Source code for avocado_i2n.cartgraph.node

# Copyright 2013-2020 Intranet AG and contributors
#
# avocado-i2n is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# avocado-i2n is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with avocado-i2n.  If not, see <http://www.gnu.org/licenses/>.

"""
Utility for the main test suite substructures like test nodes.

SUMMARY
------------------------------------------------------

Copyright: Intra2net AG

INTERFACE
------------------------------------------------------

"""

from __future__ import annotations

import os
import re
from functools import cmp_to_key
from typing import Generator
from typing import Any
import logging as log

from aexpect.exceptions import ShellCmdError
from aexpect import remote_door as door
from avocado.core.test_id import TestID
from avocado.core.nrunner.runnable import Runnable
from virttest.utils_params import Params

from . import TestSwarm, TestWorker, TestObject, NetObject
from .. import params_parser as param

logging = log.getLogger("avocado.job." + __name__)


door.DUMP_CONTROL_DIR = "/tmp"


[docs] class PrefixTreeNode(object): """A node of a prefix tree.""" def __init__(self, variant: str = None, parent: str = None) -> None: """Construct a prefix tree node.""" self.variant = variant self.parent = parent self.end_test_node = None self.children = {}
[docs] def check_child(self, variant: str) -> bool: """Check child prefix tree node.""" return variant in self.children
def get_child(self, variant: str) -> str: """Get child prefix tree node.""" return self.children[variant] def set_child(self, variant: str, child: str) -> None: """Set child prefix tree node.""" self.children[variant] = child
[docs] def unset_child(self, variant: str) -> None: """Unset child prefix tree node.""" del self.children[variant]
[docs] def traverse(self) -> Generator[None, None, None]: """Traverse the current node.""" yield self for child in self.children.values(): yield from child.traverse()
[docs] class PrefixTree(object): """A trie structure used for faster prefix lookup.""" def __init__(self) -> None: """Construct a prefix tree.""" self.variant_nodes = {} def __contains__(self, name: str) -> bool: """Check whether the prefix tree contains a given name.""" variants = name.split(".") if variants[0] not in self.variant_nodes: return False for current in self.variant_nodes[variants[0]]: for variant in variants[1:]: if not current.check_child(variant): break current = current.get_child(variant) else: return True return False
[docs] def insert(self, test_node: "TestNode") -> None: """Insert a test node name in the prefix tree.""" variants = test_node.params["name"].split(".") if variants[0] not in self.variant_nodes.keys(): self.variant_nodes[variants[0]] = [PrefixTreeNode(variants[0])] for current in self.variant_nodes[variants[0]]: for variant in variants[1:]: if not current.check_child(variant): new_child = PrefixTreeNode(variant) current.set_child(variant, new_child) if variant not in self.variant_nodes: self.variant_nodes[variant] = [] self.variant_nodes[variant] += [new_child] current = current.get_child(variant) current.end_test_node = test_node
[docs] def get(self, name: str) -> list["TestNode"]: """Get all the names of the prefix tree.""" variants = name.split(".") if variants[0] not in self.variant_nodes: return [] test_nodes = [] for current in self.variant_nodes[variants[0]]: for variant in variants[1:]: if not current.check_child(variant): break current = current.get_child(variant) else: for node in current.traverse(): if node.end_test_node is not None: test_nodes.append(node.end_test_node) return test_nodes
[docs] class EdgeRegister: """A register for the Cartesian graph edges allowing counter and worker stats extraction.""" def __init__(self) -> None: """Construct an edge register.""" self._registry = {} def __repr__(self) -> str: """Provide a representation of the object.""" return f"[edge] registry='{self._registry}'" def get_workers(self, node: "TestNode" = None) -> set[str]: """ Get all worker visits for the given (possibly bridged) test node are all nodes. :param node: possibly registered test node to get visits for :returns: all visits by all workers as worker references (allowing repetitions) """ worker_keys = set() node_keys = [node.bridged_form] if node else self._registry.keys() for node_key in node_keys: worker_keys |= {*self._registry.get(node_key, {}).keys()} return worker_keys def get_counters(self, node: "TestNode" = None, worker: TestWorker = None) -> int: """ Get all workers in the current register. :param node: optional test node to get counters for :param worker: optional worker to get counters for :returns: counter for a given node or worker (typically both) """ counter = 0 node_keys = [node.bridged_form] if node else self._registry.keys() for node_key in node_keys: worker_keys = ( [worker.id] if worker else self._registry.get(node_key, {}).keys() ) for worker_key in worker_keys: counter += self._registry.get(node_key, {}).get(worker_key, 0) return counter
[docs] def register(self, node: "TestNode", worker: TestWorker) -> None: """ Register a worker visit for the given (possibly bridged) test node. :param node: possibly registered test node to register visits for :param worker: worker that visited the test node """ if node.bridged_form not in self._registry: self._registry[node.bridged_form] = {} if worker.id not in self._registry[node.bridged_form]: self._registry[node.bridged_form][worker.id] = 0 self._registry[node.bridged_form][worker.id] += 1
[docs] class TestNode(Runnable): """ A wrapper for all test relevant parts. These include parameters, parser, used objects and dependencies to/from other test nodes (setup/cleanup). """
[docs] class ReadOnlyDict(dict[Any, Any]): """Custom implementation of a read-only attribute of dictionary type.""" def _readonly(self, *args: tuple[type, ...], **kwargs: dict[str, type]) -> None: raise RuntimeError("Cannot modify read-only dictionary") __setitem__ = _readonly __delitem__ = _readonly pop = _readonly popitem = _readonly clear = _readonly update = _readonly setdefault = _readonly
#: digit: 0 for object root, >0 for everything else #: letter: "a" (autosetup), "b" (byproduct), "c" (cleanup), "d" (duplicate) prefix_pattern = re.compile(r"^(\d+)([abcd]?)(.+)") @property def params(self) -> Params: """Parameters (cache) property.""" if self._params_cache is None: self.regenerate_params() return self._params_cache @property def shared_started_workers(self) -> set[TestWorker]: """Workers that have previously started traversing this node (incl. leaves and others).""" workers = set() if self.started_worker is not None: workers.add(self.started_worker) for bridged_node in self.bridged_nodes: if bridged_node.started_worker is not None: workers.add(bridged_node.started_worker) return workers @property def shared_finished_workers(self) -> set[TestWorker]: """Workers that have previously finished traversing this node (incl. leaves and others).""" workers = set() if self.finished_worker is not None: workers.add(self.finished_worker) for bridged_node in self.bridged_nodes: if bridged_node.finished_worker is not None: workers.add(bridged_node.finished_worker) return workers @property def shared_involved_workers(self) -> set[TestWorker]: """Workers that picked up the node and possibly have continued to either its setup or cleanup.""" worker_ids = ( self._picked_by_setup_nodes.get_workers() | self._picked_by_cleanup_nodes.get_workers() ) workers = [ w for s in TestSwarm.run_swarms for w in TestSwarm.run_swarms[s].workers if w.id in worker_ids ] return set(workers) @property def shared_results(self) -> list[dict[str, str]]: """Test results shared across all bridged nodes.""" results = list(self.results) for bridged_node in self.bridged_nodes: results += bridged_node.results return results @property def shared_filtered_results(self) -> list[dict[str, str]]: """Test results shared across all bridged nodes.""" all_results = self.shared_results if ( self.started_worker and "swarm" not in self.params["pool_scope"] and self.params.get("nets_spawner") == "lxc" ): # has separate results for each worker (doesn't matter eager of full) scope_filter = self.started_worker.swarm_id + "." + self.started_worker.id elif ( self.started_worker and "cluster" not in self.params["pool_scope"] and self.params.get("nets_spawner") == "remote" ): # has results for an entire swarm by at least N of its workers scope_filter = self.started_worker.swarm_id else: # has fully globally shared results scope_filter = "" results = [] for result in all_results: if scope_filter in result["name"]: results += [result] return results @property def shared_result_worker_ids(self) -> set[str]: """ID-s of workers that produced the shared results.""" workers = set() for result in self.shared_results: if result["status"] != "PASS": continue worker_ids = [ w.id for s in TestSwarm.run_swarms.values() for w in s.workers ] for worker_id in worker_ids: if worker_id in result["name"]: workers.add(worker_id) break return workers @property def bridged_nodes(self) -> tuple["TestNode"]: """Read-only list of bridged nodes.""" return tuple(self._bridged_nodes) @property def cloned_nodes(self) -> tuple["TestNode"]: """Read-only list of cloned nodes.""" return tuple(self._cloned_nodes) @property def setup_nodes(self) -> dict[TestNode, set[TestObject]]: """Read-only dict of setup nodes.""" return TestNode.ReadOnlyDict(self._setup_nodes) @property def cleanup_nodes(self) -> dict[TestNode, set[TestObject]]: """Read-only dict of cleanup nodes.""" return TestNode.ReadOnlyDict(self._cleanup_nodes) @property def setless_form(self) -> Params: """Test set invariant form of the test node name.""" max_restr = "" for main_restr in self.params.objects("main_restrictions"): if self.params["name"].startswith(main_restr): max_restr = ( main_restr if len(main_restr) > len(max_restr) else max_restr ) return self.params["name"].replace(max_restr + ".", "", 1) @property def bridged_form(self) -> Params: """Test worker invariant form of the test node name.""" # TODO: the order of parsing nets and vms has to be improved if len(self.objects) == 0: return self.setless_form # TODO: the long suffix does not contain anything reasonable # suffix = self.objects[0].long_suffix suffix = self.params["_name_map_file"].get("nets.cfg", "") # since this doesn't use the prefix tree a regex could match part of a variant return r"\." + self.setless_form.replace(suffix, ".+") + r"$" @property def long_prefix(self) -> Params: """Sufficiently unique prefix to identify a diagram test node.""" nets = self.params.get("nets", "").replace(" ", ".") vms = self.params.get("vms", "").replace(" ", ".") return self.prefix + "-" + nets + "." + vms @property def id(self) -> Params: """Use unique ID to identify a test node.""" return self.prefix + "-" + self.params["name"] @property def id_test(self) -> Params: """Use a unique test ID to identify a test node.""" return TestID(self.prefix, self.params["name"]) def __init__(self, prefix: str, recipe: param.Reparsable) -> None: """ Construct a test node (test) for any test objects (vms). :param recipe: variant parsing recipe for the test node """ super().__init__("avocado-vt", prefix, {}) self.prefix = prefix self.recipe = recipe self._params_cache = None self.restrs = {} self.should_run = self.default_run_decision self.should_clean = self.default_clean_decision self.finished_worker = None self.started_worker = None self._bridged_nodes = [] self._cloned_nodes = [] self.incompatible_workers = set() self.objects = [] self.results = [] # lists of parent and children test nodes self._setup_nodes = {} self._cleanup_nodes = {} self._picked_by_setup_nodes = EdgeRegister() self._picked_by_cleanup_nodes = EdgeRegister() self._dropped_setup_nodes = EdgeRegister() self._dropped_cleanup_nodes = EdgeRegister() def __repr__(self) -> str: """Provide a representation of the object.""" shortname = self.params.get("shortname", "<unknown>") return f"[node] longprefix='{self.long_prefix}', shortname='{shortname}'" def set_objects_from_net(self, net: NetObject) -> None: """ Set all node's objects from a provided test net. :param net: test net to use as first and top object """ # flattened list of objects (in composition) involved in the test self.objects = [net] # TODO: only three nesting levels from a test net are supported for test_object in net.components: self.objects += [test_object] self.objects += test_object.components # TODO: dynamically added additional images will not be detected here from . import ImageObject from .. import params_parser as param vm_name = test_object.suffix parsed_images = [c.suffix for c in test_object.components] for image_name in self.params.object_params(vm_name).objects("images"): if image_name not in parsed_images: image_suffix = f"{image_name}_{vm_name}" config = param.Reparsable() config.parse_next_dict(test_object.params.object_params(image_name)) config.parse_next_dict( {"object_suffix": image_suffix, "object_type": "images"} ) image = ImageObject(image_suffix, config) image.composites.append(test_object) self.objects += [image]
[docs] def is_occupied(self, worker: TestWorker = None) -> bool: """ Check if the test node is sufficiently occupied with respect to a given worker in various scopes. :param worker: test worker with respect to which to consider various scopes """ # by default only reentrancy of 1 is allowed independently of previous results max_concurrent_tries = self.params.get_numeric( "max_concurrent_tries", self.params.get_numeric("max_tries", 1) ) return self.is_started(worker, max(max_concurrent_tries, 1))
[docs] def is_flat(self) -> bool: """Check if the test node is flat and does not yet have objects and dependencies to evaluate.""" return len(self.objects) == 0
[docs] def is_shared_root(self) -> bool: """Check if the test node is the root of all test nodes for all test objects.""" return self.params.get_boolean("shared_root", False)
[docs] def is_object_root(self) -> bool: """Check if the test node is the root of all test nodes for some test object.""" return "object_root" in self.params
[docs] def is_unrolled(self, worker: TestWorker = None) -> bool: """ Check if the test is unrolled as composite node with dependencies. :param worker: worker a flat node is unrolled for :raises: :py:class:`RuntimeError` if the current node is not flat (cannot be unrolled) """ if self.is_shared_root(): return True elif not self.is_flat(): raise RuntimeError(f"Only flat nodes can be unrolled, {self} is not flat") elif worker and worker.net.long_suffix in self.incompatible_workers: return True elif worker is None and len(self.incompatible_workers) > 0: return True for node in self.cleanup_nodes: if self.setless_form in node.id: if worker and worker.id in node.id: return True # whether the node is unrolled for any worker if no worker specified elif worker is None: return True return False
[docs] def is_setup_ready(self, worker: TestWorker) -> bool: """ Check if all dependencies of the test were run or there were none. :param worker: relative setup readiness with respect to a worker ID """ for node in self.setup_nodes: if not node.is_flat() and worker.id not in node.params["name"]: continue if worker.id not in self._dropped_setup_nodes.get_workers(node): return False return True
[docs] def is_cleanup_ready(self, worker: TestWorker) -> bool: """ Check if all dependent tests were run or there were none. :param str worker: relative setup readiness with respect to a worker ID """ for node in self.cleanup_nodes: if not node.is_flat() and worker.id not in node.params["name"]: continue if worker.id not in self._dropped_cleanup_nodes.get_workers(node): return False return True
[docs] def is_started(self, worker: TestWorker = None, threshold: int = 1) -> bool: """ Check if the test is currently traversed by at least N (-1 for all) workers of all or some scopes. :param worker: evaluate with respect to an optional worker ID scope or globally if none given :param threshold: how eagerly the node is considered started in terms of number of required workers to use as a threshold (1 for most eagerly, -1 for most fully) :returns: whether the test was run by at least N workers of all or some scopes (N=threshold) """ if self.is_flat(): return False if ( worker and "swarm" not in self.params["pool_scope"] and self.params.get("nets_spawner") == "lxc" ): # is started separately by each worker (doesn't matter eager of full) return worker in self.shared_started_workers elif ( worker and "cluster" not in self.params["pool_scope"] and self.params.get("nets_spawner") == "remote" ): own_cluster = worker.swarm_id own_cluster_started_hosts = { w for w in self.shared_started_workers if w.swarm_id == own_cluster } if threshold == -1: # is started for an entire swarm by all of its workers that have already picked that node own_cluster_all_hosts = self.shared_involved_workers & { *TestSwarm.run_swarms[own_cluster].workers } return own_cluster_started_hosts == own_cluster_all_hosts # is started for an entire swarm by at least N of its workers return len(own_cluster_started_hosts) >= threshold else: if threshold == -1: # is started globally by all workers that have already picked that node return self.shared_started_workers == self.shared_involved_workers # is started globally by at least N workers (down to at least one worker) return len(self.shared_started_workers) >= threshold
[docs] def is_finished(self, worker: TestWorker = None, threshold: int = 1) -> bool: """ Check if the test was ever traversed by at least N (-1 for all) workers of all or some scopes. :param worker: evaluate with respect to an optional worker ID scope or globally if none given :param threshold: how eagerly the node is considered started in terms of number of required workers to use as a threshold (1 for most eagerly, -1 for most fully) :returns: whether the test was run by at least one worker of all or some scopes Threshold of 1 is the most eager manner so that any already available setup nodes are considered finished. If we instead wait for this setup to be cleaned up or synced, this would count most of the setup as finished in the very end of the traversal. Threshold of -1 is for fully traversed node by all workers unless restricted within some scope of setup reuse. """ if self.is_flat(): return True if ( worker and "swarm" not in self.params["pool_scope"] and self.params.get("nets_spawner") == "lxc" ): # is finished separately by each worker (doesn't matter eager of full) return worker in self.shared_finished_workers elif ( worker and "cluster" not in self.params["pool_scope"] and self.params.get("nets_spawner") == "remote" ): own_cluster = worker.swarm_id own_cluster_finished_hosts = { w for w in self.shared_finished_workers if w.swarm_id == own_cluster } if threshold == -1: # is finished for an entire swarm by all of its workers that have already picked that node own_cluster_all_hosts = self.shared_involved_workers & { *TestSwarm.run_swarms[own_cluster].workers } return own_cluster_finished_hosts == own_cluster_all_hosts # is finished for an entire swarm by at least N of its workers return len(own_cluster_finished_hosts) >= threshold else: if threshold == -1: # is finished globally by all workers that have already picked that node return self.shared_finished_workers == self.shared_involved_workers # is finished globally by at least N workers (down to at least one worker) return len(self.shared_finished_workers) >= threshold
def get_terminal_object(self, key: str = "object_root") -> TestObject | None: """ Determine any object that this node is a root of. :param key: parameter key to use to determine the object root :returns: object that this node is a root of if any or None otherwise """ object_root = self.params.get(key) if not object_root: return None for test_object in self.objects: if test_object.id == object_root: return test_object return None def get_stateful_objects(self, do: str = "set") -> list[TestObject]: """ Check if the test node produces any reusable setup state. :param do: state reuse or creation, one of "get" or "set" :returns: any test objects that this node produces setup for """ setup_objects = [] for test_object in self.objects: object_params = test_object.object_typed_params(self.params) object_state = object_params.get(f"{do}_state") if object_state: setup_objects += [test_object] return setup_objects def get_dependency( self, restriction: str, test_object: TestObject ) -> "TestNode | None": """ Check if the test node has a dependency parsed and available. :param restriction: name of the dependency (state or parent test set) :param test_object: object used for the dependency :returns: whether the dependency was already found among the setup nodes ..todo:: Type annotation does not support "|" with string type hint. """ # TODO: use new attribute for test_node in self.setup_nodes: # TODO: direct object comparison will not work for dynamically # (within node) created objects like secondary images node_object_suffices = [t.long_suffix for t in test_node.objects] if ( test_object in test_node.objects or test_object.long_suffix in node_object_suffices ): # search is done here to not match repeating restriction for a different object if re.search( r"(\.|^)" + restriction + r"(\.|$)", test_node.params.get("name") ): return test_node setup_object_params = test_object.object_typed_params(test_node.params) if restriction == setup_object_params.get("set_state"): return test_node return None
[docs] def should_parse(self, worker: TestWorker = None) -> bool: """ Parse if node has been dropped in all its setup nodes by at least one worker. :param worker: evaluate with respect to an optional worker ID scope or globally if none given :returns: whether the test node should be parsed """ parse_by = f" by {worker}" if worker else "" for picked_worker in self.shared_involved_workers: if ( self.is_unrolled(picked_worker) and self.is_cleanup_ready(picked_worker) and len(picked_worker.restrs) == 0 ): logging.debug( f"Should not parse {self}{parse_by} which is cleanup ready from worker {picked_worker}" ) return False logging.debug( f"Should parse {self}{parse_by} which is not cleanup ready from any worker" ) return True
[docs] def should_rerun(self, worker: TestWorker = None) -> bool: """ Check if the test node should be rerun based on some retry criteria. :param worker: evaluate with respect to an optional worker ID scope or globally if none given :returns: whether the test node should be retried The retry parameters are `max_tries` and `rerun_status` or `stop_status`. The first is the maximum number of tries, and the second two indicate when to continue or stop retrying in terms of encountered test status and can be a list of statuses. """ if self.params.get("dry_run", "no") == "yes": logging.info(f"Should not rerun via dry test run {self}") return False elif self.is_flat(): logging.debug(f"Should not rerun a flat node {self}") return False elif len(self.cloned_nodes) > 0: logging.debug(f"Should not rerun a cloned node {self}") return False elif worker and worker.id not in self.params["name"]: raise RuntimeError( f"Worker {worker.id} should not consider rerunning {self}" ) all_statuses = [ "fail", "error", "pass", "warn", "skip", "cancel", "interrupted", "unknown", ] if self.params.get("replay"): rerun_status = self.params.get_list( "rerun_status", "fail,error,warn", delimiter="," ) else: rerun_status = self.params.get_list("rerun_status", []) or all_statuses stop_status = self.params.get_list("stop_status", []) for status, status_type in [(rerun_status, "rerun"), (stop_status, "stop")]: disallowed_status = {*status} - {*all_statuses} if len(disallowed_status) > 0: raise ValueError( f"Value of {status_type} status must be a valid test status," f" found {', '.join(disallowed_status)}" ) # ignore the retry parameters for nodes that cannot be re-run (need to run at least once) max_tries = self.params.get_numeric( "max_tries", 2 if self.params.get("replay") else 1 ) # do not log when the user is not using the retry feature if max_tries > 1: stop_condition = ", ".join(stop_status) if stop_status else "NONE" rerun_condition = ", ".join(rerun_status) if rerun_status else "NONE" logging.debug( f"Could rerun {self} with stop condition {stop_condition}, a rerun condition " f"{rerun_condition}, and a maximum of {max_tries} tries" ) if max_tries < 0: raise ValueError("Number of max_tries cannot be less than zero") # analyzing rerun and stop status conditions if len(self.get_stateful_objects()) == 0: test_statuses = [r["status"].lower() for r in self.shared_results] else: # TODO: the started worker method is implicit and we need a proper function old_started_worker = self.started_worker self.started_worker = old_started_worker or worker # setup tests can be filtered across swarms test_statuses = [r["status"].lower() for r in self.shared_filtered_results] self.started_worker = old_started_worker rerun_statuses_violated = {*test_statuses} - {*rerun_status} if len(rerun_statuses_violated) > 0: logging.debug( f"Stopping test tries due to violated rerun test statuses: {rerun_status}" ) return False stop_statuses_found = {*stop_status} & {*test_statuses} if len(stop_statuses_found) > 0: logging.info( f"Stopping test tries due to obtained stop test statuses: {', '.join(stop_statuses_found)}" ) return False # the runs total also considers UNKNOWN statuses from currently occupied test nodes minus currently traversed/evaluated case total_runs = len(test_statuses) # implicitly this means that setting >1 retries will be done on tests actually collecting results (no flat nodes, dry runs, etc.) reruns_left = 0 if max_tries == 1 else max_tries - total_runs if reruns_left > 0: logging.debug( f"Still have {reruns_left} allowed reruns left and should rerun {self}" ) return True logging.debug(f"Should not rerun {self}") return False
[docs] def default_run_decision(self, worker: TestWorker) -> bool: """ Set default decision policy on whether a test node should be run or skipped. :param worker: worker which makes the run decision :returns: whether the worker should run the test node """ if self.params.get("dry_run", "no") == "yes": logging.info(f"Should not run via dry test run {self}") return False elif self.is_flat(): logging.debug(f"Should not run a flat node {self}") return False elif len(self.cloned_nodes) > 0: logging.debug(f"Should not run a cloned node {self}") return False elif worker.id not in self.params["name"]: raise RuntimeError(f"Worker {worker.id} should not try to run {self}") if len(self.get_stateful_objects()) == 0: # most standard stateless behavior is to run each test node once then rerun if needed should_run = len(self.shared_results) == 0 or self.should_rerun(worker) else: should_scan = not self.is_finished(worker, 1) should_run_from_scan = self.scan_states() if should_scan else False # rerunning of test from previous jobs is never intended if len(self.shared_filtered_results) == 0 and not should_run_from_scan: self.should_rerun = lambda _: False should_run = should_run_from_scan if should_scan else False should_run = should_run or self.should_rerun(worker) return should_run
[docs] def default_clean_decision(self, worker: TestWorker) -> bool: """ Set default decision policy on whether a test node should be cleaned or skipped. :param worker: worker which makes the clean decision :returns: whether the worker should clean the test node """ if self.params.get("dry_run", "no") == "yes": logging.info(f"Should not clean via dry test run {self}") return False elif self.is_flat(): logging.debug(f"Should not clean a flat node {self}") return False elif len(self.cloned_nodes) > 0: logging.debug(f"Should not clean a cloned node {self}") return False elif worker.id not in self.params["name"]: raise RuntimeError(f"Worker {worker.id} should not try to clean {self}") # no support for parallelism within reversible nodes since we might hit a race condition # whereby a node will be run for missing setup but its parent will be reversed before it # gets any parent-provided states for test_object in self.objects: object_params = test_object.object_typed_params(self.params) is_reversible = ( object_params.get("unset_mode_images", object_params["unset_mode"])[0] == "f" ) is_reversible |= ( object_params.get("unset_mode_vms", object_params["unset_mode"])[0] == "f" ) if is_reversible: break else: is_reversible = False if not is_reversible: return True else: # last worker should "close the door" for all workers that opened it and left for picked_worker in self.shared_involved_workers: # TODO: provide swarm filtering not just here but universally wherever needed if ( worker.swarm_id != "localhost" and worker.swarm_id not in picked_worker.id ): continue if self.is_flat() or picked_worker.id in self.params["name"]: picked_node = self else: for node in self.bridged_nodes: if picked_worker.id in node.params["name"]: picked_node = node break else: raise ValueError( f"Cannot identify picked node for involved worker {picked_worker} " f"instead of the composite {self} to consider for cleanup" ) if not picked_node.is_cleanup_ready(picked_worker): logging.debug(f"Node is not cleanup ready for {picked_worker.id}") return False # if any worker is still running this test it cannot be reversed test_statuses = [r["status"].lower() for r in picked_node.results] if "unknown" in test_statuses: logging.debug( f"A worker {picked_worker.id} is still running node which cannot yet be reversed" ) return False # all involved workers should have also flagged the generalized node as finished return self.is_finished(worker, -1)
[docs] @classmethod def prefix_priority(cls, prefix1: str, prefix2: str) -> int: """ Class method for secondary prioritization using test prefixes. :param prefix1: first prefix to use for the priority comparison :param prefix2: second prefix to use for the priority comparison :returns: negative integer if prefix1 < prefix2, positive if prefix1 > prefix2, 0 otherwise (lower is better in our standard sorting) This function also does recursive calls of sub-prefixes. """ if prefix1 == prefix2: # identical prefixes detected, nothing we can do but choose a default return 0 match1, match2 = re.match(cls.prefix_pattern, prefix1), re.match( cls.prefix_pattern, prefix2 ) digit1, alpha1, else1 = ( (prefix1, "", "") if match1 is None else match1.group(1, 2, 3) ) digit2, alpha2, else2 = ( (prefix2, "", "") if match2 is None else match2.group(1, 2, 3) ) # compare order of parsing if simple leaf nodes if digit1.isdigit() and digit2.isdigit(): digit1, digit2 = int(digit1), int(digit2) if digit1 != digit2: return digit1 - digit2 # we no longer match and are at the end of the prefix else: if digit1 != digit2: return 1 if digit1 > digit2 else -1 # compare the node type flags next if alpha1 is not None and alpha2 is not None and alpha1 != alpha2: if alpha1 == "": return -1 if alpha2 == "": return 1 # priority to lower alphas (from a down to e) return 1 if alpha1 > alpha2 else -1 # redo the comparison for the next prefix part else: if else1 == "": raise ValueError( f"could not match test prefix part {prefix1} to choose priority" ) if else2 == "": raise ValueError( f"could not match test prefix part {prefix2} to choose priority" ) # priority to the prefix that didn't terminate yet if else1.startswith("-"): return 1 elif else2.startswith("-"): return -1 # retry on next step return cls.prefix_priority(else1, else2)
[docs] def pick_parent(self, worker: TestWorker) -> "TestNode": """ Pick the next available parent based on some priority. :param worker: worker for which the parent is selected :returns: the next parent node :raises: :py:class:`RuntimeError` The current order will prioritize less traversed test paths. """ available_nodes = [ n for n in self.setup_nodes if worker.id in n.params["name"] or n.is_flat() ] available_nodes = [ n for n in available_nodes if worker.id not in self._dropped_setup_nodes.get_workers(n) ] if len(available_nodes) == 0: raise RuntimeError( f"Picked a parent of a node without remaining parents for {self}" ) sorted_nodes = sorted( available_nodes, key=cmp_to_key( lambda x, y: TestNode.prefix_priority(x.long_prefix, y.long_prefix) ), ) sorted_nodes = sorted( sorted_nodes, key=lambda n: n._picked_by_cleanup_nodes.get_counters() ) sorted_nodes = sorted(sorted_nodes, key=lambda n: int(not n.is_flat())) test_node = sorted_nodes[0] test_node._picked_by_cleanup_nodes.register(self, worker) return test_node
[docs] def pick_child(self, worker: TestWorker) -> "TestNode": """ Pick the next available child based on some priority. :param worker: worker for which the child is selected :returns: the next child node :raises: :py:class:`RuntimeError` The current order will prioritize less traversed test paths. """ available_nodes = [ n for n in self.cleanup_nodes if worker.id in n.params["name"] or n.is_flat() ] available_nodes = [ n for n in available_nodes if worker.id not in self._dropped_cleanup_nodes.get_workers(n) ] if len(available_nodes) == 0: raise RuntimeError( f"Picked a child of a node without remaining children for {self}" ) sorted_nodes = sorted( available_nodes, key=cmp_to_key( lambda x, y: TestNode.prefix_priority(x.long_prefix, y.long_prefix) ), ) sorted_nodes = sorted( sorted_nodes, key=lambda n: n._picked_by_setup_nodes.get_counters() ) sorted_nodes = sorted(sorted_nodes, key=lambda n: int(not n.is_flat())) test_node = sorted_nodes[0] test_node._picked_by_setup_nodes.register(self, worker) return test_node
[docs] def drop_parent(self, test_node: "TestNode", worker: TestWorker) -> None: """ Add a parent node to the set of visited nodes for this test. :param test_node: visited node :param worker: worker visiting the node :raises: :py:class:`ValueError` if visited node is not directly dependent """ if test_node not in self.setup_nodes: raise ValueError( f"Invalid parent to drop: {test_node} not a parent of {self}" ) self._dropped_setup_nodes.register(test_node, worker)
[docs] def drop_child(self, test_node: "TestNode", worker: TestWorker) -> None: """ Add a child node to the set of visited nodes for this test. :param test_node: visited node :param worker: worker visiting the node :raises: :py:class:`ValueError` if visited node is not directly dependent """ if test_node not in self.cleanup_nodes: raise ValueError( f"Invalid child to drop: {test_node} not a child of {self}" ) self._dropped_cleanup_nodes.register(test_node, worker)
[docs] def descend_from_node(self, test_node: "TestNode", test_object: TestObject) -> None: """ Turn the current node into a child of a parent node for a given object. :param test_node: parent node the current node is a child of :param test_object: test object via which the dependency is determined """ self._setup_nodes[test_node] = self._setup_nodes.get(test_node, set()) | { test_object } test_node._cleanup_nodes[self] = test_node._cleanup_nodes.get(self, set()) | { test_object }
[docs] def bridge_with_node(self, test_node: "TestNode") -> None: """ Bridge current node with equivalent node for a different worker. :param test_node: equivalent node for a different worker :raises: :py:class:`ValueError` if bridged node is not equivalent """ if test_node == self: return # TODO: cannot do simpler comparison due to current limitations in the bridged form elif not re.search(test_node.bridged_form, self.params["name"]): raise ValueError(f"Cannot bridge {self} with non-equivalent {test_node}") if test_node not in self._bridged_nodes: logging.info( f"Bridging {self.params['shortname']} to {test_node.params['shortname']}" ) self._bridged_nodes.append(test_node) test_node._bridged_nodes.append(self) self._picked_by_setup_nodes = test_node._picked_by_setup_nodes self._dropped_setup_nodes = test_node._dropped_setup_nodes self._picked_by_cleanup_nodes = test_node._picked_by_cleanup_nodes self._dropped_cleanup_nodes = test_node._dropped_cleanup_nodes
[docs] def clone_as_source(self, test_nodes: list["TestNode"]) -> None: """ Convert the node to a clone source for a list of its clones. :param test_nodes: clones to register as a clone source to """ self.prefix = "0" + self.prefix self._cloned_nodes = test_nodes
[docs] def pull_locations(self) -> None: """Update all setup locations for the current node.""" if self.is_flat(): return setup_path = self.params.get("swarm_pool", self.params["vms_base_dir"]) for node in self.setup_nodes: setup_locations = [":" + self.params.get("shared_pool", ".")] for net_suffix in node.shared_result_worker_ids: setup_locations += [net_suffix + ":" + setup_path] # update test parameters at runtime with worker parameters of its setup for setup_location in setup_locations: wid, _ = setup_location.split(":") for component in node.cleanup_nodes[self]: # discard parameters if we are not talking about any specific non-net object if component.key == "nets": continue object_suffix = "_" + component.long_suffix if setup_location in self.params.get( f"get_location{object_suffix}", "" ): continue if self.params.get(f"get_location{object_suffix}"): self.params[f"get_location{object_suffix}"] += ( " " + setup_location ) else: self.params[f"get_location{object_suffix}"] = setup_location # no additional parameters needed for shared (local) locations if not wid: continue # we might have results from previous jobs with non-traversed workers workers = [w for s in TestSwarm.run_swarms.values() for w in s.workers] for worker in workers: if worker.id == wid: source_suffix = "_" + wid for key in worker.params: # only provide access-related parameters from the worker if not key.startswith("nets_"): continue self.params[f"{key}{source_suffix}"] = worker.params[key] break else: raise RuntimeError( f"Could not pull setup location {setup_location} for {self}" )
[docs] def update_restrs(self, object_restrs: dict[str, str]) -> None: """ Update any restrictions with further filters. :param object_restrs: multi-line object restrictions to append """ for suffix, restriction in object_restrs.items(): self.restrs[suffix] = self.restrs.get(suffix, "") if restriction != "": if restriction.rstrip() not in self.restrs[suffix].splitlines(): self.restrs[suffix] += restriction
[docs] def regenerate_params(self, verbose: bool = False) -> None: """ Regenerate all parameters from the current reparsable config. :param bool verbose: whether to show generated parameter dictionaries """ self._params_cache = self.recipe.get_params(show_dictionaries=verbose) for key, value in list(self._params_cache.items()): if key.startswith("only_") or key.startswith("no_"): restr_type, suffix = key.split("_", maxsplit=1) restr_line = restr_type + " " + value + "\n" if value != "" else "" self.update_restrs({suffix: restr_line}) del self._params_cache[key] self.regenerate_vt_parameters()
[docs] def regenerate_vt_parameters(self) -> None: """Regenerate the parameters provided to the VT runner.""" uri = self.params.get("name") vt_params = self.params.copy() # Flatten the vt_params, discarding the attributes that are not # scalars, and will not be used in the context of nrunner for key in ("_name_map_file", "_short_name_map_file", "dep"): if key in self.params: del vt_params[key] super().__init__("avocado-vt", uri, **vt_params)
[docs] def scan_states(self) -> bool: """ Scan for present object states to reuse the test from previous runs. :returns: whether all required states are available """ should_run = True node_params = self.params.copy() is_leaf = True for test_object in self.objects: object_params = test_object.object_typed_params(self.params) object_state = object_params.get("set_state") # the test leaves an object undefined so it cannot be reused for this object if object_state is None or object_state == "": continue else: is_leaf = False # the object state has to be defined to reach this stage if object_state == "install" and test_object.is_permanent(): should_run = False break # ultimate consideration of whether the state is actually present object_suffix = f"_{test_object.key}_{test_object.long_suffix}" node_params[f"check_state{object_suffix}"] = object_state node_params[f"show_location{object_suffix}"] = ( ":" + object_params["shared_pool"] ) node_params[f"check_mode{object_suffix}"] = object_params.get( "check_mode", "rf" ) # TODO: unfortunately we need env object with pre-processed vms in order # to provide ad-hoc root vm states so we use the current advantage that # all vm state backends can check for states without a vm boot (root) if test_object.key == "vms": node_params[f"use_env{object_suffix}"] = "no" node_params[f"soft_boot{object_suffix}"] = "no" if not is_leaf: session = self.started_worker.get_session() control_path = os.path.join( self.params["suite_path"], "controls", "pre_state.control" ) mod_control_path = door.set_subcontrol_parameter( control_path, "action", "check" ) mod_control_path = door.set_subcontrol_parameter_dict( mod_control_path, "params", node_params ) try: door.run_subcontrol(session, mod_control_path) should_run = False except ShellCmdError as error: if "AssertionError" in error.output: should_run = True else: raise RuntimeError( "Could not complete state scan due to control file error" ) logging.info( f"Should{' ' if should_run else ' not '}run from scan {self} by {self.started_worker.id}" ) return should_run
[docs] def sync_states(self, params: Params) -> None: """Sync or drop present object states to clean or later skip tests from previous runs.""" node_params = self.params.copy() for key in list(node_params.keys()): if key.startswith("get_state") or key.startswith("unset_state"): del node_params[key] # the sync cleanup will be performed if at least one selected object has a cleanable state should_clean = False for test_object in self.objects: object_params = test_object.object_typed_params(self.params) object_state = object_params.get("set_state") if not object_state: continue # avoid running any test unless the user really requires cleanup or setup is reusable unset_policy = object_params.get("unset_mode", "ri") if unset_policy[0] not in ["f", "r"]: continue # avoid running any test for unselected vms if test_object.key == "nets": logging.warning("Net state cleanup is not supported") continue # the object state has to be defined to reach this stage if object_state == "install" and test_object.is_permanent(): should_clean = False break vm_name = ( test_object.suffix if test_object.key == "vms" else test_object.composites[0].suffix ) # TODO: is this needed? from .. import params_parser as param if vm_name in params.get("vms", param.all_objects("vms")): should_clean = True else: continue # TODO: cannot remove ad-hoc root states, is this even needed? if test_object.key == "vms": vm_params = object_params node_params["images_" + vm_name] = vm_params["images"] for image_name in vm_params.objects("images"): image_params = vm_params.object_params(image_name) node_params[f"image_name_{image_name}_{vm_name}"] = image_params[ "image_name" ] node_params[f"image_format_{image_name}_{vm_name}"] = image_params[ "image_format" ] if image_params.get_boolean("create_image", False): node_params[f"remove_image_{image_name}_{vm_name}"] = "yes" node_params["skip_image_processing"] = "no" suffixes = f"_{test_object.key}_{test_object.suffix}" suffixes += f"_{vm_name}" if test_object.key == "images" else "" # spread the state setup for the given test object location = ":" + object_params["shared_pool"] if unset_policy[0] == "f": # reverse the state setup for the given test object # NOTE: we are forcing the unset_mode to be the one defined for the test node because # the unset manual step behaves differently now (all this extra complexity starts from # the fact that it has different default value which is noninvasive node_params.update( { f"unset_state{suffixes}": object_state, f"unset_location{suffixes}": location, f"unset_mode{suffixes}": object_params.get("unset_mode", "ri"), f"pool_scope": "own", } ) do = "unset" logging.info(f"Need to clean up {self} by {self.started_worker.id}") else: # spread the state setup for the given test object if node_params.get("pool_filter", "reuse") in ["reuse", "block"]: logging.info( f"No need to sync {self} from {self.started_worker.id}" ) should_clean = False break else: # TODO: actual state copy support is almost fully lacking at present # and the sync operation by itself cannot guarantee equalized setup if node_params.get("pool_filter", "reuse") != "copy": raise ValueError( "Pool filtering can only be one of: reuse, copy, block" ) logging.info( f"Need to sync {self} from {location.join(',')} to {self.started_worker.id}" ) node_params.update( { f"get_state{suffixes}": object_state, f"get_location{suffixes}": location, } ) sync_scopes = set( object_params.get_list("pool_scope", ["swarm", "cluster", "shared"]) ) sync_scopes.remove("own") node_params[f"pool_scope{suffixes}"] = " ".join(sync_scopes) do = "get" # TODO: unfortunately we need env object with pre-processed vms in order # to provide ad-hoc root vm states so we use the current advantage that # all vm state backends can check for states without a vm boot (root) if test_object.key == "vms": node_params[f"use_env_{test_object.key}_{test_object.suffix}"] = "no" if should_clean: action = "Cleaning up" if unset_policy[0] == "f" else "Syncing" logging.info(f"{action} {self} for {self.started_worker.id}") session = self.started_worker.get_session() control_path = os.path.join( self.params["suite_path"], "controls", "pre_state.control" ) mod_control_path = door.set_subcontrol_parameter(control_path, "action", do) mod_control_path = door.set_subcontrol_parameter_dict( mod_control_path, "params", node_params ) try: door.run_subcontrol(session, mod_control_path) except ShellCmdError as error: logging.warning( f"{action} {self} for {self.started_worker.id} could not be completed " f"due to control file error: {error}" ) else: logging.info( f"No need to clean up or sync {self} for {self.started_worker.id}" )
[docs] def validate(self) -> None: """Validate the test node for sane attribute-parameter correspondence.""" logging.info(f"Validating {self}") if self in self.setup_nodes or self in self.cleanup_nodes: raise ValueError("Detected reflexive dependency of %s to itself" % self) if self.is_flat(): return param_nets = self.params.objects("nets") attr_nets = list(o.suffix for o in self.objects if o.key == "nets") if len(attr_nets) > 1 or len(param_nets) > 1: raise AssertionError( f"Test node {self} can have only one net ({attr_nets}/{param_nets}" ) param_net_name, attr_net_name = attr_nets[0], param_nets[0] if self.objects and self.objects[0].suffix != attr_net_name: raise AssertionError( f"The net {attr_net_name} must be the first node object {self.objects[0]}" ) if param_net_name != attr_net_name: raise AssertionError( f"Parametric and attribute nets differ {param_net_name} != {attr_net_name}" ) param_vms = set(self.params.objects("vms")) attr_vms = set(o.suffix for o in self.objects if o.key == "vms") if len(param_vms - attr_vms) > 0: raise ValueError( "Additional parametric objects %s not in %s" % (param_vms, attr_vms) ) if len(attr_vms - param_vms) > 0: raise ValueError( "Missing parametric objects %s from %s" % (param_vms, attr_vms) ) # TODO: images can currently be ad-hoc during run and thus cannot be validated for node in self.setup_nodes: if node.is_flat(): continue object_set = self.setup_nodes[node] spurious_objects = object_set - set(self.objects) if len(spurious_objects) > 0: raise ValueError( f"Detected spurious objects {spurious_objects} for dependency {node}" ) for dependency_object in object_set: object_params = dependency_object.object_typed_params(node.params) object_state = object_params.get("set_state") if not object_state: raise ValueError( f"Detected stateless dependency via {dependency_object} of {self}" ) object_params = dependency_object.object_typed_params(self.params) dependency_state = object_params["get_state"] # cloned nodes don't have an explicit get_state parameter for the object if dependency_state == "0root": continue if object_state != dependency_state: raise ValueError( f"Detected incompatible dependency {object_state}!={dependency_state} via {dependency_object} of {self}" )