DPDK patches and discussions
 help / color / mirror / Atom feed
From: "Juraj Linkeš" <juraj.linkes@pantheon.tech>
To: thomas@monjalon.net, Honnappa.Nagarahalli@arm.com,
	lijuan.tu@intel.com, jspewock@iol.unh.edu, probb@iol.unh.edu
Cc: dev@dpdk.org, "Juraj Linkeš" <juraj.linkes@pantheon.tech>
Subject: [PATCH v2 3/6] dts: traffic generator abstractions
Date: Mon, 17 Jul 2023 13:07:06 +0200	[thread overview]
Message-ID: <20230717110709.39220-4-juraj.linkes@pantheon.tech> (raw)
In-Reply-To: <20230717110709.39220-1-juraj.linkes@pantheon.tech>

There are traffic abstractions for all traffic generators and for
traffic generators that can capture (not just count) packets.

There also related abstractions, such as TGNode where the traffic
generators reside and some related code.

Signed-off-by: Juraj Linkeš <juraj.linkes@pantheon.tech>
---
 doc/guides/tools/dts.rst                      |  31 ++++
 dts/framework/dts.py                          |  61 ++++----
 dts/framework/remote_session/linux_session.py |  78 ++++++++++
 dts/framework/remote_session/os_session.py    |  15 ++
 dts/framework/test_suite.py                   |   4 +-
 dts/framework/testbed_model/__init__.py       |   1 +
 .../capturing_traffic_generator.py            | 135 ++++++++++++++++++
 dts/framework/testbed_model/hw/port.py        |  60 ++++++++
 dts/framework/testbed_model/node.py           |  15 ++
 dts/framework/testbed_model/scapy.py          |  74 ++++++++++
 dts/framework/testbed_model/tg_node.py        |  99 +++++++++++++
 .../testbed_model/traffic_generator.py        |  72 ++++++++++
 dts/framework/utils.py                        |  13 ++
 13 files changed, 632 insertions(+), 26 deletions(-)
 create mode 100644 dts/framework/testbed_model/capturing_traffic_generator.py
 create mode 100644 dts/framework/testbed_model/hw/port.py
 create mode 100644 dts/framework/testbed_model/scapy.py
 create mode 100644 dts/framework/testbed_model/tg_node.py
 create mode 100644 dts/framework/testbed_model/traffic_generator.py

diff --git a/doc/guides/tools/dts.rst b/doc/guides/tools/dts.rst
index c7b31623e4..2f97d1df6e 100644
--- a/doc/guides/tools/dts.rst
+++ b/doc/guides/tools/dts.rst
@@ -153,6 +153,37 @@ There are two areas that need to be set up on a System Under Test:
 
       sudo usermod -aG sudo <sut_user>
 
+
+Setting up Traffic Generator Node
+---------------------------------
+
+These need to be set up on a Traffic Generator Node:
+
+#. **Traffic generator dependencies**
+
+   The traffic generator running on the traffic generator node must be installed beforehand.
+   For Scapy traffic generator, only a few Python libraries need to be installed:
+
+   .. code-block:: console
+
+      sudo apt install python3-pip
+      sudo pip install --upgrade pip
+      sudo pip install scapy==2.5.0
+
+#. **Hardware dependencies**
+
+   The traffic generators, like DPDK, need a proper driver and firmware.
+   The Scapy traffic generator doesn't have strict requirements - the drivers that come
+   with most OS distributions will be satisfactory.
+
+
+#. **User with administrator privileges**
+
+   Similarly to the System Under Test, traffic generators need administrator privileges
+   to be able to use the devices.
+   Refer to the `System Under Test section <sut_admin_user>` for details.
+
+
 Running DTS
 -----------
 
diff --git a/dts/framework/dts.py b/dts/framework/dts.py
index 372bc72787..265ed7fd5b 100644
--- a/dts/framework/dts.py
+++ b/dts/framework/dts.py
@@ -15,7 +15,7 @@
 from .logger import DTSLOG, getLogger
 from .test_result import BuildTargetResult, DTSResult, ExecutionResult, Result
 from .test_suite import get_test_suites
-from .testbed_model import SutNode
+from .testbed_model import SutNode, TGNode
 from .utils import check_dts_python_version
 
 dts_logger: DTSLOG = getLogger("DTSRunner")
@@ -33,29 +33,31 @@ def run_all() -> None:
     # check the python version of the server that run dts
     check_dts_python_version()
 
