From: Paul Szczepanek <paul.szczepanek@arm.com>
To: dev@dpdk.org
Cc: Paul Szczepanek <paul.szczepanek@arm.com>,
Luca Vizzarro <luca.vizzarro@arm.com>
Subject: [PATCH v1 1/2] dts: add packet handling and test utilities to API
Date: Tue, 23 Sep 2025 11:37:57 +0100 [thread overview]
Message-ID: <20250923103758.3192015-2-paul.szczepanek@arm.com> (raw)
In-Reply-To: <20250923103758.3192015-1-paul.szczepanek@arm.com>
Split TestSuite methods between test run methods
and packet related methods.
Depends-on: series-36111 ("Split DTS framework and public API")
Signed-off-by: Paul Szczepanek <paul.szczepanek@arm.com>
Reviewed-by: Luca Vizzarro <luca.vizzarro@arm.com>
---
doc/api/dts/api.packet.rst | 8 +
doc/api/dts/api.rst | 4 +-
doc/api/dts/api.test.rst | 8 +
doc/guides/tools/dts.rst | 2 +-
dts/api/packet.py | 311 ++++++++++++++++++++++++++++++++++++
dts/api/test.py | 126 +++++++++++++++
dts/framework/test_suite.py | 305 +----------------------------------
7 files changed, 460 insertions(+), 304 deletions(-)
create mode 100644 doc/api/dts/api.packet.rst
create mode 100644 doc/api/dts/api.test.rst
create mode 100644 dts/api/packet.py
create mode 100644 dts/api/test.py
diff --git a/doc/api/dts/api.packet.rst b/doc/api/dts/api.packet.rst
new file mode 100644
index 0000000000..93d455a609
--- /dev/null
+++ b/doc/api/dts/api.packet.rst
@@ -0,0 +1,8 @@
+.. SPDX-License-Identifier: BSD-3-Clause
+
+packet - Sending and capturing packets
+======================================
+
+.. automodule:: api.packet
+ :members:
+ :show-inheritance:
diff --git a/doc/api/dts/api.rst b/doc/api/dts/api.rst
index 9a13704f16..d7d927bfa9 100644
--- a/doc/api/dts/api.rst
+++ b/doc/api/dts/api.rst
@@ -17,5 +17,7 @@ api - DTS API
:hidden:
:maxdepth: 1
+ api.test
api.artifact
- api.capabilities
\ No newline at end of file
+ api.capabilities
+ api.packet
\ No newline at end of file
diff --git a/doc/api/dts/api.test.rst b/doc/api/dts/api.test.rst
new file mode 100644
index 0000000000..91605cc31b
--- /dev/null
+++ b/doc/api/dts/api.test.rst
@@ -0,0 +1,8 @@
+.. SPDX-License-Identifier: BSD-3-Clause
+
+test - Reporting results and logging
+====================================
+
+.. automodule:: api.test
+ :members:
+ :show-inheritance:
diff --git a/doc/guides/tools/dts.rst b/doc/guides/tools/dts.rst
index e7af60b7a6..2445efccfc 100644
--- a/doc/guides/tools/dts.rst
+++ b/doc/guides/tools/dts.rst
@@ -455,7 +455,7 @@ Test Case Verification
Use the verify method to assert conditions and record test results.
This should typically be called at the end of each test case.
- Example: self.verify(link_up, "Link should be up after configuration.")
+ Example: verify(link_up, "Link should be up after configuration.")
Other Methods
diff --git a/dts/api/packet.py b/dts/api/packet.py
new file mode 100644
index 0000000000..bf6f3e1be8
--- /dev/null
+++ b/dts/api/packet.py
@@ -0,0 +1,311 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2025 Arm Limited
+
+"""Features common to all test suites.
+
+The module defines the :class:`TestSuite` class which doesn't contain any test cases, and as such
+must be extended by subclasses which add test cases. The :class:`TestSuite` contains the basics
+needed by subclasses:
+
+ * Testbed (SUT, TG) configuration,
+ * Packet sending and verification,
+ * Test case verification.
+"""
+
+from collections import Counter
+from typing import cast
+
+from scapy.layers.inet import IP
+from scapy.layers.l2 import Ether
+from scapy.packet import Packet, Padding, raw
+
+from api.test import fail, log_debug
+from framework.context import get_ctx
+from framework.exception import InternalError
+from framework.testbed_model.traffic_generator.capturing_traffic_generator import (
+ PacketFilteringConfig,
+)
+from framework.utils import get_packet_summaries
+
+
+def send_packet_and_capture(
+ packet: Packet,
+ filter_config: PacketFilteringConfig = PacketFilteringConfig(),
+ duration: float = 1,
+) -> list[Packet]:
+ """Send and receive `packet` using the associated TG.
+
+ Send `packet` through the appropriate interface and receive on the appropriate interface.
+ Modify the packet with l3/l2 addresses corresponding to the testbed and desired traffic.
+
+ Args:
+ packet: The packet to send.
+ filter_config: The filter to use when capturing packets.
+ duration: Capture traffic for this amount of time after sending `packet`.
+
+ Returns:
+ A list of received packets.
+ """
+ return send_packets_and_capture(
+ [packet],
+ filter_config,
+ duration,
+ )
+
+
+def send_packets_and_capture(
+ packets: list[Packet],
+ filter_config: PacketFilteringConfig = PacketFilteringConfig(),
+ duration: float = 1,
+) -> list[Packet]:
+ """Send and receive `packets` using the associated TG.
+
+ Send `packets` through the appropriate interface and receive on the appropriate interface.
+ Modify the packets with l3/l2 addresses corresponding to the testbed and desired traffic.
+
+ Args:
+ packets: The packets to send.
+ filter_config: The filter to use when capturing packets.
+ duration: Capture traffic for this amount of time after sending `packet`.
+
+ Returns:
+ A list of received packets.
+ """
+ from framework.context import get_ctx
+ from framework.testbed_model.traffic_generator.capturing_traffic_generator import (
+ CapturingTrafficGenerator,
+ )
+
+ assert isinstance(
+ get_ctx().tg, CapturingTrafficGenerator
+ ), "Cannot capture with a non-capturing traffic generator"
+ tg: CapturingTrafficGenerator = cast(CapturingTrafficGenerator, get_ctx().tg)
+ # TODO: implement @requires for types of traffic generator
+ packets = adjust_addresses(packets)
+ return tg.send_packets_and_capture(
+ packets,
+ get_ctx().topology.tg_port_egress,
+ get_ctx().topology.tg_port_ingress,
+ filter_config,
+ duration,
+ )
+
+
+def send_packets(
+ packets: list[Packet],
+) -> None:
+ """Send packets using the traffic generator and do not capture received traffic.
+
+ Args:
+ packets: Packets to send.
+ """
+ packets = adjust_addresses(packets)
+ get_ctx().tg.send_packets(packets, get_ctx().topology.tg_port_egress)
+
+
+def get_expected_packets(
+ packets: list[Packet],
+ sent_from_tg: bool = False,
+) -> list[Packet]:
+ """Inject the proper L2/L3 addresses into `packets`.
+
+ Inject the L2/L3 addresses expected at the receiving end of the traffic generator.
+
+ Args:
+ packets: The packets to modify.
+ sent_from_tg: If :data:`True` packet was sent from the TG.
+
+ Returns:
+ `packets` with injected L2/L3 addresses.
+ """
+ return adjust_addresses(packets, not sent_from_tg)
+
+
+def get_expected_packet(
+ packet: Packet,
+ sent_from_tg: bool = False,
+) -> Packet:
+ """Inject the proper L2/L3 addresses into `packet`.
+
+ Inject the L2/L3 addresses expected at the receiving end of the traffic generator.
+
+ Args:
+ packet: The packet to modify.
+ sent_from_tg: If :data:`True` packet was sent from the TG.
+
+ Returns:
+ `packet` with injected L2/L3 addresses.
+ """
+ return get_expected_packets([packet], sent_from_tg)[0]
+
+
+def adjust_addresses(packets: list[Packet], expected: bool = False) -> list[Packet]:
+ """L2 and L3 address additions in both directions.
+
+ Copies of `packets` will be made, modified and returned in this method.
+
+ Only missing addresses are added to packets, existing addresses will not be overridden. If
+ any packet in `packets` has multiple IP layers (using GRE, for example) only the inner-most
+ IP layer will have its addresses adjusted.
+
+ Assumptions:
+ Two links between SUT and TG, one link is TG -> SUT, the other SUT -> TG.
+
+ Args:
+ packets: The packets to modify.
+ expected: If :data:`True`, the direction is SUT -> TG,
+ otherwise the direction is TG -> SUT.
+
+ Returns:
+ A list containing copies of all packets in `packets` after modification.
+
+ Raises:
+ InternalError: If no tests are running.
+ """
+ from framework.test_suite import TestSuite
+
+ if get_ctx().local.current_test_suite is None:
+ raise InternalError("No current test suite, tests aren't running?")
+ current_test_suite: TestSuite = cast(TestSuite, get_ctx().local.current_test_suite)
+ return current_test_suite._adjust_addresses(packets, expected)
+
+
+def match_all_packets(
+ expected_packets: list[Packet],
+ received_packets: list[Packet],
+ verify: bool = True,
+) -> bool:
+ """Matches all the expected packets against the received ones.
+
+ Matching is performed by counting down the occurrences in a dictionary which keys are the
+ raw packet bytes. No deep packet comparison is performed. All the unexpected packets (noise)
+ are automatically ignored.
+
+ Args:
+ expected_packets: The packets we are expecting to receive.
+ received_packets: All the packets that were received.
+ verify: If :data:`True`, and there are missing packets an exception will be raised.
+
+ Raises:
+ TestCaseVerifyError: if and not all the `expected_packets` were found in
+ `received_packets`.
+
+ Returns:
+ :data:`True` If there are no missing packets.
+ """
+ expected_packets_counters = Counter(map(raw, expected_packets))
+ received_packets_counters = Counter(map(raw, received_packets))
+ # The number of expected packets is subtracted by the number of received packets, ignoring
+ # any unexpected packets and capping at zero.
+ missing_packets_counters = expected_packets_counters - received_packets_counters
+ missing_packets_count = missing_packets_counters.total()
+ log_debug(
+ f"match_all_packets: expected {len(expected_packets)}, "
+ f"received {len(received_packets)}, missing {missing_packets_count}"
+ )
+
+ if missing_packets_count != 0:
+ if verify:
+ fail(
+ f"Not all packets were received, expected {len(expected_packets)} "
+ f"but {missing_packets_count} were missing."
+ )
+ return False
+
+ return True
+
+
+def verify_packets(expected_packet: Packet, received_packets: list[Packet]) -> None:
+ """Verify that `expected_packet` has been received.
+
+ Go through `received_packets` and check that `expected_packet` is among them.
+ If not, raise an exception and log the last 10 commands
+ executed on both the SUT and TG.
+
+ Args:
+ expected_packet: The packet we're expecting to receive.
+ received_packets: The packets where we're looking for `expected_packet`.
+
+ Raises:
+ TestCaseVerifyError: `expected_packet` is not among `received_packets`.
+ """
+ for received_packet in received_packets:
+ if _compare_packets(expected_packet, received_packet):
+ break
+ else:
+ log_debug(
+ f"The expected packet {expected_packet.summary()} "
+ f"not found among received {get_packet_summaries(received_packets)}"
+ )
+ fail("An expected packet not found among received packets.")
+
+
+def _compare_packets(expected_packet: Packet, received_packet: Packet) -> bool:
+ log_debug(f"Comparing packets: \n{expected_packet.summary()}\n{received_packet.summary()}")
+
+ l3 = IP in expected_packet.layers()
+ log_debug("Found l3 layer")
+
+ received_payload = received_packet
+ expected_payload = expected_packet
+ while received_payload and expected_payload:
+ log_debug("Comparing payloads:")
+ log_debug(f"Received: {received_payload}")
+ log_debug(f"Expected: {expected_payload}")
+ if type(received_payload) is type(expected_payload):
+ log_debug("The layers are the same.")
+ if type(received_payload) is Ether:
+ if not _verify_l2_frame(received_payload, l3):
+ return False
+ elif type(received_payload) is IP:
+ assert type(expected_payload) is IP
+ if not _verify_l3_packet(received_payload, expected_payload):
+ return False
+ else:
+ # Different layers => different packets
+ return False
+ received_payload = received_payload.payload
+ expected_payload = expected_payload.payload
+
+ if expected_payload:
+ log_debug(f"The expected packet did not contain {expected_payload}.")
+ return False
+ if received_payload and received_payload.__class__ != Padding:
+ log_debug("The received payload had extra layers which were not padding.")
+ return False
+ return True
+
+
+def _verify_l2_frame(received_packet: Ether, contains_l3: bool) -> bool:
+ """Verify the L2 frame of `received_packet`.
+
+ Args:
+ received_packet: The received L2 frame to verify.
+ contains_l3: If :data:`True`, the packet contains an L3 layer.
+ """
+ log_debug("Looking at the Ether layer.")
+ log_debug(
+ f"Comparing received dst mac '{received_packet.dst}' "
+ f"with expected '{get_ctx().topology.tg_port_ingress.mac_address}'."
+ )
+ if received_packet.dst != get_ctx().topology.tg_port_ingress.mac_address:
+ return False
+
+ expected_src_mac = get_ctx().topology.tg_port_egress.mac_address
+ if contains_l3:
+ expected_src_mac = get_ctx().topology.sut_port_egress.mac_address
+ log_debug(
+ f"Comparing received src mac '{received_packet.src}' "
+ f"with expected '{expected_src_mac}'."
+ )
+ if received_packet.src != expected_src_mac:
+ return False
+
+ return True
+
+
+def _verify_l3_packet(received_packet: IP, expected_packet: IP) -> bool:
+ log_debug("Looking at the IP layer.")
+ if received_packet.src != expected_packet.src or received_packet.dst != expected_packet.dst:
+ return False
+ return True
diff --git a/dts/api/test.py b/dts/api/test.py
new file mode 100644
index 0000000000..f58c82715d
--- /dev/null
+++ b/dts/api/test.py
@@ -0,0 +1,126 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2025 Arm Limited
+
+"""Common test utilities.
+
+This module provides utility functions for test cases, including logging, verification.
+"""
+
+from framework.context import get_ctx
+from framework.exception import InternalError, SkippedTestException, TestCaseVerifyError
+from framework.logger import DTSLogger
+
+
+def get_current_test_case_name() -> str:
+ """The name of the current test case.
+
+ Raises:
+ InternalError: If no test is running.
+ """
+ current_test_case = get_ctx().local.current_test_case
+ if current_test_case is None:
+ raise InternalError("No current test case")
+ return current_test_case.name
+
+
+def log(message: str) -> None:
+ """Log the given message with the level 'INFO'.
+
+ Args:
+ message: String representing the message to log.
+
+ Raises:
+ InternalError: If no test is running.
+ """
+ get_logger().info(message)
+
+
+def log_debug(message: str) -> None:
+ """Log the given message with the level 'DEBUG'.
+
+ Args:
+ message: String representing the message to log.
+
+ Raises:
+ InternalError: If no test is running.
+ """
+ get_logger().debug(message)
+
+
+def verify(condition: bool, failure_description: str) -> None:
+ """Verify `condition` and handle failures.
+
+ When `condition` is :data:`False`, raise an exception and log the last 10 commands
+ executed on both the SUT and TG.
+
+ Args:
+ condition: The condition to check.
+ failure_description: A short description of the failure
+ that will be stored in the raised exception.
+
+ Raises:
+ TestCaseVerifyError: If `condition` is :data:`False`.
+ """
+ if not condition:
+ fail(failure_description)
+
+
+def verify_else_skip(condition: bool, skip_reason: str) -> None:
+ """Verify `condition` and handle skips.
+
+ When `condition` is :data:`False`, raise a skip exception.
+
+ Args:
+ condition: The condition to check.
+ skip_reason: Description of the reason for skipping.
+
+ Raises:
+ SkippedTestException: If `condition` is :data:`False`.
+ """
+ if not condition:
+ skip(skip_reason)
+
+
+def skip(skip_description: str) -> None:
+ """Skip the current test case or test suite with a given description.
+
+ Args:
+ skip_description: Description of the reason for skipping.
+
+ Raises:
+ SkippedTestException: Always raised to indicate the test was skipped.
+ """
+ get_logger().debug(f"Test skipped: {skip_description}")
+ raise SkippedTestException(skip_description)
+
+
+def fail(failure_description: str) -> None:
+ """Fail the current test case with a given description.
+
+ Logs the last 10 commands executed on both the SUT and TG before raising an exception.
+
+ Args:
+ failure_description: Description of the reason for failure.
+
+ Raises:
+ TestCaseVerifyError: Always raised to indicate the test case failed.
+ """
+ get_logger().debug("A test case failed, showing the last 10 commands executed on SUT:")
+ for command_res in get_ctx().sut_node.main_session.remote_session.history[-10:]:
+ get_logger().debug(command_res.command)
+ get_logger().debug("A test case failed, showing the last 10 commands executed on TG:")
+ for command_res in get_ctx().tg_node.main_session.remote_session.history[-10:]:
+ get_logger().debug(command_res.command)
+ raise TestCaseVerifyError(failure_description)
+
+
+def get_logger() -> DTSLogger:
+ """Get a logger instance for tests.
+
+ Raises:
+ InternalError: If no test is running.
+ """
+ current_test_suite = get_ctx().local.current_test_suite
+ if current_test_suite is None:
+ raise InternalError("No current test suite")
+ return current_test_suite._logger
diff --git a/dts/framework/test_suite.py b/dts/framework/test_suite.py
index 5ee5a039d7..9c57e343ac 100644
--- a/dts/framework/test_suite.py
+++ b/dts/framework/test_suite.py
@@ -15,7 +15,6 @@
"""
import inspect
-from collections import Counter
from collections.abc import Callable, Sequence
from dataclasses import dataclass
from enum import Enum, auto
@@ -27,21 +26,16 @@
from typing import TYPE_CHECKING, ClassVar, Protocol, TypeVar, Union, cast
from scapy.layers.inet import IP
-from scapy.layers.l2 import Ether
-from scapy.packet import Packet, Padding, raw
+from scapy.packet import Packet
from typing_extensions import Self
from framework.config.common import FrozenModel
from framework.testbed_model.capability import TestProtocol
from framework.testbed_model.topology import Topology
-from framework.testbed_model.traffic_generator.capturing_traffic_generator import (
- CapturingTrafficGenerator,
- PacketFilteringConfig,
-)
-from .exception import ConfigurationError, InternalError, SkippedTestException, TestCaseVerifyError
+from .exception import ConfigurationError, InternalError
from .logger import DTSLogger, get_dts_logger
-from .utils import get_packet_summaries, to_pascal_case
+from .utils import to_pascal_case
if TYPE_CHECKING:
from framework.context import Context
@@ -212,121 +206,6 @@ def tear_down_test_case(self) -> None:
This is done after *each* test case.
"""
- def send_packet_and_capture(
- self,
- packet: Packet,
- filter_config: PacketFilteringConfig = PacketFilteringConfig(),
- duration: float = 1,
- ) -> list[Packet]:
- """Send and receive `packet` using the associated TG.
-
- Send `packet` through the appropriate interface and receive on the appropriate interface.
- Modify the packet with l3/l2 addresses corresponding to the testbed and desired traffic.
-
- Args:
- packet: The packet to send.
- filter_config: The filter to use when capturing packets.
- duration: Capture traffic for this amount of time after sending `packet`.
-
- Returns:
- A list of received packets.
- """
- return self.send_packets_and_capture(
- [packet],
- filter_config,
- duration,
- )
-
- def send_packets_and_capture(
- self,
- packets: list[Packet],
- filter_config: PacketFilteringConfig = PacketFilteringConfig(),
- duration: float = 1,
- ) -> list[Packet]:
- """Send and receive `packets` using the associated TG.
-
- Send `packets` through the appropriate interface and receive on the appropriate interface.
- Modify the packets with l3/l2 addresses corresponding to the testbed and desired traffic.
-
- Args:
- packets: The packets to send.
- filter_config: The filter to use when capturing packets.
- duration: Capture traffic for this amount of time after sending `packet`.
-
- Returns:
- A list of received packets.
- """
- assert isinstance(
- self._ctx.tg, CapturingTrafficGenerator
- ), "Cannot capture with a non-capturing traffic generator"
- # TODO: implement @requires for types of traffic generator
- packets = self._adjust_addresses(packets)
- return self._ctx.tg.send_packets_and_capture(
- packets,
- self._ctx.topology.tg_port_egress,
- self._ctx.topology.tg_port_ingress,
- filter_config,
- duration,
- )
-
- def send_packets(
- self,
- packets: list[Packet],
- ) -> None:
- """Send packets using the traffic generator and do not capture received traffic.
-
- Args:
- packets: Packets to send.
- """
- packets = self._adjust_addresses(packets)
- self._ctx.tg.send_packets(packets, self._ctx.topology.tg_port_egress)
-
- def get_expected_packets(
- self,
- packets: list[Packet],
- sent_from_tg: bool = False,
- ) -> list[Packet]:
- """Inject the proper L2/L3 addresses into `packets`.
-
- Inject the L2/L3 addresses expected at the receiving end of the traffic generator.
-
- Args:
- packets: The packets to modify.
- sent_from_tg: If :data:`True` packet was sent from the TG.
-
- Returns:
- `packets` with injected L2/L3 addresses.
- """
- return self._adjust_addresses(packets, not sent_from_tg)
-
- def get_expected_packet(
- self,
- packet: Packet,
- sent_from_tg: bool = False,
- ) -> Packet:
- """Inject the proper L2/L3 addresses into `packet`.
-
- Inject the L2/L3 addresses expected at the receiving end of the traffic generator.
-
- Args:
- packet: The packet to modify.
- sent_from_tg: If :data:`True` packet was sent from the TG.
-
- Returns:
- `packet` with injected L2/L3 addresses.
- """
- return self.get_expected_packets([packet], sent_from_tg)[0]
-
- def log(self, message: str) -> None:
- """Call the private instance of logger within the TestSuite class.
-
- Log the given message with the level 'INFO'.
-
- Args:
- message: String representing the message to log.
- """
- self._logger.info(message)
-
def _adjust_addresses(self, packets: list[Packet], expected: bool = False) -> list[Packet]:
"""L2 and L3 address additions in both directions.
@@ -388,184 +267,6 @@ def _adjust_addresses(self, packets: list[Packet], expected: bool = False) -> li
return ret_packets
- def verify(self, condition: bool, failure_description: str) -> None:
- """Verify `condition` and handle failures.
-
- When `condition` is :data:`False`, raise an exception and log the last 10 commands
- executed on both the SUT and TG.
-
- Args:
- condition: The condition to check.
- failure_description: A short description of the failure
- that will be stored in the raised exception.
-
- Raises:
- TestCaseVerifyError: `condition` is :data:`False`.
- """
- if not condition:
- self._fail_test_case_verify(failure_description)
-
- def _fail_test_case_verify(self, failure_description: str) -> None:
- self._logger.debug("A test case failed, showing the last 10 commands executed on SUT:")
- for command_res in self._ctx.sut_node.main_session.remote_session.history[-10:]:
- self._logger.debug(command_res.command)
- self._logger.debug("A test case failed, showing the last 10 commands executed on TG:")
- for command_res in self._ctx.tg_node.main_session.remote_session.history[-10:]:
- self._logger.debug(command_res.command)
- raise TestCaseVerifyError(failure_description)
-
- def verify_else_skip(self, condition: bool, skip_reason: str) -> None:
- """Verify `condition` and handle skips.
-
- When `condition` is :data:`False`, raise a skip exception.
-
- Args:
- condition: The condition to check.
- skip_reason: Description of the reason for skipping.
-
- Raises:
- SkippedTestException: `condition` is :data:`False`.
- """
- if not condition:
- self._skip_test_case_verify(skip_reason)
-
- def _skip_test_case_verify(self, skip_description: str) -> None:
- self._logger.debug(f"Test case skipped: {skip_description}")
- raise SkippedTestException(skip_description)
-
- def verify_packets(self, expected_packet: Packet, received_packets: list[Packet]) -> None:
- """Verify that `expected_packet` has been received.
-
- Go through `received_packets` and check that `expected_packet` is among them.
- If not, raise an exception and log the last 10 commands
- executed on both the SUT and TG.
-
- Args:
- expected_packet: The packet we're expecting to receive.
- received_packets: The packets where we're looking for `expected_packet`.
-
- Raises:
- TestCaseVerifyError: `expected_packet` is not among `received_packets`.
- """
- for received_packet in received_packets:
- if self._compare_packets(expected_packet, received_packet):
- break
- else:
- self._logger.debug(
- f"The expected packet {expected_packet.summary()} "
- f"not found among received {get_packet_summaries(received_packets)}"
- )
- self._fail_test_case_verify("An expected packet not found among received packets.")
-
- def match_all_packets(
- self,
- expected_packets: list[Packet],
- received_packets: list[Packet],
- verify: bool = True,
- ) -> bool:
- """Matches all the expected packets against the received ones.
-
- Matching is performed by counting down the occurrences in a dictionary which keys are the
- raw packet bytes. No deep packet comparison is performed. All the unexpected packets (noise)
- are automatically ignored.
-
- Args:
- expected_packets: The packets we are expecting to receive.
- received_packets: All the packets that were received.
- verify: If :data:`True`, and there are missing packets an exception will be raised.
-
- Raises:
- TestCaseVerifyError: if and not all the `expected_packets` were found in
- `received_packets`.
-
- Returns:
- :data:`True` If there are no missing packets.
- """
- expected_packets_counters = Counter(map(raw, expected_packets))
- received_packets_counters = Counter(map(raw, received_packets))
- # The number of expected packets is subtracted by the number of received packets, ignoring
- # any unexpected packets and capping at zero.
- missing_packets_counters = expected_packets_counters - received_packets_counters
- missing_packets_count = missing_packets_counters.total()
- self._logger.debug(
- f"match_all_packets: expected {len(expected_packets)}, "
- f"received {len(received_packets)}, missing {missing_packets_count}"
- )
-
- if missing_packets_count != 0:
- if verify:
- self._fail_test_case_verify(
- f"Not all packets were received, expected {len(expected_packets)} "
- f"but {missing_packets_count} were missing."
- )
- return False
-
- return True
-
- def _compare_packets(self, expected_packet: Packet, received_packet: Packet) -> bool:
- self._logger.debug(
- f"Comparing packets: \n{expected_packet.summary()}\n{received_packet.summary()}"
- )
-
- l3 = IP in expected_packet.layers()
- self._logger.debug("Found l3 layer")
-
- received_payload = received_packet
- expected_payload = expected_packet
- while received_payload and expected_payload:
- self._logger.debug("Comparing payloads:")
- self._logger.debug(f"Received: {received_payload}")
- self._logger.debug(f"Expected: {expected_payload}")
- if type(received_payload) is type(expected_payload):
- self._logger.debug("The layers are the same.")
- if type(received_payload) is Ether:
- if not self._verify_l2_frame(received_payload, l3):
- return False
- elif type(received_payload) is IP:
- assert type(expected_payload) is IP
- if not self._verify_l3_packet(received_payload, expected_payload):
- return False
- else:
- # Different layers => different packets
- return False
- received_payload = received_payload.payload
- expected_payload = expected_payload.payload
-
- if expected_payload:
- self._logger.debug(f"The expected packet did not contain {expected_payload}.")
- return False
- if received_payload and received_payload.__class__ != Padding:
- self._logger.debug("The received payload had extra layers which were not padding.")
- return False
- return True
-
- def _verify_l2_frame(self, received_packet: Ether, l3: bool) -> bool:
- self._logger.debug("Looking at the Ether layer.")
- self._logger.debug(
- f"Comparing received dst mac '{received_packet.dst}' "
- f"with expected '{self.topology.tg_port_ingress.mac_address}'."
- )
- if received_packet.dst != self.topology.tg_port_ingress.mac_address:
- return False
-
- expected_src_mac = self.topology.tg_port_egress.mac_address
- if l3:
- expected_src_mac = self.topology.sut_port_egress.mac_address
- self._logger.debug(
- f"Comparing received src mac '{received_packet.src}' "
- f"with expected '{expected_src_mac}'."
- )
- if received_packet.src != expected_src_mac:
- return False
-
- return True
-
- def _verify_l3_packet(self, received_packet: IP, expected_packet: IP) -> bool:
- self._logger.debug("Looking at the IP layer.")
- if received_packet.src != expected_packet.src or received_packet.dst != expected_packet.dst:
- return False
- return True
-
#: The generic type for a method of an instance of TestSuite
TestSuiteMethodType = TypeVar("TestSuiteMethodType", bound=Callable[[TestSuite], None])
--
2.39.5
next prev parent reply other threads:[~2025-09-23 10:38 UTC|newest]
Thread overview: 3+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-09-23 10:37 [PATCH v1 0/2] Move TestSuite methods to API modules Paul Szczepanek
2025-09-23 10:37 ` Paul Szczepanek [this message]
2025-09-23 10:37 ` [PATCH v1 2/2] dts: adjust all tests to use the new API calls Paul Szczepanek
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20250923103758.3192015-2-paul.szczepanek@arm.com \
--to=paul.szczepanek@arm.com \
--cc=dev@dpdk.org \
--cc=luca.vizzarro@arm.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).