DPDK patches and discussions
 help / color / mirror / Atom feed
From: jspewock@iol.unh.edu
To: juraj.linkes@pantheon.tech, Honnappa.Nagarahalli@arm.com,
	paul.szczepanek@arm.com, yoan.picchi@foss.arm.com,
	npratte@iol.unh.edu, probb@iol.unh.edu, thomas@monjalon.net,
	wathsala.vithanage@arm.com, Luca.Vizzarro@arm.com
Cc: dev@dpdk.org, Jeremy Spewock <jspewock@iol.unh.edu>
Subject: [PATCH v1 1/1] dts: Remove XML-RPC server for Scapy TG and instead use PythonShell
Date: Thu, 20 Jun 2024 19:11:59 -0400	[thread overview]
Message-ID: <20240620231158.12008-3-jspewock@iol.unh.edu> (raw)
In-Reply-To: <20240620231158.12008-1-jspewock@iol.unh.edu>

From: Jeremy Spewock <jspewock@iol.unh.edu>

Previously all scapy commands were handled using an XML-RPC server that
ran on the TGNode. This unnecessarily enforces a minimum Python version
of 3.10 on the server that is being used as a traffic generator and
complicates the implementation of scapy methods. This patch removes the
XML-RPC server completely and instead allows the Scapy TG to extend from
the PythonShell to implement the functionality of a traffic generator.
This is done by importing the Scapy library in the PythonShell and
sending commands directly to the interactive session on the TG Node.