-    nodes: dict[str, SutNode] = {}
+    sut_nodes: dict[str, SutNode] = {}
+    tg_nodes: dict[str, TGNode] = {}
     try:
         # for all Execution sections
         for execution in CONFIGURATION.executions:
-            sut_node = None
-            if execution.system_under_test_node.name in nodes:
-                # a Node with the same name already exists
-                sut_node = nodes[execution.system_under_test_node.name]
-            else:
-                # the SUT has not been initialized yet
-                try:
+            sut_node = sut_nodes.get(execution.system_under_test_node.name)
+            tg_node = tg_nodes.get(execution.traffic_generator_node.name)
+
+            try:
+                if not sut_node:
                     sut_node = SutNode(execution.system_under_test_node)
-                    result.update_setup(Result.PASS)
-                except Exception as e:
-                    dts_logger.exception(
-                        f"Connection to node {execution.system_under_test_node} failed."
-                    )
-                    result.update_setup(Result.FAIL, e)
-                else:
-                    nodes[sut_node.name] = sut_node
-
-            if sut_node:
-                _run_execution(sut_node, execution, result)
+                    sut_nodes[sut_node.name] = sut_node
+                if not tg_node:
+                    tg_node = TGNode(execution.traffic_generator_node)
+                    tg_nodes[tg_node.name] = tg_node
+                result.update_setup(Result.PASS)
+            except Exception as e:
+                failed_node = execution.system_under_test_node.name
+                if sut_node:
+                    failed_node = execution.traffic_generator_node.name
+                dts_logger.exception(f"Creation of node {failed_node} failed.")
+                result.update_setup(Result.FAIL, e)
+
+            else:
+                _run_execution(sut_node, tg_node, execution, result)
 
     except Exception as e:
         dts_logger.exception("An unexpected error has occurred.")
@@ -64,7 +66,7 @@ def run_all() -> None:
 
     finally:
         try:
-            for node in nodes.values():
+            for node in (sut_nodes | tg_nodes).values():
                 node.close()
             result.update_teardown(Result.PASS)
         except Exception as e:
