* [PATCH v1 0/2] Move TestSuite methods to API modules
@ 2025-09-23 10:37 Paul Szczepanek
2025-09-23 10:37 ` [PATCH v1 1/2] dts: add packet handling and test utilities to API Paul Szczepanek
2025-09-23 10:37 ` [PATCH v1 2/2] dts: adjust all tests to use the new API calls Paul Szczepanek
0 siblings, 2 replies; 3+ messages in thread
From: Paul Szczepanek @ 2025-09-23 10:37 UTC (permalink / raw)
To: dev; +Cc: Paul Szczepanek
Split up TestSuite internals from methods that can be used in tests.
Packet and test related methods have been moved to new API modules:
dts/api/packet.py,
dts/api/test.py.
All tests have been adjusted to use the new API calls.
Paul Szczepanek (2):
dts: add packet handling and test utilities to API
dts: adjust all tests to use the new API calls
doc/api/dts/api.packet.rst | 8 +
doc/api/dts/api.rst | 4 +-
doc/api/dts/api.test.rst | 8 +
doc/guides/tools/dts.rst | 2 +-
dts/api/packet.py | 311 ++++++++++++++++++
dts/api/test.py | 126 +++++++
dts/framework/test_suite.py | 305 +----------------
dts/tests/TestSuite_blocklist.py | 5 +-
dts/tests/TestSuite_checksum_offload.py | 14 +-
dts/tests/TestSuite_dual_vlan.py | 26 +-
dts/tests/TestSuite_dynamic_config.py | 8 +-
dts/tests/TestSuite_dynamic_queue_conf.py | 13 +-
dts/tests/TestSuite_hello_world.py | 3 +-
dts/tests/TestSuite_l2fwd.py | 11 +-
dts/tests/TestSuite_mac_filter.py | 16 +-
dts/tests/TestSuite_mtu.py | 10 +-
dts/tests/TestSuite_packet_capture.py | 22 +-
dts/tests/TestSuite_pmd_buffer_scatter.py | 10 +-
dts/tests/TestSuite_port_control.py | 14 +-
...stSuite_port_restart_config_persistency.py | 3 +-
dts/tests/TestSuite_port_stats.py | 12 +-
dts/tests/TestSuite_promisc_support.py | 20 +-
dts/tests/TestSuite_queue_start_stop.py | 6 +-
dts/tests/TestSuite_rte_flow.py | 70 ++--
dts/tests/TestSuite_smoke_tests.py | 7 +-
dts/tests/TestSuite_softnic.py | 11 +-
dts/tests/TestSuite_uni_pkt.py | 6 +-
dts/tests/TestSuite_vlan.py | 20 +-
28 files changed, 638 insertions(+), 433 deletions(-)
create mode 100644 doc/api/dts/api.packet.rst
create mode 100644 doc/api/dts/api.test.rst
create mode 100644 dts/api/packet.py
create mode 100644 dts/api/test.py
--
2.39.5
^ permalink raw reply [flat|nested] 3+ messages in thread
* [PATCH v1 1/2] dts: add packet handling and test utilities to API
2025-09-23 10:37 [PATCH v1 0/2] Move TestSuite methods to API modules Paul Szczepanek
@ 2025-09-23 10:37 ` Paul Szczepanek
2025-09-23 10:37 ` [PATCH v1 2/2] dts: adjust all tests to use the new API calls Paul Szczepanek
1 sibling, 0 replies; 3+ messages in thread
From: Paul Szczepanek @ 2025-09-23 10:37 UTC (permalink / raw)
To: dev; +Cc: Paul Szczepanek, Luca Vizzarro
Split TestSuite methods between test run methods
and packet related methods.
Depends-on: series-36111 ("Split DTS framework and public API")
Signed-off-by: Paul Szczepanek <paul.szczepanek@arm.com>
Reviewed-by: Luca Vizzarro <luca.vizzarro@arm.com>
---
doc/api/dts/api.packet.rst | 8 +
doc/api/dts/api.rst | 4 +-
doc/api/dts/api.test.rst | 8 +
doc/guides/tools/dts.rst | 2 +-
dts/api/packet.py | 311 ++++++++++++++++++++++++++++++++++++
dts/api/test.py | 126 +++++++++++++++
dts/framework/test_suite.py | 305 +----------------------------------
7 files changed, 460 insertions(+), 304 deletions(-)
create mode 100644 doc/api/dts/api.packet.rst
create mode 100644 doc/api/dts/api.test.rst
create mode 100644 dts/api/packet.py
create mode 100644 dts/api/test.py
diff --git a/doc/api/dts/api.packet.rst b/doc/api/dts/api.packet.rst
new file mode 100644
index 0000000000..93d455a609
--- /dev/null
+++ b/doc/api/dts/api.packet.rst
@@ -0,0 +1,8 @@
+.. SPDX-License-Identifier: BSD-3-Clause
+
+packet - Sending and capturing packets
+======================================
+
+.. automodule:: api.packet
+ :members:
+ :show-inheritance:
diff --git a/doc/api/dts/api.rst b/doc/api/dts/api.rst
index 9a13704f16..d7d927bfa9 100644
--- a/doc/api/dts/api.rst
+++ b/doc/api/dts/api.rst
@@ -17,5 +17,7 @@ api - DTS API
:hidden:
:maxdepth: 1
+ api.test
api.artifact
- api.capabilities
\ No newline at end of file
+ api.capabilities
+ api.packet
\ No newline at end of file
diff --git a/doc/api/dts/api.test.rst b/doc/api/dts/api.test.rst
new file mode 100644
index 0000000000..91605cc31b
--- /dev/null
+++ b/doc/api/dts/api.test.rst
@@ -0,0 +1,8 @@
+.. SPDX-License-Identifier: BSD-3-Clause
+
+test - Reporting results and logging
+====================================
+
+.. automodule:: api.test
+ :members:
+ :show-inheritance:
diff --git a/doc/guides/tools/dts.rst b/doc/guides/tools/dts.rst
index e7af60b7a6..2445efccfc 100644
--- a/doc/guides/tools/dts.rst
+++ b/doc/guides/tools/dts.rst
@@ -455,7 +455,7 @@ Test Case Verification
Use the verify method to assert conditions and record test results.
This should typically be called at the end of each test case.
- Example: self.verify(link_up, "Link should be up after configuration.")
+ Example: verify(link_up, "Link should be up after configuration.")
Other Methods
diff --git a/dts/api/packet.py b/dts/api/packet.py
new file mode 100644
index 0000000000..bf6f3e1be8
--- /dev/null
+++ b/dts/api/packet.py
@@ -0,0 +1,311 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2025 Arm Limited
+
+"""Features common to all test suites.
+
+The module defines the :class:`TestSuite` class which doesn't contain any test cases, and as such
+must be extended by subclasses which add test cases. The :class:`TestSuite` contains the basics
+needed by subclasses:
+
+ * Testbed (SUT, TG) configuration,
+ * Packet sending and verification,
+ * Test case verification.
+"""
+
+from collections import Counter
+from typing import cast
+
+from scapy.layers.inet import IP
+from scapy.layers.l2 import Ether
+from scapy.packet import Packet, Padding, raw
+
+from api.test import fail, log_debug
+from framework.context import get_ctx
+from framework.exception import InternalError
+from framework.testbed_model.traffic_generator.capturing_traffic_generator import (
+ PacketFilteringConfig,
+)
+from framework.utils import get_packet_summaries
+
+
+def send_packet_and_capture(
+ packet: Packet,
+ filter_config: PacketFilteringConfig = PacketFilteringConfig(),
+ duration: float = 1,
+) -> list[Packet]:
+ """Send and receive `packet` using the associated TG.
+
+ Send `packet` through the appropriate interface and receive on the appropriate interface.
+ Modify the packet with l3/l2 addresses corresponding to the testbed and desired traffic.
+
+ Args:
+ packet: The packet to send.
+ filter_config: The filter to use when capturing packets.
+ duration: Capture traffic for this amount of time after sending `packet`.
+
+ Returns:
+ A list of received packets.
+ """
+ return send_packets_and_capture(
+ [packet],
+ filter_config,
+ duration,
+ )
+
+
+def send_packets_and_capture(
+ packets: list[Packet],
+ filter_config: PacketFilteringConfig = PacketFilteringConfig(),
+ duration: float = 1,
+) -> list[Packet]:
+ """Send and receive `packets` using the associated TG.
+
+ Send `packets` through the appropriate interface and receive on the appropriate interface.
+ Modify the packets with l3/l2 addresses corresponding to the testbed and desired traffic.
+
+ Args:
+ packets: The packets to send.
+ filter_config: The filter to use when capturing packets.
+ duration: Capture traffic for this amount of time after sending `packet`.
+
+ Returns:
+ A list of received packets.
+ """
+ from framework.context import get_ctx
+ from framework.testbed_model.traffic_generator.capturing_traffic_generator import (
+ CapturingTrafficGenerator,
+ )
+
+ assert isinstance(
+ get_ctx().tg, CapturingTrafficGenerator
+ ), "Cannot capture with a non-capturing traffic generator"
+ tg: CapturingTrafficGenerator = cast(CapturingTrafficGenerator, get_ctx().tg)
+ # TODO: implement @requires for types of traffic generator
+ packets = adjust_addresses(packets)
+ return tg.send_packets_and_capture(
+ packets,
+ get_ctx().topology.tg_port_egress,
+ get_ctx().topology.tg_port_ingress,
+ filter_config,
+ duration,
+ )
+
+
+def send_packets(
+ packets: list[Packet],
+) -> None:
+ """Send packets using the traffic generator and do not capture received traffic.
+
+ Args:
+ packets: Packets to send.
+ """
+ packets = adjust_addresses(packets)
+ get_ctx().tg.send_packets(packets, get_ctx().topology.tg_port_egress)
+
+
+def get_expected_packets(
+ packets: list[Packet],
+ sent_from_tg: bool = False,
+) -> list[Packet]:
+ """Inject the proper L2/L3 addresses into `packets`.
+
+ Inject the L2/L3 addresses expected at the receiving end of the traffic generator.
+
+ Args:
+ packets: The packets to modify.
+ sent_from_tg: If :data:`True` packet was sent from the TG.
+
+ Returns:
+ `packets` with injected L2/L3 addresses.
+ """
+ return adjust_addresses(packets, not sent_from_tg)
+
+
+def get_expected_packet(
+ packet: Packet,
+ sent_from_tg: bool = False,
+) -> Packet:
+ """Inject the proper L2/L3 addresses into `packet`.
+
+ Inject the L2/L3 addresses expected at the receiving end of the traffic generator.
+
+ Args:
+ packet: The packet to modify.
+ sent_from_tg: If :data:`True` packet was sent from the TG.
+
+ Returns:
+ `packet` with injected L2/L3 addresses.
+ """
+ return get_expected_packets([packet], sent_from_tg)[0]
+
+
+def adjust_addresses(packets: list[Packet], expected: bool = False) -> list[Packet]:
+ """L2 and L3 address additions in both directions.
+
+ Copies of `packets` will be made, modified and returned in this method.
+
+ Only missing addresses are added to packets, existing addresses will not be overridden. If
+ any packet in `packets` has multiple IP layers (using GRE, for example) only the inner-most
+ IP layer will have its addresses adjusted.
+
+ Assumptions:
+ Two links between SUT and TG, one link is TG -> SUT, the other SUT -> TG.
+
+ Args:
+ packets: The packets to modify.
+ expected: If :data:`True`, the direction is SUT -> TG,
+ otherwise the direction is TG -> SUT.
+
+ Returns:
+ A list containing copies of all packets in `packets` after modification.
+
+ Raises:
+ InternalError: If no tests are running.
+ """
+ from framework.test_suite import TestSuite
+
+ if get_ctx().local.current_test_suite is None:
+ raise InternalError("No current test suite, tests aren't running?")
+ current_test_suite: TestSuite = cast(TestSuite, get_ctx().local.current_test_suite)
+ return current_test_suite._adjust_addresses(packets, expected)
+
+
+def match_all_packets(
+ expected_packets: list[Packet],
+ received_packets: list[Packet],
+ verify: bool = True,
+) -> bool:
+ """Matches all the expected packets against the received ones.
+
+ Matching is performed by counting down the occurrences in a dictionary which keys are the
+ raw packet bytes. No deep packet comparison is performed. All the unexpected packets (noise)
+ are automatically ignored.
+
+ Args:
+ expected_packets: The packets we are expecting to receive.
+ received_packets: All the packets that were received.
+ verify: If :data:`True`, and there are missing packets an exception will be raised.
+
+ Raises:
+ TestCaseVerifyError: if and not all the `expected_packets` were found in
+ `received_packets`.
+
+ Returns:
+ :data:`True` If there are no missing packets.
+ """
+ expected_packets_counters = Counter(map(raw, expected_packets))
+ received_packets_counters = Counter(map(raw, received_packets))
+ # The number of expected packets is subtracted by the number of received packets, ignoring
+ # any unexpected packets and capping at zero.
+ missing_packets_counters = expected_packets_counters - received_packets_counters
+ missing_packets_count = missing_packets_counters.total()
+ log_debug(
+ f"match_all_packets: expected {len(expected_packets)}, "
+ f"received {len(received_packets)}, missing {missing_packets_count}"
+ )
+
+ if missing_packets_count != 0:
+ if verify:
+ fail(
+ f"Not all packets were received, expected {len(expected_packets)} "
+ f"but {missing_packets_count} were missing."
+ )
+ return False
+
+ return True
+
+
+def verify_packets(expected_packet: Packet, received_packets: list[Packet]) -> None:
+ """Verify that `expected_packet` has been received.
+
+ Go through `received_packets` and check that `expected_packet` is among them.
+ If not, raise an exception and log the last 10 commands
+ executed on both the SUT and TG.
+
+ Args:
+ expected_packet: The packet we're expecting to receive.
+ received_packets: The packets where we're looking for `expected_packet`.
+
+ Raises:
+ TestCaseVerifyError: `expected_packet` is not among `received_packets`.
+ """
+ for received_packet in received_packets:
+ if _compare_packets(expected_packet, received_packet):
+ break
+ else:
+ log_debug(
+ f"The expected packet {expected_packet.summary()} "
+ f"not found among received {get_packet_summaries(received_packets)}"
+ )
+ fail("An expected packet not found among received packets.")
+
+
+def _compare_packets(expected_packet: Packet, received_packet: Packet) -> bool:
+ log_debug(f"Comparing packets: \n{expected_packet.summary()}\n{received_packet.summary()}")
+
+ l3 = IP in expected_packet.layers()
+ log_debug("Found l3 layer")
+
+ received_payload = received_packet
+ expected_payload = expected_packet
+ while received_payload and expected_payload:
+ log_debug("Comparing payloads:")
+ log_debug(f"Received: {received_payload}")
+ log_debug(f"Expected: {expected_payload}")
+ if type(received_payload) is type(expected_payload):
+ log_debug("The layers are the same.")
+ if type(received_payload) is Ether:
+ if not _verify_l2_frame(received_payload, l3):
+ return False
+ elif type(received_payload) is IP:
+ assert type(expected_payload) is IP
+ if not _verify_l3_packet(received_payload, expected_payload):
+ return False
+ else:
+ # Different layers => different packets
+ return False
+ received_payload = received_payload.payload
+ expected_payload = expected_payload.payload
+
+ if expected_payload:
+ log_debug(f"The expected packet did not contain {expected_payload}.")
+ return False
+ if received_payload and received_payload.__class__ != Padding:
+ log_debug("The received payload had extra layers which were not padding.")
+ return False
+ return True
+
+
+def _verify_l2_frame(received_packet: Ether, contains_l3: bool) -> bool:
+ """Verify the L2 frame of `received_packet`.
+
+ Args:
+ received_packet: The received L2 frame to verify.
+ contains_l3: If :data:`True`, the packet contains an L3 layer.
+ """
+ log_debug("Looking at the Ether layer.")
+ log_debug(
+ f"Comparing received dst mac '{received_packet.dst}' "
+ f"with expected '{get_ctx().topology.tg_port_ingress.mac_address}'."
+ )
+ if received_packet.dst != get_ctx().topology.tg_port_ingress.mac_address:
+ return False
+
+ expected_src_mac = get_ctx().topology.tg_port_egress.mac_address
+ if contains_l3:
+ expected_src_mac = get_ctx().topology.sut_port_egress.mac_address
+ log_debug(
+ f"Comparing received src mac '{received_packet.src}' "
+ f"with expected '{expected_src_mac}'."
+ )
+ if received_packet.src != expected_src_mac:
+ return False
+
+ return True
+
+
+def _verify_l3_packet(received_packet: IP, expected_packet: IP) -> bool:
+ log_debug("Looking at the IP layer.")
+ if received_packet.src != expected_packet.src or received_packet.dst != expected_packet.dst:
+ return False
+ return True
diff --git a/dts/api/test.py b/dts/api/test.py
new file mode 100644
index 0000000000..f58c82715d
--- /dev/null
+++ b/dts/api/test.py
@@ -0,0 +1,126 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2025 Arm Limited
+
+"""Common test utilities.
+
+This module provides utility functions for test cases, including logging, verification.
+"""
+
+from framework.context import get_ctx
+from framework.exception import InternalError, SkippedTestException, TestCaseVerifyError
+from framework.logger import DTSLogger
+
+
+def get_current_test_case_name() -> str:
+ """The name of the current test case.
+
+ Raises:
+ InternalError: If no test is running.
+ """
+ current_test_case = get_ctx().local.current_test_case
+ if current_test_case is None:
+ raise InternalError("No current test case")
+ return current_test_case.name
+
+
+def log(message: str) -> None:
+ """Log the given message with the level 'INFO'.
+
+ Args:
+ message: String representing the message to log.
+
+ Raises:
+ InternalError: If no test is running.
+ """
+ get_logger().info(message)
+
+
+def log_debug(message: str) -> None:
+ """Log the given message with the level 'DEBUG'.
+
+ Args:
+ message: String representing the message to log.
+
+ Raises:
+ InternalError: If no test is running.
+ """
+ get_logger().debug(message)
+
+
+def verify(condition: bool, failure_description: str) -> None:
+ """Verify `condition` and handle failures.
+
+ When `condition` is :data:`False`, raise an exception and log the last 10 commands
+ executed on both the SUT and TG.
+
+ Args:
+ condition: The condition to check.
+ failure_description: A short description of the failure
+ that will be stored in the raised exception.
+
+ Raises:
+ TestCaseVerifyError: If `condition` is :data:`False`.
+ """
+ if not condition:
+ fail(failure_description)
+
+
+def verify_else_skip(condition: bool, skip_reason: str) -> None:
+ """Verify `condition` and handle skips.
+
+ When `condition` is :data:`False`, raise a skip exception.
+
+ Args:
+ condition: The condition to check.
+ skip_reason: Description of the reason for skipping.
+
+ Raises:
+ SkippedTestException: If `condition` is :data:`False`.
+ """
+ if not condition:
+ skip(skip_reason)
+
+
+def skip(skip_description: str) -> None:
+ """Skip the current test case or test suite with a given description.
+
+ Args:
+ skip_description: Description of the reason for skipping.
+
+ Raises:
+ SkippedTestException: Always raised to indicate the test was skipped.
+ """
+ get_logger().debug(f"Test skipped: {skip_description}")
+ raise SkippedTestException(skip_description)
+
+
+def fail(failure_description: str) -> None:
+ """Fail the current test case with a given description.
+
+ Logs the last 10 commands executed on both the SUT and TG before raising an exception.
+
+ Args:
+ failure_description: Description of the reason for failure.
+
+ Raises:
+ TestCaseVerifyError: Always raised to indicate the test case failed.
+ """
+ get_logger().debug("A test case failed, showing the last 10 commands executed on SUT:")
+ for command_res in get_ctx().sut_node.main_session.remote_session.history[-10:]:
+ get_logger().debug(command_res.command)
+ get_logger().debug("A test case failed, showing the last 10 commands executed on TG:")
+ for command_res in get_ctx().tg_node.main_session.remote_session.history[-10:]:
+ get_logger().debug(command_res.command)
+ raise TestCaseVerifyError(failure_description)
+
+
+def get_logger() -> DTSLogger:
+ """Get a logger instance for tests.
+
+ Raises:
+ InternalError: If no test is running.
+ """
+ current_test_suite = get_ctx().local.current_test_suite
+ if current_test_suite is None:
+ raise InternalError("No current test suite")
+ return current_test_suite._logger
diff --git a/dts/framework/test_suite.py b/dts/framework/test_suite.py
index 5ee5a039d7..9c57e343ac 100644
--- a/dts/framework/test_suite.py
+++ b/dts/framework/test_suite.py
@@ -15,7 +15,6 @@
"""
import inspect
-from collections import Counter
from collections.abc import Callable, Sequence
from dataclasses import dataclass
from enum import Enum, auto
@@ -27,21 +26,16 @@
from typing import TYPE_CHECKING, ClassVar, Protocol, TypeVar, Union, cast
from scapy.layers.inet import IP
-from scapy.layers.l2 import Ether
-from scapy.packet import Packet, Padding, raw
+from scapy.packet import Packet
from typing_extensions import Self
from framework.config.common import FrozenModel
from framework.testbed_model.capability import TestProtocol
from framework.testbed_model.topology import Topology
-from framework.testbed_model.traffic_generator.capturing_traffic_generator import (
- CapturingTrafficGenerator,
- PacketFilteringConfig,
-)
-from .exception import ConfigurationError, InternalError, SkippedTestException, TestCaseVerifyError
+from .exception import ConfigurationError, InternalError
from .logger import DTSLogger, get_dts_logger
-from .utils import get_packet_summaries, to_pascal_case
+from .utils import to_pascal_case
if TYPE_CHECKING:
from framework.context import Context
@@ -212,121 +206,6 @@ def tear_down_test_case(self) -> None:
This is done after *each* test case.
"""
- def send_packet_and_capture(
- self,
- packet: Packet,
- filter_config: PacketFilteringConfig = PacketFilteringConfig(),
- duration: float = 1,
- ) -> list[Packet]:
- """Send and receive `packet` using the associated TG.
-
- Send `packet` through the appropriate interface and receive on the appropriate interface.
- Modify the packet with l3/l2 addresses corresponding to the testbed and desired traffic.
-
- Args:
- packet: The packet to send.
- filter_config: The filter to use when capturing packets.
- duration: Capture traffic for this amount of time after sending `packet`.
-
- Returns:
- A list of received packets.
- """
- return self.send_packets_and_capture(
- [packet],
- filter_config,
- duration,
- )
-
- def send_packets_and_capture(
- self,
- packets: list[Packet],
- filter_config: PacketFilteringConfig = PacketFilteringConfig(),
- duration: float = 1,
- ) -> list[Packet]:
- """Send and receive `packets` using the associated TG.
-
- Send `packets` through the appropriate interface and receive on the appropriate interface.
- Modify the packets with l3/l2 addresses corresponding to the testbed and desired traffic.
-
- Args:
- packets: The packets to send.
- filter_config: The filter to use when capturing packets.
- duration: Capture traffic for this amount of time after sending `packet`.
-
- Returns:
- A list of received packets.
- """
- assert isinstance(
- self._ctx.tg, CapturingTrafficGenerator
- ), "Cannot capture with a non-capturing traffic generator"
- # TODO: implement @requires for types of traffic generator
- packets = self._adjust_addresses(packets)
- return self._ctx.tg.send_packets_and_capture(
- packets,
- self._ctx.topology.tg_port_egress,
- self._ctx.topology.tg_port_ingress,
- filter_config,
- duration,
- )
-
- def send_packets(
- self,
- packets: list[Packet],
- ) -> None:
- """Send packets using the traffic generator and do not capture received traffic.
-
- Args:
- packets: Packets to send.
- """
- packets = self._adjust_addresses(packets)
- self._ctx.tg.send_packets(packets, self._ctx.topology.tg_port_egress)
-
- def get_expected_packets(
- self,
- packets: list[Packet],
- sent_from_tg: bool = False,
- ) -> list[Packet]:
- """Inject the proper L2/L3 addresses into `packets`.
-
- Inject the L2/L3 addresses expected at the receiving end of the traffic generator.
-
- Args:
- packets: The packets to modify.
- sent_from_tg: If :data:`True` packet was sent from the TG.
-
- Returns:
- `packets` with injected L2/L3 addresses.
- """
- return self._adjust_addresses(packets, not sent_from_tg)
-
- def get_expected_packet(
- self,
- packet: Packet,
- sent_from_tg: bool = False,
- ) -> Packet:
- """Inject the proper L2/L3 addresses into `packet`.
-
- Inject the L2/L3 addresses expected at the receiving end of the traffic generator.
-
- Args:
- packet: The packet to modify.
- sent_from_tg: If :data:`True` packet was sent from the TG.
-
- Returns:
- `packet` with injected L2/L3 addresses.
- """
- return self.get_expected_packets([packet], sent_from_tg)[0]
-
- def log(self, message: str) -> None:
- """Call the private instance of logger within the TestSuite class.
-
- Log the given message with the level 'INFO'.
-
- Args:
- message: String representing the message to log.
- """
- self._logger.info(message)
-
def _adjust_addresses(self, packets: list[Packet], expected: bool = False) -> list[Packet]:
"""L2 and L3 address additions in both directions.
@@ -388,184 +267,6 @@ def _adjust_addresses(self, packets: list[Packet], expected: bool = False) -> li
return ret_packets
- def verify(self, condition: bool, failure_description: str) -> None:
- """Verify `condition` and handle failures.
-
- When `condition` is :data:`False`, raise an exception and log the last 10 commands
- executed on both the SUT and TG.
-
- Args:
- condition: The condition to check.
- failure_description: A short description of the failure
- that will be stored in the raised exception.
-
- Raises:
- TestCaseVerifyError: `condition` is :data:`False`.
- """
- if not condition:
- self._fail_test_case_verify(failure_description)
-
- def _fail_test_case_verify(self, failure_description: str) -> None:
- self._logger.debug("A test case failed, showing the last 10 commands executed on SUT:")
- for command_res in self._ctx.sut_node.main_session.remote_session.history[-10:]:
- self._logger.debug(command_res.command)
- self._logger.debug("A test case failed, showing the last 10 commands executed on TG:")
- for command_res in self._ctx.tg_node.main_session.remote_session.history[-10:]:
- self._logger.debug(command_res.command)
- raise TestCaseVerifyError(failure_description)
-
- def verify_else_skip(self, condition: bool, skip_reason: str) -> None:
- """Verify `condition` and handle skips.
-
- When `condition` is :data:`False`, raise a skip exception.
-
- Args:
- condition: The condition to check.
- skip_reason: Description of the reason for skipping.
-
- Raises:
- SkippedTestException: `condition` is :data:`False`.
- """
- if not condition:
- self._skip_test_case_verify(skip_reason)
-
- def _skip_test_case_verify(self, skip_description: str) -> None:
- self._logger.debug(f"Test case skipped: {skip_description}")
- raise SkippedTestException(skip_description)
-
- def verify_packets(self, expected_packet: Packet, received_packets: list[Packet]) -> None:
- """Verify that `expected_packet` has been received.
-
- Go through `received_packets` and check that `expected_packet` is among them.
- If not, raise an exception and log the last 10 commands
- executed on both the SUT and TG.
-
- Args:
- expected_packet: The packet we're expecting to receive.
- received_packets: The packets where we're looking for `expected_packet`.
-
- Raises:
- TestCaseVerifyError: `expected_packet` is not among `received_packets`.
- """
- for received_packet in received_packets:
- if self._compare_packets(expected_packet, received_packet):
- break
- else:
- self._logger.debug(
- f"The expected packet {expected_packet.summary()} "
- f"not found among received {get_packet_summaries(received_packets)}"
- )
- self._fail_test_case_verify("An expected packet not found among received packets.")
-
- def match_all_packets(
- self,
- expected_packets: list[Packet],
- received_packets: list[Packet],
- verify: bool = True,
- ) -> bool:
- """Matches all the expected packets against the received ones.
-
- Matching is performed by counting down the occurrences in a dictionary which keys are the
- raw packet bytes. No deep packet comparison is performed. All the unexpected packets (noise)
- are automatically ignored.
-
- Args:
- expected_packets: The packets we are expecting to receive.
- received_packets: All the packets that were received.
- verify: If :data:`True`, and there are missing packets an exception will be raised.
-
- Raises:
- TestCaseVerifyError: if and not all the `expected_packets` were found in
- `received_packets`.
-
- Returns:
- :data:`True` If there are no missing packets.
- """
- expected_packets_counters = Counter(map(raw, expected_packets))
- received_packets_counters = Counter(map(raw, received_packets))
- # The number of expected packets is subtracted by the number of received packets, ignoring
- # any unexpected packets and capping at zero.
- missing_packets_counters = expected_packets_counters - received_packets_counters
- missing_packets_count = missing_packets_counters.total()
- self._logger.debug(
- f"match_all_packets: expected {len(expected_packets)}, "
- f"received {len(received_packets)}, missing {missing_packets_count}"
- )
-
- if missing_packets_count != 0:
- if verify:
- self._fail_test_case_verify(
- f"Not all packets were received, expected {len(expected_packets)} "
- f"but {missing_packets_count} were missing."
- )
- return False
-
- return True
-
- def _compare_packets(self, expected_packet: Packet, received_packet: Packet) -> bool:
- self._logger.debug(
- f"Comparing packets: \n{expected_packet.summary()}\n{received_packet.summary()}"
- )
-
- l3 = IP in expected_packet.layers()
- self._logger.debug("Found l3 layer")
-
- received_payload = received_packet
- expected_payload = expected_packet
- while received_payload and expected_payload:
- self._logger.debug("Comparing payloads:")
- self._logger.debug(f"Received: {received_payload}")
- self._logger.debug(f"Expected: {expected_payload}")
- if type(received_payload) is type(expected_payload):
- self._logger.debug("The layers are the same.")
- if type(received_payload) is Ether:
- if not self._verify_l2_frame(received_payload, l3):
- return False
- elif type(received_payload) is IP:
- assert type(expected_payload) is IP
- if not self._verify_l3_packet(received_payload, expected_payload):
- return False
- else:
- # Different layers => different packets
- return False
- received_payload = received_payload.payload
- expected_payload = expected_payload.payload
-
- if expected_payload:
- self._logger.debug(f"The expected packet did not contain {expected_payload}.")
- return False
- if received_payload and received_payload.__class__ != Padding:
- self._logger.debug("The received payload had extra layers which were not padding.")
- return False
- return True
-
- def _verify_l2_frame(self, received_packet: Ether, l3: bool) -> bool:
- self._logger.debug("Looking at the Ether layer.")
- self._logger.debug(
- f"Comparing received dst mac '{received_packet.dst}' "
- f"with expected '{self.topology.tg_port_ingress.mac_address}'."
- )
- if received_packet.dst != self.topology.tg_port_ingress.mac_address:
- return False
-
- expected_src_mac = self.topology.tg_port_egress.mac_address
- if l3:
- expected_src_mac = self.topology.sut_port_egress.mac_address
- self._logger.debug(
- f"Comparing received src mac '{received_packet.src}' "
- f"with expected '{expected_src_mac}'."
- )
- if received_packet.src != expected_src_mac:
- return False
-
- return True
-
- def _verify_l3_packet(self, received_packet: IP, expected_packet: IP) -> bool:
- self._logger.debug("Looking at the IP layer.")
- if received_packet.src != expected_packet.src or received_packet.dst != expected_packet.dst:
- return False
- return True
-
#: The generic type for a method of an instance of TestSuite
TestSuiteMethodType = TypeVar("TestSuiteMethodType", bound=Callable[[TestSuite], None])
--
2.39.5
^ permalink raw reply [flat|nested] 3+ messages in thread
* [PATCH v1 2/2] dts: adjust all tests to use the new API calls
2025-09-23 10:37 [PATCH v1 0/2] Move TestSuite methods to API modules Paul Szczepanek
2025-09-23 10:37 ` [PATCH v1 1/2] dts: add packet handling and test utilities to API Paul Szczepanek
@ 2025-09-23 10:37 ` Paul Szczepanek
1 sibling, 0 replies; 3+ messages in thread
From: Paul Szczepanek @ 2025-09-23 10:37 UTC (permalink / raw)
To: dev; +Cc: Paul Szczepanek, Luca Vizzarro
Former calls to TestSuite methods now call API methods.
Depends-on: series-36111 ("Split DTS framework and public API")
Signed-off-by: Paul Szczepanek <paul.szczepanek@arm.com>
Reviewed-by: Luca Vizzarro <luca.vizzarro@arm.com>
---
dts/tests/TestSuite_blocklist.py | 5 +-
dts/tests/TestSuite_checksum_offload.py | 14 ++--
dts/tests/TestSuite_dual_vlan.py | 26 +++----
dts/tests/TestSuite_dynamic_config.py | 8 ++-
dts/tests/TestSuite_dynamic_queue_conf.py | 13 ++--
dts/tests/TestSuite_hello_world.py | 3 +-
dts/tests/TestSuite_l2fwd.py | 11 ++-
dts/tests/TestSuite_mac_filter.py | 16 +++--
dts/tests/TestSuite_mtu.py | 10 +--
dts/tests/TestSuite_packet_capture.py | 22 +++---
dts/tests/TestSuite_pmd_buffer_scatter.py | 10 +--
dts/tests/TestSuite_port_control.py | 14 ++--
...stSuite_port_restart_config_persistency.py | 3 +-
dts/tests/TestSuite_port_stats.py | 12 ++--
dts/tests/TestSuite_promisc_support.py | 20 ++++--
dts/tests/TestSuite_queue_start_stop.py | 6 +-
dts/tests/TestSuite_rte_flow.py | 70 ++++++++++---------
dts/tests/TestSuite_smoke_tests.py | 7 +-
dts/tests/TestSuite_softnic.py | 11 ++-
dts/tests/TestSuite_uni_pkt.py | 6 +-
dts/tests/TestSuite_vlan.py | 20 +++---
21 files changed, 178 insertions(+), 129 deletions(-)
diff --git a/dts/tests/TestSuite_blocklist.py b/dts/tests/TestSuite_blocklist.py
index 6d3dba6756..ba37f39ab3 100644
--- a/dts/tests/TestSuite_blocklist.py
+++ b/dts/tests/TestSuite_blocklist.py
@@ -10,6 +10,7 @@
LinkTopology,
requires_link_topology,
)
+from api.test import verify
from api.testpmd import TestPmd
from framework.test_suite import TestSuite, func_test
from framework.testbed_model.port import Port
@@ -27,10 +28,10 @@ def _verify_blocklisted_ports(self, ports_to_block: list[Port]) -> None:
# sanity check
allowed_len = len(allowlisted_ports - blocklisted_ports)
- self.verify(allowed_len > 0, "At least one port should have been allowed")
+ verify(allowed_len > 0, "At least one port should have been allowed")
blocked = not allowlisted_ports & blocklisted_ports
- self.verify(blocked, "At least one port was not blocklisted")
+ verify(blocked, "At least one port was not blocklisted")
@func_test
def no_blocklisted(self) -> None:
diff --git a/dts/tests/TestSuite_checksum_offload.py b/dts/tests/TestSuite_checksum_offload.py
index 70ae9c124c..8e1ec0f142 100644
--- a/dts/tests/TestSuite_checksum_offload.py
+++ b/dts/tests/TestSuite_checksum_offload.py
@@ -25,6 +25,8 @@
requires_link_topology,
requires_nic_capability,
)
+from api.packet import send_packet_and_capture
+from api.test import verify
from api.testpmd import TestPmd
from api.testpmd.config import SimpleForwardingModes
from api.testpmd.types import ChecksumOffloadOptions, PacketOffloadFlag
@@ -61,11 +63,11 @@ def _send_packets_and_verify(
by the traffic generator.
"""
for i in range(0, len(packet_list)):
- received_packets = self.send_packet_and_capture(packet=packet_list[i])
+ received_packets = send_packet_and_capture(packet=packet_list[i])
received = any(
packet.haslayer(Raw) and load in packet.load for packet in received_packets
)
- self.verify(
+ verify(
received == should_receive,
f"Packet was {'dropped' if should_receive else 'received'}",
)
@@ -85,19 +87,19 @@ def _send_packet_and_verify_checksum(
id: The destination port that matches the sent packet in verbose output.
"""
testpmd.start()
- self.send_packet_and_capture(packet=packet)
+ send_packet_and_capture(packet=packet)
verbose_output = testpmd.extract_verbose_output(testpmd.stop())
is_IP = is_L4 = None
for testpmd_packet in verbose_output:
if testpmd_packet.l4_dport == id:
is_IP = PacketOffloadFlag.RTE_MBUF_F_RX_IP_CKSUM_GOOD in testpmd_packet.ol_flags
is_L4 = PacketOffloadFlag.RTE_MBUF_F_RX_L4_CKSUM_GOOD in testpmd_packet.ol_flags
- self.verify(
+ verify(
is_IP is not None and is_L4 is not None,
"Test packet was dropped when it should have been received.",
)
- self.verify(is_L4 == good_L4, "Layer 4 checksum flag did not match expected checksum flag.")
- self.verify(is_IP == good_IP, "IP checksum flag did not match expected checksum flag.")
+ verify(is_L4 == good_L4, "Layer 4 checksum flag did not match expected checksum flag.")
+ verify(is_IP == good_IP, "IP checksum flag did not match expected checksum flag.")
def _setup_hw_offload(self, testpmd: TestPmd) -> None:
"""Sets IP, UDP, and TCP layers to hardware offload.
diff --git a/dts/tests/TestSuite_dual_vlan.py b/dts/tests/TestSuite_dual_vlan.py
index a40297ecaa..860eae7424 100644
--- a/dts/tests/TestSuite_dual_vlan.py
+++ b/dts/tests/TestSuite_dual_vlan.py
@@ -18,6 +18,8 @@
from scapy.layers.l2 import Dot1Q, Ether
from scapy.packet import Packet, Raw
+from api.packet import send_packet_and_capture
+from api.test import verify
from api.testpmd import TestPmd
from api.testpmd.config import SimpleForwardingModes
from framework.test_suite import TestSuite, func_test
@@ -111,7 +113,7 @@ def _verify_vlan_functions(self, send_packet: Packet, options: TestCaseOptions)
send_packet: Packet to send for testing.
options: Flag which defines the currents configured settings in testpmd.
"""
- recv = self.send_packet_and_capture(send_packet)
+ recv = send_packet_and_capture(send_packet)
recv = list(filter(self._is_relevant_packet, recv))
expected_layers: list[Packet] = []
@@ -119,13 +121,13 @@ def _verify_vlan_functions(self, send_packet: Packet, options: TestCaseOptions)
expected_layers.append(Dot1Q(vlan=self.outer_vlan_tag))
expected_layers.append(Dot1Q(vlan=self.inner_vlan_tag))
- self.verify(
+ verify(
len(recv) > 0,
f"Expected to receive packet with the payload {expected_layers} but got nothing.",
)
for pkt in recv:
- self.verify(
+ verify(
self._pkt_payload_contains_layers(pkt, *expected_layers),
f"Received packet ({pkt.summary()}) did not match the expected sequence of layers "
f"{expected_layers} with options {options}.",
@@ -175,11 +177,11 @@ def insert_second_vlan(self) -> None:
with TestPmd(forward_mode=SimpleForwardingModes.mac) as testpmd:
testpmd.tx_vlan_set(port=self.tx_port, enable=True, vlan=self.vlan_insert_tag)
testpmd.start()
- recv = self.send_packet_and_capture(
+ recv = send_packet_and_capture(
Ether() / Dot1Q(vlan=self.outer_vlan_tag) / Raw(b"X" * 20)
)
- self.verify(len(recv) > 0, "Did not receive any packets when testing VLAN insertion.")
- self.verify(
+ verify(len(recv) > 0, "Did not receive any packets when testing VLAN insertion.")
+ verify(
any(
self._is_relevant_packet(p)
and self._pkt_payload_contains_layers(
@@ -211,9 +213,9 @@ def all_vlan_functions(self) -> None:
)
with TestPmd(forward_mode=SimpleForwardingModes.mac) as testpmd:
testpmd.start()
- recv = self.send_packet_and_capture(send_pkt)
- self.verify(len(recv) > 0, "Unmodified packet was not received.")
- self.verify(
+ recv = send_packet_and_capture(send_pkt)
+ verify(len(recv) > 0, "Unmodified packet was not received.")
+ verify(
any(
self._is_relevant_packet(p)
and self._pkt_payload_contains_layers(
@@ -252,9 +254,9 @@ def maintains_priority(self) -> None:
)
with TestPmd(forward_mode=SimpleForwardingModes.mac) as testpmd:
testpmd.start()
- recv = self.send_packet_and_capture(pkt)
- self.verify(len(recv) > 0, "Did not receive any packets when testing VLAN priority.")
- self.verify(
+ recv = send_packet_and_capture(pkt)
+ verify(len(recv) > 0, "Did not receive any packets when testing VLAN priority.")
+ verify(
any(
self._is_relevant_packet(p)
and self._pkt_payload_contains_layers(
diff --git a/dts/tests/TestSuite_dynamic_config.py b/dts/tests/TestSuite_dynamic_config.py
index 5cc96f2633..7204ec4f73 100644
--- a/dts/tests/TestSuite_dynamic_config.py
+++ b/dts/tests/TestSuite_dynamic_config.py
@@ -25,6 +25,8 @@
requires_link_topology,
requires_nic_capability,
)
+from api.packet import send_packet_and_capture
+from api.test import verify
from api.testpmd import TestPmd
from api.testpmd.config import SimpleForwardingModes
from framework.test_suite import TestSuite, func_test
@@ -62,11 +64,11 @@ def _send_packet_and_verify(self, should_receive: bool, mac_address: str) -> Non
mac_address: Destination MAC address to generate in packet.
"""
packet = Ether(dst=mac_address) / IP() / Raw(load="xxxxx")
- received = self.send_packet_and_capture(packet)
+ received = send_packet_and_capture(packet)
contains_packet = any(
packet.haslayer(Raw) and b"xxxxx" in packet.load for packet in received
)
- self.verify(
+ verify(
should_receive == contains_packet,
f"Packet was {'dropped' if should_receive else 'received'}",
)
@@ -100,7 +102,7 @@ def default_mode(self) -> None:
"""
with TestPmd() as testpmd:
is_promisc = testpmd.show_port_info(0).is_promiscuous_mode_enabled
- self.verify(is_promisc, "Promiscuous mode was not enabled by default.")
+ verify(is_promisc, "Promiscuous mode was not enabled by default.")
testpmd.start()
mac = testpmd.show_port_info(0).mac_address
# send a packet with Rx port mac address
diff --git a/dts/tests/TestSuite_dynamic_queue_conf.py b/dts/tests/TestSuite_dynamic_queue_conf.py
index bc7f981424..3ddfa31152 100644
--- a/dts/tests/TestSuite_dynamic_queue_conf.py
+++ b/dts/tests/TestSuite_dynamic_queue_conf.py
@@ -35,6 +35,8 @@
NicCapability,
requires_nic_capability,
)
+from api.packet import send_packets
+from api.test import fail, verify
from api.testpmd import TestPmd
from api.testpmd.config import PortTopology, SimpleForwardingModes
from framework.exception import InteractiveCommandExecutionError
@@ -109,7 +111,7 @@ def _wrap(self: "TestDynamicQueueConf", is_rx_testing: bool) -> None:
self._send_packets_with_different_addresses(self.number_of_packets_to_send)
forwarding_stats = testpmd.stop()
for queue_id in queues_to_config:
- self.verify(
+ verify(
self._port_queue_in_stats(port_id, is_rx_testing, queue_id, forwarding_stats),
f"Modified queue {queue_id} on port {port_id} failed to receive traffic after"
"being started again.",
@@ -172,7 +174,7 @@ def _send_packets_with_different_addresses(self, number_of_packets: int) -> None
/ Raw()
for i in range(number_of_packets)
]
- self.send_packets(packets_to_send)
+ send_packets(packets_to_send)
def _port_queue_in_stats(
self, port_id: int, is_rx_queue: bool, queue_id: int, stats: str
@@ -226,8 +228,7 @@ def _modify_ring_size(
# The testpmd method verifies that the modification worked, so we catch that error
# and just re-raise it as a test case failure
except InteractiveCommandExecutionError:
- self.verify(
- False,
+ fail(
f"Failed to update the ring size of queue {queue_id} on port "
f"{port_id} at runtime",
)
@@ -264,12 +265,12 @@ def _stop_queues(
# it means there could be another reason for the packets not transmitting and,
# therefore, a false positive result.
for unchanged_q_id in unchanged_queues:
- self.verify(
+ verify(
self._port_queue_in_stats(port_id, is_rx_testing, unchanged_q_id, forwarding_stats),
f"Queue {unchanged_q_id} failed to receive traffic.",
)
for stopped_q_id in queues_to_modify:
- self.verify(
+ verify(
not self._port_queue_in_stats(
port_id, is_rx_testing, stopped_q_id, forwarding_stats
),
diff --git a/dts/tests/TestSuite_hello_world.py b/dts/tests/TestSuite_hello_world.py
index 3560e9ec0b..bf1a93c782 100644
--- a/dts/tests/TestSuite_hello_world.py
+++ b/dts/tests/TestSuite_hello_world.py
@@ -8,6 +8,7 @@
are properly configured.
"""
+from api.test import log
from api.testpmd import TestPmd
from framework.test_suite import BaseConfig, TestSuite, func_test
@@ -36,4 +37,4 @@ def hello_world(self) -> None:
"""
with TestPmd() as testpmd:
testpmd.start()
- self.log(self.config.msg)
+ log(self.config.msg)
diff --git a/dts/tests/TestSuite_l2fwd.py b/dts/tests/TestSuite_l2fwd.py
index bc1b5162c3..596b892730 100644
--- a/dts/tests/TestSuite_l2fwd.py
+++ b/dts/tests/TestSuite_l2fwd.py
@@ -13,6 +13,11 @@
requires_link_topology,
requires_nic_capability,
)
+from api.packet import (
+ get_expected_packets,
+ match_all_packets,
+ send_packets_and_capture,
+)
from api.testpmd import TestPmd
from api.testpmd.config import EthPeer, SimpleForwardingModes
from framework.context import filter_cores
@@ -66,8 +71,8 @@ def l2fwd_integrity(self) -> None:
shell.set_ports_queues(queues_num)
shell.start()
- received_packets = self.send_packets_and_capture(self.packets)
- expected_packets = self.get_expected_packets(self.packets)
- self.match_all_packets(expected_packets, received_packets)
+ received_packets = send_packets_and_capture(self.packets)
+ expected_packets = get_expected_packets(self.packets)
+ match_all_packets(expected_packets, received_packets)
shell.stop()
diff --git a/dts/tests/TestSuite_mac_filter.py b/dts/tests/TestSuite_mac_filter.py
index 6a51f9df4e..a7e24b37d5 100644
--- a/dts/tests/TestSuite_mac_filter.py
+++ b/dts/tests/TestSuite_mac_filter.py
@@ -23,6 +23,8 @@
NicCapability,
requires_nic_capability,
)
+from api.packet import send_packet_and_capture
+from api.test import fail, verify
from api.testpmd import TestPmd
from framework.exception import InteractiveCommandExecutionError
from framework.test_suite import TestSuite, func_test
@@ -73,16 +75,16 @@ def _send_packet_and_verify(
packet.dst = mac_address
received_packets = [
packets
- for packets in self.send_packet_and_capture(packet)
+ for packets in send_packet_and_capture(packet)
if hasattr(packets, "load") and "X" * 22 in str(packets.load)
]
if should_receive:
- self.verify(
+ verify(
len(received_packets) == 1,
"Packet sent by test case should be forwarded and received.",
)
else:
- self.verify(
+ verify(
len(received_packets) == 0,
"Packet sent by test case should not be forwarded and received.",
)
@@ -160,12 +162,12 @@ def invalid_address(self) -> None:
mac_address = self.topology.sut_port_ingress.mac_address
try:
testpmd.set_mac_addr(0, "00:00:00:00:00:00", add=True)
- self.verify(False, "Invalid mac address added.")
+ fail("Invalid mac address added.")
except InteractiveCommandExecutionError:
pass
try:
testpmd.set_mac_addr(0, mac_address, add=False)
- self.verify(False, "Default mac address removed.")
+ fail("Default mac address removed.")
except InteractiveCommandExecutionError:
pass
# Should be no errors adding this twice
@@ -174,7 +176,7 @@ def invalid_address(self) -> None:
# Double check to see if default mac address can be removed
try:
testpmd.set_mac_addr(0, mac_address, add=False)
- self.verify(False, "Default mac address removed.")
+ fail("Default mac address removed.")
except InteractiveCommandExecutionError:
pass
@@ -190,7 +192,7 @@ def invalid_address(self) -> None:
testpmd.set_mac_addr(0, "E" + mac_address[1:], add=True)
# We add an extra address to compensate for mac address pool inconsistencies.
testpmd.set_mac_addr(0, "F" + mac_address[1:], add=True)
- self.verify(False, "Mac address limit exceeded.")
+ fail("Mac address limit exceeded.")
except InteractiveCommandExecutionError:
pass
diff --git a/dts/tests/TestSuite_mtu.py b/dts/tests/TestSuite_mtu.py
index 8a14c791f7..8355495d33 100644
--- a/dts/tests/TestSuite_mtu.py
+++ b/dts/tests/TestSuite_mtu.py
@@ -21,6 +21,8 @@
NicCapability,
requires_nic_capability,
)
+from api.packet import send_packet_and_capture
+from api.test import verify
from api.testpmd import TestPmd
from framework.test_suite import TestSuite, func_test
@@ -74,7 +76,7 @@ def _send_packet_and_verify(self, pkt_size: int, should_receive: bool) -> None:
padding = pkt_size - IP_HEADER_LEN
# Insert ' ' as placeholder 'CRC' error correction.
packet = Ether() / Raw(load=" ") / IP(len=pkt_size) / Raw(load="X" * padding)
- received_packets = self.send_packet_and_capture(packet)
+ received_packets = send_packet_and_capture(packet)
found = any(
("X" * padding) in str(packets.load)
for packets in received_packets
@@ -82,9 +84,9 @@ def _send_packet_and_verify(self, pkt_size: int, should_receive: bool) -> None:
)
if should_receive:
- self.verify(found, "Did not receive packet.")
+ verify(found, "Did not receive packet.")
else:
- self.verify(not found, "Received packet.")
+ verify(not found, "Received packet.")
def _assess_mtu_boundary(self, testpmd_shell: TestPmd, mtu: int) -> None:
"""Sets the new MTU and verifies packets at the set boundary.
@@ -118,7 +120,7 @@ def _assess_mtu_boundary(self, testpmd_shell: TestPmd, mtu: int) -> None:
self._send_packet_and_verify(pkt_size=equal_frame_size, should_receive=True)
current_mtu = testpmd_shell.show_port_info(0).mtu
- self.verify(current_mtu is not None, "Error grabbing testpmd MTU value.")
+ verify(current_mtu is not None, "Error grabbing testpmd MTU value.")
if current_mtu and (
current_mtu >= STANDARD_MTU + VENDOR_AGNOSTIC_PADDING and mtu == STANDARD_MTU
):
diff --git a/dts/tests/TestSuite_packet_capture.py b/dts/tests/TestSuite_packet_capture.py
index dcd947043a..4bd15e2401 100644
--- a/dts/tests/TestSuite_packet_capture.py
+++ b/dts/tests/TestSuite_packet_capture.py
@@ -30,6 +30,12 @@
LinkTopology,
requires_link_topology,
)
+from api.packet import (
+ get_expected_packets,
+ match_all_packets,
+ send_packets_and_capture,
+)
+from api.test import verify
from api.testpmd import TestPmd
from framework.params import Params
from framework.remote_session.blocking_app import BlockingApp
@@ -139,7 +145,7 @@ def _send_and_dump(
)
)
- received_packets = self.send_packets_and_capture(
+ received_packets = send_packets_and_capture(
self.packets, PacketFilteringConfig(no_lldp=False)
)
@@ -166,18 +172,18 @@ def dumpcap(self) -> None:
testpmd.start()
received_packets = self._send_and_dump()
- expected_packets = self.get_expected_packets(self.packets, sent_from_tg=True)
+ expected_packets = get_expected_packets(self.packets, sent_from_tg=True)
with self.rx_pcap.open() as fd:
rx_pcap_packets = list(rdpcap(fd))
- self.verify(
- self.match_all_packets(expected_packets, rx_pcap_packets, verify=False),
+ verify(
+ match_all_packets(expected_packets, rx_pcap_packets, verify=False),
"Rx packets from dumpcap weren't the same as the expected packets.",
)
with self.tx_pcap.open() as fd:
tx_pcap_packets = list(rdpcap(fd))
- self.verify(
- self.match_all_packets(tx_pcap_packets, received_packets, verify=False),
+ verify(
+ match_all_packets(tx_pcap_packets, received_packets, verify=False),
"Tx packets from dumpcap weren't the same as the packets received by Scapy.",
)
@@ -198,14 +204,14 @@ def dumpcap_filter(self) -> None:
self._send_and_dump("tcp", rx_only=True)
filtered_packets = [
raw(p)
- for p in self.get_expected_packets(self.packets, sent_from_tg=True)
+ for p in get_expected_packets(self.packets, sent_from_tg=True)
if not p.haslayer(TCP)
]
with self.rx_pcap.open() as fd:
rx_pcap_packets = [raw(p) for p in rdpcap(fd)]
for filtered_packet in filtered_packets:
- self.verify(
+ verify(
filtered_packet not in rx_pcap_packets,
"Found a packet in the pcap that was meant to be filtered out.",
)
diff --git a/dts/tests/TestSuite_pmd_buffer_scatter.py b/dts/tests/TestSuite_pmd_buffer_scatter.py
index 06d2e5f7e5..b49fba4cfc 100644
--- a/dts/tests/TestSuite_pmd_buffer_scatter.py
+++ b/dts/tests/TestSuite_pmd_buffer_scatter.py
@@ -26,6 +26,8 @@
NicCapability,
requires_nic_capability,
)
+from api.packet import send_packet_and_capture
+from api.test import verify
from api.testpmd import TestPmd
from api.testpmd.config import SimpleForwardingModes
from framework.test_suite import TestSuite, func_test
@@ -87,14 +89,14 @@ def _scatter_pktgen_send_packet(self, pkt_size: int) -> list[Packet]:
# pack the payload
for X_in_hex in payload:
packet.load += struct.pack("=B", int("%s%s" % (X_in_hex[0], X_in_hex[1]), 16))
- received_packets = self.send_packet_and_capture(packet)
+ received_packets = send_packet_and_capture(packet)
# filter down the list to packets that have the appropriate structure
received_packets = [p for p in received_packets if Ether in p and IP in p and Raw in p]
- self.verify(len(received_packets) > 0, "Did not receive any packets.")
+ verify(len(received_packets) > 0, "Did not receive any packets.")
layer2 = received_packets[0].getlayer(2)
- self.verify(layer2 is not None, "The received packet is invalid.")
+ verify(layer2 is not None, "The received packet is invalid.")
assert layer2 is not None
return received_packets
@@ -126,7 +128,7 @@ def _pmd_scatter(self, mb_size: int, enable_offload: bool = False) -> None:
for offset in [-1, 0, 1, 4, 5]:
recv_packets = self._scatter_pktgen_send_packet(mb_size + offset)
self._logger.debug(f"Relevant captured packets: \n{recv_packets}")
- self.verify(
+ verify(
any(" ".join(["58"] * 8) in hexstr(pkt, onlyhex=1) for pkt in recv_packets),
"Payload of scattered packet did not match expected payload with offset "
f"{offset}.",
diff --git a/dts/tests/TestSuite_port_control.py b/dts/tests/TestSuite_port_control.py
index df11df4d39..d9cc5ff4c9 100644
--- a/dts/tests/TestSuite_port_control.py
+++ b/dts/tests/TestSuite_port_control.py
@@ -18,6 +18,8 @@
requires_link_topology,
requires_nic_capability,
)
+from api.packet import send_packets_and_capture
+from api.test import verify
from api.testpmd import TestPmd
from api.testpmd.config import SimpleForwardingModes
from framework.test_suite import TestSuite, func_test
@@ -40,8 +42,8 @@ def _send_packets_and_verify(self) -> None:
send_p = Ether() / Raw(payload.encode("utf-8"))
recv_pakts: list[Packet] = []
for _ in range(int(num_pakts / 25)):
- recv_pakts += self.send_packets_and_capture([send_p] * 25)
- recv_pakts += self.send_packets_and_capture([send_p] * (num_pakts % 25))
+ recv_pakts += send_packets_and_capture([send_p] * 25)
+ recv_pakts += send_packets_and_capture([send_p] * (num_pakts % 25))
recv_pakts = [
p
for p in recv_pakts
@@ -51,7 +53,7 @@ def _send_packets_and_verify(self) -> None:
hasattr(p, "load") and p.load.decode("utf-8").replace("\x00", "") == payload
)
]
- self.verify(
+ verify(
len(recv_pakts) == num_pakts,
f"Received {len(recv_pakts)} packets when {num_pakts} were expected.",
)
@@ -89,7 +91,7 @@ def stop_ports(self) -> None:
"""
with TestPmd(forward_mode=SimpleForwardingModes.mac) as testpmd:
testpmd.stop_all_ports()
- self.verify(
+ verify(
all(not p.is_link_up for p in testpmd.show_port_info_all()),
"Failed to stop all ports.",
)
@@ -108,6 +110,4 @@ def close_ports(self) -> None:
"""
with TestPmd() as testpmd:
testpmd.close_all_ports()
- self.verify(
- len(testpmd.show_port_info_all()) == 0, "Failed to close all ports in testpmd."
- )
+ verify(len(testpmd.show_port_info_all()) == 0, "Failed to close all ports in testpmd.")
diff --git a/dts/tests/TestSuite_port_restart_config_persistency.py b/dts/tests/TestSuite_port_restart_config_persistency.py
index 7666c9ea7a..4ea22b6d70 100644
--- a/dts/tests/TestSuite_port_restart_config_persistency.py
+++ b/dts/tests/TestSuite_port_restart_config_persistency.py
@@ -13,6 +13,7 @@
NicCapability,
requires_nic_capability,
)
+from api.test import verify
from api.testpmd import TestPmd
from api.testpmd.types import TestPmdPortFlowCtrl
from framework.test_suite import TestSuite, func_test
@@ -49,7 +50,7 @@ def _restart_port_and_verify(self, id: int, testpmd: TestPmd, changed_value: str
if flow_info_after:
all_info_after.update(asdict(flow_info_after))
- self.verify(
+ verify(
all_info_before == all_info_after,
f"Port configuration for {changed_value} was not retained through port restart.",
)
diff --git a/dts/tests/TestSuite_port_stats.py b/dts/tests/TestSuite_port_stats.py
index d0b3b33563..3dc045f847 100644
--- a/dts/tests/TestSuite_port_stats.py
+++ b/dts/tests/TestSuite_port_stats.py
@@ -23,6 +23,8 @@
requires_link_topology,
requires_nic_capability,
)
+from api.packet import send_packet_and_capture
+from api.test import verify
from api.testpmd import TestPmd
from api.testpmd.config import SimpleForwardingModes
from api.testpmd.types import RtePTypes, TestPmdVerbosePacket
@@ -142,7 +144,7 @@ def stats_updates(self) -> None:
testpmd.set_verbose(3)
testpmd.start()
testpmd.clear_port_stats_all()
- self.send_packet_and_capture(self._send_pkt)
+ send_packet_and_capture(self._send_pkt)
port_stats_all, forwarding_info = testpmd.show_port_stats_all()
verbose_information = TestPmd.extract_verbose_output(forwarding_info)
@@ -155,21 +157,21 @@ def stats_updates(self) -> None:
recv_relevant_bytes = port_stats_all[self.recv_port].rx_bytes - rx_irr_bytes
sent_relevant_bytes = port_stats_all[self.send_port].tx_bytes - tx_irr_bytes
- self.verify(
+ verify(
recv_relevant_packets == 1,
f"Port {self.recv_port} received {recv_relevant_packets} packets but expected to only "
"receive 1.",
)
- self.verify(
+ verify(
recv_relevant_bytes == self.total_packet_len,
f"Number of bytes received by port {self.recv_port} did not match the amount sent.",
)
- self.verify(
+ verify(
sent_relevant_packets == 1,
f"Number was packets sent by port {self.send_port} was not equal to the number "
f"received by port {self.recv_port}.",
)
- self.verify(
+ verify(
sent_relevant_bytes == self.total_packet_len,
f"Number of bytes sent by port {self.send_port} did not match the number of bytes "
f"received by port {self.recv_port}.",
diff --git a/dts/tests/TestSuite_promisc_support.py b/dts/tests/TestSuite_promisc_support.py
index 089db4a3ec..a0c65dc662 100644
--- a/dts/tests/TestSuite_promisc_support.py
+++ b/dts/tests/TestSuite_promisc_support.py
@@ -15,6 +15,12 @@
NicCapability,
requires_nic_capability,
)
+from api.packet import (
+ get_expected_packets,
+ match_all_packets,
+ send_packets_and_capture,
+)
+from api.test import verify
from api.testpmd import TestPmd
from framework.test_suite import TestSuite, func_test
@@ -48,9 +54,9 @@ def promisc_packets(self) -> None:
testpmd.set_promisc(port=port_id, enable=True, verify=True)
testpmd.start()
- received_packets = self.send_packets_and_capture(packet)
- expected_packets = self.get_expected_packets(packet, sent_from_tg=True)
- self.match_all_packets(expected_packets, received_packets)
+ received_packets = send_packets_and_capture(packet)
+ expected_packets = get_expected_packets(packet, sent_from_tg=True)
+ match_all_packets(expected_packets, received_packets)
testpmd.stop()
@@ -58,9 +64,9 @@ def promisc_packets(self) -> None:
testpmd.set_promisc(port=port_id, enable=False, verify=True)
testpmd.start()
- received_packets = self.send_packets_and_capture(packet)
- expected_packets = self.get_expected_packets(packet, sent_from_tg=True)
- self.verify(
- not self.match_all_packets(expected_packets, received_packets, verify=False),
+ received_packets = send_packets_and_capture(packet)
+ expected_packets = get_expected_packets(packet, sent_from_tg=True)
+ verify(
+ not match_all_packets(expected_packets, received_packets, verify=False),
"Invalid packet wasn't filtered out.",
)
diff --git a/dts/tests/TestSuite_queue_start_stop.py b/dts/tests/TestSuite_queue_start_stop.py
index e0dd39fd96..e9048d4245 100644
--- a/dts/tests/TestSuite_queue_start_stop.py
+++ b/dts/tests/TestSuite_queue_start_stop.py
@@ -22,6 +22,8 @@
requires_link_topology,
requires_nic_capability,
)
+from api.packet import send_packet_and_capture
+from api.test import verify
from api.testpmd import TestPmd
from api.testpmd.config import SimpleForwardingModes
from framework.test_suite import TestSuite, func_test
@@ -49,11 +51,11 @@ def _send_packet_and_verify(self, should_receive: bool = True) -> None:
should_receive: Indicate whether the packet should be received.
"""
packet = Ether() / IP() / Raw(load="xxxxx")
- received = self.send_packet_and_capture(packet)
+ received = send_packet_and_capture(packet)
contains_packet = any(
packet.haslayer(Raw) and b"xxxxx" in packet.load for packet in received
)
- self.verify(
+ verify(
should_receive == contains_packet,
f"Packet was {'dropped' if should_receive else 'received'}",
)
diff --git a/dts/tests/TestSuite_rte_flow.py b/dts/tests/TestSuite_rte_flow.py
index 79b16973eb..207cbce2d3 100644
--- a/dts/tests/TestSuite_rte_flow.py
+++ b/dts/tests/TestSuite_rte_flow.py
@@ -23,6 +23,8 @@
NicCapability,
requires_nic_capability,
)
+from api.packet import send_packet_and_capture
+from api.test import fail, log, verify, verify_else_skip
from api.testpmd import TestPmd
from api.testpmd.types import FlowRule
from framework.exception import InteractiveCommandExecutionError
@@ -91,13 +93,13 @@ def zip_lists(
with TestPmd(rx_queues=4, tx_queues=4) as testpmd:
for flow, packet, expected_packet in zip_lists(flows, packets, expected_packets):
is_valid = testpmd.flow_validate(flow_rule=flow, port_id=port_id)
- self.verify_else_skip(is_valid, "flow rule failed validation.")
+ verify_else_skip(is_valid, "flow rule failed validation.")
try:
flow_id = testpmd.flow_create(flow_rule=flow, port_id=port_id)
except InteractiveCommandExecutionError:
- self.log("Flow rule validation passed, but flow creation failed.")
- self.verify(False, "Failed flow creation")
+ log("Flow rule validation passed, but flow creation failed.")
+ fail("Failed flow creation")
if verification_method == self._send_packet_and_verify:
verification_method(packet=packet, *args, **kwargs)
@@ -119,11 +121,11 @@ def _send_packet_and_verify(self, packet: Packet, should_receive: bool = True) -
packet: Scapy packet to send and verify.
should_receive: Indicate whether the packet should be received.
"""
- received = self.send_packet_and_capture(packet)
+ received = send_packet_and_capture(packet)
contains_packet = any(
packet.haslayer(Raw) and b"xxxxx" in packet.load for packet in received
)
- self.verify(
+ verify(
should_receive == contains_packet,
f"Packet was {'dropped' if should_receive else 'received'}",
)
@@ -140,13 +142,13 @@ def _send_packet_and_verify_queue(
"""
testpmd.set_verbose(level=8)
testpmd.start()
- self.send_packet_and_capture(packet=packet)
+ send_packet_and_capture(packet=packet)
verbose_output = testpmd.extract_verbose_output(testpmd.stop())
received = False
for testpmd_packet in verbose_output:
if testpmd_packet.queue_id == test_queue:
received = True
- self.verify(received, f"Expected packet was not received on queue {test_queue}")
+ verify(received, f"Expected packet was not received on queue {test_queue}")
def _send_packet_and_verify_modification(self, packet: Packet, expected_packet: Packet) -> None:
"""Send packet and verify the expected modifications are present upon reception.
@@ -155,15 +157,15 @@ def _send_packet_and_verify_modification(self, packet: Packet, expected_packet:
packet: Scapy packet to send to the SUT.
expected_packet: Scapy packet that should match the received packet.
"""
- received = self.send_packet_and_capture(packet)
+ received = send_packet_and_capture(packet)
# verify reception
- self.verify(received != [], "Packet was never received.")
+ verify(received != [], "Packet was never received.")
- self.log(f"SENT PACKET: {packet.summary()}")
- self.log(f"EXPECTED PACKET: {expected_packet.summary()}")
+ log(f"SENT PACKET: {packet.summary()}")
+ log(f"EXPECTED PACKET: {expected_packet.summary()}")
for packet in received:
- self.log(f"RECEIVED PACKET: {packet.summary()}")
+ log(f"RECEIVED PACKET: {packet.summary()}")
expected_ip_dst = expected_packet[IP].dst if IP in expected_packet else None
received_ip_dst = received[IP].dst if IP in received else None
@@ -173,13 +175,13 @@ def _send_packet_and_verify_modification(self, packet: Packet, expected_packet:
# verify modification
if expected_ip_dst is not None:
- self.verify(
+ verify(
received_ip_dst == expected_ip_dst,
f"IPv4 dst mismatch: expected {expected_ip_dst}, got {received_ip_dst}",
)
if expected_mac_dst is not None:
- self.verify(
+ verify(
received_mac_dst == expected_mac_dst,
f"MAC dst mismatch: expected {expected_mac_dst}, got {received_mac_dst}",
)
@@ -202,23 +204,23 @@ def _send_packet_and_verify_jump(
testpmd.set_verbose(level=8)
for flow in flow_rules:
is_valid = testpmd.flow_validate(flow_rule=flow, port_id=0)
- self.verify_else_skip(is_valid, "flow rule failed validation.")
+ verify_else_skip(is_valid, "flow rule failed validation.")
try:
testpmd.flow_create(flow_rule=flow, port_id=0)
except InteractiveCommandExecutionError:
- self.log("Flow validation passed, but flow creation failed.")
- self.verify(False, "Failed flow creation")
+ log("Flow validation passed, but flow creation failed.")
+ fail("Failed flow creation")
for packet, test_queue in zip(packets, test_queues):
testpmd.start()
- self.send_packet_and_capture(packet=packet)
+ send_packet_and_capture(packet=packet)
verbose_output = testpmd.extract_verbose_output(testpmd.stop())
received = False
for testpmd_packet in verbose_output:
if testpmd_packet.queue_id == test_queue:
received = True
- self.verify(received, f"Expected packet was not received on queue {test_queue}")
+ verify(received, f"Expected packet was not received on queue {test_queue}")
@func_test
def queue_action_ETH(self) -> None:
@@ -439,8 +441,8 @@ def drop_action_ETH(self) -> None:
packet = Ether() / IP() / Raw(load="xxxxx")
with TestPmd() as testpmd:
testpmd.start()
- received = self.send_packet_and_capture(packet)
- self.verify(received != [], "Test packet was never received.")
+ received = send_packet_and_capture(packet)
+ verify(received != [], "Test packet was never received.")
self._runner(
verification_method=self._send_packet_and_verify,
flows=flow_list,
@@ -494,8 +496,8 @@ def drop_action_IP(self) -> None:
packet = Ether() / IP() / Raw(load="xxxxx")
with TestPmd() as testpmd:
testpmd.start()
- received = self.send_packet_and_capture(packet)
- self.verify(received != [], "Test packet was never received.")
+ received = send_packet_and_capture(packet)
+ verify(received != [], "Test packet was never received.")
self._runner(
verification_method=self._send_packet_and_verify,
flows=flow_list,
@@ -545,8 +547,8 @@ def drop_action_L4(self) -> None:
packet = Ether() / IP() / Raw(load="xxxxx")
with TestPmd() as testpmd:
testpmd.start()
- received = self.send_packet_and_capture(packet)
- self.verify(received != [], "Test packet was never received.")
+ received = send_packet_and_capture(packet)
+ verify(received != [], "Test packet was never received.")
self._runner(
verification_method=self._send_packet_and_verify,
flows=flow_list,
@@ -584,8 +586,8 @@ def drop_action_VLAN(self) -> None:
packet = Ether() / IP() / Raw(load="xxxxx")
with TestPmd() as testpmd:
testpmd.start()
- received = self.send_packet_and_capture(packet)
- self.verify(received != [], "Test packet was never received.")
+ received = send_packet_and_capture(packet)
+ verify(received != [], "Test packet was never received.")
self._runner(
verification_method=self._send_packet_and_verify,
flows=flow_list,
@@ -674,8 +676,8 @@ def egress_rules(self) -> None:
packet = Ether() / IP() / Raw(load="xxxxx")
with TestPmd() as testpmd:
testpmd.start()
- received = self.send_packet_and_capture(packet)
- self.verify(received != [], "Test packet was never received.")
+ received = send_packet_and_capture(packet)
+ verify(received != [], "Test packet was never received.")
self._runner(
verification_method=self._send_packet_and_verify,
flows=flow_list,
@@ -779,17 +781,17 @@ def priority_attribute(self) -> None:
testpmd.set_verbose(level=8)
for flow, expected_queue in zip(flow_list, expected_queue_list):
is_valid = testpmd.flow_validate(flow_rule=flow, port_id=0)
- self.verify_else_skip(is_valid, "flow rule failed validation.")
+ verify_else_skip(is_valid, "flow rule failed validation.")
try:
testpmd.flow_create(flow_rule=flow, port_id=0)
except InteractiveCommandExecutionError:
- self.log("Flow rule validation passed, but flow creation failed.")
- self.verify(False, "Failed flow creation")
+ log("Flow rule validation passed, but flow creation failed.")
+ fail("Failed flow creation")
testpmd.start()
- self.send_packet_and_capture(test_packet)
+ send_packet_and_capture(test_packet)
verbose_output = testpmd.extract_verbose_output(testpmd.stop())
received = False
for testpmd_packet in verbose_output:
if testpmd_packet.queue_id == expected_queue:
received = True
- self.verify(received, f"Packet was not received on queue {expected_queue}")
+ verify(received, f"Packet was not received on queue {expected_queue}")
diff --git a/dts/tests/TestSuite_smoke_tests.py b/dts/tests/TestSuite_smoke_tests.py
index 4211f5d97d..271ad4301c 100644
--- a/dts/tests/TestSuite_smoke_tests.py
+++ b/dts/tests/TestSuite_smoke_tests.py
@@ -18,6 +18,7 @@
LinkTopology,
requires_link_topology,
)
+from api.test import verify
from api.testpmd import TestPmd
from framework.config.node import PortConfig
from framework.settings import SETTINGS
@@ -119,7 +120,7 @@ def devices_listed_in_testpmd(self) -> None:
with TestPmd() as testpmd:
dev_list = [str(x) for x in testpmd.get_devices()]
for nic in self.nics_in_node:
- self.verify(
+ verify(
nic.pci in dev_list,
f"Device {nic.pci} was not listed in testpmd's available devices, "
"please check your configuration",
@@ -156,13 +157,13 @@ def device_bound_to_driver(self) -> None:
rf"{nic.pci}.*drv=(\S+) [^\\n]*",
all_nics_in_dpdk_devbind,
)
- self.verify(
+ verify(
devbind_info_for_nic is not None,
f"Failed to find configured device ({nic.pci}) using dpdk-devbind.py",
)
# We know this isn't None, but mypy doesn't
assert devbind_info_for_nic is not None
- self.verify(
+ verify(
devbind_info_for_nic.group(1) == nic.os_driver_for_dpdk,
f"Driver for device {nic.pci} does not match driver listed in "
f"configuration (bound to {devbind_info_for_nic.group(1)})",
diff --git a/dts/tests/TestSuite_softnic.py b/dts/tests/TestSuite_softnic.py
index 44cd066998..fa91f7ee2f 100644
--- a/dts/tests/TestSuite_softnic.py
+++ b/dts/tests/TestSuite_softnic.py
@@ -13,6 +13,11 @@
requires_link_topology,
requires_nic_capability,
)
+from api.packet import (
+ get_expected_packets,
+ match_all_packets,
+ send_packets_and_capture,
+)
from api.testpmd import TestPmd
from api.testpmd.config import EthPeer
from framework.test_suite import TestSuite, func_test
@@ -96,9 +101,9 @@ def softnic(self) -> None:
port_topology=None,
) as shell:
shell.start()
- received_packets = self.send_packets_and_capture(self.packets)
+ received_packets = send_packets_and_capture(self.packets)
# packets are being forwarded without addresses being amended so
# we get the address as it would be expected to come from TG
- expected_packets = self.get_expected_packets(self.packets, sent_from_tg=True)
+ expected_packets = get_expected_packets(self.packets, sent_from_tg=True)
- self.match_all_packets(expected_packets, received_packets)
+ match_all_packets(expected_packets, received_packets)
diff --git a/dts/tests/TestSuite_uni_pkt.py b/dts/tests/TestSuite_uni_pkt.py
index d258f95d24..97d61cd03a 100644
--- a/dts/tests/TestSuite_uni_pkt.py
+++ b/dts/tests/TestSuite_uni_pkt.py
@@ -25,6 +25,8 @@
requires_link_topology,
requires_nic_capability,
)
+from api.packet import send_packet_and_capture
+from api.test import verify
from api.testpmd import TestPmd
from api.testpmd.config import SimpleForwardingModes
from api.testpmd.types import RtePTypes, TestPmdVerbosePacket
@@ -55,10 +57,10 @@ def _send_packet_and_verify_flags(
self, expected_flag: RtePTypes, packet: Packet, testpmd: TestPmd
) -> None:
"""Sends a packet to the DUT and verifies the verbose ptype flags."""
- self.send_packet_and_capture(packet=packet)
+ send_packet_and_capture(packet=packet)
verbose_output = testpmd.extract_verbose_output(testpmd.stop())
valid = self._check_for_matching_packet(output=verbose_output, flags=expected_flag)
- self.verify(valid, f"Packet type flag did not match the expected flag: {expected_flag}.")
+ verify(valid, f"Packet type flag did not match the expected flag: {expected_flag}.")
def _setup_session(
self, testpmd: TestPmd, expected_flags: list[RtePTypes], packet_list=list[Packet]
diff --git a/dts/tests/TestSuite_vlan.py b/dts/tests/TestSuite_vlan.py
index 6c1b181c74..0ef63562b6 100644
--- a/dts/tests/TestSuite_vlan.py
+++ b/dts/tests/TestSuite_vlan.py
@@ -21,6 +21,8 @@
requires_link_topology,
requires_nic_capability,
)
+from api.packet import send_packet_and_capture
+from api.test import verify
from api.testpmd import TestPmd
from api.testpmd.config import SimpleForwardingModes
from framework.test_suite import TestSuite, func_test
@@ -55,30 +57,30 @@ def _send_vlan_packet_and_verify(self, should_receive: bool, strip: bool, vlan_i
vlan_id: Expected VLAN ID.
"""
packet = Ether() / Dot1Q(vlan=vlan_id) / Raw(load="xxxxx")
- received_packets = self.send_packet_and_capture(packet)
+ received_packets = send_packet_and_capture(packet)
test_packet = None
for packet in received_packets:
if hasattr(packet, "load") and b"xxxxx" in packet.load:
test_packet = packet
break
if should_receive:
- self.verify(
+ verify(
test_packet is not None,
"Packet was dropped when it should have been received",
)
if test_packet is not None:
if strip:
- self.verify(
+ verify(
not test_packet.haslayer(Dot1Q),
"VLAN tag was not stripped successfully",
)
else:
- self.verify(
+ verify(
test_packet.vlan == vlan_id,
"The received tag did not match the expected tag",
)
else:
- self.verify(
+ verify(
test_packet is None,
"Packet was received when it should have been dropped",
)
@@ -90,22 +92,22 @@ def _send_packet_and_verify_insertion(self, expected_id: int) -> None:
expected_id: The VLAN id that is being inserted through tx_offload configuration.
"""
packet = Ether() / Raw(load="xxxxx")
- received_packets = self.send_packet_and_capture(packet)
+ received_packets = send_packet_and_capture(packet)
test_packet = None
for packet in received_packets:
if hasattr(packet, "load") and b"xxxxx" in packet.load:
test_packet = packet
break
- self.verify(
+ verify(
test_packet is not None,
"Packet was dropped when it should have been received",
)
if test_packet is not None:
- self.verify(
+ verify(
test_packet.haslayer(Dot1Q) == 1,
"The received packet did not have a VLAN tag",
)
- self.verify(
+ verify(
test_packet.vlan == expected_id,
"The received tag did not match the expected tag",
)
--
2.39.5
^ permalink raw reply [flat|nested] 3+ messages in thread
end of thread, other threads:[~2025-09-23 10:38 UTC | newest]
Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2025-09-23 10:37 [PATCH v1 0/2] Move TestSuite methods to API modules Paul Szczepanek
2025-09-23 10:37 ` [PATCH v1 1/2] dts: add packet handling and test utilities to API Paul Szczepanek
2025-09-23 10:37 ` [PATCH v1 2/2] dts: adjust all tests to use the new API calls Paul Szczepanek
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).