From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 5CDE0454F5; Tue, 25 Jun 2024 23:11:42 +0200 (CEST) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id AD8EB42D28; Tue, 25 Jun 2024 23:11:41 +0200 (CEST) Received: from mail-oi1-f227.google.com (mail-oi1-f227.google.com [209.85.167.227]) by mails.dpdk.org (Postfix) with ESMTP id 37D5A427D5 for ; Tue, 25 Jun 2024 23:11:38 +0200 (CEST) Received: by mail-oi1-f227.google.com with SMTP id 5614622812f47-3c9b94951cfso3414330b6e.3 for ; Tue, 25 Jun 2024 14:11:38 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=iol.unh.edu; s=unh-iol; t=1719349897; x=1719954697; darn=dpdk.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=1xjqOxdK5w8QpA4h//UdZlLanuvjtURpTSwnkG5dVH0=; b=Gx+qqXjdzrhsWd3ZrPflWVOw9UsXtEU2xAfkEPg9OjU4rqRYMetHerzcSIv3doVn+l +UvSiiTwBTBHozx5+RCJb3qZzEoaLkqndiL/UfMjxbKlNsCWIQl1YGS2FS/fnRjKEyiv cfn03PJ0iNPfFTOLd1cFuYceVwGsrX+wdmNbE= X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1719349897; x=1719954697; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=1xjqOxdK5w8QpA4h//UdZlLanuvjtURpTSwnkG5dVH0=; b=anWcX21QvVlvk2k8Y1zlqT6ZAe8FGZblhrh3E12+8vOvf1YaD5dqpbu4DC4fQvoign UeYxkGoYA8vo4qhpgX2/qe07E5tJC90jEsz7ejOJtdWtHPe9HvzhY9yskVR0PurplC6U DE54uNtLzexSUaK5sH5sGZMs0CnU+sdpUMC0y02wPhjw1YFj445mFIkha2CRG4IXzPun iFlaqN0xw1kUhDMf09081lnJda6fAgaB1wxeoxcrVetmzAnt5mIpPTPEvFPxt3Qkalap 4sFl+xqXpLzxvDYgb0HO6epFMZXaDEde2AV0RalQUf5xNRl+a0qn+ngcgxLwy7FZLny3 wVVw== X-Gm-Message-State: AOJu0Yyj5bVnyZU9A/xauRwGX0MfuUoFGNwqbVc7XuJeAqTuE/TmRtim 95U0GE0I1PLuUU0UE2R5HLngYngQDXxBCUW/UKYBIIazi70AicoENjKvKptzIVE66D3gKKm+qIX 1rISUXABqa/g3LWaXgmq7kBwRqxAz+uCG X-Google-Smtp-Source: AGHT+IEWN90T990pNXjHt5HAudVAvZaXZ50xA3p/UwK5AkkB/uIJ19hVLfCnABNSVRn89tqpEBMhBQxNjrRg X-Received: by 2002:a05:6808:16a5:b0:3d2:1888:94a7 with SMTP id 5614622812f47-3d545964bf3mr9066569b6e.21.1719349897277; Tue, 25 Jun 2024 14:11:37 -0700 (PDT) Received: from postal.iol.unh.edu (postal.iol.unh.edu. [132.177.123.84]) by smtp-relay.gmail.com with ESMTPS id d75a77b69052e-444f72abb03sm697111cf.27.2024.06.25.14.11.37 (version=TLS1_2 cipher=ECDHE-ECDSA-AES128-GCM-SHA256 bits=128/128); Tue, 25 Jun 2024 14:11:37 -0700 (PDT) X-Relaying-Domain: iol.unh.edu Received: from iol.unh.edu (unknown [IPv6:2606:4100:3880:1257::1083]) by postal.iol.unh.edu (Postfix) with ESMTP id 6D5E5605C373; Tue, 25 Jun 2024 17:11:36 -0400 (EDT) From: jspewock@iol.unh.edu To: thomas@monjalon.net, juraj.linkes@pantheon.tech, Honnappa.Nagarahalli@arm.com, wathsala.vithanage@arm.com, paul.szczepanek@arm.com, npratte@iol.unh.edu, probb@iol.unh.edu, yoan.picchi@foss.arm.com, Luca.Vizzarro@arm.com Cc: dev@dpdk.org, Jeremy Spewock Subject: [PATCH v2 1/1] dts: Remove XML-RPC server for Scapy TG and instead use PythonShell Date: Tue, 25 Jun 2024 17:11:14 -0400 Message-ID: <20240625211114.886-2-jspewock@iol.unh.edu> X-Mailer: git-send-email 2.45.2 In-Reply-To: <20240625211114.886-1-jspewock@iol.unh.edu> References: <20240605175227.7003-1-jspewock@iol.unh.edu> <20240625211114.886-1-jspewock@iol.unh.edu> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org From: Jeremy Spewock Previously all scapy commands were handled using an XML-RPC server that ran on the TGNode. This unnecessarily enforces a minimum Python version of 3.10 on the server that is being used as a traffic generator and complicates the implementation of scapy methods. This patch removes the XML-RPC server completely and instead allows the Scapy TG to extend from the PythonShell to implement the functionality of a traffic generator. This is done by importing the Scapy library in the PythonShell and sending commands directly to the interactive session on the TG Node. Bugzilla ID: 1374 depends-on: series-32242 ("Improve interactive shell output gathering and logging") Signed-off-by: Jeremy Spewock --- Something I would like to note about this patch is the use of **kwargs without proper typing is unintuitive. Something that could be done to solve this could be using Unpack (like is done in other places) to create a more useful type for the parameters that are allowed/expected, but I couldn't find a good way to do this without refacotring the parameters of interactive shells to be denoted using TypedDicts, which I felt would be a little out-of-scope for this patch and what it is trying to do. However, I think using TypedDicts and an Unpack for InteractiveShells could be an interesting approach that might allow us to save some copy-pasting when sub-classing the shell. Instead of restating all of the existing parameters and then adding more to the subclasses, you could simply subclass a parameters dict and add more as needed. Then, the method signatures for interactive shells would just become `**kwargs: Unpack[InteractiveShellParams]` and the signature for multiple inheritance using the interactive shells would be much more useful since you could unpack the same TypedDict of parameters. Additionally, I believe that the TestPmdShell is incompatible with this approach to multiple-inheritance due to how it uses key-word arguments. .../remote_session/interactive_shell.py | 11 +- .../traffic_generator/__init__.py | 2 +- .../testbed_model/traffic_generator/scapy.py | 425 ++++++------------ .../traffic_generator/traffic_generator.py | 15 +- dts/framework/utils.py | 15 + 5 files changed, 177 insertions(+), 291 deletions(-) diff --git a/dts/framework/remote_session/interactive_shell.py b/dts/framework/remote_session/interactive_shell.py index c92fdbfcdf..b723a50b48 100644 --- a/dts/framework/remote_session/interactive_shell.py +++ b/dts/framework/remote_session/interactive_shell.py @@ -29,16 +29,18 @@ from framework.params import Params from framework.settings import SETTINGS from framework.testbed_model.node import Node +from framework.utils import MultiInheritanceBaseClass -class InteractiveShell(ABC): +class InteractiveShell(MultiInheritanceBaseClass, ABC): """The base class for managing interactive shells. This class shouldn't be instantiated directly, but instead be extended. It contains methods for starting interactive shells as well as sending commands to these shells and collecting input until reaching a certain prompt. All interactive applications will use the same SSH connection, but each will create their own channel on that - session. + session. This class also extends from :class:`framework.utils.MultiInheritanceBaseClass` to + allow for both single- and multiple-inheritance. """ _node: Node @@ -74,9 +76,13 @@ def __init__( start_on_init: bool = True, app_params: Params = Params(), name: str | None = None, + **kwargs, ) -> None: """Create an SSH channel during initialization. + Additional key-word arguments can be passed through `kwargs` is needed to fulfill other + constructors in the case of multiple-inheritance. + Args: node: The node on which to run start the interactive shell. privileged: Enables the shell to run as superuser. @@ -100,6 +106,7 @@ def __init__( if start_on_init: self.start_application() + super().__init__(node, **kwargs) def _setup_ssh_channel(self): self._ssh_channel = self._node.main_session.interactive_session.session.invoke_shell() diff --git a/dts/framework/testbed_model/traffic_generator/__init__.py b/dts/framework/testbed_model/traffic_generator/__init__.py index 6dac86a224..a319fa5320 100644 --- a/dts/framework/testbed_model/traffic_generator/__init__.py +++ b/dts/framework/testbed_model/traffic_generator/__init__.py @@ -36,7 +36,7 @@ def create_traffic_generator( """ match traffic_generator_config: case ScapyTrafficGeneratorConfig(): - return ScapyTrafficGenerator(tg_node, traffic_generator_config) + return ScapyTrafficGenerator(tg_node, traffic_generator_config, privileged=True) case _: raise ConfigurationError( f"Unknown traffic generator: {traffic_generator_config.traffic_generator_type}" diff --git a/dts/framework/testbed_model/traffic_generator/scapy.py b/dts/framework/testbed_model/traffic_generator/scapy.py index ca0ea6aca3..b3a8b188a1 100644 --- a/dts/framework/testbed_model/traffic_generator/scapy.py +++ b/dts/framework/testbed_model/traffic_generator/scapy.py @@ -6,309 +6,179 @@ A traffic generator used for functional testing, implemented with `the Scapy library `_. -The traffic generator uses an XML-RPC server to run Scapy on the remote TG node. +The traffic generator uses an interactive shell to run Scapy on the remote TG node. -The traffic generator uses the :mod:`xmlrpc.server` module to run an XML-RPC server -in an interactive remote Python SSH session. The communication with the server is facilitated -with a local server proxy from the :mod:`xmlrpc.client` module. +The traffic generator extends :class:`framework.remote_session.python_shell.PythonShell` to +implement the methods for handling packets by sending commands into the interactive shell. """ -import inspect -import marshal + +import re import time -import types -import xmlrpc.client -from xmlrpc.server import SimpleXMLRPCServer +from typing import ClassVar -import scapy.all # type: ignore[import-untyped] +from scapy.compat import base64_bytes # type: ignore[import-untyped] from scapy.layers.l2 import Ether # type: ignore[import-untyped] from scapy.packet import Packet # type: ignore[import-untyped] from framework.config import OS, ScapyTrafficGeneratorConfig from framework.remote_session.python_shell import PythonShell -from framework.settings import SETTINGS from framework.testbed_model.node import Node from framework.testbed_model.port import Port - -from .capturing_traffic_generator import ( - CapturingTrafficGenerator, +from framework.testbed_model.traffic_generator.capturing_traffic_generator import ( PacketFilteringConfig, - _get_default_capture_name, ) +from framework.utils import REGEX_FOR_BASE64_ENCODING -""" -========= BEGIN RPC FUNCTIONS ========= - -All of the functions in this section are intended to be exported to a python -shell which runs a scapy RPC server. These functions are made available via that -RPC server to the packet generator. To add a new function to the RPC server, -first write the function in this section. Then, if you need any imports, make sure to -add them to SCAPY_RPC_SERVER_IMPORTS as well. After that, add the function to the list -in EXPORTED_FUNCTIONS. Note that kwargs (keyword arguments) do not work via xmlrpc, -so you may need to construct wrapper functions around many scapy types. -""" - -""" -Add the line needed to import something in a normal python environment -as an entry to this array. It will be imported before any functions are -sent to the server. -""" -SCAPY_RPC_SERVER_IMPORTS = [ - "from scapy.all import *", - "import xmlrpc", - "import sys", - "from xmlrpc.server import SimpleXMLRPCServer", - "import marshal", - "import pickle", - "import types", - "import time", -] - - -def scapy_send_packets_and_capture( - xmlrpc_packets: list[xmlrpc.client.Binary], - send_iface: str, - recv_iface: str, - duration: float, - sniff_filter: str, -) -> list[bytes]: - """The RPC function to send and capture packets. - - This function is meant to be executed on the remote TG node via the server proxy. - - Args: - xmlrpc_packets: The packets to send. These need to be converted to - :class:`~xmlrpc.client.Binary` objects before sending to the remote server. - send_iface: The logical name of the egress interface. - recv_iface: The logical name of the ingress interface. - duration: Capture for this amount of time, in seconds. - - Returns: - A list of bytes. Each item in the list represents one packet, which needs - to be converted back upon transfer from the remote node. - """ - scapy_packets = [scapy.all.Packet(packet.data) for packet in xmlrpc_packets] - sniffer = scapy.all.AsyncSniffer( - iface=recv_iface, - store=True, - started_callback=lambda *args: scapy.all.sendp(scapy_packets, iface=send_iface), - filter=sniff_filter, - ) - sniffer.start() - time.sleep(duration) - return [scapy_packet.build() for scapy_packet in sniffer.stop(join=True)] - - -def scapy_send_packets(xmlrpc_packets: list[xmlrpc.client.Binary], send_iface: str) -> None: - """The RPC function to send packets. - - This function is meant to be executed on the remote TG node via the server proxy. - It only sends `xmlrpc_packets`, without capturing them. - - Args: - xmlrpc_packets: The packets to send. These need to be converted to - :class:`~xmlrpc.client.Binary` objects before sending to the remote server. - send_iface: The logical name of the egress interface. - """ - scapy_packets = [scapy.all.Packet(packet.data) for packet in xmlrpc_packets] - scapy.all.sendp(scapy_packets, iface=send_iface, realtime=True, verbose=True) - - -""" -Functions to be exposed by the scapy RPC server. -""" -RPC_FUNCTIONS = [ - scapy_send_packets, - scapy_send_packets_and_capture, -] - -""" -========= END RPC FUNCTIONS ========= -""" - - -class QuittableXMLRPCServer(SimpleXMLRPCServer): - r"""Basic XML-RPC server. +from .capturing_traffic_generator import CapturingTrafficGenerator - The server may be augmented by functions serializable by the :mod:`marshal` module. - Example: - :: +class ScapyTrafficGenerator(PythonShell, CapturingTrafficGenerator): + """Provides access to scapy functions on a traffic generator node. - def hello_world(): - # to be sent to the XML-RPC server - print("Hello World!") + This class extends the base with remote execution of scapy functions. All methods for + processing packets are implemented using an underlying + :class:`framework.remote_session.python_shell.PythonShell` which imports the Scapy library. - # start the XML-RPC server on the remote node - # this is done by starting a Python shell on the remote node - from framework.remote_session import PythonShell - # the example assumes you're already connected to a tg_node - session = tg_node.create_interactive_shell(PythonShell, timeout=5, privileged=True) + Note that the order of inheritance is important for this class. In order to instantiate this + class, the abstract methods of :class:`~.capturing_traffic_generator.CapturingTrafficGenerator` + must be implemented. Since some of these methods are implemented in the underlying interactive + shell, according to Python's Method Resolution Order (MRO), the interactive shell must come + first. + """ - # then importing the modules needed to run the server - # and the modules for any functions later added to the server - session.send_command("import xmlrpc") - session.send_command("from xmlrpc.server import SimpleXMLRPCServer") + _config: ScapyTrafficGeneratorConfig - # sending the source code of this class to the Python shell - from xmlrpc.server import SimpleXMLRPCServer - src = inspect.getsource(QuittableXMLRPCServer) - src = "\n".join([l for l in src.splitlines() if not l.isspace() and l != ""]) - spacing = "\n" * 4 - session.send_command(spacing + src + spacing) + #: Name of sniffer to ensure the same is used in all places + _sniffer_name: ClassVar[str] = "sniffer" + #: Name of variable that points to the list of packets inside the scapy shell. + _send_packet_list_name: ClassVar[str] = "packets" + #: Padding to add to the start of a line for python syntax compliance. + _python_indentation: ClassVar[str] = " " * 4 - # then starting the server with: - command = "s = QuittableXMLRPCServer(('0.0.0.0', {listen_port}));s.serve_forever()" - session.send_command(command, "XMLRPC OK") + def __init__(self, tg_node: Node, config: ScapyTrafficGeneratorConfig, **kwargs): + """Extend the constructor with Scapy TG specifics. - # now the server is running on the remote node and we can add functions to it - # first connect to the server from the execution node - import xmlrpc.client - server_url = f"http://{tg_node.config.hostname}:8000" - rpc_server_proxy = xmlrpc.client.ServerProxy(server_url) + Initializes both the traffic generator and the interactive shell used to handle Scapy + functions. The interactive shell will be started on `tg_node`. The additional key-word + arguments in `kwargs` are used to pass into the constructor for the interactive shell. - # get the function bytes to send - import marshal - function_bytes = marshal.dumps(hello_world.__code__) - rpc_server_proxy.add_rpc_function(hello_world.__name__, function_bytes) + Args: + tg_node: The node where the traffic generator resides. + config: The traffic generator's test run configuration. + """ + assert ( + tg_node.config.os == OS.linux + ), "Linux is the only supported OS for scapy traffic generation" - # now we can execute the function on the server - xmlrpc_binary_recv: xmlrpc.client.Binary = rpc_server_proxy.hello_world() - print(str(xmlrpc_binary_recv)) - """ + super().__init__(tg_node, config=config, **kwargs) - def __init__(self, *args, **kwargs): - """Extend the XML-RPC server initialization. + def start_application(self) -> None: + """Extends :meth:`framework.remote_session.interactive_shell.start_application`. - Args: - args: The positional arguments that will be passed to the superclass's constructor. - kwargs: The keyword arguments that will be passed to the superclass's constructor. - The `allow_none` argument will be set to :data:`True`. + Adds a command that imports everything from the scapy library immediately after starting + the shell for usage in later calls to the methods of this class. """ - kwargs["allow_none"] = True - super().__init__(*args, **kwargs) - self.register_introspection_functions() - self.register_function(self.quit) - self.register_function(self.add_rpc_function) + super().start_application() + self.send_command("from scapy.all import *") - def quit(self) -> None: - """Quit the server.""" - self._BaseServer__shutdown_request = True - return None + def _create_packet_list(self, packets: list[Packet]) -> None: + """Build a list of packets to send later. - def add_rpc_function(self, name: str, function_bytes: xmlrpc.client.Binary) -> None: - """Add a function to the server from the local server proxy. + Sends the string that represents the Python command that was used to create each packet in + `packets` into the underlying Python session. The purpose behind doing this is to create a + list that is identical to `packets` inside the shell. This method should only be called by + methods for sending packets immediately prior to sending. The list of packets will continue + to exist in the scope of the shell until subsequent calls to this method, so failure to + rebuild the list prior to sending packets could lead to undesired "stale" packets to be + sent. Args: - name: The name of the function. - function_bytes: The code of the function. + packets: The list of packets to recreate in the shell. """ - function_code = marshal.loads(function_bytes.data) - function = types.FunctionType(function_code, globals(), name) - self.register_function(function) + self._logger.info("Building a list of packets to send.") + self.send_command( + f"{self._send_packet_list_name} = [{', '.join(map(Packet.command, packets))}]" + ) - def serve_forever(self, poll_interval: float = 0.5) -> None: - """Extend the superclass method with an additional print. + def _send_packets(self, packets: list[Packet], port: Port) -> None: + """Implementation for sending packets without capturing any received traffic. - Once executed in the local server proxy, the print gives us a clear string to expect - when starting the server. The print means this function was executed on the XML-RPC server. + Provides a "fire and forget" method of sending packets. """ - print("XMLRPC OK") - super().serve_forever(poll_interval) - - -class ScapyTrafficGenerator(CapturingTrafficGenerator): - """Provides access to scapy functions via an RPC interface. - - This class extends the base with remote execution of scapy functions. - - Any packets sent to the remote server are first converted to bytes. They are received as - :class:`~xmlrpc.client.Binary` objects on the server side. When the server sends the packets - back, they are also received as :class:`~xmlrpc.client.Binary` objects on the client side, are - converted back to :class:`~scapy.packet.Packet` objects and only then returned from the methods. - - Attributes: - session: The exclusive interactive remote session created by the Scapy - traffic generator where the XML-RPC server runs. - rpc_server_proxy: The object used by clients to execute functions - on the XML-RPC server. - """ - - session: PythonShell - rpc_server_proxy: xmlrpc.client.ServerProxy - _config: ScapyTrafficGeneratorConfig - - def __init__(self, tg_node: Node, config: ScapyTrafficGeneratorConfig): - """Extend the constructor with Scapy TG specifics. - - The traffic generator first starts an XML-RPC on the remote `tg_node`. - Then it populates the server with functions which use the Scapy library - to send/receive traffic: - - * :func:`scapy_send_packets_and_capture` - * :func:`scapy_send_packets` - - To enable verbose logging from the xmlrpc client, use the :option:`--verbose` - command line argument or the :envvar:`DTS_VERBOSE` environment variable. + self._create_packet_list(packets) + send_command = [ + "sendp(", + f"{self._send_packet_list_name},", + f"iface='{port.logical_name}',", + "realtime=True,", + "verbose=True", + ")", + ] + self.send_command(f"\n{self._python_indentation}".join(send_command)) + + def _create_sniffer( + self, packets_to_send: list[Packet], send_port: Port, recv_port: Port, filter_config: str + ) -> None: + """Create an asynchronous sniffer in the shell. + + A list of packets to send is added to the sniffer inside of a callback function so that + they are immediately sent at the time sniffing is started. Args: - tg_node: The node where the traffic generator resides. - config: The traffic generator's test run configuration. + packets_to_send: A list of packets to send when sniffing is started. + send_port: The port to send the packets on when sniffing is started. + recv_port: The port to collect the traffic from. + filter_config: An optional BPF format filter to use when sniffing for packets. Omitted + when set to an empty string. """ - super().__init__(tg_node, config) + self._create_packet_list(packets_to_send) + sniffer_commands = [ + f"{self._sniffer_name} = AsyncSniffer(", + f"iface='{recv_port.logical_name}',", + "store=True,", + "started_callback=lambda *args: sendp(", + ( + f"{self._python_indentation}{self._send_packet_list_name}," + f" iface='{send_port.logical_name}')," + ), + ")", + ] + if filter_config: + sniffer_commands.insert(-1, f"filter='{filter_config}'") + + self.send_command(f"\n{self._python_indentation}".join(sniffer_commands)) + + def _start_and_stop_sniffing(self, duration: float) -> list[Packet]: + """Start asynchronous sniffer, run for a set `duration`, then collect received packets. + + This method expects that you have first created an asynchronous sniffer inside the shell + and will fail if you haven't. Received packets are collected by printing the base64 + encoding of each packet in the shell and then harvesting these encodings using regex to + convert back into packet objects. - assert ( - self._tg_node.config.os == OS.linux - ), "Linux is the only supported OS for scapy traffic generation" - - self.session = PythonShell( - self._tg_node, timeout=5, privileged=True, name="ScapyXMLRPCServer" - ) - - # import libs in remote python console - for import_statement in SCAPY_RPC_SERVER_IMPORTS: - self.session.send_command(import_statement) - - # start the server - xmlrpc_server_listen_port = 8000 - self._start_xmlrpc_server_in_remote_python(xmlrpc_server_listen_port) + Args: + duration: The amount of time in seconds to sniff for received packets. - # connect to the server - server_url = f"http://{self._tg_node.config.hostname}:{xmlrpc_server_listen_port}" - self.rpc_server_proxy = xmlrpc.client.ServerProxy( - server_url, allow_none=True, verbose=SETTINGS.verbose + Returns: + A list of all packets that were received while the sniffer was running. + """ + sniffed_packets_name = "gathered_packets" + self.send_command(f"{self._sniffer_name}.start()") + time.sleep(duration) + self.send_command(f"{sniffed_packets_name} = {self._sniffer_name}.stop(join=True)") + # An extra newline is required here due to the nature of interactive Python shells + packet_strs = self.send_command( + f"for pakt in {sniffed_packets_name}: print(bytes_base64(pakt.build()))\n" ) - - # add functions to the server - for function in RPC_FUNCTIONS: - # A slightly hacky way to move a function to the remote server. - # It is constructed from the name and code on the other side. - # Pickle cannot handle functions, nor can any of the other serialization - # frameworks aside from the libraries used to generate pyc files, which - # are even more messy to work with. - function_bytes = marshal.dumps(function.__code__) - self.rpc_server_proxy.add_rpc_function(function.__name__, function_bytes) - - def _start_xmlrpc_server_in_remote_python(self, listen_port: int) -> None: - # load the source of the function - src = inspect.getsource(QuittableXMLRPCServer) - # Lines with only whitespace break the repl if in the middle of a function - # or class, so strip all lines containing only whitespace - src = "\n".join([line for line in src.splitlines() if not line.isspace() and line != ""]) - - # execute it in the python terminal - self.session.send_command(src + "\n") - self.session.send_command( - f"server = QuittableXMLRPCServer(('0.0.0.0', {listen_port}));server.serve_forever()", - "XMLRPC OK", + # In the string of bytes "b'XXXX'", we only want the contents ("XXXX") + list_of_packets_base64 = re.findall( + f"^b'({REGEX_FOR_BASE64_ENCODING})'", packet_strs, re.MULTILINE ) - - def _send_packets(self, packets: list[Packet], port: Port) -> None: - packets = [packet.build() for packet in packets] - self.rpc_server_proxy.scapy_send_packets(packets, port.logical_name) + return [Ether(base64_bytes(pakt)) for pakt in list_of_packets_base64] def _create_packet_filter(self, filter_config: PacketFilteringConfig) -> str: - """Combines filter settings from `filter_config` into a BPF that scapy can use. + """Combine filter settings from `filter_config` into a BPF that scapy can use. Scapy allows for the use of Berkeley Packet Filters (BPFs) to filter what packets are collected based on various attributes of the packet. @@ -333,32 +203,19 @@ def _send_packets_and_capture( self, packets: list[Packet], send_port: Port, - receive_port: Port, + recv_port: Port, filter_config: PacketFilteringConfig, duration: float, - capture_name: str = _get_default_capture_name(), ) -> list[Packet]: - binary_packets = [packet.build() for packet in packets] - - xmlrpc_packets: list[ - xmlrpc.client.Binary - ] = self.rpc_server_proxy.scapy_send_packets_and_capture( - binary_packets, - send_port.logical_name, - receive_port.logical_name, - duration, - self._create_packet_filter(filter_config), - ) # type: ignore[assignment] - - scapy_packets = [Ether(packet.data) for packet in xmlrpc_packets] - return scapy_packets - - def close(self) -> None: - """Close the traffic generator.""" - try: - self.rpc_server_proxy.quit() - except ConnectionRefusedError: - # Because the python instance closes, we get no RPC response. - # Thus, this error is expected - pass - self.session.close() + """Implementation for sending packets and capturing any received traffic. + + This method first creates an asynchronous sniffer that holds the packets to send, then + starts and stops and starts said sniffer. + + Returns: + A list of packets received after sending `packets`. + """ + self._create_sniffer( + packets, send_port, recv_port, self._create_packet_filter(filter_config) + ) + return self._start_and_stop_sniffing(duration) diff --git a/dts/framework/testbed_model/traffic_generator/traffic_generator.py b/dts/framework/testbed_model/traffic_generator/traffic_generator.py index 4ce1148706..176d5e9065 100644 --- a/dts/framework/testbed_model/traffic_generator/traffic_generator.py +++ b/dts/framework/testbed_model/traffic_generator/traffic_generator.py @@ -16,23 +16,29 @@ from framework.logger import DTSLogger, get_dts_logger from framework.testbed_model.node import Node from framework.testbed_model.port import Port -from framework.utils import get_packet_summaries +from framework.utils import MultiInheritanceBaseClass, get_packet_summaries -class TrafficGenerator(ABC): +class TrafficGenerator(MultiInheritanceBaseClass, ABC): """The base traffic generator. Exposes the common public methods of all traffic generators and defines private methods - that must implement the traffic generation logic in subclasses. + that must implement the traffic generation logic in subclasses. This class also extends from + :class:`framework.utils.MultiInheritanceBaseClass` to allow subclasses the ability to inherit + from multiple classes to fulfil the traffic generating functionality without breaking + single-inheritance. """ _config: TrafficGeneratorConfig _tg_node: Node _logger: DTSLogger - def __init__(self, tg_node: Node, config: TrafficGeneratorConfig): + def __init__(self, tg_node: Node, config: TrafficGeneratorConfig, **kwargs): """Initialize the traffic generator. + Additional key-word arguments can be passed through `kwargs` if needed for fulfilling other + constructors in the case of multiple-inheritance. + Args: tg_node: The traffic generator node where the created traffic generator will be running. config: The traffic generator's test run configuration. @@ -40,6 +46,7 @@ def __init__(self, tg_node: Node, config: TrafficGeneratorConfig): self._config = config self._tg_node = tg_node self._logger = get_dts_logger(f"{self._tg_node.name} {self._config.traffic_generator_type}") + super().__init__(tg_node, **kwargs) def send_packet(self, packet: Packet, port: Port) -> None: """Send `packet` and block until it is fully sent. diff --git a/dts/framework/utils.py b/dts/framework/utils.py index 6b5d5a805f..1370ca1fe5 100644 --- a/dts/framework/utils.py +++ b/dts/framework/utils.py @@ -27,6 +27,7 @@ from .exception import ConfigurationError REGEX_FOR_PCI_ADDRESS: str = "/[0-9a-fA-F]{4}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}.[0-9]{1}/" +REGEX_FOR_BASE64_ENCODING: str = "[-a-zA-Z0-9+\\/]*={0,3}" def expand_range(range_str: str) -> list[int]: @@ -244,3 +245,17 @@ def _delete_tarball(self) -> None: def __fspath__(self) -> str: """The os.PathLike protocol implementation.""" return str(self._tarball_path) + + +class MultiInheritanceBaseClass: + """A base class for classes utilizing multiple-inheritance. + + This class enables it's subclasses to support both single and multiple inheritance by acting as + a stopping point in the tree of calls to the constructors of super-classes. This class is able + to exist at the end of the Method Resolution Order (MRO) so that sub-classes can call super + without repercussion. + """ + + def __init__(self, *args, **kwargs) -> None: + """Call the init method of :class:`object`.""" + super().__init__() -- 2.45.2