@@ -81,7 +83,10 @@ def run_all() -> None:
 
 
 def _run_execution(
-    sut_node: SutNode, execution: ExecutionConfiguration, result: DTSResult
+    sut_node: SutNode,
+    tg_node: TGNode,
+    execution: ExecutionConfiguration,
+    result: DTSResult,
 ) -> None:
     """
     Run the given execution. This involves running the execution setup as well as
@@ -101,7 +106,9 @@ def _run_execution(
 
     else:
         for build_target in execution.build_targets:
-            _run_build_target(sut_node, build_target, execution, execution_result)
+            _run_build_target(
+                sut_node, tg_node, build_target, execution, execution_result
+            )
 
     finally:
         try:
@@ -114,6 +121,7 @@ def _run_execution(
 
 def _run_build_target(
     sut_node: SutNode,
+    tg_node: TGNode,
     build_target: BuildTargetConfiguration,
     execution: ExecutionConfiguration,
     execution_result: ExecutionResult,
@@ -134,7 +142,7 @@ def _run_build_target(
         build_target_result.update_setup(Result.FAIL, e)
 
     else:
-        _run_all_suites(sut_node, execution, build_target_result)
+        _run_all_suites(sut_node, tg_node, execution, build_target_result)
 
     finally:
         try:
@@ -147,6 +155,7 @@ def _run_build_target(
 
 def _run_all_suites(
     sut_node: SutNode,
+    tg_node: TGNode,
     execution: ExecutionConfiguration,
     build_target_result: BuildTargetResult,
 ) -> None:
@@ -161,7 +170,7 @@ def _run_all_suites(
     for test_suite_config in execution.test_suites:
         try:
             _run_single_suite(
-                sut_node, execution, build_target_result, test_suite_config
+                sut_node, tg_node, execution, build_target_result, test_suite_config
             )
         except BlockingTestSuiteError as e:
             dts_logger.exception(
@@ -177,6 +186,7 @@ def _run_all_suites(
 
 def _run_single_suite(
     sut_node: SutNode,
+    tg_node: TGNode,
     execution: ExecutionConfiguration,
     build_target_result: BuildTargetResult,
     test_suite_config: TestSuiteConfig,
@@ -205,6 +215,7 @@ def _run_single_suite(
         for test_suite_class in test_suite_classes:
             test_suite = test_suite_class(
                 sut_node,
+                tg_node,
                 test_suite_config.test_cases,
                 execution.func,
                 build_target_result,
diff --git a/dts/framework/remote_session/linux_session.py b/dts/framework/remote_session/linux_session.py
index f13f399121..284c74795d 100644
--- a/dts/framework/remote_session/linux_session.py
+++ b/dts/framework/remote_session/linux_session.py
@@ -2,13 +2,47 @@
 # Copyright(c) 2023 PANTHEON.tech s.r.o.
 # Copyright(c) 2023 University of New Hampshire
 
+import json
+from typing import TypedDict
+
+from typing_extensions import NotRequired
+
 from framework.exception import RemoteCommandExecutionError
 from framework.testbed_model import LogicalCore
+from framework.testbed_model.hw.port import Port
 from framework.utils import expand_range
 
 from .posix_session import PosixSession
 
 
+class LshwConfigurationOutput(TypedDict):
+    link: str
+
+
+class LshwOutput(TypedDict):
+    """
+    A model of the relevant information from json lshw output, e.g.:
+    {
+    ...
+    "businfo" : "pci@0000:08:00.0",
+    "logicalname" : "enp8s0",
+    "version" : "00",
+    "serial" : "52:54:00:59:e1:ac",
+    ...
+    "configuration" : {
+      ...
+      "link" : "yes",
+      ...
+    },
+    ...
+    """
+
+    businfo: str
+    logicalname: NotRequired[str]
+    serial: NotRequired[str]
+    configuration: LshwConfigurationOutput
+
+
 class LinuxSession(PosixSession):
     """
     The implementation of non-Posix compliant parts of Linux remote sessions.
@@ -102,3 +136,47 @@ def _configure_huge_pages(
         self.send_command(
             f"echo {amount} | tee {hugepage_config_path}", privileged=True
         )
+
+    def update_ports(self, ports: list[Port]) -> None:
+        self._logger.debug("Gathering port info.")
+        for port in ports:
+            assert (
+                port.node == self.name
+            ), "Attempted to gather port info on the wrong node"
+
+        port_info_list = self._get_lshw_info()
+        for port in ports:
+            for port_info in port_info_list:
+                if f"pci@{port.pci}" == port_info.get("businfo"):
+                    self._update_port_attr(
+                        port, port_info.get("logicalname"), "logical_name"
+                    )
+                    self._update_port_attr(port, port_info.get("serial"), "mac_address")
+                    port_info_list.remove(port_info)
+                    break
+            else:
+                self._logger.warning(f"No port at pci address {port.pci} found.")
+
+    def _get_lshw_info(self) -> list[LshwOutput]:
+        output = self.send_command("lshw -quiet -json -C network", verify=True)
+        return json.loads(output.stdout)
+
+    def _update_port_attr(
+        self, port: Port, attr_value: str | None, attr_name: str
+    ) -> None:
+        if attr_value:
+            setattr(port, attr_name, attr_value)
+            self._logger.debug(
+                f"Found '{attr_name}' of port {port.pci}: '{attr_value}'."
+            )
+        else:
+            self._logger.warning(
+                f"Attempted to get '{attr_name}' of port {port.pci}, "
+                f"but it doesn't exist."
+            )
+
+    def configure_port_state(self, port: Port, enable: bool) -> None:
+        state = "up" if enable else "down"
+        self.send_command(
+            f"ip link set dev {port.logical_name} {state}", privileged=True
+        )
diff --git a/dts/framework/remote_session/os_session.py b/dts/framework/remote_session/os_session.py
index cc13b02f16..633d06eb5d 100644
--- a/dts/framework/remote_session/os_session.py
+++ b/dts/framework/remote_session/os_session.py
@@ -12,6 +12,7 @@
 from framework.remote_session.remote import InteractiveShell, TestPmdShell
 from framework.settings import SETTINGS
 from framework.testbed_model import LogicalCore
+from framework.testbed_model.hw.port import Port
 from framework.utils import MesonArgs
 
 from .remote import (
@@ -255,3 +256,17 @@ def get_node_info(self) -> NodeInfo:
         """
         Collect information about the node
         """
+
+    @abstractmethod
+    def update_ports(self, ports: list[Port]) -> None:
+        """
+        Get additional information about ports:
+            Logical name (e.g. enp7s0) if applicable
+            Mac address
+        """
+
+    @abstractmethod
+    def configure_port_state(self, port: Port, enable: bool) -> None:
+        """
+        Enable/disable port.
+        """
diff --git a/dts/framework/test_suite.py b/dts/framework/test_suite.py
index de94c9332d..056460dd05 100644
--- a/dts/framework/test_suite.py
+++ b/dts/framework/test_suite.py
@@ -20,7 +20,7 @@
 from .logger import DTSLOG, getLogger
 from .settings import SETTINGS
 from .test_result import BuildTargetResult, Result, TestCaseResult, TestSuiteResult
-from .testbed_model import SutNode
+from .testbed_model import SutNode, TGNode
 
 
 class TestSuite(object):
@@ -51,11 +51,13 @@ class TestSuite(object):
     def __init__(
         self,
         sut_node: SutNode,
+        tg_node: TGNode,
         test_cases: list[str],
         func: bool,
         build_target_result: BuildTargetResult,
     ):
         self.sut_node = sut_node
+        self.tg_node = tg_node
         self._logger = getLogger(self.__class__.__name__)
         self._test_cases_to_run = test_cases
         self._test_cases_to_run.extend(SETTINGS.test_cases)
diff --git a/dts/framework/testbed_model/__init__.py b/dts/framework/testbed_model/__init__.py
index f54a947051..5cbb859e47 100644
--- a/dts/framework/testbed_model/__init__.py
+++ b/dts/framework/testbed_model/__init__.py
@@ -20,3 +20,4 @@
 )
 from .node import Node
 from .sut_node import SutNode
+from .tg_node import TGNode
diff --git a/dts/framework/testbed_model/capturing_traffic_generator.py b/dts/framework/testbed_model/capturing_traffic_generator.py
new file mode 100644
index 0000000000..1130d87f1e
--- /dev/null
+++ b/dts/framework/testbed_model/capturing_traffic_generator.py
@@ -0,0 +1,135 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 University of New Hampshire
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+"""Traffic generator that can capture packets.
+
+In functional testing, we need to interrogate received packets to check their validity.
+Here we define the interface common to all
+traffic generators capable of capturing traffic.
+"""
+
+import uuid
+from abc import abstractmethod
+
+import scapy.utils  # type: ignore[import]
+from scapy.packet import Packet  # type: ignore[import]
+
+from framework.settings import SETTINGS
+from framework.utils import get_packet_summaries
+
+from .hw.port import Port
+from .traffic_generator import TrafficGenerator
+
+
+def _get_default_capture_name() -> str:
+    """
+    This is the function used for the default implementation of capture names.
+    """
+    return str(uuid.uuid4())
+
+
+class CapturingTrafficGenerator(TrafficGenerator):
+    """
+    A mixin interface which enables a packet generator to declare that it can capture
+    packets and return them to the user.
+
+    The methods of capturing traffic generators obey the following workflow:
+        1. send packets
+        2. capture packets
+        3. write the capture to a .pcap file
+        4. return the received packets
+    """
+
+    @property
+    def is_capturing(self) -> bool:
+        return True
+
+    def send_packet_and_capture(
+        self,
+        packet: Packet,
+        send_port: Port,
+        receive_port: Port,
+        duration: float,
+        capture_name: str = _get_default_capture_name(),
+    ) -> list[Packet]:
+        """Send a packet, return received traffic.
+
+        Send a packet on the send_port and then return all traffic captured
+        on the receive_port for the given duration. Also record the captured traffic
+        in a pcap file.
+
+        Args:
+            packet: The packet to send.
+            send_port: The egress port on the TG node.
+            receive_port: The ingress port in the TG node.
+            duration: Capture traffic for this amount of time after sending the packet.
+            capture_name: The name of the .pcap file where to store the capture.
+
+        Returns:
+             A list of received packets. May be empty if no packets are captured.
+        """
+        return self.send_packets_and_capture(
+            [packet], send_port, receive_port, duration, capture_name
+        )
+
+    def send_packets_and_capture(
+        self,
+        packets: list[Packet],
+        send_port: Port,
+        receive_port: Port,
+        duration: float,
+        capture_name: str = _get_default_capture_name(),
+    ) -> list[Packet]:
+        """Send packets, return received traffic.
+
+        Send packets on the send_port and then return all traffic captured
+        on the receive_port for the given duration. Also record the captured traffic
+        in a pcap file.
+
+        Args:
+            packets: The packets to send.
+            send_port: The egress port on the TG node.
+            receive_port: The ingress port in the TG node.
+            duration: Capture traffic for this amount of time after sending the packets.
+            capture_name: The name of the .pcap file where to store the capture.
+
+        Returns:
+             A list of received packets. May be empty if no packets are captured.
+        """
+        self._logger.debug(get_packet_summaries(packets))
+        self._logger.debug(
+            f"Sending packet on {send_port.logical_name}, "
+            f"receiving on {receive_port.logical_name}."
+        )
+        received_packets = self._send_packets_and_capture(
+            packets,
+            send_port,
+            receive_port,
+            duration,
+        )
+
+        self._logger.debug(
+            f"Received packets: {get_packet_summaries(received_packets)}"
+        )
+        self._write_capture_from_packets(capture_name, received_packets)
+        return received_packets
+
+    @abstractmethod
+    def _send_packets_and_capture(
+        self,
+        packets: list[Packet],
+        send_port: Port,
+        receive_port: Port,
+        duration: float,
+    ) -> list[Packet]:
+        """
+        The extended classes must implement this method which
+        sends packets on send_port and receives packets on the receive_port
+        for the specified duration. It must be able to handle no received packets.
+        """
+
+    def _write_capture_from_packets(self, capture_name: str, packets: list[Packet]):
+        file_name = f"{SETTINGS.output_dir}/{capture_name}.pcap"
+        self._logger.debug(f"Writing packets to {file_name}.")
+        scapy.utils.wrpcap(file_name, packets)
diff --git a/dts/framework/testbed_model/hw/port.py b/dts/framework/testbed_model/hw/port.py
new file mode 100644
index 0000000000..680c29bfe3
--- /dev/null
+++ b/dts/framework/testbed_model/hw/port.py
@@ -0,0 +1,60 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 University of New Hampshire
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+from dataclasses import dataclass
+
+from framework.config import PortConfig
+
+
+@dataclass(slots=True, frozen=True)
+class PortIdentifier:
+    node: str
+    pci: str
+
+
+@dataclass(slots=True)
+class Port:
+    """
+    identifier: The PCI address of the port on a node.
+
+    os_driver: The driver used by this port when the OS is controlling it.
+        Example: i40e
+    os_driver_for_dpdk: The driver the device must be bound to for DPDK to use it,
+        Example: vfio-pci.
+
+    Note: os_driver and os_driver_for_dpdk may be the same thing.
+        Example: mlx5_core
+
+    peer: The identifier of a port this port is connected with.
+    """
+
+    identifier: PortIdentifier
+    os_driver: str
+    os_driver_for_dpdk: str
+    peer: PortIdentifier
+    mac_address: str = ""
+    logical_name: str = ""
+
+    def __init__(self, node_name: str, config: PortConfig):
+        self.identifier = PortIdentifier(
+            node=node_name,
+            pci=config.pci,
+        )
+        self.os_driver = config.os_driver
+        self.os_driver_for_dpdk = config.os_driver_for_dpdk
+        self.peer = PortIdentifier(node=config.peer_node, pci=config.peer_pci)
+
+    @property
+    def node(self) -> str:
+        return self.identifier.node
+
+    @property
+    def pci(self) -> str:
+        return self.identifier.pci
+
+
+@dataclass(slots=True, frozen=True)
+class PortLink:
+    sut_port: Port
+    tg_port: Port
diff --git a/dts/framework/testbed_model/node.py b/dts/framework/testbed_model/node.py
index d2d55d904e..e09931cedf 100644
--- a/dts/framework/testbed_model/node.py
+++ b/dts/framework/testbed_model/node.py
@@ -25,6 +25,7 @@
     LogicalCoreListFilter,
     lcore_filter,
 )
+from .hw.port import Port
 
 
 class Node(object):
@@ -38,6 +39,7 @@ class Node(object):
     config: NodeConfiguration
     name: str
     lcores: list[LogicalCore]
+    ports: list[Port]
     _logger: DTSLOG
     _other_sessions: list[OSSession]
     _execution_config: ExecutionConfiguration
@@ -57,6 +59,13 @@ def __init__(self, node_config: NodeConfiguration):
         ).filter()
 
         self._other_sessions = []
