Source code for avocado_i2n.states.pool

# Copyright 2013-2021 Intranet AG and contributors
#
# avocado-i2n is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# avocado-i2n is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with avocado-i2n.  If not, see <http://www.gnu.org/licenses/>.

"""
Module for the QCOW2 pool state management backend.

SUMMARY
------------------------------------------------------

Copyright: Intra2net AG

INTERFACE
------------------------------------------------------

"""

import os
import time
from typing import Any
import logging as log

import shutil
import contextlib
import fcntl
import errno
import json

from aexpect import remote, ops_linux as ops
from aexpect.client import RemoteSession
from avocado.utils import crypto
from virttest.utils_params import Params

from virttest.qemu_storage import QemuImg

from .setup import StateBackend


logging = log.getLogger("avocado.job." + __name__)


#: skip waiting on locks if we only read from the pool for all processes
#: WARNING: use it only if you know what you are doing
SKIP_LOCKS = False


[docs] class TransferOps: """A small namespace for pool transfer operations of multiple types.""" _session_cache = {} @classmethod def get_session(cls, host: str, params: Params) -> RemoteSession: """ Get a possibly reused session to the remote location. :param host: remote host name for the remote location :param params: configuration parameters :returns: a new session or previously cached session """ session = cls._session_cache.get(host) if not session: session = remote.remote_login( params["nets_shell_client"], params["nets_shell_host"], params["nets_shell_port"], params["nets_username"], params["nets_password"], params["nets_shell_prompt"], ) cls._session_cache[host] = session return session
[docs] @classmethod def list_paths(cls, pool_path: str, params: Params) -> list[str]: """ List all states in a path from the pool. :param pool_path: pool path to list pool states from :param params: configuration parameters """ hosts, path = pool_path.split(":") if hosts != "": return cls.list_remote(pool_path, params) elif ";" in path: return cls.list_link(path.replace(";", ""), params) else: return cls.list_local(path, params)
[docs] @classmethod def compare(cls, cache_path: str, pool_path: str, params: Params) -> bool: """ Compare cache and pool external state version. :param cache_path: cache path to compare with :param pool_path: pool path to compare with :param params: configuration parameters """ hosts, path = pool_path.split(":") if hosts != "": return cls.compare_remote(cache_path, pool_path, params) elif ";" in path: return cls.compare_link(cache_path, path.replace(";", ""), params) else: return cls.compare_local(cache_path, path, params)
[docs] @classmethod def download(cls, cache_path: str, pool_path: str, params: Params) -> None: """ Download a path from the pool depending on the pool location. :param cache_path: cache path to download to :param pool_path: pool path to download from :param params: configuration parameters """ hosts, path = pool_path.split(":") if hosts != "": cls.download_remote(cache_path, pool_path, params) elif ";" in path: cls.download_link(cache_path, path.replace(";", ""), params) else: cls.download_local(cache_path, path, params)
[docs] @classmethod def upload(cls, cache_path: str, pool_path: str, params: Params) -> None: """ Upload a path to the pool depending on the pool location. :param cache_path: cache path to upload from :param pool_path: pool path to upload to :param params: configuration parameters """ hosts, path = pool_path.split(":") if hosts != "": cls.upload_remote(cache_path, pool_path, params) elif ";" in path: cls.upload_link(cache_path, path.replace(";", ""), params) else: cls.upload_local(cache_path, path, params)
[docs] @classmethod def delete(cls, pool_path: str, params: Params) -> None: """ Delete a path in the pool depending on the pool location. :param pool_path: path in the pool to delete :param params: configuration parameters """ hosts, path = pool_path.split(":") if hosts != "": cls.delete_remote(pool_path, params) elif ";" in path: cls.delete_link(path.replace(";", ""), params) else: cls.delete_local(path, params)
[docs] @staticmethod def list_local(pool_path: str, params: Params) -> list[str]: """ List all states in a path from the pool. All arguments are identical to the main entry method. """ if not os.path.exists(pool_path): return [] return os.listdir(pool_path)
[docs] @staticmethod def compare_local(cache_path: str, pool_path: str, params: Params) -> bool: """ Compare cache and pool external state version. All arguments are identical to the main entry method. """ if os.path.exists(cache_path): local_hash = crypto.hash_file(cache_path, 1048576, "md5") else: local_hash = "" if os.path.exists(pool_path): remote_hash = crypto.hash_file(pool_path, 1048576, "md5") else: remote_hash = "" return local_hash == remote_hash
[docs] @staticmethod def download_local(cache_path: str, pool_path: str, params: Params) -> None: """ Download a path from the pool depending on the pool location. All arguments are identical to the main entry method. """ os.makedirs(os.path.dirname(cache_path), exist_ok=True) update_timeout = params.get_numeric("update_pool_timeout", 300) with image_lock(pool_path, update_timeout) as lock: if TransferOps.compare_local(cache_path, pool_path, params): logging.info(f"Skip download of an already available {cache_path}") return shutil.copy(pool_path, cache_path)
[docs] @staticmethod def upload_local(cache_path: str, pool_path: str, params: Params) -> None: """ Upload a path to the pool depending on the pool location. All arguments are identical to the main entry method. """ update_timeout = params.get_numeric("update_pool_timeout", 300) with image_lock(pool_path, update_timeout) as lock: if TransferOps.compare_local(cache_path, pool_path, params): logging.info(f"Skip upload of an already available {cache_path}") return os.makedirs(os.path.dirname(pool_path), exist_ok=True) shutil.copy(cache_path, pool_path)
[docs] @staticmethod def delete_local(pool_path: str, params: Params) -> None: """ Delete a path in the pool depending on the pool location. All arguments are identical to the main entry method. """ update_timeout = params.get_numeric("update_pool_timeout", 300) with image_lock(pool_path, update_timeout) as lock: os.unlink(pool_path)
[docs] @staticmethod def list_remote(pool_path: str, params: Params) -> list[str]: """ List all states in a path from the pool. All arguments are identical to the main entry method. """ host, path = pool_path.split(":") session = TransferOps.get_session(host, params) status, output = session.cmd_status_output(f"ls {path}") if status != 0: logging.debug(f"Path {path} not found: {output}") return [] return output.split()
[docs] @staticmethod def compare_remote(cache_path: str, pool_path: str, params: Params) -> bool: """ Compare cache and pool external state version. All arguments are identical to the main entry method. """ if os.path.exists(cache_path): local_hash = crypto.hash_file(cache_path, 1048576, "md5") else: local_hash = "" host, path = pool_path.split(":") session = TransferOps.get_session(host, params) remote_hash = ops.hash_file(session, path, "1M", "md5") return local_hash == remote_hash
[docs] @staticmethod def download_remote(cache_path: str, pool_path: str, params: Params) -> None: """ Download a path from the pool depending on the pool location. All arguments are identical to the main entry method. """ os.makedirs(os.path.dirname(cache_path), exist_ok=True) # TODO: no support for remote lock files yet host, path = pool_path.split(":") if TransferOps.compare_remote(cache_path, pool_path, params): logging.info( f"Skip download of an already available and valid {cache_path}" ) return if os.path.exists(cache_path): logging.info(f"Force download of an already available {cache_path}") remote.copy_files_from( params["nets_shell_host"], params["nets_file_transfer_client"], params["nets_username"], params["nets_password"], params["nets_file_transfer_port"], path, cache_path, timeout=params.get_numeric("update_pool_timeout", 300), )
[docs] @staticmethod def upload_remote(cache_path: str, pool_path: str, params: Params) -> None: """ Upload a path to the pool depending on the pool location. All arguments are identical to the main entry method. """ # TODO: need to create remote directory if not available # TODO: no support for remote lock files yet host, path = pool_path.split(":") if TransferOps.compare_remote(cache_path, pool_path, params): logging.info(f"Skip upload of an already available {pool_path}") return logging.info(f"Will possibly force upload to {pool_path}") remote.copy_files_to( params["nets_shell_host"], params["nets_file_transfer_client"], params["nets_username"], params["nets_password"], params["nets_file_transfer_port"], cache_path, path, timeout=params.get_numeric("update_pool_timeout", 300), )
[docs] @staticmethod def delete_remote(pool_path: str, params: Params) -> None: """ Delete a path in the pool depending on the pool location. All arguments are identical to the main entry method. """ host, path = pool_path.split(":") session = remote.remote_login( params["nets_shell_client"], params["nets_shell_host"], params["nets_shell_port"], params["nets_username"], params["nets_password"], params["nets_shell_prompt"], ) session.cmd(f"rm {path}")
[docs] class QCOW2ImageTransfer(StateBackend): """Backend manipulating root or external states from a shared pool of QCOW2 images.""" ops = TransferOps @staticmethod def get_image_path(params: Params) -> str: """ Get the absolute path to a QCOW2 image. :param params: configuration parameters :returns: absolute path to the QCOW2 image """ vm_name, image_name = params["vms"], params["images"] vm_dir = os.path.join(params["vms_base_dir"], vm_name) image_path, image_format = params["image_name"], params.get("image_format") if image_format is None: raise ValueError( f"Unspecified image format for {image_name} - " "must be qcow2 or raw" ) if image_format not in ["raw", "qcow2"]: raise ValueError( f"Incompatible image format {image_format} for" f" {image_name} - must be qcow2 or raw" ) if not os.path.isabs(image_path): image_path = os.path.join(vm_dir, image_path) image_format = "" if image_format == "raw" else "." + image_format image_path = image_path + image_format return image_path
[docs] @classmethod def check_root(cls, params: Params, object: Any = None) -> bool: """ Check whether a root state or essentially the object exists. All arguments match the base class. """ vm_name = params["vms"] image_name = params["image_name"] target_image = cls.get_image_path(params) shared_pool = ":" + params["shared_pool"] image_base_name = os.path.join(vm_name, os.path.basename(target_image)) logging.debug( f"Checking for shared {vm_name}/{image_name} existence" f" in the shared pool {shared_pool}" ) src_image_name = os.path.join(shared_pool, image_base_name) # it is possible that the the root state is partially provided pool_images = cls.ops.list_paths(os.path.join(shared_pool, vm_name), params) if image_name + ".qcow2" in pool_images: logging.info("The shared %s image exists", src_image_name) return True else: logging.info("The shared %s image doesn't exist", src_image_name) return False
@classmethod def get_root(cls, params: Params, object: Any = None) -> None: """ Get a root state or essentially due to pre-existence do nothing. All arguments match the base class. """ vm_name = params["vms"] image_name = params["image_name"] target_image = cls.get_image_path(params) shared_pool = ":" + params["shared_pool"] image_base_names = os.path.join(vm_name, os.path.basename(target_image)) logging.info( f"Downloading shared {vm_name}/{image_name} " f"from the shared pool {shared_pool}" ) src_image_name = os.path.join(shared_pool, image_base_names) cls.ops.download(target_image, src_image_name, params) @classmethod def set_root(cls, params: Params, object: Any = None) -> None: """ Set a root state to provide object existence. All arguments match the base class. """ vm_name = params["vms"] image_name = params["image_name"] target_image = cls.get_image_path(params) shared_pool = ":" + params["shared_pool"] image_base_names = os.path.join(vm_name, os.path.basename(target_image)) logging.info( f"Uploading shared {vm_name}/{image_name} " f"to the shared pool {shared_pool}" ) dst_image_name = os.path.join(shared_pool, image_base_names) cls.ops.upload(target_image, dst_image_name, params)
[docs] @classmethod def unset_root(cls, params: Params, object: Any = None) -> None: """ Unset a root state to prevent object existence. All arguments match the base class and in addition: """ vm_name = params["vms"] image_name = params["image_name"] target_image = cls.get_image_path(params) shared_pool = ":" + params["shared_pool"] image_base_names = os.path.join(vm_name, os.path.basename(target_image)) logging.info( f"Removing shared {vm_name}/{image_name} " f"from the shared pool {shared_pool}" ) dst_image_name = os.path.join(shared_pool, image_base_names) cls.ops.delete(dst_image_name, params)
@classmethod def get_dependency(cls, state: str, params: Params) -> str: """ Return a backing state that the current state depends on. :param state: state name to retrieve the backing dependency of The rest of the arguments match the signature of the other methods here. """ vm_id, image_name = params["object_id"], params["images"] vm_dir = os.path.join(params["swarm_pool"], vm_id) params["image_chain"] = f"snapshot {image_name}" params["image_name_snapshot"] = os.path.join(image_name, state) params["image_format_snapshot"] = "qcow2" # TODO: we might want to return the complete backing chain but in some # cases parts of it are stored in a remote location # params["backing_chain"] = "yes" qemu_img = QemuImg(params.object_params("snapshot"), vm_dir, "snapshot") image_info = qemu_img.info(force_share=True, output="json") image_file = json.loads(image_info).get("backing-filename", "") return os.path.basename(image_file.replace(".qcow2", ""))
[docs] @classmethod def compare_chain( cls, state: str, cache_dir: str, pool_dir: str, params: Params ) -> bool: """ Compare checksums for all dependencies states backing a given state. :param state: state name :param cache_dir: root cache directory to compare from/to :param pool_dir: root pool directory to compare from/to :param params: configuration parameters """ vm_id = params["object_id"] logging.debug(f"Comparing backing chain for {state}") next_state = state while next_state != "": for image_name in params.objects("images"): image_params = params.object_params(image_name) cache_path = os.path.join( cache_dir, vm_id, image_name, next_state + ".qcow2" ) pool_path = os.path.join( pool_dir, vm_id, image_name, next_state + ".qcow2" ) if not cls.ops.compare(cache_path, pool_path, image_params): logging.warning( f"The image {image_name} has different {next_state} between cache {cache_path} and pool {pool_path}" ) return False if next_state == state and params["object_type"] in ["vms", "nets/vms"]: cache_path = os.path.join(cache_dir, vm_id, next_state + ".state") pool_path = os.path.join(pool_dir, vm_id, next_state + ".state") if not cls.ops.compare(cache_path, pool_path, params): logging.warning( f"The vm {vm_id} has different {next_state} between cache {cache_path} and pool {pool_path}" ) return False # comparison of state chain is not yet complete if the state has backing dependencies next_state = cls.get_dependency(next_state, params) logging.debug( f"The backing chain for {state} is identical between cache {cache_dir} and pool {pool_dir}" ) return True
[docs] @classmethod def transfer_chain( cls, state: str, cache_dir: str, pool_dir: str, params: Params, down: bool = True, ) -> None: """ Repeat pool operation an all dependencies states backing a given state. :param state: state name :param cache_dir: root cache directory to transfer from/to :param pool_dir: root pool directory to transfer from/to :param params: configuration parameters :param down: whether the chain is downloaded or uploaded """ transfer_operation = cls.ops.download if down else cls.ops.upload vm_id = params["object_id"] logging.debug(f"Transferring backing chain for {state}") next_state = state while next_state != "": for image_name in params.objects("images"): image_params = params.object_params(image_name) cache_path = os.path.join( cache_dir, vm_id, image_name, next_state + ".qcow2" ) pool_path = os.path.join( pool_dir, vm_id, image_name, next_state + ".qcow2" ) # if only vm state is not available this would indicate image corruption transfer_operation(cache_path, pool_path, image_params) if next_state == state and params["object_type"] in ["vms", "nets/vms"]: cache_path = os.path.join(cache_dir, vm_id, next_state + ".state") pool_path = os.path.join(pool_dir, vm_id, next_state + ".state") transfer_operation(cache_path, pool_path, params) # transfer of state chain is not yet complete if the state has backing dependencies next_state = cls.get_dependency(next_state, params) logging.debug( f"The backing chain for {state} is fully transferred to cache {cache_dir} from pool {pool_dir}" )
[docs] @classmethod def show(cls, params: Params, object: Any = None) -> list[str]: """ Return a list of available states of a specific type. All arguments match the base class. """ vm_id, vm_name = params["object_id"], params["vms"] state_tag = f"{vm_name}" format = ".state" if params["object_type"] in ["images", "nets/vms/images"]: image_name = params["images"] state_tag += f"/{image_name}" format = ".qcow2" pool_dir = params["show_location"] path = os.path.join(pool_dir, state_tag.replace(vm_name, vm_id)) logging.debug( f"Showing shared {state_tag} states " f"in the pool location {pool_dir}" ) states = cls.ops.list_paths(path, params) states = [p.replace(format, "") for p in states] return states
[docs] @classmethod def get(cls, params: Params, object: Any = None) -> None: """ Get a state transferring its entire chain of dependencies. All arguments match the base class. """ cache_dir = params["swarm_pool"] pool_dir = params["get_location"] vm_name = params["vms"] state_tag = f"{vm_name}" format = "state" if params["object_type"] in ["images", "nets/vms/images"]: image_name = params["images"] state_tag += f"/{image_name}" format = "qcow2" state = params["get_state"] logging.info( f"Downloading shared {state_tag} state {state} " f"from the shared pool {pool_dir} to {cache_dir}" ) cls.transfer_chain(state, cache_dir, pool_dir, params, down=True)
[docs] @classmethod def set(cls, params: Params, object: Any = None) -> None: """ Set a state transferring its entire chain of dependencies. All arguments match the base class. """ cache_dir = params["swarm_pool"] pool_dir = params["set_location"] vm_name = params["vms"] state_tag = f"{vm_name}" if params["object_type"] in ["images", "nets/vms/images"]: image_name = params["images"] state_tag += f"/{image_name}" state = params["set_state"] logging.info( f"Uploading shared {state_tag} state {state} " f"to the shared pool {pool_dir} from {cache_dir}" ) cls.transfer_chain(state, cache_dir, pool_dir, params, down=False)
[docs] @classmethod def unset(cls, params: Params, object: Any = None) -> None: """ Unset a state preserving its entire chain of dependencies. All arguments match the base class and in addition: """ pool_dir = params["unset_location"] vm_id, vm_name = params["object_id"], params["vms"] state_tag = f"{vm_name}" if params["object_type"] in ["images", "nets/vms/images"]: image_name = params["images"] state_tag += f"/{image_name}" state = params["unset_state"] logging.info( f"Removing shared {state_tag} state {state} " f"from the shared pool {pool_dir}" ) for image_name in params.objects("images"): image_params = params.object_params(image_name) pool_path = os.path.join(pool_dir, vm_id, image_name, state + ".qcow2") cls.ops.delete(pool_path, image_params) if params["object_type"] in ["vms", "nets/vms"]: pool_path = os.path.join(pool_dir, vm_id, state + ".state") cls.ops.delete(pool_path, params)
[docs] class RootSourcedStateBackend(StateBackend): """Backend manipulating root states from a possibly shared source.""" transport = QCOW2ImageTransfer
[docs] @classmethod def check_root( cls, params: Params, object: Any = None ) -> list["TestObject"] | bool: """ Check whether a root state or essentially the object exists. All arguments match the base class. """ local_root_exists = cls._check_root(params, object) if params["pool_scope"] == "own": return local_root_exists pool_root_exists = cls.transport.check_root(params, object) # TODO: boot state has to be deprecated and it cannot be handled remotely return local_root_exists or ( pool_root_exists and params["object_type"] not in ["vms", "nets/vms"] )
@classmethod def get_root(cls, params: Params, object: Any = None) -> None: """ Get a root state or essentially due to pre-existence do nothing. All arguments match the base class. """ if "own" not in params["pool_scope"]: cls.transport.get_root(params, object) return elif params["pool_scope"] == "own": cls._get_root(params, object) return local_root_exists = cls._check_root(params, object) pool_root_exists = cls.transport.check_root(params, object) if pool_root_exists: if local_root_exists: cache_valid = True vm_name = params["vms"] for image_name in params.objects("images"): image_params = params.object_params(image_name) image_filename = image_params["image_name"] cache_path = os.path.join( image_params["vms_base_dir"], vm_name, image_filename + ".qcow2" ) pool_path = os.path.join( image_params.get("shared_pool", ""), vm_name, image_filename + ".qcow2", ) if not cls.transport.ops.compare( cache_path, ":" + pool_path, image_params ): logging.warning( f"The image {image_name} is different between cache {cache_path} and pool {pool_path}" ) cache_valid = False break else: cache_valid = False if not cache_valid: cls.transport.get_root(params, object) cls._get_root(params, object) @classmethod def set_root(cls, params: Params, object: Any = None) -> None: """ Set a root state to provide object existence. All arguments match the base class. """ # local and pool root setting are mutually exclusive as we usually want # to set the pool root from an existing local root with some states on it if params["pool_scope"] == "own": cls._set_root(params, object) elif params["pool_scope"] == "shared": local_root_exists = cls._check_root(params, object) if not local_root_exists: raise RuntimeError("Updating state pool requires local root states") cls.transport.set_root(params, object) else: raise RuntimeError(f"Invalid pool scope {params['pool_scope']}")
[docs] @classmethod def unset_root(cls, params: Params, object: Any = None) -> None: """ Unset a root state to prevent object existence. All arguments match the base class and in addition: """ # local and pool root setting are mutually exclusive as we usually want # to set the pool root from an existing local root with some states on it if params["pool_scope"] == "own": cls._unset_root(params, object) elif params["pool_scope"] == "shared": cls.transport.unset_root(params, object) else: raise RuntimeError(f"Invalid pool scope {params['pool_scope']}")
[docs] class SourcedStateBackend(StateBackend): """Backend manipulating states from a possibly shared source.""" transport = QCOW2ImageTransfer @classmethod def get_sources(cls, do: str, params: Params) -> list[str]: """ Get the currently permitted pool and state reuse scope. :param do: state operation to consider the location for :param params: parameters for the current state manipulation """ def proximity(source: str) -> int: score = 0 source_net, source_path = source.split(":") source_params = params.object_params(source_net) if source_net else params if params["nets_gateway"] == source_params["nets_gateway"]: score += 1000 if params["nets_host"] == source_params["nets_host"]: score += 100 if params["swarm_pool"] == source_path: score += 10 else: score += 1 return score return sorted(params.objects(f"{do}_location"), key=proximity, reverse=True) @classmethod def get_source_scope( cls, source_path: str, source_params: Params, own_params: Params ) -> str: """ Get the currently permitted pool and state reuse scope. :param source_path: source identifier inclusive of all scopes :param own_params: parameters for the current state manipulation """ if own_params["nets_gateway"] != source_params["nets_gateway"]: return "cluster" elif own_params["nets_host"] != source_params["nets_host"]: return "swarm" elif own_params["shared_pool"].lstrip(":") == source_path: return "shared" elif own_params["swarm_pool"] == source_path: return "own" else: return "shared"
[docs] @classmethod def show(cls, params: Params, object: Any = None) -> list[str]: """ Return a list of available states of a specific type. All arguments match the base class. """ sources = cls.get_sources("show", params) scopes = params.get_list("pool_scope") if "own" in scopes: cache_states = cls._show(params, object) else: cache_states = [] pool_states = set() for source in sources: logging.debug(f"Next show source to consider is {source}") source_net, source_path = source.split(":") source_params = ( params.object_params(source_net) if source_net else params.copy() ) source_params["show_location"] = source # filtering stage where we may disallow certain data transport source_scope = cls.get_source_scope(source_path, source_params, params) if source_scope == "own" or source_scope not in scopes: continue logging.debug(f"Choosing {source} as the show source to use") mirror_states = cls.transport.show(source_params, object) pool_states = ( set(mirror_states) if not pool_states else pool_states.intersection(mirror_states) ) return list(set(cache_states).union(pool_states))
[docs] @classmethod def get(cls, params: Params, object: Any = None) -> None: """ Get a state from the best possible mirror in a certain restricted scope. All arguments match the base class. """ sources = cls.get_sources("get", params) scopes = params.get_list("pool_scope") # get from best available source (assuming from best to worst until filters permit it, then breaking) for source in sources: logging.debug(f"Next get source to consider is {source}") source_net, source_path = source.split(":") source_params = ( params.object_params(source_net) if source_net else params.copy() ) source_params["get_location"] = source # filtering stage where we may disallow certain data transport source_scope = cls.get_source_scope(source_path, source_params, params) if source_scope == "own" or source_scope not in scopes: continue logging.debug(f"Choosing {source} as the get source to use") source_params["show_location"] = source local_state_exists = params["get_state"] in cls._show(params, object) pool_state_exists = params["get_state"] in cls.transport.show( source_params, object ) if pool_state_exists: if local_state_exists: cache_valid = cls.transport.compare_chain( params["get_state"], params["swarm_pool"], source_params["get_location"], source_params, ) else: cache_valid = False if not cache_valid: cls.transport.get(source_params, object) break if "own" in scopes: cls._get(params, object)
[docs] @classmethod def set(cls, params: Params, object: Any = None) -> None: """ Set a state to all mirrors in a certain restricted scope. All arguments match the base class. """ sources = cls.get_sources("set", params) scopes = params.get_list("pool_scope") if "own" in scopes: cls._set(params, object) else: local_state_exists = params["set_state"] in cls._show(params, object) if not local_state_exists: raise RuntimeError("Updating state pool requires local states") # set from best available source (assuming from best to worst until filters permit it, then breaking) for source in sources: logging.debug(f"Next set source to consider is {source}") source_net, source_path = source.split(":") source_params = ( params.object_params(source_net) if source_net else params.copy() ) source_params["set_location"] = source # filtering stage where we may disallow certain data transport source_scope = cls.get_source_scope(source_path, source_params, params) if source_scope == "own" or source_scope not in scopes: continue logging.debug(f"Choosing {source} as the set source to use") cls.transport.set(source_params, object)
[docs] @classmethod def unset(cls, params: Params, object: Any = None) -> None: """ Unset a state to all mirrors in a certain restricted scope. All arguments match the base class and in addition: """ sources = cls.get_sources("unset", params) scopes = params.get_list("pool_scope") if "own" in scopes: cls._unset(params, object) # unset from best available source (assuming from best to worst until filters permit it, then breaking) for source in sources: logging.debug(f"Next unset source to consider is {source}") source_net, source_path = source.split(":") source_params = ( params.object_params(source_net) if source_net else params.copy() ) source_params["unset_location"] = source # filtering stage where we may disallow certain data transport source_scope = cls.get_source_scope(source_path, source_params, params) if source_scope == "own" or source_scope not in scopes: continue logging.debug(f"Choosing {source} as the unset source to use") cls.transport.unset(source_params, object)
[docs] @contextlib.contextmanager def image_lock(resource_path: str, timeout: int = 300) -> None: """ Wait for a lock to free image for state pool operations. :param resource_path: path to the potentially locked resource :param timeout: timeout to wait before erroring out (default 5 mins) """ if SKIP_LOCKS: yield None return lockfile = resource_path + ".lock" os.makedirs(os.path.dirname(lockfile), exist_ok=True) with open(lockfile, "wb") as fd: for _ in range(timeout): try: fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError as error: # block here but still support a finite timeout if error.errno != errno.EACCES and error.errno != errno.EAGAIN: raise else: break logging.debug("Waiting for image to become available") time.sleep(1) else: raise RuntimeError( f"Waiting to acquire {lockfile} took more than " f"the allowed {timeout} seconds" ) try: yield fd finally: fcntl.lockf(fd, fcntl.LOCK_UN)