Source code for avocado_i2n.plugins.runner

# Copyright 2013-2020 Intranet AG and contributors
#
# avocado-i2n is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# avocado-i2n is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with avocado-i2n.  If not, see <http://www.gnu.org/licenses/>.

"""
Specialized test runner for the plugin.

SUMMARY
------------------------------------------------------

Copyright: Intra2net AG

INTERFACE
------------------------------------------------------

"""

from __future__ import annotations

import os
import time
import json
from typing import Any
import logging as log

import asyncio

log.getLogger("asyncio").parent = log.getLogger("avocado.job")

from avocado.core.job import Job
from avocado.core.nrunner.task import TASK_DEFAULT_CATEGORY, Task
from avocado.core.messages import MessageHandler
from avocado.core.plugin_interfaces import SuiteRunner as RunnerInterface
from avocado.core.status.repo import StatusRepo
from avocado.core.status.server import StatusServer
from avocado.core.suite import TestSuite
from avocado.core.teststatus import STATUSES_MAPPING
from avocado.core.task.runtime import RuntimeTask, PreRuntimeTask, PostRuntimeTask
from avocado.core.task.statemachine import TaskStateMachine, Worker
from avocado.core.dispatcher import SpawnerDispatcher
from virttest.utils_params import Params

from ..cartgraph import TestGraph, TestWorker, TestNode


logging = log.getLogger("avocado.job." + __name__)