+        self._init_ports()
+
+    def _init_ports(self) -> None:
+        self.ports = [Port(self.name, port_config) for port_config in self.config.ports]
+        self.main_session.update_ports(self.ports)
+        for port in self.ports:
+            self.configure_port_state(port)
 
     def set_up_execution(self, execution_config: ExecutionConfiguration) -> None:
         """
@@ -168,6 +177,12 @@ def _setup_hugepages(self):
                 self.config.hugepages.amount, self.config.hugepages.force_first_numa
             )
 
+    def configure_port_state(self, port: Port, enable: bool = True) -> None:
+        """
+        Enable/disable port.
+        """
+        self.main_session.configure_port_state(port, enable)
+
     def close(self) -> None:
         """
         Close all connections and free other resources.
diff --git a/dts/framework/testbed_model/scapy.py b/dts/framework/testbed_model/scapy.py
new file mode 100644
index 0000000000..1a23dc9fa3
--- /dev/null
+++ b/dts/framework/testbed_model/scapy.py
@@ -0,0 +1,74 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 University of New Hampshire
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+"""Scapy traffic generator.
+
+Traffic generator used for functional testing, implemented using the Scapy library.
+The traffic generator uses an XML-RPC server to run Scapy on the remote TG node.
+
+The XML-RPC server runs in an interactive remote SSH session running Python console,
+where we start the server. The communication with the server is facilitated with
+a local server proxy.
+"""
+
+from scapy.packet import Packet  # type: ignore[import]
+
+from framework.config import OS, ScapyTrafficGeneratorConfig
+from framework.logger import getLogger
+
+from .capturing_traffic_generator import (
+    CapturingTrafficGenerator,
+    _get_default_capture_name,
+)
+from .hw.port import Port
+from .tg_node import TGNode
+
+
+class ScapyTrafficGenerator(CapturingTrafficGenerator):
+    """Provides access to scapy functions via an RPC interface.
+
+    The traffic generator first starts an XML-RPC on the remote TG node.
+    Then it populates the server with functions which use the Scapy library
+    to send/receive traffic.
+
+    Any packets sent to the remote server are first converted to bytes.
+    They are received as xmlrpc.client.Binary objects on the server side.
+    When the server sends the packets back, they are also received as
+    xmlrpc.client.Binary object on the client side, are converted back to Scapy
+    packets and only then returned from the methods.
+
+    Arguments:
+        tg_node: The node where the traffic generator resides.
+        config: The user configuration of the traffic generator.
+    """
+
+    _config: ScapyTrafficGeneratorConfig
+    _tg_node: TGNode
+
+    def __init__(self, tg_node: TGNode, config: ScapyTrafficGeneratorConfig):
+        self._config = config
+        self._tg_node = tg_node
+        self._logger = getLogger(
+            f"{self._tg_node.name} {self._config.traffic_generator_type}"
+        )
+
+        assert (
+            self._tg_node.config.os == OS.linux
+        ), "Linux is the only supported OS for scapy traffic generation"
+
+    def _send_packets(self, packets: list[Packet], port: Port) -> None:
+        raise NotImplementedError()
+
+    def _send_packets_and_capture(
+        self,
+        packets: list[Packet],
+        send_port: Port,
+        receive_port: Port,
+        duration: float,
+        capture_name: str = _get_default_capture_name(),
+    ) -> list[Packet]:
+        raise NotImplementedError()
+
+    def close(self):
+        pass
diff --git a/dts/framework/testbed_model/tg_node.py b/dts/framework/testbed_model/tg_node.py
new file mode 100644
index 0000000000..27025cfa31
--- /dev/null
+++ b/dts/framework/testbed_model/tg_node.py
@@ -0,0 +1,99 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2010-2014 Intel Corporation
+# Copyright(c) 2022 University of New Hampshire
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+"""Traffic generator node.
+
+This is the node where the traffic generator resides.
+The distinction between a node and a traffic generator is as follows:
+A node is a host that DTS connects to. It could be a baremetal server,
+a VM or a container.
+A traffic generator is software running on the node.
+A traffic generator node is a node running a traffic generator.
+A node can be a traffic generator node as well as system under test node.
+"""
+
+from scapy.packet import Packet  # type: ignore[import]
+
+from framework.config import (
+    ScapyTrafficGeneratorConfig,
+    TGNodeConfiguration,
+    TrafficGeneratorType,
+)
+from framework.exception import ConfigurationError
+
+from .capturing_traffic_generator import CapturingTrafficGenerator
+from .hw.port import Port
+from .node import Node
+
+
+class TGNode(Node):
+    """Manage connections to a node with a traffic generator.
+
+    Apart from basic node management capabilities, the Traffic Generator node has
+    specialized methods for handling the traffic generator running on it.
+
+    Arguments:
+        node_config: The user configuration of the traffic generator node.
+
+    Attributes:
+        traffic_generator: The traffic generator running on the node.
+    """
+
+    traffic_generator: CapturingTrafficGenerator
+
+    def __init__(self, node_config: TGNodeConfiguration):
+        super(TGNode, self).__init__(node_config)
+        self.traffic_generator = create_traffic_generator(
+            self, node_config.traffic_generator
+        )
+        self._logger.info(f"Created node: {self.name}")
+
+    def send_packet_and_capture(
+        self,
+        packet: Packet,
+        send_port: Port,
+        receive_port: Port,
+        duration: float = 1,
+    ) -> list[Packet]:
+        """Send a packet, return received traffic.
+
+        Send a packet on the send_port and then return all traffic captured
+        on the receive_port for the given duration. Also record the captured traffic
+        in a pcap file.
+
+        Args:
+            packet: The packet to send.
+            send_port: The egress port on the TG node.
+            receive_port: The ingress port in the TG node.
+            duration: Capture traffic for this amount of time after sending the packet.
+
+        Returns:
+             A list of received packets. May be empty if no packets are captured.
+        """
+        return self.traffic_generator.send_packet_and_capture(
+            packet, send_port, receive_port, duration
+        )
+
+    def close(self) -> None:
+        """Free all resources used by the node"""
+        self.traffic_generator.close()
+        super(TGNode, self).close()
+
+
+def create_traffic_generator(
+    tg_node: TGNode, traffic_generator_config: ScapyTrafficGeneratorConfig
+) -> CapturingTrafficGenerator:
+    """A factory function for creating traffic generator object from user config."""
+
+    from .scapy import ScapyTrafficGenerator
+
+    match traffic_generator_config.traffic_generator_type:
+        case TrafficGeneratorType.SCAPY:
+            return ScapyTrafficGenerator(tg_node, traffic_generator_config)
+        case _:
+            raise ConfigurationError(
+                "Unknown traffic generator: "
+                f"{traffic_generator_config.traffic_generator_type}"
+            )
diff --git a/dts/framework/testbed_model/traffic_generator.py b/dts/framework/testbed_model/traffic_generator.py
new file mode 100644
index 0000000000..28c35d3ce4
--- /dev/null
+++ b/dts/framework/testbed_model/traffic_generator.py
@@ -0,0 +1,72 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 University of New Hampshire
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+"""The base traffic generator.
+
+These traffic generators can't capture received traffic,
+only count the number of received packets.
+"""
+
+from abc import ABC, abstractmethod
+
+from scapy.packet import Packet  # type: ignore[import]
+
+from framework.logger import DTSLOG
+from framework.utils import get_packet_summaries
+
+from .hw.port import Port
+
+
+class TrafficGenerator(ABC):
+    """The base traffic generator.
+
+    Defines the few basic methods that each traffic generator must implement.
+    """
+
+    _logger: DTSLOG
+
+    def send_packet(self, packet: Packet, port: Port) -> None:
+        """Send a packet and block until it is fully sent.
+
+        What fully sent means is defined by the traffic generator.
+
+        Args:
+            packet: The packet to send.
+            port: The egress port on the TG node.
+        """
+        self.send_packets([packet], port)
+
+    def send_packets(self, packets: list[Packet], port: Port) -> None:
+        """Send packets and block until they are fully sent.
+
+        What fully sent means is defined by the traffic generator.
+
+        Args:
+            packets: The packets to send.
+            port: The egress port on the TG node.
+        """
+        self._logger.info(f"Sending packet{'s' if len(packets) > 1 else ''}.")
+        self._logger.debug(get_packet_summaries(packets))
+        self._send_packets(packets, port)
+
+    @abstractmethod
+    def _send_packets(self, packets: list[Packet], port: Port) -> None:
+        """
+        The extended classes must implement this method which
+        sends packets on send_port. The method should block until all packets
+        are fully sent.
+        """
+
+    @property
+    def is_capturing(self) -> bool:
+        """Whether this traffic generator can capture traffic.
+
+        Returns:
+            True if the traffic generator can capture traffic, False otherwise.
+        """
+        return False
+
+    @abstractmethod
+    def close(self) -> None:
+        """Free all resources used by the traffic generator."""
diff --git a/dts/framework/utils.py b/dts/framework/utils.py
index 60abe46edf..d27c2c5b5f 100644
--- a/dts/framework/utils.py
+++ b/dts/framework/utils.py
@@ -4,6 +4,7 @@
 # Copyright(c) 2022-2023 University of New Hampshire
 
 import atexit
