DPDK patches and discussions
 help / color / mirror / Atom feed
From: Jeremy Spewock <jspewock@iol.unh.edu>
To: "Juraj Linkeš" <juraj.linkes@pantheon.tech>
Cc: thomas@monjalon.net, Honnappa.Nagarahalli@arm.com,
	lijuan.tu@intel.com,  probb@iol.unh.edu, dev@dpdk.org
Subject: Re: [PATCH v2 3/6] dts: traffic generator abstractions
Date: Tue, 18 Jul 2023 15:56:22 -0400	[thread overview]
Message-ID: <CAAA20UREMOFN=Q6UFLzD4+X5X2eWbLHKV5atmTKK5O6XdxdX=g@mail.gmail.com> (raw)
In-Reply-To: <20230717110709.39220-4-juraj.linkes@pantheon.tech>

[-- Attachment #1: Type: text/plain, Size: 36952 bytes --]

Hey Juraj,

Just a couple of comments below, but very minor stuff. Just a few docstring
that I commented on and one question about the factory for traffic
generators that I was wondering what you thought about. More below:

On Mon, Jul 17, 2023 at 7:07 AM Juraj Linkeš <juraj.linkes@pantheon.tech>
wrote:

> There are traffic abstractions for all traffic generators and for
> traffic generators that can capture (not just count) packets.
>
> There also related abstractions, such as TGNode where the traffic
> generators reside and some related code.
>
> Signed-off-by: Juraj Linkeš <juraj.linkes@pantheon.tech>
> ---
>  doc/guides/tools/dts.rst                      |  31 ++++
>  dts/framework/dts.py                          |  61 ++++----
>  dts/framework/remote_session/linux_session.py |  78 ++++++++++
>  dts/framework/remote_session/os_session.py    |  15 ++
>  dts/framework/test_suite.py                   |   4 +-
>  dts/framework/testbed_model/__init__.py       |   1 +
>  .../capturing_traffic_generator.py            | 135 ++++++++++++++++++
>  dts/framework/testbed_model/hw/port.py        |  60 ++++++++
>  dts/framework/testbed_model/node.py           |  15 ++
>  dts/framework/testbed_model/scapy.py          |  74 ++++++++++
>  dts/framework/testbed_model/tg_node.py        |  99 +++++++++++++
>  .../testbed_model/traffic_generator.py        |  72 ++++++++++
>  dts/framework/utils.py                        |  13 ++
>  13 files changed, 632 insertions(+), 26 deletions(-)
>  create mode 100644
> dts/framework/testbed_model/capturing_traffic_generator.py
>  create mode 100644 dts/framework/testbed_model/hw/port.py
>  create mode 100644 dts/framework/testbed_model/scapy.py
>  create mode 100644 dts/framework/testbed_model/tg_node.py
>  create mode 100644 dts/framework/testbed_model/traffic_generator.py
>
> diff --git a/doc/guides/tools/dts.rst b/doc/guides/tools/dts.rst
> index c7b31623e4..2f97d1df6e 100644
> --- a/doc/guides/tools/dts.rst
> +++ b/doc/guides/tools/dts.rst
> @@ -153,6 +153,37 @@ There are two areas that need to be set up on a
> System Under Test:
>
>        sudo usermod -aG sudo <sut_user>
>
> +
> +Setting up Traffic Generator Node
> +---------------------------------
> +
> +These need to be set up on a Traffic Generator Node:
> +
> +#. **Traffic generator dependencies**
> +
> +   The traffic generator running on the traffic generator node must be
> installed beforehand.
> +   For Scapy traffic generator, only a few Python libraries need to be
> installed:
> +
> +   .. code-block:: console
> +
> +      sudo apt install python3-pip
> +      sudo pip install --upgrade pip
> +      sudo pip install scapy==2.5.0
> +
> +#. **Hardware dependencies**
> +
> +   The traffic generators, like DPDK, need a proper driver and firmware.
> +   The Scapy traffic generator doesn't have strict requirements - the
> drivers that come
> +   with most OS distributions will be satisfactory.
> +
> +
> +#. **User with administrator privileges**
> +
> +   Similarly to the System Under Test, traffic generators need
> administrator privileges
> +   to be able to use the devices.
> +   Refer to the `System Under Test section <sut_admin_user>` for details.
> +
> +
>  Running DTS
>  -----------
>
> diff --git a/dts/framework/dts.py b/dts/framework/dts.py
> index 372bc72787..265ed7fd5b 100644
> --- a/dts/framework/dts.py
> +++ b/dts/framework/dts.py
> @@ -15,7 +15,7 @@
>  from .logger import DTSLOG, getLogger
>  from .test_result import BuildTargetResult, DTSResult, ExecutionResult,
> Result
>  from .test_suite import get_test_suites
> -from .testbed_model import SutNode
> +from .testbed_model import SutNode, TGNode
>  from .utils import check_dts_python_version
>
>  dts_logger: DTSLOG = getLogger("DTSRunner")
> @@ -33,29 +33,31 @@ def run_all() -> None:
>      # check the python version of the server that run dts
>      check_dts_python_version()
>
> -    nodes: dict[str, SutNode] = {}
> +    sut_nodes: dict[str, SutNode] = {}
> +    tg_nodes: dict[str, TGNode] = {}
>      try:
>          # for all Execution sections
>          for execution in CONFIGURATION.executions:
> -            sut_node = None
> -            if execution.system_under_test_node.name in nodes:
> -                # a Node with the same name already exists
> -                sut_node = nodes[execution.system_under_test_node.name]
> -            else:
> -                # the SUT has not been initialized yet
> -                try:
> +            sut_node = sut_nodes.get(
> execution.system_under_test_node.name)
> +            tg_node = tg_nodes.get(execution.traffic_generator_node.name)
> +
> +            try:
> +                if not sut_node:
>                      sut_node = SutNode(execution.system_under_test_node)
> -                    result.update_setup(Result.PASS)
> -                except Exception as e:
> -                    dts_logger.exception(
> -                        f"Connection to node
> {execution.system_under_test_node} failed."
> -                    )
> -                    result.update_setup(Result.FAIL, e)
> -                else:
> -                    nodes[sut_node.name] = sut_node
> -
> -            if sut_node:
> -                _run_execution(sut_node, execution, result)
> +                    sut_nodes[sut_node.name] = sut_node
> +                if not tg_node:
> +                    tg_node = TGNode(execution.traffic_generator_node)
> +                    tg_nodes[tg_node.name] = tg_node
> +                result.update_setup(Result.PASS)
> +            except Exception as e:
> +                failed_node = execution.system_under_test_node.name
> +                if sut_node:
> +                    failed_node = execution.traffic_generator_node.name
> +                dts_logger.exception(f"Creation of node {failed_node}
> failed.")
> +                result.update_setup(Result.FAIL, e)
> +
> +            else:
> +                _run_execution(sut_node, tg_node, execution, result)
>
>      except Exception as e:
>          dts_logger.exception("An unexpected error has occurred.")
> @@ -64,7 +66,7 @@ def run_all() -> None:
>
>      finally:
>          try:
> -            for node in nodes.values():
> +            for node in (sut_nodes | tg_nodes).values():
>                  node.close()
>              result.update_teardown(Result.PASS)
>          except Exception as e:
> @@ -81,7 +83,10 @@ def run_all() -> None:
>
>
>  def _run_execution(
> -    sut_node: SutNode, execution: ExecutionConfiguration, result:
> DTSResult
> +    sut_node: SutNode,
> +    tg_node: TGNode,
> +    execution: ExecutionConfiguration,
> +    result: DTSResult,
>  ) -> None:
>      """
>      Run the given execution. This involves running the execution setup as
> well as
> @@ -101,7 +106,9 @@ def _run_execution(
>
>      else:
>          for build_target in execution.build_targets:
> -            _run_build_target(sut_node, build_target, execution,
> execution_result)
> +            _run_build_target(
> +                sut_node, tg_node, build_target, execution,
> execution_result
> +            )
>
>      finally:
>          try:
> @@ -114,6 +121,7 @@ def _run_execution(
>
>  def _run_build_target(
>      sut_node: SutNode,
> +    tg_node: TGNode,
>      build_target: BuildTargetConfiguration,
>      execution: ExecutionConfiguration,
>      execution_result: ExecutionResult,
> @@ -134,7 +142,7 @@ def _run_build_target(
>          build_target_result.update_setup(Result.FAIL, e)
>
>      else:
> -        _run_all_suites(sut_node, execution, build_target_result)
> +        _run_all_suites(sut_node, tg_node, execution, build_target_result)
>
>      finally:
>          try:
> @@ -147,6 +155,7 @@ def _run_build_target(
>
>  def _run_all_suites(
>      sut_node: SutNode,
> +    tg_node: TGNode,
>      execution: ExecutionConfiguration,
>      build_target_result: BuildTargetResult,
>  ) -> None:
> @@ -161,7 +170,7 @@ def _run_all_suites(
>      for test_suite_config in execution.test_suites:
>          try:
>              _run_single_suite(
> -                sut_node, execution, build_target_result,
> test_suite_config
> +                sut_node, tg_node, execution, build_target_result,
> test_suite_config
>              )
>          except BlockingTestSuiteError as e:
>              dts_logger.exception(
> @@ -177,6 +186,7 @@ def _run_all_suites(
>
>  def _run_single_suite(
>      sut_node: SutNode,
> +    tg_node: TGNode,
>      execution: ExecutionConfiguration,
>      build_target_result: BuildTargetResult,
>      test_suite_config: TestSuiteConfig,
> @@ -205,6 +215,7 @@ def _run_single_suite(
>          for test_suite_class in test_suite_classes:
>              test_suite = test_suite_class(
>                  sut_node,
> +                tg_node,
>                  test_suite_config.test_cases,
>                  execution.func,
>                  build_target_result,
> diff --git a/dts/framework/remote_session/linux_session.py
> b/dts/framework/remote_session/linux_session.py
> index f13f399121..284c74795d 100644
> --- a/dts/framework/remote_session/linux_session.py
> +++ b/dts/framework/remote_session/linux_session.py
> @@ -2,13 +2,47 @@
>  # Copyright(c) 2023 PANTHEON.tech s.r.o.
>  # Copyright(c) 2023 University of New Hampshire
>
> +import json
> +from typing import TypedDict
> +
> +from typing_extensions import NotRequired
> +
>  from framework.exception import RemoteCommandExecutionError
>  from framework.testbed_model import LogicalCore
> +from framework.testbed_model.hw.port import Port
>  from framework.utils import expand_range
>
>  from .posix_session import PosixSession
>
>
> +class LshwConfigurationOutput(TypedDict):
> +    link: str
> +
> +
> +class LshwOutput(TypedDict):
> +    """
> +    A model of the relevant information from json lshw output, e.g.:
> +    {
> +    ...
> +    "businfo" : "pci@0000:08:00.0",
> +    "logicalname" : "enp8s0",
> +    "version" : "00",
> +    "serial" : "52:54:00:59:e1:ac",
> +    ...
> +    "configuration" : {
> +      ...
> +      "link" : "yes",
> +      ...
> +    },
> +    ...
> +    """
> +
> +    businfo: str
> +    logicalname: NotRequired[str]
> +    serial: NotRequired[str]
> +    configuration: LshwConfigurationOutput
> +
> +
>  class LinuxSession(PosixSession):
>      """
>      The implementation of non-Posix compliant parts of Linux remote
> sessions.
> @@ -102,3 +136,47 @@ def _configure_huge_pages(
>          self.send_command(
>              f"echo {amount} | tee {hugepage_config_path}", privileged=True
>          )
> +
> +    def update_ports(self, ports: list[Port]) -> None:
> +        self._logger.debug("Gathering port info.")
> +        for port in ports:
> +            assert (
> +                port.node == self.name
> +            ), "Attempted to gather port info on the wrong node"
> +
> +        port_info_list = self._get_lshw_info()
> +        for port in ports:
> +            for port_info in port_info_list:
> +                if f"pci@{port.pci}" == port_info.get("businfo"):
> +                    self._update_port_attr(
> +                        port, port_info.get("logicalname"), "logical_name"
> +                    )
> +                    self._update_port_attr(port, port_info.get("serial"),
> "mac_address")
> +                    port_info_list.remove(port_info)
> +                    break
> +            else:
> +                self._logger.warning(f"No port at pci address {port.pci}
> found.")
> +
> +    def _get_lshw_info(self) -> list[LshwOutput]:
> +        output = self.send_command("lshw -quiet -json -C network",
> verify=True)
> +        return json.loads(output.stdout)
> +
> +    def _update_port_attr(
> +        self, port: Port, attr_value: str | None, attr_name: str
> +    ) -> None:
> +        if attr_value:
> +            setattr(port, attr_name, attr_value)
> +            self._logger.debug(
> +                f"Found '{attr_name}' of port {port.pci}: '{attr_value}'."
> +            )
> +        else:
> +            self._logger.warning(
> +                f"Attempted to get '{attr_name}' of port {port.pci}, "
> +                f"but it doesn't exist."
> +            )
> +
> +    def configure_port_state(self, port: Port, enable: bool) -> None:
> +        state = "up" if enable else "down"
> +        self.send_command(
> +            f"ip link set dev {port.logical_name} {state}",
> privileged=True
> +        )
> diff --git a/dts/framework/remote_session/os_session.py
> b/dts/framework/remote_session/os_session.py
> index cc13b02f16..633d06eb5d 100644
> --- a/dts/framework/remote_session/os_session.py
> +++ b/dts/framework/remote_session/os_session.py
> @@ -12,6 +12,7 @@
>  from framework.remote_session.remote import InteractiveShell, TestPmdShell
>  from framework.settings import SETTINGS
>  from framework.testbed_model import LogicalCore
> +from framework.testbed_model.hw.port import Port
>  from framework.utils import MesonArgs
>
>  from .remote import (
> @@ -255,3 +256,17 @@ def get_node_info(self) -> NodeInfo:
>          """
>          Collect information about the node
>          """
> +
> +    @abstractmethod
> +    def update_ports(self, ports: list[Port]) -> None:
> +        """
> +        Get additional information about ports:
> +            Logical name (e.g. enp7s0) if applicable
> +            Mac address
> +        """
> +
> +    @abstractmethod
> +    def configure_port_state(self, port: Port, enable: bool) -> None:
> +        """
> +        Enable/disable port.
> +        """
> diff --git a/dts/framework/test_suite.py b/dts/framework/test_suite.py
> index de94c9332d..056460dd05 100644
> --- a/dts/framework/test_suite.py
> +++ b/dts/framework/test_suite.py
> @@ -20,7 +20,7 @@
>  from .logger import DTSLOG, getLogger
>  from .settings import SETTINGS
>  from .test_result import BuildTargetResult, Result, TestCaseResult,
> TestSuiteResult
> -from .testbed_model import SutNode
> +from .testbed_model import SutNode, TGNode
>
>
>  class TestSuite(object):
> @@ -51,11 +51,13 @@ class TestSuite(object):
>      def __init__(
>          self,
>          sut_node: SutNode,
> +        tg_node: TGNode,
>          test_cases: list[str],
>          func: bool,
>          build_target_result: BuildTargetResult,
>      ):
>          self.sut_node = sut_node
> +        self.tg_node = tg_node
>          self._logger = getLogger(self.__class__.__name__)
>          self._test_cases_to_run = test_cases
>          self._test_cases_to_run.extend(SETTINGS.test_cases)
> diff --git a/dts/framework/testbed_model/__init__.py
> b/dts/framework/testbed_model/__init__.py
> index f54a947051..5cbb859e47 100644
> --- a/dts/framework/testbed_model/__init__.py
> +++ b/dts/framework/testbed_model/__init__.py
> @@ -20,3 +20,4 @@
>  )
>  from .node import Node
>  from .sut_node import SutNode
> +from .tg_node import TGNode
> diff --git a/dts/framework/testbed_model/capturing_traffic_generator.py
> b/dts/framework/testbed_model/capturing_traffic_generator.py
> new file mode 100644
> index 0000000000..1130d87f1e
> --- /dev/null
> +++ b/dts/framework/testbed_model/capturing_traffic_generator.py
> @@ -0,0 +1,135 @@
> +# SPDX-License-Identifier: BSD-3-Clause
> +# Copyright(c) 2022 University of New Hampshire
> +# Copyright(c) 2023 PANTHEON.tech s.r.o.
> +
> +"""Traffic generator that can capture packets.
> +
> +In functional testing, we need to interrogate received packets to check
> their validity.
> +Here we define the interface common to all
> +traffic generators capable of capturing traffic.
>

Is there a reason for the line break here? Just to keep things consistent I
think it might make sense to extend this line to be the same length as the
one above.


> +"""
> +
> +import uuid
> +from abc import abstractmethod
> +
> +import scapy.utils  # type: ignore[import]
> +from scapy.packet import Packet  # type: ignore[import]
> +
> +from framework.settings import SETTINGS
> +from framework.utils import get_packet_summaries
> +
> +from .hw.port import Port
> +from .traffic_generator import TrafficGenerator
> +
> +
> +def _get_default_capture_name() -> str:
> +    """
> +    This is the function used for the default implementation of capture
> names.
> +    """
> +    return str(uuid.uuid4())
> +
> +
> +class CapturingTrafficGenerator(TrafficGenerator):
> +    """
> +    A mixin interface which enables a packet generator to declare that
> it can capture
> +    packets and return them to the user.
>

This is missing the one line summary at the top of the comment. Obviously
this is not a big issue, but we likely would want this to be uniform with
the rest of the module which does have the summary at the top.


> +
> +    The methods of capturing traffic generators obey the following
> workflow:
> +        1. send packets
> +        2. capture packets
> +        3. write the capture to a .pcap file
> +        4. return the received packets
> +    """
> +
> +    @property
> +    def is_capturing(self) -> bool:
> +        return True
> +
> +    def send_packet_and_capture(
> +        self,
> +        packet: Packet,
> +        send_port: Port,
> +        receive_port: Port,
> +        duration: float,
> +        capture_name: str = _get_default_capture_name(),
> +    ) -> list[Packet]:
> +        """Send a packet, return received traffic.
> +
> +        Send a packet on the send_port and then return all traffic
> captured
> +        on the receive_port for the given duration. Also record the
> captured traffic
> +        in a pcap file.
> +
>
+        Args:
> +            packet: The packet to send.
> +            send_port: The egress port on the TG node.
> +            receive_port: The ingress port in the TG node.
> +            duration: Capture traffic for this amount of time after
> sending the packet.
> +            capture_name: The name of the .pcap file where to store the
> capture.
> +
> +        Returns:
> +             A list of received packets. May be empty if no packets are
> captured.
> +        """
> +        return self.send_packets_and_capture(
> +            [packet], send_port, receive_port, duration, capture_name
> +        )
> +
> +    def send_packets_and_capture(
> +        self,
> +        packets: list[Packet],
> +        send_port: Port,
> +        receive_port: Port,
> +        duration: float,
> +        capture_name: str = _get_default_capture_name(),
> +    ) -> list[Packet]:
> +        """Send packets, return received traffic.
> +
> +        Send packets on the send_port and then return all traffic captured
> +        on the receive_port for the given duration. Also record the
> captured traffic
> +        in a pcap file.
> +
> +        Args:
> +            packets: The packets to send.
> +            send_port: The egress port on the TG node.
> +            receive_port: The ingress port in the TG node.
> +            duration: Capture traffic for this amount of time after
> sending the packets.
> +            capture_name: The name of the .pcap file where to store the
> capture.
> +
> +        Returns:
> +             A list of received packets. May be empty if no packets are
> captured.
> +        """
> +        self._logger.debug(get_packet_summaries(packets))
> +        self._logger.debug(
> +            f"Sending packet on {send_port.logical_name}, "
> +            f"receiving on {receive_port.logical_name}."
> +        )
> +        received_packets = self._send_packets_and_capture(
> +            packets,
> +            send_port,
> +            receive_port,
> +            duration,
> +        )
> +
> +        self._logger.debug(
> +            f"Received packets: {get_packet_summaries(received_packets)}"
> +        )
> +        self._write_capture_from_packets(capture_name, received_packets)
> +        return received_packets
> +
> +    @abstractmethod
> +    def _send_packets_and_capture(
> +        self,
> +        packets: list[Packet],
> +        send_port: Port,
> +        receive_port: Port,
> +        duration: float,
> +    ) -> list[Packet]:
> +        """
> +        The extended classes must implement this method which
> +        sends packets on send_port and receives packets on the
> receive_port
> +        for the specified duration. It must be able to handle no received
> packets.
> +        """
> +
> +    def _write_capture_from_packets(self, capture_name: str, packets:
> list[Packet]):
> +        file_name = f"{SETTINGS.output_dir}/{capture_name}.pcap"
> +        self._logger.debug(f"Writing packets to {file_name}.")
> +        scapy.utils.wrpcap(file_name, packets)
> diff --git a/dts/framework/testbed_model/hw/port.py
> b/dts/framework/testbed_model/hw/port.py
> new file mode 100644
> index 0000000000..680c29bfe3
> --- /dev/null
> +++ b/dts/framework/testbed_model/hw/port.py
> @@ -0,0 +1,60 @@
> +# SPDX-License-Identifier: BSD-3-Clause
> +# Copyright(c) 2022 University of New Hampshire
> +# Copyright(c) 2023 PANTHEON.tech s.r.o.
> +
> +from dataclasses import dataclass
> +
> +from framework.config import PortConfig
> +
> +
> +@dataclass(slots=True, frozen=True)
> +class PortIdentifier:
> +    node: str
> +    pci: str
> +
> +
> +@dataclass(slots=True)
> +class Port:
> +    """
> +    identifier: The PCI address of the port on a node.
> +
> +    os_driver: The driver used by this port when the OS is controlling it.
> +        Example: i40e
> +    os_driver_for_dpdk: The driver the device must be bound to for DPDK
> to use it,
> +        Example: vfio-pci.
> +
> +    Note: os_driver and os_driver_for_dpdk may be the same thing.
> +        Example: mlx5_core
> +
> +    peer: The identifier of a port this port is connected with.
> +    """
> +
> +    identifier: PortIdentifier
> +    os_driver: str
> +    os_driver_for_dpdk: str
> +    peer: PortIdentifier
> +    mac_address: str = ""
> +    logical_name: str = ""
> +
> +    def __init__(self, node_name: str, config: PortConfig):
> +        self.identifier = PortIdentifier(
> +            node=node_name,
> +            pci=config.pci,
> +        )
> +        self.os_driver = config.os_driver
> +        self.os_driver_for_dpdk = config.os_driver_for_dpdk
> +        self.peer = PortIdentifier(node=config.peer_node,
> pci=config.peer_pci)
> +
> +    @property
> +    def node(self) -> str:
> +        return self.identifier.node
> +
> +    @property
> +    def pci(self) -> str:
> +        return self.identifier.pci
> +
> +
> +@dataclass(slots=True, frozen=True)
> +class PortLink:
> +    sut_port: Port
> +    tg_port: Port
> diff --git a/dts/framework/testbed_model/node.py
> b/dts/framework/testbed_model/node.py
> index d2d55d904e..e09931cedf 100644
> --- a/dts/framework/testbed_model/node.py
> +++ b/dts/framework/testbed_model/node.py
> @@ -25,6 +25,7 @@
>      LogicalCoreListFilter,
>      lcore_filter,
>  )
> +from .hw.port import Port
>
>
>  class Node(object):
> @@ -38,6 +39,7 @@ class Node(object):
>      config: NodeConfiguration
>      name: str
>      lcores: list[LogicalCore]
> +    ports: list[Port]
>      _logger: DTSLOG
>      _other_sessions: list[OSSession]
>      _execution_config: ExecutionConfiguration
> @@ -57,6 +59,13 @@ def __init__(self, node_config: NodeConfiguration):
>          ).filter()
>
>          self._other_sessions = []
> +        self._init_ports()
> +
> +    def _init_ports(self) -> None:
> +        self.ports = [Port(self.name, port_config) for port_config in
> self.config.ports]
> +        self.main_session.update_ports(self.ports)
> +        for port in self.ports:
> +            self.configure_port_state(port)
>
>      def set_up_execution(self, execution_config: ExecutionConfiguration)
> -> None:
>          """
> @@ -168,6 +177,12 @@ def _setup_hugepages(self):
>                  self.config.hugepages.amount,
> self.config.hugepages.force_first_numa
>              )
>
> +    def configure_port_state(self, port: Port, enable: bool = True) ->
> None:
> +        """
> +        Enable/disable port.
> +        """
> +        self.main_session.configure_port_state(port, enable)
> +
>      def close(self) -> None:
>          """
>          Close all connections and free other resources.
> diff --git a/dts/framework/testbed_model/scapy.py
> b/dts/framework/testbed_model/scapy.py
> new file mode 100644
> index 0000000000..1a23dc9fa3
> --- /dev/null
> +++ b/dts/framework/testbed_model/scapy.py
> @@ -0,0 +1,74 @@
> +# SPDX-License-Identifier: BSD-3-Clause
> +# Copyright(c) 2022 University of New Hampshire
> +# Copyright(c) 2023 PANTHEON.tech s.r.o.
> +
> +"""Scapy traffic generator.
> +
> +Traffic generator used for functional testing, implemented using the
> Scapy library.
> +The traffic generator uses an XML-RPC server to run Scapy on the remote
> TG node.
> +
> +The XML-RPC server runs in an interactive remote SSH session running
> Python console,
> +where we start the server. The communication with the server is
> facilitated with
> +a local server proxy.
> +"""
> +
> +from scapy.packet import Packet  # type: ignore[import]
> +
> +from framework.config import OS, ScapyTrafficGeneratorConfig
> +from framework.logger import getLogger
> +
> +from .capturing_traffic_generator import (
> +    CapturingTrafficGenerator,
> +    _get_default_capture_name,
> +)
> +from .hw.port import Port
> +from .tg_node import TGNode
> +
> +
> +class ScapyTrafficGenerator(CapturingTrafficGenerator):
> +    """Provides access to scapy functions via an RPC interface.
> +
> +    The traffic generator first starts an XML-RPC on the remote TG node.
> +    Then it populates the server with functions which use the Scapy
> library
> +    to send/receive traffic.
> +
> +    Any packets sent to the remote server are first converted to bytes.
> +    They are received as xmlrpc.client.Binary objects on the server side.
> +    When the server sends the packets back, they are also received as
> +    xmlrpc.client.Binary object on the client side, are converted back to
> Scapy
> +    packets and only then returned from the methods.
> +
> +    Arguments:
> +        tg_node: The node where the traffic generator resides.
> +        config: The user configuration of the traffic generator.
> +    """
> +
> +    _config: ScapyTrafficGeneratorConfig
> +    _tg_node: TGNode
> +
> +    def __init__(self, tg_node: TGNode, config:
> ScapyTrafficGeneratorConfig):
> +        self._config = config
> +        self._tg_node = tg_node
> +        self._logger = getLogger(
> +            f"{self._tg_node.name} {self._config.traffic_generator_type}"
> +        )
> +
> +        assert (
> +            self._tg_node.config.os == OS.linux
> +        ), "Linux is the only supported OS for scapy traffic generation"
> +
> +    def _send_packets(self, packets: list[Packet], port: Port) -> None:
> +        raise NotImplementedError()
> +
> +    def _send_packets_and_capture(
> +        self,
> +        packets: list[Packet],
> +        send_port: Port,
> +        receive_port: Port,
> +        duration: float,
> +        capture_name: str = _get_default_capture_name(),
> +    ) -> list[Packet]:
> +        raise NotImplementedError()
> +
> +    def close(self):
> +        pass
> diff --git a/dts/framework/testbed_model/tg_node.py
> b/dts/framework/testbed_model/tg_node.py
> new file mode 100644
> index 0000000000..27025cfa31
> --- /dev/null
> +++ b/dts/framework/testbed_model/tg_node.py
> @@ -0,0 +1,99 @@
> +# SPDX-License-Identifier: BSD-3-Clause
> +# Copyright(c) 2010-2014 Intel Corporation
> +# Copyright(c) 2022 University of New Hampshire
> +# Copyright(c) 2023 PANTHEON.tech s.r.o.
> +
> +"""Traffic generator node.
> +
> +This is the node where the traffic generator resides.
> +The distinction between a node and a traffic generator is as follows:
> +A node is a host that DTS connects to. It could be a baremetal server,
> +a VM or a container.
> +A traffic generator is software running on the node.
> +A traffic generator node is a node running a traffic generator.
> +A node can be a traffic generator node as well as system under test node.
> +"""
> +
> +from scapy.packet import Packet  # type: ignore[import]
> +
> +from framework.config import (
> +    ScapyTrafficGeneratorConfig,
> +    TGNodeConfiguration,
> +    TrafficGeneratorType,
> +)
> +from framework.exception import ConfigurationError
> +
> +from .capturing_traffic_generator import CapturingTrafficGenerator
> +from .hw.port import Port
> +from .node import Node
> +
> +
> +class TGNode(Node):
> +    """Manage connections to a node with a traffic generator.
> +
> +    Apart from basic node management capabilities, the Traffic Generator
> node has
> +    specialized methods for handling the traffic generator running on it.
> +
> +    Arguments:
> +        node_config: The user configuration of the traffic generator node.
> +
> +    Attributes:
> +        traffic_generator: The traffic generator running on the node.
> +    """
> +
> +    traffic_generator: CapturingTrafficGenerator
> +
> +    def __init__(self, node_config: TGNodeConfiguration):
> +        super(TGNode, self).__init__(node_config)
> +        self.traffic_generator = create_traffic_generator(
> +            self, node_config.traffic_generator
> +        )
> +        self._logger.info(f"Created node: {self.name}")
> +
> +    def send_packet_and_capture(
> +        self,
> +        packet: Packet,
> +        send_port: Port,
> +        receive_port: Port,
> +        duration: float = 1,
> +    ) -> list[Packet]:
> +        """Send a packet, return received traffic.
> +
> +        Send a packet on the send_port and then return all traffic
> captured
> +        on the receive_port for the given duration. Also record the
> captured traffic
> +        in a pcap file.
> +
> +        Args:
> +            packet: The packet to send.
> +            send_port: The egress port on the TG node.
> +            receive_port: The ingress port in the TG node.
> +            duration: Capture traffic for this amount of time after
> sending the packet.
> +
> +        Returns:
> +             A list of received packets. May be empty if no packets are
> captured.
> +        """
> +        return self.traffic_generator.send_packet_and_capture(
> +            packet, send_port, receive_port, duration
> +        )
> +
> +    def close(self) -> None:
> +        """Free all resources used by the node"""
> +        self.traffic_generator.close()
> +        super(TGNode, self).close()
> +
> +
> +def create_traffic_generator(
> +    tg_node: TGNode, traffic_generator_config: ScapyTrafficGeneratorConfig
> +) -> CapturingTrafficGenerator:
> +    """A factory function for creating traffic generator object from user
> config."""
> +
> +    from .scapy import ScapyTrafficGenerator
> +
> +    match traffic_generator_config.traffic_generator_type:
> +        case TrafficGeneratorType.SCAPY:
> +            return ScapyTrafficGenerator(tg_node,
> traffic_generator_config)
> +        case _:
> +            raise ConfigurationError(
> +                "Unknown traffic generator: "
> +                f"{traffic_generator_config.traffic_generator_type}"
> +            )


Would it be possible here to do something like what we did in
create_interactive_shell with a TypeVar where we can initialize it
directly? It would change from using the enum to setting the
traffic_generator_config.traffic_generator_type to a specific class in the
config (in this case, ScapyTrafficGenerator), but I think it would be
possible to change in the from_dict method where we could set this type to
the class directly instead of the enum (or maybe had the enum relate it's
values to the classes themselves).

I think this would make some things slightly more complicated (like how we
would map from conf.yaml to one of the classes and all of those needing to
be imported in config/__init__.py) but it would save developers in the
future from having to add to two different places  (the enum in
config/__init__.py and this match statement) and save this list from being
arbitrarily long. I think this is fine for this patch but maybe when we
expand the traffic generator or the scapy generator it could be worth
thinking about.

Do you think it would make sense to change it in this way or would that be
somewhat unnecessary in your eyes?


> diff --git a/dts/framework/testbed_model/traffic_generator.py
> b/dts/framework/testbed_model/traffic_generator.py
> new file mode 100644
> index 0000000000..28c35d3ce4
> --- /dev/null
> +++ b/dts/framework/testbed_model/traffic_generator.py
> @@ -0,0 +1,72 @@
> +# SPDX-License-Identifier: BSD-3-Clause
> +# Copyright(c) 2022 University of New Hampshire
> +# Copyright(c) 2023 PANTHEON.tech s.r.o.
> +
> +"""The base traffic generator.
> +
> +These traffic generators can't capture received traffic,
> +only count the number of received packets.
> +"""
> +
> +from abc import ABC, abstractmethod
> +
> +from scapy.packet import Packet  # type: ignore[import]
> +
> +from framework.logger import DTSLOG
> +from framework.utils import get_packet_summaries
> +
> +from .hw.port import Port
> +
> +
> +class TrafficGenerator(ABC):
> +    """The base traffic generator.
> +
> +    Defines the few basic methods that each traffic generator must
> implement.
> +    """
> +
> +    _logger: DTSLOG
> +
> +    def send_packet(self, packet: Packet, port: Port) -> None:
> +        """Send a packet and block until it is fully sent.
> +
> +        What fully sent means is defined by the traffic generator.
> +
> +        Args:
> +            packet: The packet to send.
> +            port: The egress port on the TG node.
> +        """
> +        self.send_packets([packet], port)
> +
> +    def send_packets(self, packets: list[Packet], port: Port) -> None:
> +        """Send packets and block until they are fully sent.
> +
> +        What fully sent means is defined by the traffic generator.
> +
> +        Args:
> +            packets: The packets to send.
> +            port: The egress port on the TG node.
> +        """
> +        self._logger.info(f"Sending packet{'s' if len(packets) > 1 else
> ''}.")
> +        self._logger.debug(get_packet_summaries(packets))
> +        self._send_packets(packets, port)
> +
> +    @abstractmethod
> +    def _send_packets(self, packets: list[Packet], port: Port) -> None:
> +        """
> +        The extended classes must implement this method which
> +        sends packets on send_port. The method should block until all
> packets
> +        are fully sent.
> +        """
> +
> +    @property
> +    def is_capturing(self) -> bool:
> +        """Whether this traffic generator can capture traffic.
> +
> +        Returns:
> +            True if the traffic generator can capture traffic, False
> otherwise.
> +        """
> +        return False
> +
> +    @abstractmethod
> +    def close(self) -> None:
> +        """Free all resources used by the traffic generator."""
> diff --git a/dts/framework/utils.py b/dts/framework/utils.py
> index 60abe46edf..d27c2c5b5f 100644
> --- a/dts/framework/utils.py
> +++ b/dts/framework/utils.py
> @@ -4,6 +4,7 @@
>  # Copyright(c) 2022-2023 University of New Hampshire
>
>  import atexit
> +import json
>  import os
>  import subprocess
>  import sys
> @@ -11,6 +12,8 @@
>  from pathlib import Path
>  from subprocess import SubprocessError
>
> +from scapy.packet import Packet  # type: ignore[import]
> +
>  from .exception import ConfigurationError
>
>
> @@ -64,6 +67,16 @@ def expand_range(range_str: str) -> list[int]:
>      return expanded_range
>
>
> +def get_packet_summaries(packets: list[Packet]):
> +    if len(packets) == 1:
> +        packet_summaries = packets[0].summary()
> +    else:
> +        packet_summaries = json.dumps(
> +            list(map(lambda pkt: pkt.summary(), packets)), indent=4
> +        )
> +    return f"Packet contents: \n{packet_summaries}"
> +
> +
>  def RED(text: str) -> str:
>      return f"\u001B[31;1m{str(text)}\u001B[0m"
>
> --
> 2.34.1
>
>

[-- Attachment #2: Type: text/html, Size: 45296 bytes --]

  reply	other threads:[~2023-07-18 19:56 UTC|newest]

Thread overview: 29+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-04-20  9:31 [RFC PATCH v1 0/5] dts: add tg abstractions and scapy Juraj Linkeš
2023-04-20  9:31 ` [RFC PATCH v1 1/5] dts: add scapy dependency Juraj Linkeš
2023-04-20  9:31 ` [RFC PATCH v1 2/5] dts: add traffic generator config Juraj Linkeš
2023-04-20  9:31 ` [RFC PATCH v1 3/5] dts: traffic generator abstractions Juraj Linkeš
2023-04-20  9:31 ` [RFC PATCH v1 4/5] dts: scapy traffic generator implementation Juraj Linkeš
2023-04-20  9:31 ` [RFC PATCH v1 5/5] dts: add traffic generator node to dts runner Juraj Linkeš
2023-05-03 18:02   ` Jeremy Spewock
2023-07-17 11:07 ` [PATCH v2 0/6] dts: tg abstractions and scapy tg Juraj Linkeš
2023-07-17 11:07   ` [PATCH v2 1/6] dts: add scapy dependency Juraj Linkeš
2023-07-17 11:07   ` [PATCH v2 2/6] dts: add traffic generator config Juraj Linkeš
2023-07-18 15:55     ` Jeremy Spewock
2023-07-19 12:57       ` Juraj Linkeš
2023-07-19 13:18         ` Jeremy Spewock
2023-07-17 11:07   ` [PATCH v2 3/6] dts: traffic generator abstractions Juraj Linkeš
2023-07-18 19:56     ` Jeremy Spewock [this message]
2023-07-19 13:23       ` Juraj Linkeš
2023-07-17 11:07   ` [PATCH v2 4/6] dts: add python remote interactive shell Juraj Linkeš
2023-07-17 11:07   ` [PATCH v2 5/6] dts: scapy traffic generator implementation Juraj Linkeš
2023-07-17 11:07   ` [PATCH v2 6/6] dts: add basic UDP test case Juraj Linkeš
2023-07-18 21:04   ` [PATCH v2 0/6] dts: tg abstractions and scapy tg Jeremy Spewock
2023-07-19 14:12   ` [PATCH v3 " Juraj Linkeš
2023-07-19 14:12     ` [PATCH v3 1/6] dts: add scapy dependency Juraj Linkeš
2023-07-19 14:12     ` [PATCH v3 2/6] dts: add traffic generator config Juraj Linkeš
2023-07-19 14:13     ` [PATCH v3 3/6] dts: traffic generator abstractions Juraj Linkeš
2023-07-19 14:13     ` [PATCH v3 4/6] dts: add python remote interactive shell Juraj Linkeš
2023-07-19 14:13     ` [PATCH v3 5/6] dts: scapy traffic generator implementation Juraj Linkeš
2023-07-19 14:13     ` [PATCH v3 6/6] dts: add basic UDP test case Juraj Linkeš
2023-07-20 15:21       ` Jeremy Spewock
2023-07-24 14:23     ` [PATCH v3 0/6] dts: tg abstractions and scapy tg Thomas Monjalon

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to='CAAA20UREMOFN=Q6UFLzD4+X5X2eWbLHKV5atmTKK5O6XdxdX=g@mail.gmail.com' \
    --to=jspewock@iol.unh.edu \
    --cc=Honnappa.Nagarahalli@arm.com \
    --cc=dev@dpdk.org \
    --cc=juraj.linkes@pantheon.tech \
    --cc=lijuan.tu@intel.com \
    --cc=probb@iol.unh.edu \
    --cc=thomas@monjalon.net \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).