[docs] class TestRunner(RunnerInterface): """Test runner for Cartesian graph traversal.""" name = "traverser" description = "Runs tests through a Cartesian graph traversal" def __init__(self) -> None: """Construct minimal attributes for the Cartesian runner.""" self.tasks = [] self.status_repo = None self.status_server = None self.previous_results = [] """results functionality""" async def _update_status(self) -> None: message_handler = MessageHandler() while True: try: (_, task_id, _, index) = self.status_repo.status_journal_summary_pop() except IndexError: await asyncio.sleep(0.05) continue message = self.status_repo.get_task_data(task_id, index) tasks_by_id = { str(runtime_task.task.identifier): runtime_task.task for runtime_task in self.tasks } task = tasks_by_id.get(task_id) message_handler.process_message(message, task, self.job)
[docs] def all_results_ok(self) -> bool: """ Evaluate if all tests run under this runner have an ok status. :returns: whether all tests ended with acceptable status ..todo:: There might be repeated tests here that have eventually passed so we might need to return an overall "pass" status. """ shared_status = True for test in self.job.result.tests: shared_status &= any( STATUSES_MAPPING[t["status"]] for t in self.job.result.tests if t["name"].name == test["name"].name ) if not shared_status: return False return True
[docs] def results_from_previous_jobs(self) -> None: """Parse results from previous job to add to all traversed graph nodes.""" params = self.job.config["param_dict"] # TODO: we could really benefit from using an appropriate params object here replay_jobs = params.get("replay", "").split(" ") for replay_job in replay_jobs: if not replay_job: continue replay_dir = self.job.config.get("datadir.paths.logs_dir", ".") replay_results = os.path.join(replay_dir, replay_job, "results.json") if not os.path.isfile(replay_results): raise RuntimeError( "Cannot find replay job results file %s" % replay_results ) with open(replay_results) as json_file: logging.info(f"Parsing previous results to replay {replay_results}") data = json.load(json_file) if "tests" not in data: raise RuntimeError( f"Cannot find tests to replay against in {replay_results}" ) for test_details in data["tests"]: logging.info(f"Updating with previous test results {test_details}") self.previous_results += [test_details]
"""running functionality"""
[docs] async def run_test_task(self, node: TestNode) -> None: """ Run a test instance inside a subprocess. :param node: test node to run """ host = node.params["nets_host"] or "process" gateway = node.params["nets_gateway"] or "localhost" spawner = node.params["nets_spawner"] logging.debug( f"Running {node.id} on {gateway}/{host} using {spawner} isolation" ) if node.started_worker is None: raise RuntimeError(f"No worker is running {node}") if node.started_worker.spawner is None: raise RuntimeError(f"Worker {node.started_worker} cannot spawn tasks") if not self.status_repo: self.status_repo = StatusRepo(self.job.unique_id) self.status_server = StatusServer( self.job.config.get("run.status_server_listen"), self.status_repo ) asyncio.ensure_future(self.status_server.serve_forever()) # TODO: this needs more customization asyncio.ensure_future(self._update_status()) status_server_uri = self.job.config.get("run.status_server_uri") node.regenerate_vt_parameters() raw_task = Task( node, node.id_test, [status_server_uri], category=TASK_DEFAULT_CATEGORY, job_id=self.job.unique_id, ) raw_task.runnable.output_dir = os.path.join( self.job.test_results_path, raw_task.identifier.str_filesystem ) task = RuntimeTask(raw_task) config = ( self.test_suite.config if hasattr(self, "test_suite") else self.job.config ) pre_tasks = PreRuntimeTask.get_tasks_from_test_task( task, 1, self.job.test_results_path, None, status_server_uri, self.job.unique_id, config, ) post_tasks = PostRuntimeTask.get_tasks_from_test_task( task, 1, self.job.test_results_path, None, status_server_uri, self.job.unique_id, config, ) tasks = [*pre_tasks, task, *post_tasks] for task in tasks: if spawner == "lxc": task.spawner_handle = host elif spawner == "remote": task.spawner_handle = node.started_worker.get_session() self.tasks += tasks # TODO: use a single state machine for all test nodes when we are able # to at least add requested tasks to it safely (using its locks) await Worker( state_machine=TaskStateMachine(tasks, self.status_repo), spawner=node.started_worker.spawner, max_running=1, task_timeout=self.job.config.get("task.timeout.running"), ).run()
[docs] async def run_test_node(self, node: TestNode, status_timeout: int = 10) -> bool: """ Run a test node with a potential retry prefix modification. :param node: test node to run :returns: whether the test succeeded as a simple boolean test result status :raises: :py:class:`AssertionError` if the ran test node contains no objects """ if node.is_flat(): raise AssertionError( "Cannot run test nodes not using any test objects, here %s" % node ) original_prefix = node.prefix # appending a suffix to retries so we can tell them apart run_times = len(node.shared_results) if run_times > 0: node.prefix = original_prefix + f"r{run_times}" uid = node.id_test.uid name = node.params["name"] node_result = {"name": name, "status": "UNKNOWN"} node.results += [node_result] await self.run_test_task(node) for i in range(status_timeout): try: test_result = next( ( x for x in self.job.result.tests if x["name"].name == name and x["name"].uid == uid ) ) if len(node.results) > 0: duration = float(test_result["time_elapsed"]) max_allowed = max( [ float(r["time_elapsed"]) for r in node.results if r["status"] == "PASS" ], default=duration, ) logging.info( f"Validating test duration {duration} is within usual bounds ({max_allowed})" ) if ( test_result["status"] == "PASS" and float(duration) > 1.25 * max_allowed ): logging.warning( f"Test result {uid} was obtained but test took much longer ({duration}) than usual" ) # TODO: could we replace with WARN before the status is announced to the status server? test_result["status"] = "WARN" # job and local results as interpreted by us have only serializable easy to use data job_result = {key: value for key, value in test_result.items()} job_result["name"] = test_result["name"].name node.results += [job_result] node.results.remove(node_result) test_status = test_result["status"].lower() break except StopIteration: await asyncio.sleep(30) logging.warning( f"Test result {uid} wasn't yet found and could not be extracted ({i}/{status_timeout})" ) test_status = "error" else: logging.error( f"Test result {uid} for {name} could not be found and extracted, defaulting to ERROR" ) node.prefix = original_prefix logging.info(f"Finished running test with status {test_status.upper()}") # no need to log when test was not repeated if run_times > 0: logging.info(f"Finished running test {run_times + 1} times") # FIX: as VT's retval is broken (always True), we fix its handling here if test_status in ["error", "fail"]: return False else: return True
[docs] def run_workers(self, test_suite: TestSuite | TestGraph, params: Params) -> None: """ Run all workers in parallel traversing the graph for each. :param test_suite: test suite to traverse as graph or a custom test graph to traverse :param params: runtime parameters used for extra customization :raises: TypeError if the provided test suite is of unknown type """ if isinstance(test_suite, TestSuite): graph = TestGraph() graph.restrs.update(self.job.config["vm_strs"]) for node in test_suite.tests: assert isinstance( node, TestNode ), f"Invalid test type fo test suite to run workers on for {node}" # apply default_only or user overwritten restriction node.update_restrs(self.job.config["vm_strs"]) graph.new_nodes(test_suite.tests) graph.parse_shared_root_from_object_roots(params) graph.new_workers(TestGraph.parse_workers(params)) elif isinstance(test_suite, TestGraph): graph = test_suite else: raise TypeError( f"Unknown test suite type for {type(test_suite)}, must be a Cartesian graph or an Avocado test suite" ) graph.visualize(self.job.logdir) self.results_from_previous_jobs() graph.runner = self for worker in graph.workers.values(): if not worker.spawner: worker.spawner = SpawnerDispatcher(self.job.config, self.job)[ worker.params["nets_spawner"] ].obj if not worker.start(): raise RuntimeError(f"Failed to start environment {worker.id}") slot_workers = sorted([*graph.workers.values()], key=lambda x: x.params["name"]) to_traverse = [graph.traverse_object_trees(s, params) for s in slot_workers] asyncio.get_event_loop().run_until_complete( asyncio.wait_for(asyncio.gather(*to_traverse), self.job.timeout or None) )
[docs] def run_suite(self, job: Job, test_suite: TestSuite) -> set[str]: """ Run one or more tests and report with test result. :param job: job that includes the test suite :param test_suite: test suite with some tests to run :returns: a set with types of test failures """ summary = set() if not test_suite.enabled: job.interrupted_reason = f"Suite {test_suite.name} is disabled." return summary job.result.tests_total = len(test_suite.tests) self.job = job self.test_suite = test_suite self.tasks = [] self.status_repo = StatusRepo(self.job.unique_id) self.status_server = StatusServer( self.job.config.get("run.status_server_listen"), self.status_repo ) loop = asyncio.get_event_loop() loop.run_until_complete(self.status_server.create_server()) asyncio.ensure_future(self.status_server.serve_forever()) # TODO: this needs more customization asyncio.ensure_future(self._update_status()) params = self.job.config["param_dict"] try: self.run_workers(test_suite, params) if not self.all_results_ok(): # the summary is a set so only a single failed test is enough summary.add("FAIL") except (KeyboardInterrupt, asyncio.TimeoutError) as error: logging.info(str(error)) self.job.interrupted_reason = str(error) summary.add("INTERRUPTED") # clean up any test node session cache for session in TestWorker._session_cache.values(): session.close() # TODO: The avocado implementation needs a workaround here: # Wait until all messages may have been processed by the # status_updater. This should be replaced by a mechanism # that only waits if there are missing status messages to # be processed, and, only for a given amount of time. # Tests with non received status will always show as SKIP # because of result reconciliation. time.sleep(0.05) self.job.result.end_tests() # the status server does not provide a way to verify it is fully initialized # so zero test runs need to access an internal attribute before closing if self.status_server._server_task: self.status_server.close() # Update the overall summary with found test statuses, which will # determine the Avocado command line exit status test_ids = [ runtime_task.task.identifier for runtime_task in self.tasks if runtime_task.task.category == "test" ] summary.update( [ status.upper() for status in self.status_repo.get_result_set_for_tasks(test_ids) ] ) return summary