Source code for avocado_i2n.cartgraph.worker
# Copyright 2013-2023 Intranet AG and contributors
#
# avocado-i2n is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# avocado-i2n is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with avocado-i2n. If not, see <http://www.gnu.org/licenses/>.
"""
Utility for the main test suite substructures like test objects.
SUMMARY
------------------------------------------------------
Copyright: Intra2net AG
INTERFACE
------------------------------------------------------
"""
from __future__ import annotations
import logging as log
import aexpect
from aexpect.exceptions import ShellTimeoutError
from aexpect import remote
from aexpect.client import RemoteSession
from virttest.utils_params import Params
from . import NetObject
logging = log.getLogger("avocado.job." + __name__)
[docs]
class TestEnvironment(object):
"""Generic environment isolating a given test."""
def __init__(self, id: str) -> None:
"""
Construct a test environment for any test nodes (tests).
:param id: ID of the test environment
"""
self.id = id
[docs]
class TestSwarm(TestEnvironment):
"""A wrapper for a test swarm of workers traversing the graph."""
run_swarms = {}
def __init__(self, id: str, workers: list[TestWorker] = None) -> None:
"""
Construct a test swarm (of sub-environments for execution) for any test nodes (tests).
The rest of the arguments are inherited from the base class.
"""
super().__init__(id)
self.workers = workers or []
def __repr__(self) -> str:
"""Provide a representation of the object."""
dump = f"[swarm] id='{self.id}', workers='{len(self.workers)}'"
for worker in self.workers:
dump = f"{dump}\n\t{worker}"
return dump
[docs]
class TestWorker(TestEnvironment):
"""A wrapper for a test worker traversing the graph."""
_session_cache = {}
@property
def params(self) -> Params:
"""Parameters (cache) property."""
return self.net.params
@property
def restrs(self) -> dict[str, str]:
"""Restrictions property."""
return self.net.restrs
def __init__(self, id_net: NetObject) -> None:
"""
Construct a test worker (execution environment) for any test nodes (tests).
:param id_net: flat test net object to get configuration from
The rest of the arguments are inherited from the base class.
"""
super().__init__(id_net.params["shortname"])
self.net = id_net
_, self.swarm_id, _ = self.params["name"].split(".")
self.spawner = None
def __repr__(self) -> str:
"""Provide a representation of the object."""
return f"[worker] id='{self.id}', spawner='{self.params['nets_spawner']}'"
[docs]
def overwrite_with_slot(self, slot: str) -> None:
"""
Overwrite worker parameters with configuration extrapolated from a slot string.
:param slot: slot string in the format "gateway/host"
"""
env_tuple = tuple(slot.split("/"))
if len(env_tuple) == 1:
env_net = ""
env_name = "c" + env_tuple[0] if env_tuple[0] else ""
if env_name != "":
prefix = self.params["nets_ip_prefix"]
ip = f"{prefix}.{env_name[1:]}"
else:
ip = "localhost"
port = self.params["nets_shell_port"]
# NOTE: at present handle empty environment id (lack of slots) as an indicator
# of using non-isolated serial runs via the old process environment spawner
env_type = "lxc" if env_name else "process"
elif len(env_tuple) == 2:
env_net = env_tuple[0]
env_name = env_tuple[1]
if not env_name.isdigit():
raise RuntimeError(
f"Invalid remote host '{env_name}', "
f"only numbers (as forwarded ports) accepted"
)
env_type = "remote"
port = f"22{env_name}"
ip = env_net
else:
raise ValueError(f"Slot string {slot} could not be parsed")
self.params["nets_gateway"] = env_net
self.params["nets_host"] = env_name
self.params["nets_spawner"] = env_type
self.params["nets_shell_host"] = ip
self.params["nets_shell_port"] = port
[docs]
def start(self) -> bool:
"""
Start the environment for executing a test node.
:returns: whether the environment is available after current or previous start
:raises: :py:class:`ValueError` when environment ID could not be parsed
"""
logging.info(f"Starting worker {self.id} environment")
isolation_type = self.params["nets_spawner"]
if isolation_type == "process":
logging.debug("Serial runs do not have any startable environment")
return True
elif isolation_type == "lxc":
import lxc
cid = self.params["nets_host"]
container = lxc.Container(cid)
if not container.running:
logging.info(f"Starting container environment {cid}")
return container.start()
return container.running
elif isolation_type == "remote":
# TODO: send wake-on-lan package to start remote host (assuming routable)
logging.warning("Assuming the remote host is running for now")
return True
else:
raise RuntimeError(f"Unsupported isolation type {isolation_type}")
[docs]
def stop(self) -> bool:
"""
Stop the environment for executing a test node.
:returns: whether the environment stopping succeded
:raises: :py:class:`ValueError` when environment ID could not be parsed
"""
logging.info(f"Stopping worker {self.id} environment")
isolation_type = self.params["nets_spawner"]
if isolation_type == "process":
logging.debug("Serial runs do not have any stoppable environment")
return True
elif isolation_type == "lxc":
import lxc
cid = self.params["nets_host"]
container = lxc.Container(cid)
if container.running:
logging.info(f"Stopping container environment {cid}")
return container.stop()
return container.running
elif isolation_type == "remote":
# TODO: send shutdown via session to stop remote host (assuming routable)
logging.warning("Assuming the remote host is not running for now")
return True
else:
raise RuntimeError(f"Unsupported isolation type {isolation_type}")
def get_session(self) -> RemoteSession:
"""
Get a remote session to the current slot for the given test node.
:returns: remote session to the slot determined from current node environment
"""
log.getLogger("aexpect").parent = log.getLogger("avocado.job")
address = self.params["nets_shell_host"] + ":" + self.params["nets_shell_port"]
cache = type(self)._session_cache
session = cache.get(address)
if session:
# check for corrupted sessions
try:
logging.debug(
"Remote session health check: " + session.cmd_output("date")
)
except ShellTimeoutError as error:
logging.warning(f"Bad remote session health for {address}!")
session = None
if not session:
session = remote.wait_for_login(
self.params["nets_shell_client"],
self.params["nets_shell_host"],
self.params["nets_shell_port"],
self.params["nets_username"],
self.params["nets_password"],
self.params["nets_shell_prompt"],
)
cache[address] = session
return session