# Copyright 2013-2020 Intranet AG and contributors
#
# avocado-i2n is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# avocado-i2n is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with avocado-i2n. If not, see <http://www.gnu.org/licenses/>.
"""
Tunnel object for the vmnet utility.
SUMMARY
------------------------------------------------------
Copyright: Intra2net AG
CONTENTS
------------------------------------------------------
This class wraps up the utilities for managing tunnels.
The parameters parsed at each vm are used as overwrite dictionary and
and missing ones are generated for the full configuration of the tunnel.
INTERFACE
------------------------------------------------------
"""
from typing import Any
import logging as log
from virttest.utils_params import Params
from .netconfig import VMNetconfig
from .interface import VMInterface
from .node import VMNode
logging = log.getLogger("avocado.job." + __name__)
[docs]
class VMTunnel(object):
"""Get the tunnel class."""
"""Structural properties"""
def left(self, value: VMNode = None) -> VMNode | None:
"""Get a reference to the left node of the tunnel."""
if value is not None:
self._left = value
return None
else:
return self._left
left = property(fget=left, fset=left)
def right(self, value: VMNode = None) -> VMNode | None:
"""Get a reference to the right node of the tunnel."""
if value is not None:
self._right = value
return None
else:
return self._right
right = property(fget=right, fset=right)
def left_iface(self, value: VMInterface = None) -> VMInterface | None:
"""Get a reference to the left interface of the tunnel."""
if value is not None:
self._left_iface = value
return None
else:
return self._left_iface
left_iface = property(fget=left_iface, fset=left_iface)
def right_iface(self, value: VMInterface = None) -> VMInterface | None:
"""Get a reference to the right interface of the tunnel."""
if value is not None:
self._right_iface = value
return None
else:
return self._right_iface
right_iface = property(fget=right_iface, fset=right_iface)
def left_net(self, value: VMNetconfig = None) -> VMNetconfig | None:
"""Get a reference to the left netconfig of the tunnel."""
if value is not None:
self._left_net = value
return None
else:
return self._left_net
left_net = property(fget=left_net, fset=left_net)
def right_net(self, value: VMNetconfig = None) -> VMNetconfig | None:
"""Get a reference to the right netconfig of the tunnel."""
if value is not None:
self._right_net = value
return None
else:
return self._right_net
right_net = property(fget=right_net, fset=right_net)
@property
def left_params(self) -> Params:
"""Tunnel generated left side parameters."""
return self.left.params.object_params(self.name)
@property
def right_params(self) -> Params:
"""Tunnel generated right side parameters."""
return self.right.params.object_params(self.name)
@property
def params(self) -> Params:
"""Tunnel generated test parameters."""
return self._params
"""Connection properties"""
def name(self, value: str = None) -> str | None:
"""Name for the connection."""
if value is not None:
self._name = value
return None
else:
return self._name
name = property(fget=name, fset=name)
def __init__(
self,
name: str,
node1: VMNode,
node2: VMNode,
local1: dict[str, str] = None,
remote1: dict[str, str] = None,
peer1: dict[str, str] = None,
auth: dict[str, str] = None,
) -> None:
"""
Construct the full set of required tunnel parameters for a given tunnel left configuration.
That are not already defined in the parameters of the two vms (left 'node1' with right 'node2').
:param name: name of the tunnel
:param node1: left side node of the tunnel
:param node2: right side node of the tunnel
:param local1: left local configuration with at least one key 'type' with value 'nic'
for left-site (could be used for site-to-site or site-to-point tunnels)
or 'internetip' for left-point (for point-to-site or point-to-point tunnels)
or 'custom' for left-site or left-point that is not a LAN (e.g. for tunnel
forwarding of another tunneled remote net)
:param remote1: left remote configuration with at least one key 'type' with value 'custom'
for right-site (could be used for site-to-site or point-to-site tunnels) or
'externalip' for right-point (for site-to-point or point-to-point tunnels)
or 'modeconfig' for special right-point (using a ModeConfig connection for
a right road warrior)
:param peer1: left peer configuration with at least one key 'type' with value 'ip' for no
NAT along the tunnel (the peer having a public IP) or 'dynip' for a road
warrior right end point (the peer is behind NAT and its IP is changing)
:param auth: authentication configuration with at least one key 'type' with value in
"pubkey", "psk", "none" and the rest of the keys providing type details
:raises: :py:class:`ValueError` if some of the supplied configuration is not valid
The right side `local2`, `remote2`, `peer2` configuration is determined from the left side.
If a PSK (pre-shared secret) authentication type is specified, the relevant additional
options are `psk` for the secret word, `left_id` and `right_id` for the identification
type to be used on each side (either IP for empty id or any user-defined id).
"""
logging.info(
"Preparing tunnel parameters for each of %s and %s", node1.name, node2.name
)
if local1 is None:
local1 = {"type": "nic", "nic": "lan_nic"}
if remote1 is None:
remote1 = {"type": "custom", "nic": "lan_nic"}
if peer1 is None:
peer1 = {"type": "ip", "nic": "internet_nic"}
local2, remote2, peer2 = self._get_peer_variant(local1, remote1, peer1)
params = Params()
# main parameters
params["vpnconn_%s_%s" % (name, node1.name)] = name
params["vpnconn_%s_%s" % (name, node2.name)] = name
params["vpn_side_%s_%s" % (name, node1.name)] = "left"
params["vpn_side_%s_%s" % (name, node2.name)] = "right"
params["vpnconn_lan_type_%s_%s" % (name, node1.name)] = local1["type"].upper()
params["vpnconn_lan_type_%s_%s" % (name, node2.name)] = local2["type"].upper()
params["vpnconn_remote_type_%s_%s" % (name, node1.name)] = remote1[
"type"
].upper()
params["vpnconn_remote_type_%s_%s" % (name, node2.name)] = remote2[
"type"
].upper()
if local1["type"] == "nic":
netconfig1 = node1.interfaces[
node1.params[local1.get("nic", "lan_nic")]
].netconfig
params["vpnconn_lan_net_%s_%s" % (name, node1.name)] = netconfig1.net_ip
params["vpnconn_lan_netmask_%s_%s" % (name, node1.name)] = (
netconfig1.netmask
)
params["vpnconn_remote_net_%s_%s" % (name, node2.name)] = netconfig1.net_ip
params["vpnconn_remote_netmask_%s_%s" % (name, node2.name)] = (
netconfig1.netmask
)
elif local1["type"] == "internetip":
netconfig1 = None
elif local1["type"] == "custom":
# "custom" configuration does no guarantee pre-existing netconfig like "nic"
# so create an address/netmask only netconfig to match against for compatibility
netconfig1 = VMNetconfig()
netconfig1.net_ip = local1["lnet"]
netconfig1.netmask = local1["lmask"]
params["vpnconn_lan_net_%s_%s" % (name, node1.name)] = local1["lnet"]
params["vpnconn_lan_netmask_%s_%s" % (name, node1.name)] = local1["lmask"]
else:
raise ValueError(
"Invalid choice of left local type '%s', must be one of"
" 'nic', 'internetip', 'custom'" % local1["type"]
)
if remote1["type"] == "custom":
if local1["type"] == "custom":
netconfig2 = VMNetconfig()
netconfig2.net_ip = local1["rnet"]
netconfig2.netmask = local1["rmask"]
params["vpnconn_lan_net_%s_%s" % (name, node2.name)] = local1["rnet"]
params["vpnconn_lan_netmask_%s_%s" % (name, node2.name)] = local1[
"rmask"
]
else:
netconfig2 = node2.interfaces[
node2.params[remote1.get("nic", "lan_nic")]
].netconfig
params["vpnconn_lan_net_%s_%s" % (name, node2.name)] = netconfig2.net_ip
params["vpnconn_lan_netmask_%s_%s" % (name, node2.name)] = (
netconfig2.netmask
)
params["vpnconn_remote_net_%s_%s" % (name, node1.name)] = netconfig2.net_ip
params["vpnconn_remote_netmask_%s_%s" % (name, node1.name)] = (
netconfig2.netmask
)
elif remote1["type"] == "externalip":
netconfig2 = None
elif remote1["type"] == "modeconfig":
netconfig2 = None
params["vpnconn_remote_modeconfig_ip_%s_%s" % (name, node1.name)] = remote1[
"modeconfig_ip"
]
else:
raise ValueError(
"Invalid choice of left remote type '%s', must be one of"
" 'custom', 'externalip', or 'modeconfig'" % remote1["type"]
)
# road warrior parameters
params["vpnconn_peer_type_%s_%s" % (name, node1.name)] = peer1["type"].upper()
if peer1["type"] == "ip":
interface2 = node2.interfaces[
node2.params[peer1.get("nic", "internet_nic")]
]
params["vpnconn_peer_ip_%s_%s" % (name, node1.name)] = interface2.ip
params["vpnconn_activation_%s_%s" % (name, node1.name)] = "ALWAYS"
elif peer1["type"] == "dynip":
interface2 = node2.interfaces[
node2.params[peer1.get("nic", "internet_nic")]
]
params["vpnconn_activation_%s_%s" % (name, node1.name)] = "PASSIVE"
else:
raise ValueError(
"Invalid choice of left peer type '%s', must be one of"
" 'ip', 'dynip'" % peer1["type"]
)
params["vpnconn_peer_type_%s_%s" % (name, node2.name)] = peer2["type"].upper()
interface1 = node1.interfaces[node1.params[peer2.get("nic", "internet_nic")]]
params["vpnconn_peer_ip_%s_%s" % (name, node2.name)] = interface1.ip
params["vpnconn_activation_%s_%s" % (name, node2.name)] = "ALWAYS"
# authentication parameters
if auth is None:
params["vpnconn_key_type_%s" % name] = "NONE"
elif auth["type"] == "pubkey":
params["vpnconn_key_type_%s" % name] = "PUBLIC"
elif auth["type"] == "psk":
params["vpnconn_key_type_%s" % name] = "PSK"
psk = auth["psk"]
left_id = auth["left_id"]
left_id_type = "IP" if left_id == "" else "CUSTOM"
right_id = auth["right_id"]
right_id_type = "IP" if right_id == "" else "CUSTOM"
params["vpnconn_psk_%s" % name] = psk
params["vpnconn_psk_foreign_id_%s_%s" % (name, node1.name)] = right_id
params["vpnconn_psk_foreign_id_type_%s_%s" % (name, node1.name)] = (
right_id_type
)
params["vpnconn_psk_own_id_%s_%s" % (name, node1.name)] = left_id
params["vpnconn_psk_own_id_type_%s_%s" % (name, node1.name)] = left_id_type
params["vpnconn_psk_foreign_id_%s_%s" % (name, node2.name)] = left_id
params["vpnconn_psk_foreign_id_type_%s_%s" % (name, node2.name)] = (
left_id_type
)
params["vpnconn_psk_own_id_%s_%s" % (name, node2.name)] = right_id
params["vpnconn_psk_own_id_type_%s_%s" % (name, node2.name)] = right_id_type
else:
raise ValueError(
"Invalid choice of authentication type '%s', must be one of"
" 'pubkey', 'psk', or 'none'" % auth["type"]
)
# overwrite the base vpn parameters with other already defined tunnel parameters
params1 = params.object_params(node1.name)
params2 = params.object_params(node2.name)
params1.update(node1.params)
params2.update(node2.params)
node1.params.clear()
node2.params.clear()
node1.params.update(params1)
node2.params.update(params2)
self._params = params
self._left = node1
self._left_iface = interface1
self._left_net = netconfig1
self._right = node2
self._right_iface = interface2
self._right_net = netconfig2
self._name = name
logging.info("Produced tunnel from parameters is %s", self)
def __repr__(self) -> str:
"""Provide a representation of the object."""
left_net = "none" if self.left_net is None else self.left_net.net_ip
right_net = "none" if self.right_net is None else self.right_net.net_ip
tunnel_tuple = (
self.name,
self.left.name,
self.left_iface.ip,
self.right.name,
self.right_iface.ip,
left_net,
right_net,
)
return (
"[tunnel] name='%s', left='%s(%s)', right='%s(%s)', lnet='%s', rnet='%s'"
% tunnel_tuple
)
[docs]
def connects_nodes(self, node1: VMNode, node2: VMNode) -> bool:
"""
Check whether a tunnel connects two vm nodes.
The vm nodes can be connected directly as tunnel
peers or indirectly in tunnel connected LANs (netconfigs).
:param node1: one side vm of the tunnel
:param node2: another side vm of the tunnel
:returns: whether the tunnel connects the two nodes
"""
def on_the_left(node: VMNode) -> bool:
if node == self.left:
logging.debug(
"The node %s is the left end point of the tunnel %s", node, self
)
return True
if self.left_net is not None and node.check_interface(
self.left_net.has_interface
):
logging.debug(
"The node %s is in the left end site of the tunnel %s", node, self
)
return True
if self.left_params["vpnconn_lan_type"] == "CUSTOM":
if node.check_interface(self.left_net.can_add_interface):
logging.debug(
"The node %s is forwarded from the left end of the tunnel %s",
node,
self,
)
return True
return False
def on_the_right(node: VMNode) -> bool:
if node == self.right:
logging.debug(
"The node %s is the right end point of the tunnel %s", node, self
)
return True
if self.right_net is not None and node.check_interface(
self.right_net.has_interface
):
logging.debug(
"The node %s is in the right end site of the tunnel %s", node, self
)
return True
if self.right_params["vpnconn_lan_type"] == "CUSTOM":
if node.check_interface(self.right_net.can_add_interface):
logging.debug(
"The node %s is forwarded from the right end of the tunnel %s",
node,
self,
)
return True
return False
logging.debug(
"Checking if the tunnel %s connects %s and %s", self, node1, node2
)
if on_the_left(node1) and on_the_right(node2):
return True
elif on_the_right(node1) and on_the_left(node2):
return True
else:
return False
def _get_peer_variant(
self,
left_local: dict[str, str],
left_remote: dict[str, str],
left_peer: dict[str, str],
) -> tuple[dict[str, str], dict[str, str], dict[str, str]]:
"""
Convert triple of parameters according to ipsec rules.
Returns a triple of parameters for for the peer. Return
default parameter where the left variant has used a more
"exotic" value.
"""
right_local = {"type": "nic"}
right_remote = {"type": "custom"}
right_peer = {"type": "ip"}
if left_local["type"] == "nic":
right_remote["type"] = "custom"
right_remote["nic"] = left_local["nic"]
elif left_local["type"] == "internetip":
right_remote["type"] = "externalip"
if left_remote["type"] == "custom":
if left_local["type"] == "custom":
right_local["type"] = "custom"
else:
right_local["type"] = "nic"
right_local["nic"] = left_remote["nic"]
elif left_remote["type"] == "externalip":
right_local["type"] = "internetip"
if left_peer["type"] == "dynip":
right_peer["type"] = "ip"
right_peer["nic"] = left_peer["nic"]
# road warriors are always assumed to be on the left side
elif left_peer["type"] == "ip":
right_peer["type"] = "ip"
right_peer["nic"] = left_peer["nic"]
return right_local, right_remote, right_peer
[docs]
def import_key_params(self, from_node: VMNode, to_node: VMNode) -> None:
"""
Generate own key configuration at the source vm and foreign key configuration at the destination vm.
:param from_node: source node to get the key from (and generate own key
configuration on it containing all relevant key information)
:param to_node: destination node to import the key to (and generate foreign key
configuration on it containing all relevant key information)
"""
assert (
from_node != to_node
), "Cannot import key parameters from a vm node to itself"
if from_node not in [self.left, self.right]:
raise ValueError(
"The keys are not imported from any of the tunnel end points %s and %s and "
"%s is not one of them"
% (self.left.name, self.right.name, from_node.name)
)
if to_node not in [self.left, self.right]:
raise ValueError(
"The keys are not imported to any of the tunnel end points %s and %s and "
"%s is not one of them"
% (self.left.name, self.right.name, to_node.name)
)
from_vm, to_vm = from_node.platform, to_node.platform
own_key_params = Params({"vpnconn_own_key_name": "sample-key"})
from_vm.params.update(own_key_params)
def get_imported_key_params(from_params: Params) -> Params:
to_params = from_params.copy()
to_params["vpnconn_foreign_key_name"] = from_params["vpnconn_own_key_name"]
del to_params["vpnconn_own_key_name"]
return to_params
foreign_key_params = get_imported_key_params(own_key_params)
to_vm.params.update(foreign_key_params)
raise NotImplementedError(
"Public key authentication is not implemented for any guest OS"
)