Source code for avocado_i2n.states.setup

# Copyright 2013-2021 Intranet AG and contributors
#
# avocado-i2n is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# avocado-i2n is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with avocado-i2n.  If not, see <http://www.gnu.org/licenses/>.

"""
Utility to manage off and on test object states.

SUMMARY
------------------------------------------------------

Copyright: Intra2net AG

INTERFACE
------------------------------------------------------

"""

import os
from typing import Any
from typing import Generator
import logging as log

from avocado.core import exceptions
from avocado_vt.test import VirtTest
from virttest.utils_env import Env
from virttest.utils_params import Params

logging = log.getLogger("avocado.job." + __name__)


#: list of all available state backends and operations
__all__ = [
    "BACKENDS",
    "ROOTS",
    "show_states",
    "check_states",
    "get_states",
    "set_states",
    "unset_states",
    "push_states",
    "pop_states",
]


class StateBackend:
    """A general backend implementing state manipulation."""

    @classmethod
    def show(cls, params: dict[str, str], object: Any = None) -> list[str]:
        """
        Return a list of available states of a specific type.

        :param params: configuration parameters
        :param object: object whose states are manipulated
        :returns: list of detected states
        """
        raise NotImplementedError("Cannot use abstract state backend")

    @classmethod
    def get(cls, params: dict[str, str], object: Any = None) -> None:
        """
        Retrieve a state disregarding the current changes.

        :param params: configuration parameters
        :param object: object whose states are manipulated
        """
        raise NotImplementedError("Cannot use abstract state backend")

    @classmethod
    def set(cls, params: dict[str, str], object: Any = None) -> None:
        """
        Store a state saving the current changes.

        :param params: configuration parameters
        :param object: object whose states are manipulated
        """
        raise NotImplementedError("Cannot use abstract state backend")

    @classmethod
    def unset(cls, params: dict[str, str], object: Any = None) -> None:
        """
        Remove a state with previous changes.

        :param params: configuration parameters
        :param object: object whose states are manipulated
        """
        raise NotImplementedError("Cannot use abstract state backend")

    @classmethod
    def check_root(cls, params: dict[str, str], object: Any = None) -> bool:
        """
        Check whether a root state or essentially the object exists.

        :param params: configuration parameters
        :param object: object whose states are manipulated
        :returns: whether the object (root state) is exists
        """
        raise NotImplementedError("Cannot use abstract state backend")

    @classmethod
    def get_root(cls, params: dict[str, str], object: Any = None) -> None:
        """
        Get a root state or essentially due to pre-existence do nothing.

        :param params: configuration parameters
        :param object: object whose states are manipulated
        """
        pass

    @classmethod
    def set_root(cls, params: dict[str, str], object: Any = None) -> None:
        """
        Set a root state to provide object existence.

        :param params: configuration parameters
        :param object: object whose states are manipulated
        """
        raise NotImplementedError("Cannot use abstract state backend")

    @classmethod
    def unset_root(cls, params: dict[str, str], object: Any = None) -> None:
        """
        Unset a root state to prevent object existence.

        :param params: configuration parameters
        :param object: object whose states are manipulated
        """
        raise NotImplementedError("Cannot use abstract state backend")


#: available state backend implementations
BACKENDS = {}
#: keywords reserved for root states
ROOTS = ["root", "0root", "boot", "0boot"]


def _parametric_object_iteration(
    params: dict[str, str], composites: list[tuple[str, str]] = None
) -> Generator[Params, None, None]:
    """
    Iterate over a hierarchy of stateful parametric objects.

    :param params: parameters of the parametric object is processed
    :param composites: current composite parametric object
    :raises: :py:class:`ValueError` if no hierarchy is configured
    """
    object_composition = params.objects("states_chain")
    if len(object_composition) == 0:
        raise ValueError(
            "Have to specify at least one parametric object type "
            "or an overall composition through `states_chain`"
        )
    if composites is None:
        composites = []
    params_obj_type = object_composition[len(composites)]
    composites.append(None)
    for params_obj_name in params.objects(params_obj_type):
        composites[-1] = (params_obj_name, params_obj_type)
        obj_params = params.object_params(params_obj_name)
        obj_params[params_obj_type] = params_obj_name
        obj_params["object_name"] = "/".join([c[0] for c in composites])
        obj_params["object_type"] = "/".join([c[1] for c in composites])
        if params_obj_type != object_composition[-1]:
            yield from _parametric_object_iteration(obj_params, composites)
        # object type parameters don't propagate downwards in the hierarchy
        obj_type_params = obj_params.object_params(params_obj_type)
        yield obj_type_params
    composites.pop()


