Source code for avocado_i2n.states.qcow2

# Copyright 2013-2021 Intranet AG and contributors
#
# avocado-i2n is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# avocado-i2n is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with avocado-i2n.  If not, see <http://www.gnu.org/licenses/>.

"""
Module for the QCOW2 state management backends.

SUMMARY
------------------------------------------------------

Copyright: Intra2net AG

INTERFACE
------------------------------------------------------

"""

import os
import re
import json
import shutil
from typing import Any
import logging as log

from virttest import env_process
from virttest.qemu_storage import QemuImg
from virttest.utils_params import Params

from .pool import RootSourcedStateBackend, SourcedStateBackend

logging = log.getLogger("avocado.job." + __name__)


#: off qemu states regex (0 vm size)
QEMU_OFF_STATES_REGEX = re.compile(
    r"^\d+\s+([\w\.-]+)\s*(0 B)\s+\d{4}-\d\d-\d\d", flags=re.MULTILINE
)
#: on qemu states regex (>0 vm size)
QEMU_ON_STATES_REGEX = re.compile(
    r"^\d+\s+([\w\.-]+)\s*(?!0 B)(\d+e?[\-\+]?[\.\d]* \w+)\s+\d{4}-\d\d-\d\d",
    flags=re.MULTILINE,
)