Bugzilla ID: 1374
depends-on: series-32242 ("Improve interactive shell output gathering
and logging")

Signed-off-by: Jeremy Spewock <jspewock@iol.unh.edu>
---
 .../testbed_model/traffic_generator/scapy.py  | 422 ++++++------------
 dts/framework/utils.py                        |   1 +
 2 files changed, 139 insertions(+), 284 deletions(-)

diff --git a/dts/framework/testbed_model/traffic_generator/scapy.py b/dts/framework/testbed_model/traffic_generator/scapy.py
index ca0ea6aca3..0fd7e28522 100644
--- a/dts/framework/testbed_model/traffic_generator/scapy.py
+++ b/dts/framework/testbed_model/traffic_generator/scapy.py
@@ -6,309 +6,176 @@
 
 A traffic generator used for functional testing, implemented with
 `the Scapy library <https://scapy.readthedocs.io/en/latest/>`_.
-The traffic generator uses an XML-RPC server to run Scapy on the remote TG node.
+The traffic generator uses an interactive shell to run Scapy on the remote TG node.
 
-The traffic generator uses the :mod:`xmlrpc.server` module to run an XML-RPC server
-in an interactive remote Python SSH session. The communication with the server is facilitated
-with a local server proxy from the :mod:`xmlrpc.client` module.
+The traffic generator extends :class:`framework.remote_session.python_shell.PythonShell` to
+implement the methods for handling packets by sending commands into the interactive shell.
 """
 
-import inspect
-import marshal
+
+import re
 import time
-import types
-import xmlrpc.client
-from xmlrpc.server import SimpleXMLRPCServer
+from typing import ClassVar
 
-import scapy.all  # type: ignore[import-untyped]
+from scapy.compat import base64_bytes  # type: ignore[import-untyped]
 from scapy.layers.l2 import Ether  # type: ignore[import-untyped]
 from scapy.packet import Packet  # type: ignore[import-untyped]
 
 from framework.config import OS, ScapyTrafficGeneratorConfig
 from framework.remote_session.python_shell import PythonShell
-from framework.settings import SETTINGS
 from framework.testbed_model.node import Node
 from framework.testbed_model.port import Port
-
-from .capturing_traffic_generator import (
-    CapturingTrafficGenerator,
+from framework.testbed_model.traffic_generator.capturing_traffic_generator import (
     PacketFilteringConfig,
-    _get_default_capture_name,
 )
+from framework.utils import REGEX_FOR_BASE64_ENCODING
 
-"""
-========= BEGIN RPC FUNCTIONS =========
-
-All of the functions in this section are intended to be exported to a python
-shell which runs a scapy RPC server. These functions are made available via that
-RPC server to the packet generator. To add a new function to the RPC server,
-first write the function in this section. Then, if you need any imports, make sure to
-add them to SCAPY_RPC_SERVER_IMPORTS as well. After that, add the function to the list
-in EXPORTED_FUNCTIONS. Note that kwargs (keyword arguments) do not work via xmlrpc,
-so you may need to construct wrapper functions around many scapy types.
-"""
-
-"""
-Add the line needed to import something in a normal python environment
-as an entry to this array. It will be imported before any functions are
-sent to the server.
-"""
-SCAPY_RPC_SERVER_IMPORTS = [
-    "from scapy.all import *",
-    "import xmlrpc",
-    "import sys",
-    "from xmlrpc.server import SimpleXMLRPCServer",
-    "import marshal",
-    "import pickle",
-    "import types",
-    "import time",
-]
-
-
-def scapy_send_packets_and_capture(
-    xmlrpc_packets: list[xmlrpc.client.Binary],
-    send_iface: str,
-    recv_iface: str,
-    duration: float,
-    sniff_filter: str,
-) -> list[bytes]:
-    """The RPC function to send and capture packets.
-
-    This function is meant to be executed on the remote TG node via the server proxy.
-
-    Args:
-        xmlrpc_packets: The packets to send. These need to be converted to
-            :class:`~xmlrpc.client.Binary` objects before sending to the remote server.
-        send_iface: The logical name of the egress interface.
-        recv_iface: The logical name of the ingress interface.
-        duration: Capture for this amount of time, in seconds.
-
-    Returns:
-        A list of bytes. Each item in the list represents one packet, which needs
-        to be converted back upon transfer from the remote node.
-    """
-    scapy_packets = [scapy.all.Packet(packet.data) for packet in xmlrpc_packets]
-    sniffer = scapy.all.AsyncSniffer(
-        iface=recv_iface,
-        store=True,
-        started_callback=lambda *args: scapy.all.sendp(scapy_packets, iface=send_iface),
-        filter=sniff_filter,
-    )
-    sniffer.start()
-    time.sleep(duration)
-    return [scapy_packet.build() for scapy_packet in sniffer.stop(join=True)]
-
-
-def scapy_send_packets(xmlrpc_packets: list[xmlrpc.client.Binary], send_iface: str) -> None:
-    """The RPC function to send packets.
-
-    This function is meant to be executed on the remote TG node via the server proxy.
-    It only sends `xmlrpc_packets`, without capturing them.
-
-    Args:
-        xmlrpc_packets: The packets to send. These need to be converted to
-            :class:`~xmlrpc.client.Binary` objects before sending to the remote server.
-        send_iface: The logical name of the egress interface.
-    """
-    scapy_packets = [scapy.all.Packet(packet.data) for packet in xmlrpc_packets]
-    scapy.all.sendp(scapy_packets, iface=send_iface, realtime=True, verbose=True)
-
-
-"""
-Functions to be exposed by the scapy RPC server.
-"""
-RPC_FUNCTIONS = [
-    scapy_send_packets,
-    scapy_send_packets_and_capture,
-]
-
-"""
-========= END RPC FUNCTIONS =========
-"""
-
+from .capturing_traffic_generator import CapturingTrafficGenerator
 
-class QuittableXMLRPCServer(SimpleXMLRPCServer):
-    r"""Basic XML-RPC server.
 
-    The server may be augmented by functions serializable by the :mod:`marshal` module.
+class ScapyTrafficGenerator(PythonShell, CapturingTrafficGenerator):
+    """Provides access to scapy functions on a traffic generator node.
 
-    Example:
-        ::
+    This class extends the base with remote execution of scapy functions. All methods for
+    processing packets are implemented using an underlying
+    :class:`framework.remote_session.python_shell.PythonShell` which imports the Scapy library.
 
-            def hello_world():
-                # to be sent to the XML-RPC server
-                print("Hello World!")
-
-            # start the XML-RPC server on the remote node
-            # this is done by starting a Python shell on the remote node
-            from framework.remote_session import PythonShell
-            # the example assumes you're already connected to a tg_node
-            session = tg_node.create_interactive_shell(PythonShell, timeout=5, privileged=True)
-
-            # then importing the modules needed to run the server
-            # and the modules for any functions later added to the server
-            session.send_command("import xmlrpc")
-            session.send_command("from xmlrpc.server import SimpleXMLRPCServer")
-
-            # sending the source code of this class to the Python shell
-            from xmlrpc.server import SimpleXMLRPCServer
-            src = inspect.getsource(QuittableXMLRPCServer)
-            src = "\n".join([l for l in src.splitlines() if not l.isspace() and l != ""])
-            spacing = "\n" * 4
-            session.send_command(spacing + src + spacing)
-
-            # then starting the server with:
-            command = "s = QuittableXMLRPCServer(('0.0.0.0', {listen_port}));s.serve_forever()"
-            session.send_command(command, "XMLRPC OK")
-
-            # now the server is running on the remote node and we can add functions to it
-            # first connect to the server from the execution node
-            import xmlrpc.client
-            server_url = f"http://{tg_node.config.hostname}:8000"
-            rpc_server_proxy = xmlrpc.client.ServerProxy(server_url)
-
-            # get the function bytes to send
-            import marshal
-            function_bytes = marshal.dumps(hello_world.__code__)
-            rpc_server_proxy.add_rpc_function(hello_world.__name__, function_bytes)
-
-            # now we can execute the function on the server
-            xmlrpc_binary_recv: xmlrpc.client.Binary = rpc_server_proxy.hello_world()
-            print(str(xmlrpc_binary_recv))
+    Note that the order of inheritance is important for this class. In order to instantiate this
+    class, the abstract methods of :class:`~.capturing_traffic_generator.CapturingTrafficGenerator`
+    must be implemented. Since some of these methods are implemented in the underlying interactive
+    shell, according to Python's Method Resolution Order (MRO), the interactive shell must come
+    first.
     """
 
-    def __init__(self, *args, **kwargs):
-        """Extend the XML-RPC server initialization.
-
-        Args:
-            args: The positional arguments that will be passed to the superclass's constructor.
-            kwargs: The keyword arguments that will be passed to the superclass's constructor.
-                The `allow_none` argument will be set to :data:`True`.
-        """
-        kwargs["allow_none"] = True
-        super().__init__(*args, **kwargs)
-        self.register_introspection_functions()
-        self.register_function(self.quit)
-        self.register_function(self.add_rpc_function)
-
-    def quit(self) -> None:
-        """Quit the server."""
-        self._BaseServer__shutdown_request = True
-        return None
-
-    def add_rpc_function(self, name: str, function_bytes: xmlrpc.client.Binary) -> None:
-        """Add a function to the server from the local server proxy.
-
-        Args:
-              name: The name of the function.
-              function_bytes: The code of the function.
-        """
-        function_code = marshal.loads(function_bytes.data)
-        function = types.FunctionType(function_code, globals(), name)
-        self.register_function(function)
-
-    def serve_forever(self, poll_interval: float = 0.5) -> None:
-        """Extend the superclass method with an additional print.
-
-        Once executed in the local server proxy, the print gives us a clear string to expect
-        when starting the server. The print means this function was executed on the XML-RPC server.
-        """
-        print("XMLRPC OK")
-        super().serve_forever(poll_interval)
-
-
-class ScapyTrafficGenerator(CapturingTrafficGenerator):
-    """Provides access to scapy functions via an RPC interface.
-
-    This class extends the base with remote execution of scapy functions.
-
-    Any packets sent to the remote server are first converted to bytes. They are received as
-    :class:`~xmlrpc.client.Binary` objects on the server side. When the server sends the packets
-    back, they are also received as :class:`~xmlrpc.client.Binary` objects on the client side, are
-    converted back to :class:`~scapy.packet.Packet` objects and only then returned from the methods.
-
-    Attributes:
-        session: The exclusive interactive remote session created by the Scapy
-            traffic generator where the XML-RPC server runs.
-        rpc_server_proxy: The object used by clients to execute functions
-            on the XML-RPC server.
-    """
-
-    session: PythonShell
-    rpc_server_proxy: xmlrpc.client.ServerProxy
     _config: ScapyTrafficGeneratorConfig
 
+    #: Name of sniffer to ensure the same is used in all places
+    _sniffer_name: ClassVar[str] = "sniffer"
+    #: Name of variable that points to the list of packets inside the scapy shell.
+    _send_packet_list_name: ClassVar[str] = "packets"
+    #: Padding to add to the start of a line for python syntax compliance.
+    _padding: ClassVar[str] = " " * 4
+
     def __init__(self, tg_node: Node, config: ScapyTrafficGeneratorConfig):
         """Extend the constructor with Scapy TG specifics.
 
-        The traffic generator first starts an XML-RPC on the remote `tg_node`.
-        Then it populates the server with functions which use the Scapy library
-        to send/receive traffic:
-
-            * :func:`scapy_send_packets_and_capture`
-            * :func:`scapy_send_packets`
-
-        To enable verbose logging from the xmlrpc client, use the :option:`--verbose`
-        command line argument or the :envvar:`DTS_VERBOSE` environment variable.
+        Initializes both a traffic generator and an interactive shell to handle Scapy functions.
+        The interactive shell will be started on `tg_node`.
 
         Args:
             tg_node: The node where the traffic generator resides.
             config: The traffic generator's test run configuration.
         """
-        super().__init__(tg_node, config)
+        CapturingTrafficGenerator.__init__(self, tg_node, config)
+        PythonShell.__init__(self, tg_node, privileged=True)
 
         assert (
             self._tg_node.config.os == OS.linux
         ), "Linux is the only supported OS for scapy traffic generation"
 
-        self.session = PythonShell(
-            self._tg_node, timeout=5, privileged=True, name="ScapyXMLRPCServer"
-        )
+    def start_application(self) -> None:
+        """Extends :meth:`framework.remote_session.interactive_shell.start_application`.
 
-        # import libs in remote python console
-        for import_statement in SCAPY_RPC_SERVER_IMPORTS:
-            self.session.send_command(import_statement)
+        Adds a command that imports everything from the scapy library immediately after starting
+        the shell for usage in later calls to the methods of this class.
+        """
+        super().start_application()
+        self.send_command("from scapy.all import *")
 
-        # start the server
-        xmlrpc_server_listen_port = 8000
-        self._start_xmlrpc_server_in_remote_python(xmlrpc_server_listen_port)
+    def _create_packet_list(self, packets: list[Packet]) -> None:
+        """Build a list of packets to send later.
 
-        # connect to the server
-        server_url = f"http://{self._tg_node.config.hostname}:{xmlrpc_server_listen_port}"
-        self.rpc_server_proxy = xmlrpc.client.ServerProxy(
-            server_url, allow_none=True, verbose=SETTINGS.verbose
-        )
+        Sends the string that represents the Python command that was used to create each packet in
+        `packets` into the underlying Python session. The purpose behind doing this is to create a
+        list that is identical to `packets` inside the shell. This method should only be called by
+        methods for sending packets immediately prior to sending. The list of packets will continue
+        to exist in the scope of the shell until subsequent calls to this method, so failure to
+        rebuild the list prior to sending packets could lead to undesired "stale" packets to be
+        sent.
 
-        # add functions to the server
-        for function in RPC_FUNCTIONS:
-            # A slightly hacky way to move a function to the remote server.
-            # It is constructed from the name and code on the other side.
-            # Pickle cannot handle functions, nor can any of the other serialization
-            # frameworks aside from the libraries used to generate pyc files, which
-            # are even more messy to work with.
-            function_bytes = marshal.dumps(function.__code__)
-            self.rpc_server_proxy.add_rpc_function(function.__name__, function_bytes)
-
-    def _start_xmlrpc_server_in_remote_python(self, listen_port: int) -> None:
-        # load the source of the function
-        src = inspect.getsource(QuittableXMLRPCServer)
-        # Lines with only whitespace break the repl if in the middle of a function
-        # or class, so strip all lines containing only whitespace
-        src = "\n".join([line for line in src.splitlines() if not line.isspace() and line != ""])
-
-        # execute it in the python terminal
-        self.session.send_command(src + "\n")
-        self.session.send_command(
-            f"server = QuittableXMLRPCServer(('0.0.0.0', {listen_port}));server.serve_forever()",
-            "XMLRPC OK",
+        Args:
+            packets: The list of packets to recreate in the shell.
+        """
+        self._logger.info("Building a list of packets to send.")
+        self.send_command(
+            f"{self._send_packet_list_name} = [{', '.join(map(Packet.command, packets))}]"
         )
 
     def _send_packets(self, packets: list[Packet], port: Port) -> None:
-        packets = [packet.build() for packet in packets]
-        self.rpc_server_proxy.scapy_send_packets(packets, port.logical_name)
+        """Implementation for sending packets without capturing any received traffic.
+
+        Provides a "fire and forget" method of sending packets.
+        """
+        self._create_packet_list(packets)
+        send_command = [
+            "sendp(",
+            f"{self._send_packet_list_name},",
+            f"iface='{port.logical_name}',",
+            "realtime=True,",
+            "verbose=True",
+            ")",
+        ]
+        self.send_command(f"\n{self._padding}".join(send_command))
+
+    def _create_sniffer(
+        self, packets_to_send: list[Packet], send_port: Port, recv_port: Port, filter_config: str
+    ) -> None:
+        """Create an asynchronous sniffer in the shell.
+
+        A list of packets to send is added to the sniffer inside of a callback function so that
+        they are immediately sent at the time sniffing is started.
+
+        Args:
+            packets_to_send: A list of packets to send when sniffing is started.
+            send_port: The port to send the packets on when sniffing is started.
+            recv_port: The port to collect the traffic from.
+            filter_config: An optional BPF format filter to use when sniffing for packets. Omitted
+                when set to an empty string.
+        """
+        self._create_packet_list(packets_to_send)
+        sniffer_commands = [
+            f"{self._sniffer_name} = AsyncSniffer(",
+            f"iface='{recv_port.logical_name}',",
+            "store=True,",
+            "started_callback=lambda *args: sendp(",
+            f"{self._padding}{self._send_packet_list_name}, iface='{send_port.logical_name}'),",
+            ")",
+        ]
+        if filter_config:
+            sniffer_commands.insert(-1, f"filter='{filter_config}'")
+
+        self.send_command(f"\n{self._padding}".join(sniffer_commands))
+
+    def _start_and_stop_sniffing(self, duration: float) -> list[Packet]:
+        """Start asynchronous sniffer, run for a set `duration`, then collect received packets.
+
+        This method expects that you have first created an asynchronous sniffer inside the shell
+        and will fail if you haven't. Received packets are collected by printing the base64
+        encoding of each packet in the shell and then harvesting these encodings using regex to
+        convert back into packet objects.
+
+        Args:
+            duration: The amount of time in seconds to sniff for received packets.
+
+        Returns:
+            A list of all packets that were received while the sniffer was running.
+        """
+        sniffed_packets_name = "gathered_packets"
+        self.send_command(f"{self._sniffer_name}.start()")
+        time.sleep(duration)
+        self.send_command(f"{sniffed_packets_name} = {self._sniffer_name}.stop(join=True)")
+        # An extra newline is required here due to the nature of interactive Python shells
+        packet_strs = self.send_command(
+            f"for pakt in {sniffed_packets_name}: print(bytes_base64(pakt.build()))\n"
+        )
+        # In the string of bytes "b'XXXX'", we only want the contents ("XXXX")
+        list_of_packets_base64 = re.findall(
+            f"^b'({REGEX_FOR_BASE64_ENCODING})'", packet_strs, re.MULTILINE
+        )
+        return [Ether(base64_bytes(pakt)) for pakt in list_of_packets_base64]
 
     def _create_packet_filter(self, filter_config: PacketFilteringConfig) -> str:
-        """Combines filter settings from `filter_config` into a BPF that scapy can use.
+        """Combine filter settings from `filter_config` into a BPF that scapy can use.
 
         Scapy allows for the use of Berkeley Packet Filters (BPFs) to filter what packets are
         collected based on various attributes of the packet.
@@ -333,32 +200,19 @@ def _send_packets_and_capture(
         self,
         packets: list[Packet],
         send_port: Port,
-        receive_port: Port,
+        recv_port: Port,
         filter_config: PacketFilteringConfig,
         duration: float,
-        capture_name: str = _get_default_capture_name(),
     ) -> list[Packet]:
-        binary_packets = [packet.build() for packet in packets]
-
-        xmlrpc_packets: list[
-            xmlrpc.client.Binary
-        ] = self.rpc_server_proxy.scapy_send_packets_and_capture(
-            binary_packets,
-            send_port.logical_name,
-            receive_port.logical_name,
-            duration,
-            self._create_packet_filter(filter_config),
-        )  # type: ignore[assignment]
-
-        scapy_packets = [Ether(packet.data) for packet in xmlrpc_packets]
-        return scapy_packets
-
-    def close(self) -> None:
-        """Close the traffic generator."""
-        try:
-            self.rpc_server_proxy.quit()
-        except ConnectionRefusedError:
-            # Because the python instance closes, we get no RPC response.
-            # Thus, this error is expected
-            pass
-        self.session.close()
+        """Implementation for sending packets and capturing any received traffic.
+
+        This method first creates an asynchronous sniffer that holds the packets to send, then
+        starts and stops and starts said sniffer.
+
+        Returns:
+            A list of packets received after sending `packets`.
+        """
+        self._create_sniffer(
+            packets, send_port, recv_port, self._create_packet_filter(filter_config)
+        )
+        return self._start_and_stop_sniffing(duration)
diff --git a/dts/framework/utils.py b/dts/framework/utils.py
index 6b5d5a805f..aa213c188f 100644
--- a/dts/framework/utils.py
+++ b/dts/framework/utils.py
@@ -27,6 +27,7 @@
 from .exception import ConfigurationError
 
 REGEX_FOR_PCI_ADDRESS: str = "/[0-9a-fA-F]{4}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}.[0-9]{1}/"
+REGEX_FOR_BASE64_ENCODING: str = "[-a-zA-Z0-9+\\/]*={0,3}"
 
 
 def expand_range(range_str: str) -> list[int]:
-- 
2.45.2


  reply	other threads:[~2024-06-20 23:13 UTC|newest]

Thread overview: 13+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-06-05 17:52 [RFC PATCH v1 0/2] dts: replace XML-RPC server jspewock
2024-06-05 17:52 ` [RFC PATCH v1 1/2] dts: Add interactive shell for managing Scapy jspewock
2024-06-11 11:12   ` Juraj Linkeš
2024-06-17 19:45     ` Jeremy Spewock
2024-06-05 17:52 ` [RFC PATCH v1 2/2] dts: Remove XML-RPC server for Scapy TG and instead us ScapyShell jspewock
2024-06-11 10:46   ` Juraj Linkeš
2024-06-17 19:57     ` Jeremy Spewock
2024-06-20 23:11 ` [PATCH v1 0/1] dts: replace XML-RPC server jspewock
2024-06-20 23:11   ` jspewock [this message]
2024-06-21 14:14     ` [PATCH v1 1/1] dts: Remove XML-RPC server for Scapy TG and instead use PythonShell Juraj Linkeš
2024-06-24 20:54       ` Jeremy Spewock
2024-06-25 21:11 ` [PATCH v2 0/1] dts: replace XML-RPC server jspewock
2024-06-25 21:11   ` [PATCH v2 1/1] dts: Remove XML-RPC server for Scapy TG and instead use PythonShell jspewock

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240620231158.12008-3-jspewock@iol.unh.edu \
    --to=jspewock@iol.unh.edu \
    --cc=Honnappa.Nagarahalli@arm.com \
    --cc=Luca.Vizzarro@arm.com \
    --cc=dev@dpdk.org \
    --cc=juraj.linkes@pantheon.tech \
    --cc=npratte@iol.unh.edu \
    --cc=paul.szczepanek@arm.com \
    --cc=probb@iol.unh.edu \
    --cc=thomas@monjalon.net \
    --cc=wathsala.vithanage@arm.com \
    --cc=yoan.picchi@foss.arm.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).