def _state_check_chain(
    do: str,
    env: Env,
    params_obj_type: str,
    params_obj_name: str,
    state_params: dict[str, str],
) -> bool:
    """
    State chain from set/set/unset states to check states.

    :param do: get, set, or unset
    :param env: test environment
    :param params_obj_type: type of the parametric object to check
    :param params_obj_name: name of the parametric object to check
    :param state_params: image parameters of the vm's image which is processed
    """
    state_params["check_state"] = state_params[f"{do}_state"]
    if state_params.get(f"{do}_location"):
        state_params["show_location"] = state_params[f"{do}_location"]
    if do == "set":
        state_params["check_opts"] = "soft_boot=yes"
        state_params["soft_boot"] = "yes"
    else:
        state_params["check_opts"] = "soft_boot=no"
        state_params["soft_boot"] = "no"

    # restrict inner call parameteric object types and names
    composite_types = params_obj_type.split("/")
    composite_names = params_obj_name.split("/")
    for composite_type, composite_name in zip(composite_types, composite_names):
        state_params[composite_type] = composite_name
    state_params["states_chain"] = composite_types[-1]
    state_exists = check_states(state_params, env)

    return state_exists


[docs] def show_states(run_params: Params, env: Env = None) -> list[str]: """ Return a list of available states of a specific type. :param run_params: configuration parameters :param env: test environment or nothing if not needed :returns: list of detected states """ states = [] for state_params in _parametric_object_iteration(run_params): params_obj_name = state_params["object_name"] params_obj_type = state_params["object_type"] if params_obj_type in state_params.objects("skip_types"): continue if params_obj_type == "nets/vms/images" and state_params.get_boolean( "image_readonly", False ): logging.warning( f"Incorrect configuration: cannot use any state " f"from readonly image {params_obj_name} - skipping" ) continue logging.debug( "Checking %s for available %s states using %s", params_obj_name, params_obj_type, state_params["states"], ) state_backend = BACKENDS[state_params["states"]] params_obj_states = state_backend.show(state_params, env) logging.info( "Detected %s states for %s: %s", params_obj_type, params_obj_name, ", ".join(params_obj_states), ) states += params_obj_states return states
[docs] def check_states(run_params: Params, env: Env = None) -> bool: """ Check whether a given state exits. :param run_params: configuration parameters :returns: whether the given state exists .. note:: We can check for multiple states of multiple objects at the same time through our choice of configuration. """ for state_params in _parametric_object_iteration(run_params): params_obj_name = state_params["object_name"] params_obj_type = state_params["object_type"] if params_obj_type in state_params.objects("skip_types"): continue if params_obj_type == "nets/vms/images" and state_params.get_boolean( "image_readonly", False ): logging.warning( f"Incorrect configuration: cannot use any state " f"from readonly image {params_obj_name} - skipping" ) continue # if the snapshot is not defined skip (leaf tests that are no setup) if not state_params.get("check_state"): logging.debug( f"Skip checking any {params_obj_type} state for {params_obj_name}" ) continue else: state = state_params["check_state"] # NOTE: there is no concept of "check_mode" here state_params["check_opts"] = state_params.get("check_opts", "soft_boot=yes") # TODO: document after experimental period state_params["check_mode"] = state_params.get("check_mode", "rf") state_backend = BACKENDS[state_params["states"]] # TODO: we don't support other parametric object instances vm = env.get_vm(state_params["vms"]) if env is not None else None # TODO: consider whether we need this with more advanced env handling # if vm is None and env is not None: # vm = env.create_vm(state_params.get('vm_type'), state_params.get('target'), # params_obj_name, state_params, None) state_object = env if params_obj_type == "nets" else vm action_if_root_exists = state_params["check_mode"][0] action_if_root_doesnt_exist = state_params["check_mode"][1] # always check the corresponding root state as a prerequisite root_exists = state_backend.check_root(state_params, state_object) root_params = state_params.copy() if not root_exists: if action_if_root_doesnt_exist == "f": root_params["pool_scope"] = "own" state_backend.set_root(root_params, state_object) root_exists = True elif action_if_root_doesnt_exist == "r": return False else: raise exceptions.TestError( f"Invalid policy {action_if_root_doesnt_exist}: The root " "nonexistence action can be either of 'reuse' or 'force'." ) elif action_if_root_exists == "f": root_params["pool_scope"] = "own" # TODO: implement unset root for all parametric object types if params_obj_type == "nets/vms": vm.destroy( gracefully=root_params.get_dict("check_opts").get( "soft_boot", "yes" ) == "yes" ) else: state_backend.unset_root(root_params, state_object) state_backend.set_root(root_params, state_object) root_exists = True else: state_backend.get_root(root_params, state_object) if state in ROOTS: state_exists = root_exists else: state_exists = state in state_backend.show(state_params, state_object) if not state_exists: return False return True
[docs] def get_states(run_params: Params, env: Env = None) -> None: """ Retrieve a state disregarding the current changes. :param run_params: configuration parameters :raises: :py:class:`exceptions.TestAbortError` if the retrieved state doesn't exist, the vm is unavailable from the env, or snapshot exists in passive mode (abort) :raises: :py:class:`exceptions.TestError` if invalid policy was used """ for state_params in _parametric_object_iteration(run_params): params_obj_name = state_params["object_name"] params_obj_type = state_params["object_type"] if params_obj_type in state_params.objects("skip_types"): logging.debug( f"Skip getting states of types {', '.join(state_params.objects('skip_types'))}" ) continue if params_obj_type == "nets/vms/images" and state_params.get_boolean( "image_readonly", False ): logging.warning( f"Incorrect configuration: cannot use any state " f"from readonly image {params_obj_name} - skipping" ) continue # if the state is not defined skip (leaf tests that are no setup) if not state_params.get("get_state"): logging.debug( f"Skip getting any {params_obj_type} state for {params_obj_name}" ) continue else: state = state_params["get_state"] state_params["get_mode"] = state_params.get("get_mode", "ra") logging.info(f"Getting {params_obj_type} state {state} for {params_obj_name}") state_exists = _state_check_chain( "get", env, params_obj_type, params_obj_name, state_params ) state_backend = BACKENDS[state_params["states"]] # TODO: we don't support other parametric object instances vm = env.get_vm(state_params["vms"]) if env is not None else None state_object = env if params_obj_type == "nets" else vm action_if_exists = state_params["get_mode"][0] action_if_doesnt_exist = state_params["get_mode"][1] if not state_exists and "a" == action_if_doesnt_exist: logging.info("Aborting because of missing snapshot for setup") raise exceptions.TestAbortError( "Snapshot '%s' of %s doesn't exist. Aborting " "due to passive mode." % (state_params["get_state"], params_obj_name) ) elif not state_exists and "i" == action_if_doesnt_exist: logging.warning("Ignoring missing snapshot for setup") continue elif not state_exists: raise exceptions.TestError( "Invalid policy %s: The start action on missing state can be " "either of 'abort', 'ignore'." % state_params["get_mode"] ) elif state_exists and "a" == action_if_exists: logging.info("Aborting because of unwanted snapshot for setup") raise exceptions.TestAbortError( "Snapshot '%s' of %s already exists. Aborting " "due to passive mode." % (state_params["get_state"], params_obj_name) ) elif state_exists and "r" == action_if_exists: pass elif state_exists and "i" == action_if_exists: logging.warning("Ignoring present snapshot for setup") continue elif state_exists: raise exceptions.TestError( "Invalid policy %s: The start action on present state can be " "either of 'abort', 'reuse', 'ignore'." % state_params["get_mode"] ) if state_params["get_state"] in ROOTS: state_backend.get_root(state_params, state_object) else: state_backend.get(state_params, state_object)
[docs] def set_states(run_params: Params, env: Env = None) -> None: """ Store a state saving the current changes. :param run_params: configuration parameters :raises: :py:class:`exceptions.TestAbortError` if unexpected/missing snapshot in passive mode (abort) :raises: :py:class:`exceptions.TestError` if invalid policy was used """ for state_params in _parametric_object_iteration(run_params): params_obj_name = state_params["object_name"] params_obj_type = state_params["object_type"] if params_obj_type in state_params.objects("skip_types"): logging.debug( f"Skip setting states of types {', '.join(state_params.objects('skip_types'))}" ) continue if params_obj_type == "nets/vms/images" and state_params.get_boolean( "image_readonly", False ): logging.warning( f"Incorrect configuration: cannot use any state " f"from readonly image {params_obj_name} - skipping" ) continue # if the state is not defined skip (leaf tests that are no setup) if not state_params.get("set_state"): logging.debug( f"Skip setting any {params_obj_type} state for {params_obj_name}" ) continue else: state = state_params["set_state"] state_params["set_mode"] = state_params.get("set_mode", "ff") logging.info(f"Setting {params_obj_type} state {state} for {params_obj_name}") state_exists = _state_check_chain( "set", env, params_obj_type, params_obj_name, state_params ) state_backend = BACKENDS[state_params["states"]] # TODO: we don't support other parametric object instances vm = env.get_vm(state_params["vms"]) if env is not None else None state_object = env if params_obj_type == "nets" else vm action_if_exists = state_params["set_mode"][0] action_if_doesnt_exist = state_params["set_mode"][1] if state_exists and "a" == action_if_exists: logging.info("Aborting because of unwanted snapshot for later cleanup") raise exceptions.TestAbortError( "Snapshot '%s' of %s already exists. Aborting " "due to passive mode." % (state_params["set_state"], params_obj_name) ) elif state_exists and "r" == action_if_exists: logging.info("Keeping the already existing snapshot untouched") continue elif state_exists and "f" == action_if_exists: logging.info("Overwriting the already existing snapshot") state_params["unset_state"] = state_params["set_state"] if state_params["set_state"] in ROOTS: state_backend.unset_root(state_params, state_object) else: from .pool import SourcedStateBackend if issubclass(state_backend, SourcedStateBackend): # overwriting arbitrary external states in the backing chain can result in invalid # derivative states when branching out and other problems, do this only manually if # you really know what you are doing which would depend on a case-by-case basis logging.warning( "Preserving the already existing snapshot due to overwrite dependency coupling" ) else: logging.info("Removing the already existing snapshot") state_backend.unset(state_params, state_object) elif state_exists: raise exceptions.TestError( "Invalid policy %s: The end action on present state can be " "either of 'abort', 'reuse', 'force'." % state_params["set_mode"] ) elif not state_exists and "a" == action_if_doesnt_exist: logging.info("Aborting because of missing snapshot for later cleanup") raise exceptions.TestAbortError( "Snapshot '%s' of %s doesn't exist. Aborting " "due to passive mode." % (state_params["set_state"], params_obj_name) ) elif not state_exists and "f" == action_if_doesnt_exist: if not state_params["set_state"] in ROOTS and not state_backend.check_root( state_params, state_object ): raise exceptions.TestError( "Cannot force set state without a root state, use enforcing check " "policy to also force root (existing stateful object) creation." ) elif not state_exists: raise exceptions.TestError( "Invalid policy %s: The end action on missing state can be " "either of 'abort', 'force'." % state_params["set_mode"] ) if state_params["set_state"] in ROOTS: state_backend.set_root(state_params, state_object) else: state_backend.set(state_params, state_object)
[docs] def unset_states(run_params: Params, env: Env = None) -> None: """ Remove a state with previous changes. :param run_params: configuration parameters :raises: :py:class:`exceptions.TestAbortError` if missing snapshot in passive mode (abort) :raises: :py:class:`exceptions.TestError` if invalid policy was used """ for state_params in _parametric_object_iteration(run_params): params_obj_name = state_params["object_name"] params_obj_type = state_params["object_type"] if params_obj_type in state_params.objects("skip_types"): logging.debug( f"Skip unsetting states of types {', '.join(state_params.objects('skip_types'))}" ) continue if params_obj_type == "nets/vms/images" and state_params.get_boolean( "image_readonly", False ): logging.warning( f"Incorrect configuration: cannot use any state " f"from readonly image {params_obj_name} - skipping" ) continue # if the state is not defined skip (leaf tests that are no setup) if not state_params.get("unset_state"): logging.debug( f"Skip unsetting any {params_obj_type} state for {params_obj_name}" ) continue else: state = state_params["unset_state"] state_params["unset_mode"] = state_params.get("unset_mode", "fi") logging.info(f"Unsetting {params_obj_type} state {state} for {params_obj_name}") state_exists = _state_check_chain( "unset", env, params_obj_type, params_obj_name, state_params ) state_backend = BACKENDS[state_params["states"]] # TODO: we don't support other parametric object instances vm = env.get_vm(state_params["vms"]) if env is not None else None state_object = env if params_obj_type == "nets" else vm action_if_exists = state_params["unset_mode"][0] action_if_doesnt_exist = state_params["unset_mode"][1] if not state_exists and "a" == action_if_doesnt_exist: logging.info("Aborting because of missing snapshot for final cleanup") raise exceptions.TestAbortError( "Snapshot '%s' of %s doesn't exist. Aborting " "due to passive mode." % (state_params["unset_state"], params_obj_name) ) elif not state_exists and "i" == action_if_doesnt_exist: logging.warning( "Ignoring missing snapshot for final cleanup (will not be removed)" ) continue elif not state_exists: raise exceptions.TestError( "Invalid policy %s: The unset action on missing state can be " "either of 'abort', 'ignore'." % state_params["unset_mode"] ) elif state_exists and "r" == action_if_exists: logging.info( "Preserving state '%s' of %s for later test runs", state_params["unset_state"], params_obj_name, ) continue elif state_exists and "f" == action_if_exists: pass elif state_exists: raise exceptions.TestError( "Invalid policy %s: The unset action on present state can be " "either of 'reuse', 'force'." % state_params["unset_mode"] ) if state_params["unset_state"] in ROOTS: state_backend.unset_root(state_params, state_object) else: state_backend.unset(state_params, state_object)
[docs] def push_states(run_params: Params, env: Env = None) -> None: """ Identical to the set operation but used within the push/pop pair. :param run_params: configuration parameters """ for state_params in _parametric_object_iteration(run_params): params_obj_name = state_params["object_name"] params_obj_type = state_params["object_type"] if not state_params.get("push_state"): continue else: state = state_params["push_state"] if state in ROOTS: # cannot be done with root states continue # restrict parametric objects of this type in the subroutine composite_types = params_obj_type.split("/") composite_names = params_obj_name.split("/") for composite_type, composite_name in zip(composite_types, composite_names): state_params[composite_type] = composite_name state_params["states_chain"] = composite_types[-1] state_params["set_state"] = state_params["push_state"] state_params["set_mode"] = state_params.get("push_mode", "af") set_states(state_params, env)
[docs] def pop_states(run_params: Params, env: Env = None) -> None: """ Retrieve and remove a state/snapshot. :param run_params: configuration parameters """ for state_params in _parametric_object_iteration(run_params): params_obj_name = state_params["object_name"] params_obj_type = state_params["object_type"] if not state_params.get("pop_state"): continue else: state = state_params["pop_state"] if state in ROOTS: # cannot be done with root states continue # restrict parametric objects of this type in the subroutine composite_types = params_obj_type.split("/") composite_names = params_obj_name.split("/") for composite_type, composite_name in zip(composite_types, composite_names): state_params[composite_type] = composite_name state_params["states_chain"] = composite_types[-1] state_params["get_state"] = state_params["pop_state"] state_params["get_mode"] = state_params.get("pop_mode", "ra") get_states(state_params, env) state_params["unset_state"] = state_params["pop_state"] state_params["unset_mode"] = state_params.get("pop_mode", "fa") unset_states(state_params, env)