[docs] class QCOW2Backend(RootSourcedStateBackend): """Backend manipulating image states as internal QCOW2 snapshots.""" _require_running_object = False
[docs] @classmethod def state_type(cls) -> str: """State type string representation depending used for logging.""" return "on/vm" if cls._require_running_object else "off/image"
[docs] @classmethod def show(cls, params: Params, object: Any = None) -> list[str]: """ Return a list of available states of a specific type. All arguments match the base class. """ qemu_img = QemuImg(params, params["images_base_dir"], params["images"]) logging.debug( "Showing %s internal states for image %s", cls.state_type(), params["images"], ) on_snapshots_dump = qemu_img.snapshot_list(force_share=True) pattern = ( QEMU_ON_STATES_REGEX if cls._require_running_object else QEMU_OFF_STATES_REGEX ) state_tuples = re.findall(pattern, on_snapshots_dump) states = [] for state_tuple in state_tuples: logging.debug( "Detected %s state '%s' of size %s", cls.state_type(), state_tuple[0], state_tuple[1], ) states.append(state_tuple[0]) return states
[docs] @classmethod def get(cls, params: Params, object: Any = None) -> None: """ Retrieve a state disregarding the current changes. All arguments match the base class. """ vm, vm_name = object, params["vms"] state, image = params["get_state"], params["images"] params["image_chain"] = f"{image} snapshot" params["image_raw_device_snapshot"] = "yes" params["image_name_snapshot"] = state qemu_img = QemuImg(params, params["images_base_dir"], image) logging.info( "Reusing %s state '%s' of %s/%s", cls.state_type(), state, vm_name, image ) qemu_img.snapshot_apply()
[docs] @classmethod def set(cls, params: Params, object: Any = None) -> None: """ Store a state saving the current changes. All arguments match the base class. """ vm, vm_name = object, params["vms"] state, image = params["set_state"], params["images"] params["image_chain"] = f"{image} snapshot" params["image_raw_device_snapshot"] = "yes" params["image_name_snapshot"] = state qemu_img = QemuImg(params, params["images_base_dir"], image) logging.info( "Creating %s state '%s' of %s/%s", cls.state_type(), state, vm_name, image ) qemu_img.snapshot_create()
[docs] @classmethod def unset(cls, params: Params, object: Any = None) -> None: """ Remove a state with previous changes. All arguments match the base class. """ vm, vm_name = object, params["vms"] state, image = params["unset_state"], params["images"] params["image_chain"] = f"{image} snapshot" params["image_raw_device_snapshot"] = "yes" params["image_name_snapshot"] = state qemu_img = QemuImg(params, params["images_base_dir"], image) logging.info( "Removing %s state '%s' of %s/%s", cls.state_type(), state, vm_name, image ) qemu_img.snapshot_del()
@classmethod def _check_root(cls, params: Params, object: Any = None) -> bool: """ Check whether a root state or essentially the object exists locally. All arguments match the base class. """ vm_name = params["vms"] image_name = params["image_name"] logging.debug( "Checking whether %s's %s exists (root state requested)", vm_name, image_name, ) if not os.path.isabs(image_name): image_name = os.path.join(params["images_base_dir"], image_name) image_format = params.get("image_format", "qcow2") logging.debug("Checking for %s image %s", image_format, image_name) image_format = "" if image_format in ["raw", ""] else "." + image_format if object is not None and object.is_alive(): logging.info( "The required virtual machine %s is alive and it shouldn't be", vm_name ) return False if os.path.exists(image_name + image_format): logging.info( "The required virtual machine %s's %s exists", vm_name, image_name ) return True else: logging.info( "The required virtual machine %s's %s doesn't exist", vm_name, image_name, ) return False @classmethod def _get_root(cls, params: Params, object: Any = None) -> None: """ Get a root state or essentially due to pre-existence do nothing. All arguments match the base class. """ pass @classmethod def _set_root(cls, params: Params, object: Any = None) -> None: """ Set a root state to provide object existence. All arguments match the base class. """ vm_name = params["vms"] if object is not None and object.is_alive(): object.destroy(gracefully=params.get_boolean("soft_boot", True)) image_name = params["image_name"] if not os.path.isabs(image_name): image_name = os.path.join(params["images_base_dir"], image_name) image_format = params.get("image_format") image_format = "" if image_format in ["raw", ""] else "." + image_format if not os.path.exists(image_name + image_format): os.makedirs(os.path.dirname(image_name), exist_ok=True) logging.info("Creating image %s for %s", image_name, vm_name) params.update({"create_image": "yes", "force_create_image": "yes"}) env_process.preprocess_image(None, params, image_name) @classmethod def _unset_root(cls, params: Params, object: Any = None) -> None: """ Unset a root state to prevent object existence. All arguments match the base class. """ vm_name = params["vms"] if object is not None and object.is_alive(): object.destroy(gracefully=params.get_boolean("soft_boot", True)) image_name = params["image_name"] if not os.path.isabs(image_name): image_name = os.path.join(params["images_base_dir"], image_name) logging.info("Removing image %s for %s", image_name, vm_name) params.update({"remove_image": "yes"}) env_process.postprocess_image(None, params, image_name) try: os.rmdir(os.path.dirname(image_name)) except OSError as error: logging.debug("Image directory not yet empty: %s", error)
[docs] class QCOW2ExtBackend(SourcedStateBackend, QCOW2Backend): """Backend manipulating image states as external QCOW2 snapshots.""" _require_running_object = False @classmethod def _show(cls, params: Params, object: Any = None) -> list[str]: """ Return a list of available states of a specific type. All arguments match the base class. """ vm_name, image_name = params["vms"], params["images"] vm_id = params["object_id"] state_dir = params["swarm_pool"] vm_dir = os.path.join(state_dir, vm_id) qemu_img = QemuImg(params, vm_dir, image_name) logging.debug( "Showing external states for %s image %s locally in %s", vm_name, image_name, state_dir, ) image_dir = os.path.join(os.path.dirname(qemu_img.image_filename), image_name) if not os.path.exists(image_dir): return [] snapshots = os.listdir(image_dir) states = [] for snapshot in snapshots: if not snapshot.endswith(".qcow2"): continue size = os.stat(os.path.join(image_dir, snapshot)).st_size state = snapshot[:-6] logging.debug( f"Detected {cls.state_type()} state '{state}' of size " f"{round(size / 1024**3, 3)} GB ({size})" ) states.append(state) return states @classmethod def _get(cls, params: Params, object: Any = None) -> None: """ Retrieve a state disregarding the current changes. All arguments match the base class. """ vm_name, image_name = params["vms"], params["images"] vm_id = params["object_id"] state_dir = params["swarm_pool"] vm_dir = os.path.join(state_dir, vm_id) image_dir = os.path.join(vm_dir, image_name) state = params["get_state"] params["image_chain"] = f"snapshot {image_name}" params["image_name_snapshot"] = os.path.join(image_dir, state) params["image_format_snapshot"] = "qcow2" qemu_img = QemuImg( params, os.path.join(params["vms_base_dir"], vm_name), image_name ) logging.info( "Reusing %s state '%s' of %s/%s", cls.state_type(), state, vm_name, image_name, ) qemu_img.create(params, ignore_errors=False) @classmethod def _set(cls, params: Params, object: Any = None) -> None: """ Store a state saving the current changes. All arguments match the base class. """ vm_name, image_name = params["vms"], params["images"] vm_id = params["object_id"] state_dir = params["swarm_pool"] vm_dir = os.path.join(state_dir, vm_id) image_dir = os.path.join(vm_dir, image_name) state = params["set_state"] logging.info( "Creating %s state '%s' of %s/%s", cls.state_type(), state, vm_name, image_name, ) state_file = os.path.join(image_dir, state + ".qcow2") # TODO: this does not follow a simple imperative boundary and has to be refactored # together with a more natural support for qcow2ext and general external state chains, # i.e. no conditionals allowed at the boundary qemu_img = QemuImg( params, os.path.join(params["vms_base_dir"], vm_name), image_name ) image_info = json.loads(qemu_img.info(output="json")) backing_file = image_info.get("backing-filename", "") inverse = params.copy() inverse["image_name"] = os.path.join(image_dir, state) if os.path.exists(state_file): qemu_img_inverse = QemuImg( inverse, os.path.join(params["vms_base_dir"], vm_name), image_name ) image_info = json.loads(qemu_img_inverse.info(output="json")) inverse_file = image_info.get("backing-filename", "") if state_file == backing_file: # disallow loops and circular backing references, assuming an ancestor and squashing back logging.info( f"Overwriting pre-existing backing state {state} via committing" ) qemu_img.commit(params) elif inverse_file == backing_file: logging.info( f"Overwriting pre-existing backing state {state} via forward replacement" ) os.makedirs(image_dir, exist_ok=True) os.unlink(state_file) shutil.copy(qemu_img.image_filename, state_file) else: raise RuntimeError( "Cannot perform nontrivial pre-existing state overwrite for qcow2ext" ) else: os.makedirs(image_dir, exist_ok=True) shutil.copy(qemu_img.image_filename, state_file) @classmethod def _unset(cls, params: Params, object: Any = None) -> None: """ Remove a state with previous changes. All arguments match the base class. """ vm_name, image_name = params["vms"], params["images"] vm_id = params["object_id"] state_dir = params["swarm_pool"] vm_dir = os.path.join(state_dir, vm_id) image_dir = os.path.join(vm_dir, image_name) state = params["unset_state"] # TODO: the qemu img could become useful if we implement the below todo # qemu_img = QemuImg(params, os.path.join(params["vms_base_dir"], vm_name), image_name) logging.info( "Removing %s state '%s' of %s/%s", cls.state_type(), state, vm_name, image_name, ) # TODO: should we move to pointer image in case removed state is in backing chain? os.unlink(os.path.join(image_dir, state + ".qcow2"))
[docs] @classmethod def check_root(cls, params: Params, object: Any = None) -> bool: """ Check whether a root state or essentially the object exists locally. All arguments match the base class. """ return QCOW2Backend._check_root(params, object)
@classmethod def get_root(cls, params: Params, object: Any = None) -> None: """ Get a root state or essentially due to pre-existence do nothing. All arguments match the base class. """ QCOW2Backend._get_root(params, object) @classmethod def set_root(cls, params: Params, object: Any = None) -> None: """ Set a root state to provide object existence. All arguments match the base class. """ QCOW2Backend._set_root(params, object)
[docs] @classmethod def unset_root(cls, params: Params, object: Any = None) -> None: """ Unset a root state to prevent object existence. All arguments match the base class. """ QCOW2Backend._unset_root(params, object)
[docs] class QCOW2VTBackend(QCOW2Backend): """Backend manipulating vm states as QCOW2 snapshots using VT's VM bindings.""" _require_running_object = True
[docs] @classmethod def show(cls, params: Params, object: Any = None) -> set[str]: """ Return a list of available states of a specific type. All arguments match the base class. """ logging.debug( f"Showing {cls.state_type()} internal states for vm {params['vms']}" ) states = set() for image_name in params.objects("images"): image_params = params.object_params(image_name) # TODO: refine method arguments by providing at least the image name directly image_params["images"] = image_name image_states = super().show(image_params, object=object) if len(states) == 0: states = image_states else: states = states.intersect(image_states) return states
[docs] @classmethod def get(cls, params: Params, object: Any = None) -> None: """ Retrieve a state disregarding the current changes. All arguments match the base class. """ vm, vm_name = object, params["vms"] logging.info("Reusing vm state '%s' of %s", params["get_state"], vm_name) vm.pause() vm.loadvm(params["get_state"]) vm.resume(timeout=3)
[docs] @classmethod def set(cls, params: Params, object: Any = None) -> None: """ Store a state saving the current changes. All arguments match the base class. """ vm, vm_name = object, params["vms"] logging.info("Setting vm state '%s' of %s", params["set_state"], vm_name) vm.pause() vm.savevm(params["set_state"]) vm.resume(timeout=3)
[docs] @classmethod def unset(cls, params: Params, object: Any = None) -> None: """ Remove a state with previous changes. All arguments match the base class. """ vm, vm_name = object, params["vms"] logging.info("Removing vm state '%s' of %s", params["unset_state"], vm_name) vm.pause() # NOTE: this was supposed to be implemented in the Qemu VM object but # it is not unlike savevm and loadvm, perhaps due to command availability vm.verify_status("paused") logging.debug("Deleting VM %s from %s", vm_name, params["unset_state"]) vm.monitor.send_args_cmd("delvm id=%s" % params["unset_state"]) vm.verify_status("paused") vm.resume(timeout=3)
@classmethod def _check_root(cls, params: Params, object: Any = None) -> bool: """ Check whether a root state or essentially the object is running. All arguments match the base class. """ vm_name = params["vms"] logging.debug("Checking whether %s's root state is fully available", vm_name) for image_name in params.objects("images"): image_params = params.object_params(image_name) image_path = image_params["image_name"] if not os.path.isabs(image_path): image_path = os.path.join(image_params["images_base_dir"], image_path) image_format = image_params.get("image_format", "qcow2") image_format = "" if image_format in ["raw", ""] else "." + image_format if not os.path.exists(image_path + image_format): logging.info( "The required virtual machine %s has a missing image %s", vm_name, image_path + image_format, ) return False if not params.get_boolean("use_env", True): return True logging.debug("Checking whether %s is on (boot state requested)", vm_name) vm = object if vm is not None and vm.is_alive(): logging.info("The required virtual machine %s is on", vm_name) return True else: logging.info("The required virtual machine %s is off", vm_name) return False @classmethod def _set_root(cls, params: Params, object: Any = None) -> None: """ Set a root state to provide running object. All arguments match the base class. ..todo:: Study better the environment pre/postprocessing details necessary for flawless vm destruction and creation to improve these. """ vm_name = params["vms"] for image_name in params.objects("images"): image_params = params.object_params(image_name) image_path = image_params["image_name"] if not os.path.isabs(image_path): image_path = os.path.join(image_params["images_base_dir"], image_path) image_format = image_params.get("image_format") image_format = "" if image_format in ["raw", ""] else "." + image_format if not os.path.exists(image_path + image_format): logging.info( "Creating image %s in order to boot %s", image_path + image_format, vm_name, ) os.makedirs(os.path.dirname(image_path), exist_ok=True) image_params.update( {"create_image": "yes", "force_create_image": "yes"} ) env_process.preprocess_image(None, image_params, image_path) if not params.get_boolean("use_env", True): return logging.info("Booting %s to provide boot state", vm_name) vm = object if vm is None: raise ValueError("Need an environmental object to boot") # vm = env.create_vm(params.get('vm_type'), params.get('target'), # vm_name, params, None) if not vm.is_alive(): vm.create() @classmethod def _unset_root(cls, params: Params, object: Any = None) -> None: """ Unset a root state to prevent object from running. All arguments match the base class. """ vm_name = params["vms"] logging.info("Shutting down %s to prevent boot state", vm_name) vm = object if vm is not None and vm.is_alive(): vm.destroy(gracefully=False)
[docs] def get_image_path(params: Params) -> str: """ Get the absolute path to a QCOW2 image. :param params: configuration parameters :returns: absolute path to the QCOW2 image """ image_name = params["image_name"] image_format = params.get("image_format") if image_format is None: raise ValueError( f"Unspecified image format for {image_name} - " "must be qcow2 or raw" ) if image_format not in ["raw", "qcow2"]: raise ValueError( f"Incompatible image format {image_format} for" f" {image_name} - must be qcow2 or raw" ) if not os.path.isabs(image_name): image_name = os.path.join(params["images_base_dir"], image_name) image_format = "" if image_format == "raw" else "." + image_format image_path = image_name + image_format return image_path
[docs] def convert_image(params: Params) -> None: """ Convert a raw img to a QCOW2 or other image usable for virtual machines. :param params: configuration parameters :raises: py:class:`FileNotFoundError` if the source image doesn't exist :raises: py:class:`AssertionError` when the source image is in use .. note:: This function could be used with qemu-img for more general images and not just the QCOW2 format. """ raw_directory = params.get("raw_image_dir", ".") raw_image = params["raw_image"] # allow the user to specify a path prefix for image files logging.info(f"Using image prefix {raw_directory}") source_image = os.path.join(raw_directory, raw_image) params.update( { "image_name_rawimg1": source_image, "image_format_rawimg1": "raw", "image_raw_device_rawimg1": "yes", } ) source_qemu_img = QemuImg(params, raw_directory, "rawimg1") if not os.path.isfile(source_image): raise FileNotFoundError(f"Source image {source_image} doesn't exist") target_qemu_img = QemuImg(params, params["images_base_dir"], params["images"]) target_image = get_image_path(params) # create the target directory if needed target_directory = os.path.dirname(target_image) os.makedirs(target_directory, exist_ok=True) if os.path.isfile(target_image): logging.debug(f"{target_image} already exists, checking if it's in use") result = target_qemu_img.check(params, params["images_base_dir"]) if result.exit_status == 0: logging.debug(f"{target_image} not in use, integrity asserted") else: if '"write" lock' in result.stderr_text: logging.error(f"{target_image} is in use, refusing to convert") raise logging.debug(f"{target_image} exists but cannot check integrity") logging.info(f"Overwriting existing {target_image}") params["convert_target"] = params["images"] params["convert_compressed"] = "yes" source_qemu_img.convert(params, params["images_base_dir"]) logging.debug("Conversion successful")