+import json
 import os
 import subprocess
 import sys
@@ -11,6 +12,8 @@
 from pathlib import Path
 from subprocess import SubprocessError
 
+from scapy.packet import Packet  # type: ignore[import]
+
 from .exception import ConfigurationError
 
 
@@ -64,6 +67,16 @@ def expand_range(range_str: str) -> list[int]:
     return expanded_range
 
 
+def get_packet_summaries(packets: list[Packet]):
+    if len(packets) == 1:
+        packet_summaries = packets[0].summary()
+    else:
+        packet_summaries = json.dumps(
+            list(map(lambda pkt: pkt.summary(), packets)), indent=4
+        )
+    return f"Packet contents: \n{packet_summaries}"
+
+
 def RED(text: str) -> str:
     return f"\u001B[31;1m{str(text)}\u001B[0m"
 
-- 
2.34.1


  parent reply	other threads:[~2023-07-17 11:07 UTC|newest]

Thread overview: 29+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-04-20  9:31 [RFC PATCH v1 0/5] dts: add tg abstractions and scapy Juraj Linkeš
2023-04-20  9:31 ` [RFC PATCH v1 1/5] dts: add scapy dependency Juraj Linkeš
2023-04-20  9:31 ` [RFC PATCH v1 2/5] dts: add traffic generator config Juraj Linkeš
2023-04-20  9:31 ` [RFC PATCH v1 3/5] dts: traffic generator abstractions Juraj Linkeš
2023-04-20  9:31 ` [RFC PATCH v1 4/5] dts: scapy traffic generator implementation Juraj Linkeš
2023-04-20  9:31 ` [RFC PATCH v1 5/5] dts: add traffic generator node to dts runner Juraj Linkeš
2023-05-03 18:02   ` Jeremy Spewock
2023-07-17 11:07 ` [PATCH v2 0/6] dts: tg abstractions and scapy tg Juraj Linkeš
2023-07-17 11:07   ` [PATCH v2 1/6] dts: add scapy dependency Juraj Linkeš
2023-07-17 11:07   ` [PATCH v2 2/6] dts: add traffic generator config Juraj Linkeš
2023-07-18 15:55     ` Jeremy Spewock
2023-07-19 12:57       ` Juraj Linkeš
2023-07-19 13:18         ` Jeremy Spewock
2023-07-17 11:07   ` Juraj Linkeš [this message]
2023-07-18 19:56     ` [PATCH v2 3/6] dts: traffic generator abstractions Jeremy Spewock
2023-07-19 13:23       ` Juraj Linkeš
2023-07-17 11:07   ` [PATCH v2 4/6] dts: add python remote interactive shell Juraj Linkeš
2023-07-17 11:07   ` [PATCH v2 5/6] dts: scapy traffic generator implementation Juraj Linkeš
2023-07-17 11:07   ` [PATCH v2 6/6] dts: add basic UDP test case Juraj Linkeš
2023-07-18 21:04   ` [PATCH v2 0/6] dts: tg abstractions and scapy tg Jeremy Spewock
2023-07-19 14:12   ` [PATCH v3 " Juraj Linkeš
2023-07-19 14:12     ` [PATCH v3 1/6] dts: add scapy dependency Juraj Linkeš
2023-07-19 14:12     ` [PATCH v3 2/6] dts: add traffic generator config Juraj Linkeš
2023-07-19 14:13     ` [PATCH v3 3/6] dts: traffic generator abstractions Juraj Linkeš
2023-07-19 14:13     ` [PATCH v3 4/6] dts: add python remote interactive shell Juraj Linkeš
2023-07-19 14:13     ` [PATCH v3 5/6] dts: scapy traffic generator implementation Juraj Linkeš
2023-07-19 14:13     ` [PATCH v3 6/6] dts: add basic UDP test case Juraj Linkeš
2023-07-20 15:21       ` Jeremy Spewock
2023-07-24 14:23     ` [PATCH v3 0/6] dts: tg abstractions and scapy tg Thomas Monjalon

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20230717110709.39220-4-juraj.linkes@pantheon.tech \
    --to=juraj.linkes@pantheon.tech \
    --cc=Honnappa.Nagarahalli@arm.com \
    --cc=dev@dpdk.org \
    --cc=jspewock@iol.unh.edu \
    --cc=lijuan.tu@intel.com \
    --cc=probb@iol.unh.edu \
    --cc=thomas@monjalon.net \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).