DPDK patches and discussions
 help / color / mirror / Atom feed
* [PATCH v1 0/2] Move TestSuite methods to API modules
@ 2025-09-23 10:37 Paul Szczepanek
  2025-09-23 10:37 ` [PATCH v1 1/2] dts: add packet handling and test utilities to API Paul Szczepanek
  2025-09-23 10:37 ` [PATCH v1 2/2] dts: adjust all tests to use the new API calls Paul Szczepanek
  0 siblings, 2 replies; 3+ messages in thread
From: Paul Szczepanek @ 2025-09-23 10:37 UTC (permalink / raw)
  To: dev; +Cc: Paul Szczepanek

Split up TestSuite internals from methods that can be used in tests.
Packet and test related methods have been moved to new API modules:
dts/api/packet.py,
dts/api/test.py.
All tests have been adjusted to use the new API calls.

Paul Szczepanek (2):
  dts: add packet handling and test utilities to API
  dts: adjust all tests to use the new API calls

 doc/api/dts/api.packet.rst                    |   8 +
 doc/api/dts/api.rst                           |   4 +-
 doc/api/dts/api.test.rst                      |   8 +
 doc/guides/tools/dts.rst                      |   2 +-
 dts/api/packet.py                             | 311 ++++++++++++++++++
 dts/api/test.py                               | 126 +++++++
 dts/framework/test_suite.py                   | 305 +----------------
 dts/tests/TestSuite_blocklist.py              |   5 +-
 dts/tests/TestSuite_checksum_offload.py       |  14 +-
 dts/tests/TestSuite_dual_vlan.py              |  26 +-
 dts/tests/TestSuite_dynamic_config.py         |   8 +-
 dts/tests/TestSuite_dynamic_queue_conf.py     |  13 +-
 dts/tests/TestSuite_hello_world.py            |   3 +-
 dts/tests/TestSuite_l2fwd.py                  |  11 +-
 dts/tests/TestSuite_mac_filter.py             |  16 +-
 dts/tests/TestSuite_mtu.py                    |  10 +-
 dts/tests/TestSuite_packet_capture.py         |  22 +-
 dts/tests/TestSuite_pmd_buffer_scatter.py     |  10 +-
 dts/tests/TestSuite_port_control.py           |  14 +-
 ...stSuite_port_restart_config_persistency.py |   3 +-
 dts/tests/TestSuite_port_stats.py             |  12 +-
 dts/tests/TestSuite_promisc_support.py        |  20 +-
 dts/tests/TestSuite_queue_start_stop.py       |   6 +-
 dts/tests/TestSuite_rte_flow.py               |  70 ++--
 dts/tests/TestSuite_smoke_tests.py            |   7 +-
 dts/tests/TestSuite_softnic.py                |  11 +-
 dts/tests/TestSuite_uni_pkt.py                |   6 +-
 dts/tests/TestSuite_vlan.py                   |  20 +-
 28 files changed, 638 insertions(+), 433 deletions(-)
 create mode 100644 doc/api/dts/api.packet.rst
 create mode 100644 doc/api/dts/api.test.rst
 create mode 100644 dts/api/packet.py
 create mode 100644 dts/api/test.py

--
2.39.5


^ permalink raw reply	[flat|nested] 3+ messages in thread

* [PATCH v1 1/2] dts: add packet handling and test utilities to API
  2025-09-23 10:37 [PATCH v1 0/2] Move TestSuite methods to API modules Paul Szczepanek
@ 2025-09-23 10:37 ` Paul Szczepanek
  2025-09-23 10:37 ` [PATCH v1 2/2] dts: adjust all tests to use the new API calls Paul Szczepanek
  1 sibling, 0 replies; 3+ messages in thread
From: Paul Szczepanek @ 2025-09-23 10:37 UTC (permalink / raw)
  To: dev; +Cc: Paul Szczepanek, Luca Vizzarro

Split TestSuite methods between test run methods
and packet related methods.

Depends-on: series-36111 ("Split DTS framework and public API")

Signed-off-by: Paul Szczepanek <paul.szczepanek@arm.com>
Reviewed-by: Luca Vizzarro <luca.vizzarro@arm.com>
---
 doc/api/dts/api.packet.rst  |   8 +
 doc/api/dts/api.rst         |   4 +-
 doc/api/dts/api.test.rst    |   8 +
 doc/guides/tools/dts.rst    |   2 +-
 dts/api/packet.py           | 311 ++++++++++++++++++++++++++++++++++++
 dts/api/test.py             | 126 +++++++++++++++
 dts/framework/test_suite.py | 305 +----------------------------------
 7 files changed, 460 insertions(+), 304 deletions(-)
 create mode 100644 doc/api/dts/api.packet.rst
 create mode 100644 doc/api/dts/api.test.rst
 create mode 100644 dts/api/packet.py
 create mode 100644 dts/api/test.py

diff --git a/doc/api/dts/api.packet.rst b/doc/api/dts/api.packet.rst
new file mode 100644
index 0000000000..93d455a609
--- /dev/null
+++ b/doc/api/dts/api.packet.rst
@@ -0,0 +1,8 @@
+.. SPDX-License-Identifier: BSD-3-Clause
+
+packet - Sending and capturing packets
+======================================
+
+.. automodule:: api.packet
+   :members:
+   :show-inheritance:
diff --git a/doc/api/dts/api.rst b/doc/api/dts/api.rst
index 9a13704f16..d7d927bfa9 100644
--- a/doc/api/dts/api.rst
+++ b/doc/api/dts/api.rst
@@ -17,5 +17,7 @@ api - DTS API
    :hidden:
    :maxdepth: 1

+   api.test
    api.artifact
-   api.capabilities
\ No newline at end of file
+   api.capabilities
+   api.packet
\ No newline at end of file
diff --git a/doc/api/dts/api.test.rst b/doc/api/dts/api.test.rst
new file mode 100644
index 0000000000..91605cc31b
--- /dev/null
+++ b/doc/api/dts/api.test.rst
@@ -0,0 +1,8 @@
+.. SPDX-License-Identifier: BSD-3-Clause
+
+test - Reporting results and logging
+====================================
+
+.. automodule:: api.test
+   :members:
+   :show-inheritance:
diff --git a/doc/guides/tools/dts.rst b/doc/guides/tools/dts.rst
index e7af60b7a6..2445efccfc 100644
--- a/doc/guides/tools/dts.rst
+++ b/doc/guides/tools/dts.rst
@@ -455,7 +455,7 @@ Test Case Verification

    Use the verify method to assert conditions and record test results.
    This should typically be called at the end of each test case.
-   Example: self.verify(link_up, "Link should be up after configuration.")
+   Example: verify(link_up, "Link should be up after configuration.")

 Other Methods

diff --git a/dts/api/packet.py b/dts/api/packet.py
new file mode 100644
index 0000000000..bf6f3e1be8
--- /dev/null
+++ b/dts/api/packet.py
@@ -0,0 +1,311 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2025 Arm Limited
+
+"""Features common to all test suites.
+
+The module defines the :class:`TestSuite` class which doesn't contain any test cases, and as such
+must be extended by subclasses which add test cases. The :class:`TestSuite` contains the basics
+needed by subclasses:
+
+    * Testbed (SUT, TG) configuration,
+    * Packet sending and verification,
+    * Test case verification.
+"""
+
+from collections import Counter
+from typing import cast
+
+from scapy.layers.inet import IP
+from scapy.layers.l2 import Ether
+from scapy.packet import Packet, Padding, raw
+
+from api.test import fail, log_debug
+from framework.context import get_ctx
+from framework.exception import InternalError
+from framework.testbed_model.traffic_generator.capturing_traffic_generator import (
+    PacketFilteringConfig,
+)
+from framework.utils import get_packet_summaries
+
+
+def send_packet_and_capture(
+    packet: Packet,
+    filter_config: PacketFilteringConfig = PacketFilteringConfig(),
+    duration: float = 1,
+) -> list[Packet]:
+    """Send and receive `packet` using the associated TG.
+
+    Send `packet` through the appropriate interface and receive on the appropriate interface.
+    Modify the packet with l3/l2 addresses corresponding to the testbed and desired traffic.
+
+    Args:
+        packet: The packet to send.
+        filter_config: The filter to use when capturing packets.
+        duration: Capture traffic for this amount of time after sending `packet`.
+
+    Returns:
+        A list of received packets.
+    """
+    return send_packets_and_capture(
+        [packet],
+        filter_config,
+        duration,
+    )
+
+
+def send_packets_and_capture(
+    packets: list[Packet],
+    filter_config: PacketFilteringConfig = PacketFilteringConfig(),
+    duration: float = 1,
+) -> list[Packet]:
+    """Send and receive `packets` using the associated TG.
+
+    Send `packets` through the appropriate interface and receive on the appropriate interface.
+    Modify the packets with l3/l2 addresses corresponding to the testbed and desired traffic.
+
+    Args:
+        packets: The packets to send.
+        filter_config: The filter to use when capturing packets.
+        duration: Capture traffic for this amount of time after sending `packet`.
+
+    Returns:
+        A list of received packets.
+    """
+    from framework.context import get_ctx
+    from framework.testbed_model.traffic_generator.capturing_traffic_generator import (
+        CapturingTrafficGenerator,
+    )
+
+    assert isinstance(
+        get_ctx().tg, CapturingTrafficGenerator
+    ), "Cannot capture with a non-capturing traffic generator"
+    tg: CapturingTrafficGenerator = cast(CapturingTrafficGenerator, get_ctx().tg)
+    # TODO: implement @requires for types of traffic generator
+    packets = adjust_addresses(packets)
+    return tg.send_packets_and_capture(
+        packets,
+        get_ctx().topology.tg_port_egress,
+        get_ctx().topology.tg_port_ingress,
+        filter_config,
+        duration,
+    )
+
+
+def send_packets(
+    packets: list[Packet],
+) -> None:
+    """Send packets using the traffic generator and do not capture received traffic.
+
+    Args:
+        packets: Packets to send.
+    """
+    packets = adjust_addresses(packets)
+    get_ctx().tg.send_packets(packets, get_ctx().topology.tg_port_egress)
+
+
+def get_expected_packets(
+    packets: list[Packet],
+    sent_from_tg: bool = False,
+) -> list[Packet]:
+    """Inject the proper L2/L3 addresses into `packets`.
+
+    Inject the L2/L3 addresses expected at the receiving end of the traffic generator.
+
+    Args:
+        packets: The packets to modify.
+        sent_from_tg: If :data:`True` packet was sent from the TG.
+
+    Returns:
+        `packets` with injected L2/L3 addresses.
+    """
+    return adjust_addresses(packets, not sent_from_tg)
+
+
+def get_expected_packet(
+    packet: Packet,
+    sent_from_tg: bool = False,
+) -> Packet:
+    """Inject the proper L2/L3 addresses into `packet`.
+
+    Inject the L2/L3 addresses expected at the receiving end of the traffic generator.
+
+    Args:
+        packet: The packet to modify.
+        sent_from_tg: If :data:`True` packet was sent from the TG.
+
+    Returns:
+        `packet` with injected L2/L3 addresses.
+    """
+    return get_expected_packets([packet], sent_from_tg)[0]
+
+
+def adjust_addresses(packets: list[Packet], expected: bool = False) -> list[Packet]:
+    """L2 and L3 address additions in both directions.
+
+    Copies of `packets` will be made, modified and returned in this method.
+
+    Only missing addresses are added to packets, existing addresses will not be overridden. If
+    any packet in `packets` has multiple IP layers (using GRE, for example) only the inner-most
+    IP layer will have its addresses adjusted.
+
+    Assumptions:
+        Two links between SUT and TG, one link is TG -> SUT, the other SUT -> TG.
+
+    Args:
+        packets: The packets to modify.
+        expected: If :data:`True`, the direction is SUT -> TG,
+            otherwise the direction is TG -> SUT.
+
+    Returns:
+        A list containing copies of all packets in `packets` after modification.
+
+    Raises:
+        InternalError: If no tests are running.
+    """
+    from framework.test_suite import TestSuite
+
+    if get_ctx().local.current_test_suite is None:
+        raise InternalError("No current test suite, tests aren't running?")
+    current_test_suite: TestSuite = cast(TestSuite, get_ctx().local.current_test_suite)
+    return current_test_suite._adjust_addresses(packets, expected)
+
+
+def match_all_packets(
+    expected_packets: list[Packet],
+    received_packets: list[Packet],
+    verify: bool = True,
+) -> bool:
+    """Matches all the expected packets against the received ones.
+
+    Matching is performed by counting down the occurrences in a dictionary which keys are the
+    raw packet bytes. No deep packet comparison is performed. All the unexpected packets (noise)
+    are automatically ignored.
+
+    Args:
+        expected_packets: The packets we are expecting to receive.
+        received_packets: All the packets that were received.
+        verify: If :data:`True`, and there are missing packets an exception will be raised.
+
+    Raises:
+        TestCaseVerifyError: if and not all the `expected_packets` were found in
+            `received_packets`.
+
+    Returns:
+        :data:`True` If there are no missing packets.
+    """
+    expected_packets_counters = Counter(map(raw, expected_packets))
+    received_packets_counters = Counter(map(raw, received_packets))
+    # The number of expected packets is subtracted by the number of received packets, ignoring
+    # any unexpected packets and capping at zero.
+    missing_packets_counters = expected_packets_counters - received_packets_counters
+    missing_packets_count = missing_packets_counters.total()
+    log_debug(
+        f"match_all_packets: expected {len(expected_packets)}, "
+        f"received {len(received_packets)}, missing {missing_packets_count}"
+    )
+
+    if missing_packets_count != 0:
+        if verify:
+            fail(
+                f"Not all packets were received, expected {len(expected_packets)} "
+                f"but {missing_packets_count} were missing."
+            )
+        return False
+
+    return True
+
+
+def verify_packets(expected_packet: Packet, received_packets: list[Packet]) -> None:
+    """Verify that `expected_packet` has been received.
+
+    Go through `received_packets` and check that `expected_packet` is among them.
+    If not, raise an exception and log the last 10 commands
+    executed on both the SUT and TG.
+
+    Args:
+        expected_packet: The packet we're expecting to receive.
+        received_packets: The packets where we're looking for `expected_packet`.
+
+    Raises:
+        TestCaseVerifyError: `expected_packet` is not among `received_packets`.
+    """
+    for received_packet in received_packets:
+        if _compare_packets(expected_packet, received_packet):
+            break
+    else:
+        log_debug(
+            f"The expected packet {expected_packet.summary()} "
+            f"not found among received {get_packet_summaries(received_packets)}"
+        )
+        fail("An expected packet not found among received packets.")
+
+
+def _compare_packets(expected_packet: Packet, received_packet: Packet) -> bool:
+    log_debug(f"Comparing packets: \n{expected_packet.summary()}\n{received_packet.summary()}")
+
+    l3 = IP in expected_packet.layers()
+    log_debug("Found l3 layer")
+
+    received_payload = received_packet
+    expected_payload = expected_packet
+    while received_payload and expected_payload:
+        log_debug("Comparing payloads:")
+        log_debug(f"Received: {received_payload}")
+        log_debug(f"Expected: {expected_payload}")
+        if type(received_payload) is type(expected_payload):
+            log_debug("The layers are the same.")
+            if type(received_payload) is Ether:
+                if not _verify_l2_frame(received_payload, l3):
+                    return False
+            elif type(received_payload) is IP:
+                assert type(expected_payload) is IP
+                if not _verify_l3_packet(received_payload, expected_payload):
+                    return False
+        else:
+            # Different layers => different packets
+            return False
+        received_payload = received_payload.payload
+        expected_payload = expected_payload.payload
+
+    if expected_payload:
+        log_debug(f"The expected packet did not contain {expected_payload}.")
+        return False
+    if received_payload and received_payload.__class__ != Padding:
+        log_debug("The received payload had extra layers which were not padding.")
+        return False
+    return True
+
+
+def _verify_l2_frame(received_packet: Ether, contains_l3: bool) -> bool:
+    """Verify the L2 frame of `received_packet`.
+
+    Args:
+        received_packet: The received L2 frame to verify.
+        contains_l3: If :data:`True`, the packet contains an L3 layer.
+    """
+    log_debug("Looking at the Ether layer.")
+    log_debug(
+        f"Comparing received dst mac '{received_packet.dst}' "
+        f"with expected '{get_ctx().topology.tg_port_ingress.mac_address}'."
+    )
+    if received_packet.dst != get_ctx().topology.tg_port_ingress.mac_address:
+        return False
+
+    expected_src_mac = get_ctx().topology.tg_port_egress.mac_address
+    if contains_l3:
+        expected_src_mac = get_ctx().topology.sut_port_egress.mac_address
+    log_debug(
+        f"Comparing received src mac '{received_packet.src}' "
+        f"with expected '{expected_src_mac}'."
+    )
+    if received_packet.src != expected_src_mac:
+        return False
+
+    return True
+
+
+def _verify_l3_packet(received_packet: IP, expected_packet: IP) -> bool:
+    log_debug("Looking at the IP layer.")
+    if received_packet.src != expected_packet.src or received_packet.dst != expected_packet.dst:
+        return False
+    return True
diff --git a/dts/api/test.py b/dts/api/test.py
new file mode 100644
index 0000000000..f58c82715d
--- /dev/null
+++ b/dts/api/test.py
@@ -0,0 +1,126 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2025 Arm Limited
+
+"""Common test utilities.
+
+This module provides utility functions for test cases, including logging, verification.
+"""
+
+from framework.context import get_ctx
+from framework.exception import InternalError, SkippedTestException, TestCaseVerifyError
+from framework.logger import DTSLogger
+
+
+def get_current_test_case_name() -> str:
+    """The name of the current test case.
+
+    Raises:
+        InternalError: If no test is running.
+    """
+    current_test_case = get_ctx().local.current_test_case
+    if current_test_case is None:
+        raise InternalError("No current test case")
+    return current_test_case.name
+
+
+def log(message: str) -> None:
+    """Log the given message with the level 'INFO'.
+
+    Args:
+        message: String representing the message to log.
+
+    Raises:
+        InternalError: If no test is running.
+    """
+    get_logger().info(message)
+
+
+def log_debug(message: str) -> None:
+    """Log the given message with the level 'DEBUG'.
+
+    Args:
+        message: String representing the message to log.
+
+    Raises:
+        InternalError: If no test is running.
+    """
+    get_logger().debug(message)
+
+
+def verify(condition: bool, failure_description: str) -> None:
+    """Verify `condition` and handle failures.
+
+    When `condition` is :data:`False`, raise an exception and log the last 10 commands
+    executed on both the SUT and TG.
+
+    Args:
+        condition: The condition to check.
+        failure_description: A short description of the failure
+            that will be stored in the raised exception.
+
+    Raises:
+        TestCaseVerifyError: If `condition` is :data:`False`.
+    """
+    if not condition:
+        fail(failure_description)
+
+
+def verify_else_skip(condition: bool, skip_reason: str) -> None:
+    """Verify `condition` and handle skips.
+
+    When `condition` is :data:`False`, raise a skip exception.
+
+    Args:
+        condition: The condition to check.
+        skip_reason: Description of the reason for skipping.
+
+    Raises:
+        SkippedTestException: If `condition` is :data:`False`.
+    """
+    if not condition:
+        skip(skip_reason)
+
+
+def skip(skip_description: str) -> None:
+    """Skip the current test case or test suite with a given description.
+
+    Args:
+        skip_description: Description of the reason for skipping.
+
+    Raises:
+        SkippedTestException: Always raised to indicate the test was skipped.
+    """
+    get_logger().debug(f"Test skipped: {skip_description}")
+    raise SkippedTestException(skip_description)
+
+
+def fail(failure_description: str) -> None:
+    """Fail the current test case with a given description.
+
+    Logs the last 10 commands executed on both the SUT and TG before raising an exception.
+
+    Args:
+        failure_description: Description of the reason for failure.
+
+    Raises:
+        TestCaseVerifyError: Always raised to indicate the test case failed.
+    """
+    get_logger().debug("A test case failed, showing the last 10 commands executed on SUT:")
+    for command_res in get_ctx().sut_node.main_session.remote_session.history[-10:]:
+        get_logger().debug(command_res.command)
+    get_logger().debug("A test case failed, showing the last 10 commands executed on TG:")
+    for command_res in get_ctx().tg_node.main_session.remote_session.history[-10:]:
+        get_logger().debug(command_res.command)
+    raise TestCaseVerifyError(failure_description)
+
+
+def get_logger() -> DTSLogger:
+    """Get a logger instance for tests.
+
+    Raises:
+        InternalError: If no test is running.
+    """
+    current_test_suite = get_ctx().local.current_test_suite
+    if current_test_suite is None:
+        raise InternalError("No current test suite")
+    return current_test_suite._logger
diff --git a/dts/framework/test_suite.py b/dts/framework/test_suite.py
index 5ee5a039d7..9c57e343ac 100644
--- a/dts/framework/test_suite.py
+++ b/dts/framework/test_suite.py
@@ -15,7 +15,6 @@
 """

 import inspect
-from collections import Counter
 from collections.abc import Callable, Sequence
 from dataclasses import dataclass
 from enum import Enum, auto
@@ -27,21 +26,16 @@
 from typing import TYPE_CHECKING, ClassVar, Protocol, TypeVar, Union, cast

 from scapy.layers.inet import IP
-from scapy.layers.l2 import Ether
-from scapy.packet import Packet, Padding, raw
+from scapy.packet import Packet
 from typing_extensions import Self

 from framework.config.common import FrozenModel
 from framework.testbed_model.capability import TestProtocol
 from framework.testbed_model.topology import Topology
-from framework.testbed_model.traffic_generator.capturing_traffic_generator import (
-    CapturingTrafficGenerator,
-    PacketFilteringConfig,
-)

-from .exception import ConfigurationError, InternalError, SkippedTestException, TestCaseVerifyError
+from .exception import ConfigurationError, InternalError
 from .logger import DTSLogger, get_dts_logger
-from .utils import get_packet_summaries, to_pascal_case
+from .utils import to_pascal_case

 if TYPE_CHECKING:
     from framework.context import Context
@@ -212,121 +206,6 @@ def tear_down_test_case(self) -> None:
         This is done after *each* test case.
         """

-    def send_packet_and_capture(
-        self,
-        packet: Packet,
-        filter_config: PacketFilteringConfig = PacketFilteringConfig(),
-        duration: float = 1,
-    ) -> list[Packet]:
-        """Send and receive `packet` using the associated TG.
-
-        Send `packet` through the appropriate interface and receive on the appropriate interface.
-        Modify the packet with l3/l2 addresses corresponding to the testbed and desired traffic.
-
-        Args:
-            packet: The packet to send.
-            filter_config: The filter to use when capturing packets.
-            duration: Capture traffic for this amount of time after sending `packet`.
-
-        Returns:
-            A list of received packets.
-        """
-        return self.send_packets_and_capture(
-            [packet],
-            filter_config,
-            duration,
-        )
-
-    def send_packets_and_capture(
-        self,
-        packets: list[Packet],
-        filter_config: PacketFilteringConfig = PacketFilteringConfig(),
-        duration: float = 1,
-    ) -> list[Packet]:
-        """Send and receive `packets` using the associated TG.
-
-        Send `packets` through the appropriate interface and receive on the appropriate interface.
-        Modify the packets with l3/l2 addresses corresponding to the testbed and desired traffic.
-
-        Args:
-            packets: The packets to send.
-            filter_config: The filter to use when capturing packets.
-            duration: Capture traffic for this amount of time after sending `packet`.
-
-        Returns:
-            A list of received packets.
-        """
-        assert isinstance(
-            self._ctx.tg, CapturingTrafficGenerator
-        ), "Cannot capture with a non-capturing traffic generator"
-        # TODO: implement @requires for types of traffic generator
-        packets = self._adjust_addresses(packets)
-        return self._ctx.tg.send_packets_and_capture(
-            packets,
-            self._ctx.topology.tg_port_egress,
-            self._ctx.topology.tg_port_ingress,
-            filter_config,
-            duration,
-        )
-
-    def send_packets(
-        self,
-        packets: list[Packet],
-    ) -> None:
-        """Send packets using the traffic generator and do not capture received traffic.
-
-        Args:
-            packets: Packets to send.
-        """
-        packets = self._adjust_addresses(packets)
-        self._ctx.tg.send_packets(packets, self._ctx.topology.tg_port_egress)
-
-    def get_expected_packets(
-        self,
-        packets: list[Packet],
-        sent_from_tg: bool = False,
-    ) -> list[Packet]:
-        """Inject the proper L2/L3 addresses into `packets`.
-
-        Inject the L2/L3 addresses expected at the receiving end of the traffic generator.
-
-        Args:
-            packets: The packets to modify.
-            sent_from_tg: If :data:`True` packet was sent from the TG.
-
-        Returns:
-            `packets` with injected L2/L3 addresses.
-        """
-        return self._adjust_addresses(packets, not sent_from_tg)
-
-    def get_expected_packet(
-        self,
-        packet: Packet,
-        sent_from_tg: bool = False,
-    ) -> Packet:
-        """Inject the proper L2/L3 addresses into `packet`.
-
-        Inject the L2/L3 addresses expected at the receiving end of the traffic generator.
-
-        Args:
-            packet: The packet to modify.
-            sent_from_tg: If :data:`True` packet was sent from the TG.
-
-        Returns:
-            `packet` with injected L2/L3 addresses.
-        """
-        return self.get_expected_packets([packet], sent_from_tg)[0]
-
-    def log(self, message: str) -> None:
-        """Call the private instance of logger within the TestSuite class.
-
-        Log the given message with the level 'INFO'.
-
-        Args:
-            message: String representing the message to log.
-        """
-        self._logger.info(message)
-
     def _adjust_addresses(self, packets: list[Packet], expected: bool = False) -> list[Packet]:
         """L2 and L3 address additions in both directions.

@@ -388,184 +267,6 @@ def _adjust_addresses(self, packets: list[Packet], expected: bool = False) -> li

         return ret_packets

-    def verify(self, condition: bool, failure_description: str) -> None:
-        """Verify `condition` and handle failures.
-
-        When `condition` is :data:`False`, raise an exception and log the last 10 commands
-        executed on both the SUT and TG.
-
-        Args:
-            condition: The condition to check.
-            failure_description: A short description of the failure
-                that will be stored in the raised exception.
-
-        Raises:
-            TestCaseVerifyError: `condition` is :data:`False`.
-        """
-        if not condition:
-            self._fail_test_case_verify(failure_description)
-
-    def _fail_test_case_verify(self, failure_description: str) -> None:
-        self._logger.debug("A test case failed, showing the last 10 commands executed on SUT:")
-        for command_res in self._ctx.sut_node.main_session.remote_session.history[-10:]:
-            self._logger.debug(command_res.command)
-        self._logger.debug("A test case failed, showing the last 10 commands executed on TG:")
-        for command_res in self._ctx.tg_node.main_session.remote_session.history[-10:]:
-            self._logger.debug(command_res.command)
-        raise TestCaseVerifyError(failure_description)
-
-    def verify_else_skip(self, condition: bool, skip_reason: str) -> None:
-        """Verify `condition` and handle skips.
-
-        When `condition` is :data:`False`, raise a skip exception.
-
-        Args:
-            condition: The condition to check.
-            skip_reason: Description of the reason for skipping.
-
-        Raises:
-            SkippedTestException: `condition` is :data:`False`.
-        """
-        if not condition:
-            self._skip_test_case_verify(skip_reason)
-
-    def _skip_test_case_verify(self, skip_description: str) -> None:
-        self._logger.debug(f"Test case skipped: {skip_description}")
-        raise SkippedTestException(skip_description)
-
-    def verify_packets(self, expected_packet: Packet, received_packets: list[Packet]) -> None:
-        """Verify that `expected_packet` has been received.
-
-        Go through `received_packets` and check that `expected_packet` is among them.
-        If not, raise an exception and log the last 10 commands
-        executed on both the SUT and TG.
-
-        Args:
-            expected_packet: The packet we're expecting to receive.
-            received_packets: The packets where we're looking for `expected_packet`.
-
-        Raises:
-            TestCaseVerifyError: `expected_packet` is not among `received_packets`.
-        """
-        for received_packet in received_packets:
-            if self._compare_packets(expected_packet, received_packet):
-                break
-        else:
-            self._logger.debug(
-                f"The expected packet {expected_packet.summary()} "
-                f"not found among received {get_packet_summaries(received_packets)}"
-            )
-            self._fail_test_case_verify("An expected packet not found among received packets.")
-
-    def match_all_packets(
-        self,
-        expected_packets: list[Packet],
-        received_packets: list[Packet],
-        verify: bool = True,
-    ) -> bool:
-        """Matches all the expected packets against the received ones.
-
-        Matching is performed by counting down the occurrences in a dictionary which keys are the
-        raw packet bytes. No deep packet comparison is performed. All the unexpected packets (noise)
-        are automatically ignored.
-
-        Args:
-            expected_packets: The packets we are expecting to receive.
-            received_packets: All the packets that were received.
-            verify: If :data:`True`, and there are missing packets an exception will be raised.
-
-        Raises:
-            TestCaseVerifyError: if and not all the `expected_packets` were found in
-                `received_packets`.
-
-        Returns:
-            :data:`True` If there are no missing packets.
-        """
-        expected_packets_counters = Counter(map(raw, expected_packets))
-        received_packets_counters = Counter(map(raw, received_packets))
-        # The number of expected packets is subtracted by the number of received packets, ignoring
-        # any unexpected packets and capping at zero.
-        missing_packets_counters = expected_packets_counters - received_packets_counters
-        missing_packets_count = missing_packets_counters.total()
-        self._logger.debug(
-            f"match_all_packets: expected {len(expected_packets)}, "
-            f"received {len(received_packets)}, missing {missing_packets_count}"
-        )
-
-        if missing_packets_count != 0:
-            if verify:
-                self._fail_test_case_verify(
-                    f"Not all packets were received, expected {len(expected_packets)} "
-                    f"but {missing_packets_count} were missing."
-                )
-            return False
-
-        return True
-
-    def _compare_packets(self, expected_packet: Packet, received_packet: Packet) -> bool:
-        self._logger.debug(
-            f"Comparing packets: \n{expected_packet.summary()}\n{received_packet.summary()}"
-        )
-
-        l3 = IP in expected_packet.layers()
-        self._logger.debug("Found l3 layer")
-
-        received_payload = received_packet
-        expected_payload = expected_packet
-        while received_payload and expected_payload:
-            self._logger.debug("Comparing payloads:")
-            self._logger.debug(f"Received: {received_payload}")
-            self._logger.debug(f"Expected: {expected_payload}")
-            if type(received_payload) is type(expected_payload):
-                self._logger.debug("The layers are the same.")
-                if type(received_payload) is Ether:
-                    if not self._verify_l2_frame(received_payload, l3):
-                        return False
-                elif type(received_payload) is IP:
-                    assert type(expected_payload) is IP
-                    if not self._verify_l3_packet(received_payload, expected_payload):
-                        return False
-            else:
-                # Different layers => different packets
-                return False
-            received_payload = received_payload.payload
-            expected_payload = expected_payload.payload
-
-        if expected_payload:
-            self._logger.debug(f"The expected packet did not contain {expected_payload}.")
-            return False
-        if received_payload and received_payload.__class__ != Padding:
-            self._logger.debug("The received payload had extra layers which were not padding.")
-            return False
-        return True
-
-    def _verify_l2_frame(self, received_packet: Ether, l3: bool) -> bool:
-        self._logger.debug("Looking at the Ether layer.")
-        self._logger.debug(
-            f"Comparing received dst mac '{received_packet.dst}' "
-            f"with expected '{self.topology.tg_port_ingress.mac_address}'."
-        )
-        if received_packet.dst != self.topology.tg_port_ingress.mac_address:
-            return False
-
-        expected_src_mac = self.topology.tg_port_egress.mac_address
-        if l3:
-            expected_src_mac = self.topology.sut_port_egress.mac_address
-        self._logger.debug(
-            f"Comparing received src mac '{received_packet.src}' "
-            f"with expected '{expected_src_mac}'."
-        )
-        if received_packet.src != expected_src_mac:
-            return False
-
-        return True
-
-    def _verify_l3_packet(self, received_packet: IP, expected_packet: IP) -> bool:
-        self._logger.debug("Looking at the IP layer.")
-        if received_packet.src != expected_packet.src or received_packet.dst != expected_packet.dst:
-            return False
-        return True
-

 #: The generic type for a method of an instance of TestSuite
 TestSuiteMethodType = TypeVar("TestSuiteMethodType", bound=Callable[[TestSuite], None])
--
2.39.5


^ permalink raw reply	[flat|nested] 3+ messages in thread

* [PATCH v1 2/2] dts: adjust all tests to use the new API calls
  2025-09-23 10:37 [PATCH v1 0/2] Move TestSuite methods to API modules Paul Szczepanek
  2025-09-23 10:37 ` [PATCH v1 1/2] dts: add packet handling and test utilities to API Paul Szczepanek
@ 2025-09-23 10:37 ` Paul Szczepanek
  1 sibling, 0 replies; 3+ messages in thread
From: Paul Szczepanek @ 2025-09-23 10:37 UTC (permalink / raw)
  To: dev; +Cc: Paul Szczepanek, Luca Vizzarro

Former calls to TestSuite methods now call API methods.

Depends-on: series-36111 ("Split DTS framework and public API")

Signed-off-by: Paul Szczepanek <paul.szczepanek@arm.com>
Reviewed-by: Luca Vizzarro <luca.vizzarro@arm.com>
---
 dts/tests/TestSuite_blocklist.py              |  5 +-
 dts/tests/TestSuite_checksum_offload.py       | 14 ++--
 dts/tests/TestSuite_dual_vlan.py              | 26 +++----
 dts/tests/TestSuite_dynamic_config.py         |  8 ++-
 dts/tests/TestSuite_dynamic_queue_conf.py     | 13 ++--
 dts/tests/TestSuite_hello_world.py            |  3 +-
 dts/tests/TestSuite_l2fwd.py                  | 11 ++-
 dts/tests/TestSuite_mac_filter.py             | 16 +++--
 dts/tests/TestSuite_mtu.py                    | 10 +--
 dts/tests/TestSuite_packet_capture.py         | 22 +++---
 dts/tests/TestSuite_pmd_buffer_scatter.py     | 10 +--
 dts/tests/TestSuite_port_control.py           | 14 ++--
 ...stSuite_port_restart_config_persistency.py |  3 +-
 dts/tests/TestSuite_port_stats.py             | 12 ++--
 dts/tests/TestSuite_promisc_support.py        | 20 ++++--
 dts/tests/TestSuite_queue_start_stop.py       |  6 +-
 dts/tests/TestSuite_rte_flow.py               | 70 ++++++++++---------
 dts/tests/TestSuite_smoke_tests.py            |  7 +-
 dts/tests/TestSuite_softnic.py                | 11 ++-
 dts/tests/TestSuite_uni_pkt.py                |  6 +-
 dts/tests/TestSuite_vlan.py                   | 20 +++---
 21 files changed, 178 insertions(+), 129 deletions(-)

diff --git a/dts/tests/TestSuite_blocklist.py b/dts/tests/TestSuite_blocklist.py
index 6d3dba6756..ba37f39ab3 100644
--- a/dts/tests/TestSuite_blocklist.py
+++ b/dts/tests/TestSuite_blocklist.py
@@ -10,6 +10,7 @@
     LinkTopology,
     requires_link_topology,
 )
+from api.test import verify
 from api.testpmd import TestPmd
 from framework.test_suite import TestSuite, func_test
 from framework.testbed_model.port import Port
@@ -27,10 +28,10 @@ def _verify_blocklisted_ports(self, ports_to_block: list[Port]) -> None:

             # sanity check
             allowed_len = len(allowlisted_ports - blocklisted_ports)
-            self.verify(allowed_len > 0, "At least one port should have been allowed")
+            verify(allowed_len > 0, "At least one port should have been allowed")

             blocked = not allowlisted_ports & blocklisted_ports
-            self.verify(blocked, "At least one port was not blocklisted")
+            verify(blocked, "At least one port was not blocklisted")

     @func_test
     def no_blocklisted(self) -> None:
diff --git a/dts/tests/TestSuite_checksum_offload.py b/dts/tests/TestSuite_checksum_offload.py
index 70ae9c124c..8e1ec0f142 100644
--- a/dts/tests/TestSuite_checksum_offload.py
+++ b/dts/tests/TestSuite_checksum_offload.py
@@ -25,6 +25,8 @@
     requires_link_topology,
     requires_nic_capability,
 )
+from api.packet import send_packet_and_capture
+from api.test import verify
 from api.testpmd import TestPmd
 from api.testpmd.config import SimpleForwardingModes
 from api.testpmd.types import ChecksumOffloadOptions, PacketOffloadFlag
@@ -61,11 +63,11 @@ def _send_packets_and_verify(
                 by the traffic generator.
         """
         for i in range(0, len(packet_list)):
-            received_packets = self.send_packet_and_capture(packet=packet_list[i])
+            received_packets = send_packet_and_capture(packet=packet_list[i])
             received = any(
                 packet.haslayer(Raw) and load in packet.load for packet in received_packets
             )
-            self.verify(
+            verify(
                 received == should_receive,
                 f"Packet was {'dropped' if should_receive else 'received'}",
             )
@@ -85,19 +87,19 @@ def _send_packet_and_verify_checksum(
             id: The destination port that matches the sent packet in verbose output.
         """
         testpmd.start()
-        self.send_packet_and_capture(packet=packet)
+        send_packet_and_capture(packet=packet)
         verbose_output = testpmd.extract_verbose_output(testpmd.stop())
         is_IP = is_L4 = None
         for testpmd_packet in verbose_output:
             if testpmd_packet.l4_dport == id:
                 is_IP = PacketOffloadFlag.RTE_MBUF_F_RX_IP_CKSUM_GOOD in testpmd_packet.ol_flags
                 is_L4 = PacketOffloadFlag.RTE_MBUF_F_RX_L4_CKSUM_GOOD in testpmd_packet.ol_flags
-        self.verify(
+        verify(
             is_IP is not None and is_L4 is not None,
             "Test packet was dropped when it should have been received.",
         )
-        self.verify(is_L4 == good_L4, "Layer 4 checksum flag did not match expected checksum flag.")
-        self.verify(is_IP == good_IP, "IP checksum flag did not match expected checksum flag.")
+        verify(is_L4 == good_L4, "Layer 4 checksum flag did not match expected checksum flag.")
+        verify(is_IP == good_IP, "IP checksum flag did not match expected checksum flag.")

     def _setup_hw_offload(self, testpmd: TestPmd) -> None:
         """Sets IP, UDP, and TCP layers to hardware offload.
diff --git a/dts/tests/TestSuite_dual_vlan.py b/dts/tests/TestSuite_dual_vlan.py
index a40297ecaa..860eae7424 100644
--- a/dts/tests/TestSuite_dual_vlan.py
+++ b/dts/tests/TestSuite_dual_vlan.py
@@ -18,6 +18,8 @@
 from scapy.layers.l2 import Dot1Q, Ether
 from scapy.packet import Packet, Raw

+from api.packet import send_packet_and_capture
+from api.test import verify
 from api.testpmd import TestPmd
 from api.testpmd.config import SimpleForwardingModes
 from framework.test_suite import TestSuite, func_test
@@ -111,7 +113,7 @@ def _verify_vlan_functions(self, send_packet: Packet, options: TestCaseOptions)
             send_packet: Packet to send for testing.
             options: Flag which defines the currents configured settings in testpmd.
         """
-        recv = self.send_packet_and_capture(send_packet)
+        recv = send_packet_and_capture(send_packet)
         recv = list(filter(self._is_relevant_packet, recv))
         expected_layers: list[Packet] = []

@@ -119,13 +121,13 @@ def _verify_vlan_functions(self, send_packet: Packet, options: TestCaseOptions)
             expected_layers.append(Dot1Q(vlan=self.outer_vlan_tag))
         expected_layers.append(Dot1Q(vlan=self.inner_vlan_tag))

-        self.verify(
+        verify(
             len(recv) > 0,
             f"Expected to receive packet with the payload {expected_layers} but got nothing.",
         )

         for pkt in recv:
-            self.verify(
+            verify(
                 self._pkt_payload_contains_layers(pkt, *expected_layers),
                 f"Received packet ({pkt.summary()}) did not match the expected sequence of layers "
                 f"{expected_layers} with options {options}.",
@@ -175,11 +177,11 @@ def insert_second_vlan(self) -> None:
         with TestPmd(forward_mode=SimpleForwardingModes.mac) as testpmd:
             testpmd.tx_vlan_set(port=self.tx_port, enable=True, vlan=self.vlan_insert_tag)
             testpmd.start()
-            recv = self.send_packet_and_capture(
+            recv = send_packet_and_capture(
                 Ether() / Dot1Q(vlan=self.outer_vlan_tag) / Raw(b"X" * 20)
             )
-            self.verify(len(recv) > 0, "Did not receive any packets when testing VLAN insertion.")
-            self.verify(
+            verify(len(recv) > 0, "Did not receive any packets when testing VLAN insertion.")
+            verify(
                 any(
                     self._is_relevant_packet(p)
                     and self._pkt_payload_contains_layers(
@@ -211,9 +213,9 @@ def all_vlan_functions(self) -> None:
         )
         with TestPmd(forward_mode=SimpleForwardingModes.mac) as testpmd:
             testpmd.start()
-            recv = self.send_packet_and_capture(send_pkt)
-            self.verify(len(recv) > 0, "Unmodified packet was not received.")
-            self.verify(
+            recv = send_packet_and_capture(send_pkt)
+            verify(len(recv) > 0, "Unmodified packet was not received.")
+            verify(
                 any(
                     self._is_relevant_packet(p)
                     and self._pkt_payload_contains_layers(
@@ -252,9 +254,9 @@ def maintains_priority(self) -> None:
         )
         with TestPmd(forward_mode=SimpleForwardingModes.mac) as testpmd:
             testpmd.start()
-            recv = self.send_packet_and_capture(pkt)
-            self.verify(len(recv) > 0, "Did not receive any packets when testing VLAN priority.")
-            self.verify(
+            recv = send_packet_and_capture(pkt)
+            verify(len(recv) > 0, "Did not receive any packets when testing VLAN priority.")
+            verify(
                 any(
                     self._is_relevant_packet(p)
                     and self._pkt_payload_contains_layers(
diff --git a/dts/tests/TestSuite_dynamic_config.py b/dts/tests/TestSuite_dynamic_config.py
index 5cc96f2633..7204ec4f73 100644
--- a/dts/tests/TestSuite_dynamic_config.py
+++ b/dts/tests/TestSuite_dynamic_config.py
@@ -25,6 +25,8 @@
     requires_link_topology,
     requires_nic_capability,
 )
+from api.packet import send_packet_and_capture
+from api.test import verify
 from api.testpmd import TestPmd
 from api.testpmd.config import SimpleForwardingModes
 from framework.test_suite import TestSuite, func_test
@@ -62,11 +64,11 @@ def _send_packet_and_verify(self, should_receive: bool, mac_address: str) -> Non
             mac_address: Destination MAC address to generate in packet.
         """
         packet = Ether(dst=mac_address) / IP() / Raw(load="xxxxx")
-        received = self.send_packet_and_capture(packet)
+        received = send_packet_and_capture(packet)
         contains_packet = any(
             packet.haslayer(Raw) and b"xxxxx" in packet.load for packet in received
         )
-        self.verify(
+        verify(
             should_receive == contains_packet,
             f"Packet was {'dropped' if should_receive else 'received'}",
         )
@@ -100,7 +102,7 @@ def default_mode(self) -> None:
         """
         with TestPmd() as testpmd:
             is_promisc = testpmd.show_port_info(0).is_promiscuous_mode_enabled
-            self.verify(is_promisc, "Promiscuous mode was not enabled by default.")
+            verify(is_promisc, "Promiscuous mode was not enabled by default.")
             testpmd.start()
             mac = testpmd.show_port_info(0).mac_address
             # send a packet with Rx port mac address
diff --git a/dts/tests/TestSuite_dynamic_queue_conf.py b/dts/tests/TestSuite_dynamic_queue_conf.py
index bc7f981424..3ddfa31152 100644
--- a/dts/tests/TestSuite_dynamic_queue_conf.py
+++ b/dts/tests/TestSuite_dynamic_queue_conf.py
@@ -35,6 +35,8 @@
     NicCapability,
     requires_nic_capability,
 )
+from api.packet import send_packets
+from api.test import fail, verify
 from api.testpmd import TestPmd
 from api.testpmd.config import PortTopology, SimpleForwardingModes
 from framework.exception import InteractiveCommandExecutionError
@@ -109,7 +111,7 @@ def _wrap(self: "TestDynamicQueueConf", is_rx_testing: bool) -> None:
             self._send_packets_with_different_addresses(self.number_of_packets_to_send)
             forwarding_stats = testpmd.stop()
             for queue_id in queues_to_config:
-                self.verify(
+                verify(
                     self._port_queue_in_stats(port_id, is_rx_testing, queue_id, forwarding_stats),
                     f"Modified queue {queue_id} on port {port_id} failed to receive traffic after"
                     "being started again.",
@@ -172,7 +174,7 @@ def _send_packets_with_different_addresses(self, number_of_packets: int) -> None
             / Raw()
             for i in range(number_of_packets)
         ]
-        self.send_packets(packets_to_send)
+        send_packets(packets_to_send)

     def _port_queue_in_stats(
         self, port_id: int, is_rx_queue: bool, queue_id: int, stats: str
@@ -226,8 +228,7 @@ def _modify_ring_size(
             # The testpmd method verifies that the modification worked, so we catch that error
             # and just re-raise it as a test case failure
             except InteractiveCommandExecutionError:
-                self.verify(
-                    False,
+                fail(
                     f"Failed to update the ring size of queue {queue_id} on port "
                     f"{port_id} at runtime",
                 )
@@ -264,12 +265,12 @@ def _stop_queues(
         # it means there could be another reason for the packets not transmitting and,
         # therefore, a false positive result.
         for unchanged_q_id in unchanged_queues:
-            self.verify(
+            verify(
                 self._port_queue_in_stats(port_id, is_rx_testing, unchanged_q_id, forwarding_stats),
                 f"Queue {unchanged_q_id} failed to receive traffic.",
             )
         for stopped_q_id in queues_to_modify:
-            self.verify(
+            verify(
                 not self._port_queue_in_stats(
                     port_id, is_rx_testing, stopped_q_id, forwarding_stats
                 ),
diff --git a/dts/tests/TestSuite_hello_world.py b/dts/tests/TestSuite_hello_world.py
index 3560e9ec0b..bf1a93c782 100644
--- a/dts/tests/TestSuite_hello_world.py
+++ b/dts/tests/TestSuite_hello_world.py
@@ -8,6 +8,7 @@
 are properly configured.
 """

+from api.test import log
 from api.testpmd import TestPmd
 from framework.test_suite import BaseConfig, TestSuite, func_test

@@ -36,4 +37,4 @@ def hello_world(self) -> None:
         """
         with TestPmd() as testpmd:
             testpmd.start()
-        self.log(self.config.msg)
+        log(self.config.msg)
diff --git a/dts/tests/TestSuite_l2fwd.py b/dts/tests/TestSuite_l2fwd.py
index bc1b5162c3..596b892730 100644
--- a/dts/tests/TestSuite_l2fwd.py
+++ b/dts/tests/TestSuite_l2fwd.py
@@ -13,6 +13,11 @@
     requires_link_topology,
     requires_nic_capability,
 )
+from api.packet import (
+    get_expected_packets,
+    match_all_packets,
+    send_packets_and_capture,
+)
 from api.testpmd import TestPmd
 from api.testpmd.config import EthPeer, SimpleForwardingModes
 from framework.context import filter_cores
@@ -66,8 +71,8 @@ def l2fwd_integrity(self) -> None:
                 shell.set_ports_queues(queues_num)
                 shell.start()

-                received_packets = self.send_packets_and_capture(self.packets)
-                expected_packets = self.get_expected_packets(self.packets)
-                self.match_all_packets(expected_packets, received_packets)
+                received_packets = send_packets_and_capture(self.packets)
+                expected_packets = get_expected_packets(self.packets)
+                match_all_packets(expected_packets, received_packets)

                 shell.stop()
diff --git a/dts/tests/TestSuite_mac_filter.py b/dts/tests/TestSuite_mac_filter.py
index 6a51f9df4e..a7e24b37d5 100644
--- a/dts/tests/TestSuite_mac_filter.py
+++ b/dts/tests/TestSuite_mac_filter.py
@@ -23,6 +23,8 @@
     NicCapability,
     requires_nic_capability,
 )
+from api.packet import send_packet_and_capture
+from api.test import fail, verify
 from api.testpmd import TestPmd
 from framework.exception import InteractiveCommandExecutionError
 from framework.test_suite import TestSuite, func_test
@@ -73,16 +75,16 @@ def _send_packet_and_verify(
         packet.dst = mac_address
         received_packets = [
             packets
-            for packets in self.send_packet_and_capture(packet)
+            for packets in send_packet_and_capture(packet)
             if hasattr(packets, "load") and "X" * 22 in str(packets.load)
         ]
         if should_receive:
-            self.verify(
+            verify(
                 len(received_packets) == 1,
                 "Packet sent by test case should be forwarded and received.",
             )
         else:
-            self.verify(
+            verify(
                 len(received_packets) == 0,
                 "Packet sent by test case should not be forwarded and received.",
             )
@@ -160,12 +162,12 @@ def invalid_address(self) -> None:
             mac_address = self.topology.sut_port_ingress.mac_address
             try:
                 testpmd.set_mac_addr(0, "00:00:00:00:00:00", add=True)
-                self.verify(False, "Invalid mac address added.")
+                fail("Invalid mac address added.")
             except InteractiveCommandExecutionError:
                 pass
             try:
                 testpmd.set_mac_addr(0, mac_address, add=False)
-                self.verify(False, "Default mac address removed.")
+                fail("Default mac address removed.")
             except InteractiveCommandExecutionError:
                 pass
             # Should be no errors adding this twice
@@ -174,7 +176,7 @@ def invalid_address(self) -> None:
             # Double check to see if default mac address can be removed
             try:
                 testpmd.set_mac_addr(0, mac_address, add=False)
-                self.verify(False, "Default mac address removed.")
+                fail("Default mac address removed.")
             except InteractiveCommandExecutionError:
                 pass

@@ -190,7 +192,7 @@ def invalid_address(self) -> None:
                 testpmd.set_mac_addr(0, "E" + mac_address[1:], add=True)
                 # We add an extra address to compensate for mac address pool inconsistencies.
                 testpmd.set_mac_addr(0, "F" + mac_address[1:], add=True)
-                self.verify(False, "Mac address limit exceeded.")
+                fail("Mac address limit exceeded.")
             except InteractiveCommandExecutionError:
                 pass

diff --git a/dts/tests/TestSuite_mtu.py b/dts/tests/TestSuite_mtu.py
index 8a14c791f7..8355495d33 100644
--- a/dts/tests/TestSuite_mtu.py
+++ b/dts/tests/TestSuite_mtu.py
@@ -21,6 +21,8 @@
     NicCapability,
     requires_nic_capability,
 )
+from api.packet import send_packet_and_capture
+from api.test import verify
 from api.testpmd import TestPmd
 from framework.test_suite import TestSuite, func_test

@@ -74,7 +76,7 @@ def _send_packet_and_verify(self, pkt_size: int, should_receive: bool) -> None:
         padding = pkt_size - IP_HEADER_LEN
         # Insert '    ' as placeholder 'CRC' error correction.
         packet = Ether() / Raw(load="    ") / IP(len=pkt_size) / Raw(load="X" * padding)
-        received_packets = self.send_packet_and_capture(packet)
+        received_packets = send_packet_and_capture(packet)
         found = any(
             ("X" * padding) in str(packets.load)
             for packets in received_packets
@@ -82,9 +84,9 @@ def _send_packet_and_verify(self, pkt_size: int, should_receive: bool) -> None:
         )

         if should_receive:
-            self.verify(found, "Did not receive packet.")
+            verify(found, "Did not receive packet.")
         else:
-            self.verify(not found, "Received packet.")
+            verify(not found, "Received packet.")

     def _assess_mtu_boundary(self, testpmd_shell: TestPmd, mtu: int) -> None:
         """Sets the new MTU and verifies packets at the set boundary.
@@ -118,7 +120,7 @@ def _assess_mtu_boundary(self, testpmd_shell: TestPmd, mtu: int) -> None:
         self._send_packet_and_verify(pkt_size=equal_frame_size, should_receive=True)

         current_mtu = testpmd_shell.show_port_info(0).mtu
-        self.verify(current_mtu is not None, "Error grabbing testpmd MTU value.")
+        verify(current_mtu is not None, "Error grabbing testpmd MTU value.")
         if current_mtu and (
             current_mtu >= STANDARD_MTU + VENDOR_AGNOSTIC_PADDING and mtu == STANDARD_MTU
         ):
diff --git a/dts/tests/TestSuite_packet_capture.py b/dts/tests/TestSuite_packet_capture.py
index dcd947043a..4bd15e2401 100644
--- a/dts/tests/TestSuite_packet_capture.py
+++ b/dts/tests/TestSuite_packet_capture.py
@@ -30,6 +30,12 @@
     LinkTopology,
     requires_link_topology,
 )
+from api.packet import (
+    get_expected_packets,
+    match_all_packets,
+    send_packets_and_capture,
+)
+from api.test import verify
 from api.testpmd import TestPmd
 from framework.params import Params
 from framework.remote_session.blocking_app import BlockingApp
@@ -139,7 +145,7 @@ def _send_and_dump(
                 )
             )

-        received_packets = self.send_packets_and_capture(
+        received_packets = send_packets_and_capture(
             self.packets, PacketFilteringConfig(no_lldp=False)
         )

@@ -166,18 +172,18 @@ def dumpcap(self) -> None:
             testpmd.start()
             received_packets = self._send_and_dump()

-            expected_packets = self.get_expected_packets(self.packets, sent_from_tg=True)
+            expected_packets = get_expected_packets(self.packets, sent_from_tg=True)
             with self.rx_pcap.open() as fd:
                 rx_pcap_packets = list(rdpcap(fd))
-                self.verify(
-                    self.match_all_packets(expected_packets, rx_pcap_packets, verify=False),
+                verify(
+                    match_all_packets(expected_packets, rx_pcap_packets, verify=False),
                     "Rx packets from dumpcap weren't the same as the expected packets.",
                 )

             with self.tx_pcap.open() as fd:
                 tx_pcap_packets = list(rdpcap(fd))
-                self.verify(
-                    self.match_all_packets(tx_pcap_packets, received_packets, verify=False),
+                verify(
+                    match_all_packets(tx_pcap_packets, received_packets, verify=False),
                     "Tx packets from dumpcap weren't the same as the packets received by Scapy.",
                 )

@@ -198,14 +204,14 @@ def dumpcap_filter(self) -> None:
             self._send_and_dump("tcp", rx_only=True)
             filtered_packets = [
                 raw(p)
-                for p in self.get_expected_packets(self.packets, sent_from_tg=True)
+                for p in get_expected_packets(self.packets, sent_from_tg=True)
                 if not p.haslayer(TCP)
             ]

             with self.rx_pcap.open() as fd:
                 rx_pcap_packets = [raw(p) for p in rdpcap(fd)]
                 for filtered_packet in filtered_packets:
-                    self.verify(
+                    verify(
                         filtered_packet not in rx_pcap_packets,
                         "Found a packet in the pcap that was meant to be filtered out.",
                     )
diff --git a/dts/tests/TestSuite_pmd_buffer_scatter.py b/dts/tests/TestSuite_pmd_buffer_scatter.py
index 06d2e5f7e5..b49fba4cfc 100644
--- a/dts/tests/TestSuite_pmd_buffer_scatter.py
+++ b/dts/tests/TestSuite_pmd_buffer_scatter.py
@@ -26,6 +26,8 @@
     NicCapability,
     requires_nic_capability,
 )
+from api.packet import send_packet_and_capture
+from api.test import verify
 from api.testpmd import TestPmd
 from api.testpmd.config import SimpleForwardingModes
 from framework.test_suite import TestSuite, func_test
@@ -87,14 +89,14 @@ def _scatter_pktgen_send_packet(self, pkt_size: int) -> list[Packet]:
         # pack the payload
         for X_in_hex in payload:
             packet.load += struct.pack("=B", int("%s%s" % (X_in_hex[0], X_in_hex[1]), 16))
-        received_packets = self.send_packet_and_capture(packet)
+        received_packets = send_packet_and_capture(packet)
         # filter down the list to packets that have the appropriate structure
         received_packets = [p for p in received_packets if Ether in p and IP in p and Raw in p]

-        self.verify(len(received_packets) > 0, "Did not receive any packets.")
+        verify(len(received_packets) > 0, "Did not receive any packets.")

         layer2 = received_packets[0].getlayer(2)
-        self.verify(layer2 is not None, "The received packet is invalid.")
+        verify(layer2 is not None, "The received packet is invalid.")
         assert layer2 is not None

         return received_packets
@@ -126,7 +128,7 @@ def _pmd_scatter(self, mb_size: int, enable_offload: bool = False) -> None:
             for offset in [-1, 0, 1, 4, 5]:
                 recv_packets = self._scatter_pktgen_send_packet(mb_size + offset)
                 self._logger.debug(f"Relevant captured packets: \n{recv_packets}")
-                self.verify(
+                verify(
                     any(" ".join(["58"] * 8) in hexstr(pkt, onlyhex=1) for pkt in recv_packets),
                     "Payload of scattered packet did not match expected payload with offset "
                     f"{offset}.",
diff --git a/dts/tests/TestSuite_port_control.py b/dts/tests/TestSuite_port_control.py
index df11df4d39..d9cc5ff4c9 100644
--- a/dts/tests/TestSuite_port_control.py
+++ b/dts/tests/TestSuite_port_control.py
@@ -18,6 +18,8 @@
     requires_link_topology,
     requires_nic_capability,
 )
+from api.packet import send_packets_and_capture
+from api.test import verify
 from api.testpmd import TestPmd
 from api.testpmd.config import SimpleForwardingModes
 from framework.test_suite import TestSuite, func_test
@@ -40,8 +42,8 @@ def _send_packets_and_verify(self) -> None:
         send_p = Ether() / Raw(payload.encode("utf-8"))
         recv_pakts: list[Packet] = []
         for _ in range(int(num_pakts / 25)):
-            recv_pakts += self.send_packets_and_capture([send_p] * 25)
-        recv_pakts += self.send_packets_and_capture([send_p] * (num_pakts % 25))
+            recv_pakts += send_packets_and_capture([send_p] * 25)
+        recv_pakts += send_packets_and_capture([send_p] * (num_pakts % 25))
         recv_pakts = [
             p
             for p in recv_pakts
@@ -51,7 +53,7 @@ def _send_packets_and_verify(self) -> None:
                 hasattr(p, "load") and p.load.decode("utf-8").replace("\x00", "") == payload
             )
         ]
-        self.verify(
+        verify(
             len(recv_pakts) == num_pakts,
             f"Received {len(recv_pakts)} packets when {num_pakts} were expected.",
         )
@@ -89,7 +91,7 @@ def stop_ports(self) -> None:
         """
         with TestPmd(forward_mode=SimpleForwardingModes.mac) as testpmd:
             testpmd.stop_all_ports()
-            self.verify(
+            verify(
                 all(not p.is_link_up for p in testpmd.show_port_info_all()),
                 "Failed to stop all ports.",
             )
@@ -108,6 +110,4 @@ def close_ports(self) -> None:
         """
         with TestPmd() as testpmd:
             testpmd.close_all_ports()
-            self.verify(
-                len(testpmd.show_port_info_all()) == 0, "Failed to close all ports in testpmd."
-            )
+            verify(len(testpmd.show_port_info_all()) == 0, "Failed to close all ports in testpmd.")
diff --git a/dts/tests/TestSuite_port_restart_config_persistency.py b/dts/tests/TestSuite_port_restart_config_persistency.py
index 7666c9ea7a..4ea22b6d70 100644
--- a/dts/tests/TestSuite_port_restart_config_persistency.py
+++ b/dts/tests/TestSuite_port_restart_config_persistency.py
@@ -13,6 +13,7 @@
     NicCapability,
     requires_nic_capability,
 )
+from api.test import verify
 from api.testpmd import TestPmd
 from api.testpmd.types import TestPmdPortFlowCtrl
 from framework.test_suite import TestSuite, func_test
@@ -49,7 +50,7 @@ def _restart_port_and_verify(self, id: int, testpmd: TestPmd, changed_value: str
         if flow_info_after:
             all_info_after.update(asdict(flow_info_after))

-        self.verify(
+        verify(
             all_info_before == all_info_after,
             f"Port configuration for {changed_value} was not retained through port restart.",
         )
diff --git a/dts/tests/TestSuite_port_stats.py b/dts/tests/TestSuite_port_stats.py
index d0b3b33563..3dc045f847 100644
--- a/dts/tests/TestSuite_port_stats.py
+++ b/dts/tests/TestSuite_port_stats.py
@@ -23,6 +23,8 @@
     requires_link_topology,
     requires_nic_capability,
 )
+from api.packet import send_packet_and_capture
+from api.test import verify
 from api.testpmd import TestPmd
 from api.testpmd.config import SimpleForwardingModes
 from api.testpmd.types import RtePTypes, TestPmdVerbosePacket
@@ -142,7 +144,7 @@ def stats_updates(self) -> None:
             testpmd.set_verbose(3)
             testpmd.start()
             testpmd.clear_port_stats_all()
-            self.send_packet_and_capture(self._send_pkt)
+            send_packet_and_capture(self._send_pkt)
             port_stats_all, forwarding_info = testpmd.show_port_stats_all()
             verbose_information = TestPmd.extract_verbose_output(forwarding_info)

@@ -155,21 +157,21 @@ def stats_updates(self) -> None:
         recv_relevant_bytes = port_stats_all[self.recv_port].rx_bytes - rx_irr_bytes
         sent_relevant_bytes = port_stats_all[self.send_port].tx_bytes - tx_irr_bytes

-        self.verify(
+        verify(
             recv_relevant_packets == 1,
             f"Port {self.recv_port} received {recv_relevant_packets} packets but expected to only "
             "receive 1.",
         )
-        self.verify(
+        verify(
             recv_relevant_bytes == self.total_packet_len,
             f"Number of bytes received by port {self.recv_port} did not match the amount sent.",
         )
-        self.verify(
+        verify(
             sent_relevant_packets == 1,
             f"Number was packets sent by port {self.send_port} was not equal to the number "
             f"received by port {self.recv_port}.",
         )
-        self.verify(
+        verify(
             sent_relevant_bytes == self.total_packet_len,
             f"Number of bytes sent by port {self.send_port} did not match the number of bytes "
             f"received by port {self.recv_port}.",
diff --git a/dts/tests/TestSuite_promisc_support.py b/dts/tests/TestSuite_promisc_support.py
index 089db4a3ec..a0c65dc662 100644
--- a/dts/tests/TestSuite_promisc_support.py
+++ b/dts/tests/TestSuite_promisc_support.py
@@ -15,6 +15,12 @@
     NicCapability,
     requires_nic_capability,
 )
+from api.packet import (
+    get_expected_packets,
+    match_all_packets,
+    send_packets_and_capture,
+)
+from api.test import verify
 from api.testpmd import TestPmd
 from framework.test_suite import TestSuite, func_test

@@ -48,9 +54,9 @@ def promisc_packets(self) -> None:
                 testpmd.set_promisc(port=port_id, enable=True, verify=True)
             testpmd.start()

-            received_packets = self.send_packets_and_capture(packet)
-            expected_packets = self.get_expected_packets(packet, sent_from_tg=True)
-            self.match_all_packets(expected_packets, received_packets)
+            received_packets = send_packets_and_capture(packet)
+            expected_packets = get_expected_packets(packet, sent_from_tg=True)
+            match_all_packets(expected_packets, received_packets)

             testpmd.stop()

@@ -58,9 +64,9 @@ def promisc_packets(self) -> None:
                 testpmd.set_promisc(port=port_id, enable=False, verify=True)
             testpmd.start()

-            received_packets = self.send_packets_and_capture(packet)
-            expected_packets = self.get_expected_packets(packet, sent_from_tg=True)
-            self.verify(
-                not self.match_all_packets(expected_packets, received_packets, verify=False),
+            received_packets = send_packets_and_capture(packet)
+            expected_packets = get_expected_packets(packet, sent_from_tg=True)
+            verify(
+                not match_all_packets(expected_packets, received_packets, verify=False),
                 "Invalid packet wasn't filtered out.",
             )
diff --git a/dts/tests/TestSuite_queue_start_stop.py b/dts/tests/TestSuite_queue_start_stop.py
index e0dd39fd96..e9048d4245 100644
--- a/dts/tests/TestSuite_queue_start_stop.py
+++ b/dts/tests/TestSuite_queue_start_stop.py
@@ -22,6 +22,8 @@
     requires_link_topology,
     requires_nic_capability,
 )
+from api.packet import send_packet_and_capture
+from api.test import verify
 from api.testpmd import TestPmd
 from api.testpmd.config import SimpleForwardingModes
 from framework.test_suite import TestSuite, func_test
@@ -49,11 +51,11 @@ def _send_packet_and_verify(self, should_receive: bool = True) -> None:
             should_receive: Indicate whether the packet should be received.
         """
         packet = Ether() / IP() / Raw(load="xxxxx")
-        received = self.send_packet_and_capture(packet)
+        received = send_packet_and_capture(packet)
         contains_packet = any(
             packet.haslayer(Raw) and b"xxxxx" in packet.load for packet in received
         )
-        self.verify(
+        verify(
             should_receive == contains_packet,
             f"Packet was {'dropped' if should_receive else 'received'}",
         )
diff --git a/dts/tests/TestSuite_rte_flow.py b/dts/tests/TestSuite_rte_flow.py
index 79b16973eb..207cbce2d3 100644
--- a/dts/tests/TestSuite_rte_flow.py
+++ b/dts/tests/TestSuite_rte_flow.py
@@ -23,6 +23,8 @@
     NicCapability,
     requires_nic_capability,
 )
+from api.packet import send_packet_and_capture
+from api.test import fail, log, verify, verify_else_skip
 from api.testpmd import TestPmd
 from api.testpmd.types import FlowRule
 from framework.exception import InteractiveCommandExecutionError
@@ -91,13 +93,13 @@ def zip_lists(
         with TestPmd(rx_queues=4, tx_queues=4) as testpmd:
             for flow, packet, expected_packet in zip_lists(flows, packets, expected_packets):
                 is_valid = testpmd.flow_validate(flow_rule=flow, port_id=port_id)
-                self.verify_else_skip(is_valid, "flow rule failed validation.")
+                verify_else_skip(is_valid, "flow rule failed validation.")

                 try:
                     flow_id = testpmd.flow_create(flow_rule=flow, port_id=port_id)
                 except InteractiveCommandExecutionError:
-                    self.log("Flow rule validation passed, but flow creation failed.")
-                    self.verify(False, "Failed flow creation")
+                    log("Flow rule validation passed, but flow creation failed.")
+                    fail("Failed flow creation")

                 if verification_method == self._send_packet_and_verify:
                     verification_method(packet=packet, *args, **kwargs)
@@ -119,11 +121,11 @@ def _send_packet_and_verify(self, packet: Packet, should_receive: bool = True) -
             packet: Scapy packet to send and verify.
             should_receive: Indicate whether the packet should be received.
         """
-        received = self.send_packet_and_capture(packet)
+        received = send_packet_and_capture(packet)
         contains_packet = any(
             packet.haslayer(Raw) and b"xxxxx" in packet.load for packet in received
         )
-        self.verify(
+        verify(
             should_receive == contains_packet,
             f"Packet was {'dropped' if should_receive else 'received'}",
         )
@@ -140,13 +142,13 @@ def _send_packet_and_verify_queue(
         """
         testpmd.set_verbose(level=8)
         testpmd.start()
-        self.send_packet_and_capture(packet=packet)
+        send_packet_and_capture(packet=packet)
         verbose_output = testpmd.extract_verbose_output(testpmd.stop())
         received = False
         for testpmd_packet in verbose_output:
             if testpmd_packet.queue_id == test_queue:
                 received = True
-        self.verify(received, f"Expected packet was not received on queue {test_queue}")
+        verify(received, f"Expected packet was not received on queue {test_queue}")

     def _send_packet_and_verify_modification(self, packet: Packet, expected_packet: Packet) -> None:
         """Send packet and verify the expected modifications are present upon reception.
@@ -155,15 +157,15 @@ def _send_packet_and_verify_modification(self, packet: Packet, expected_packet:
             packet: Scapy packet to send to the SUT.
             expected_packet: Scapy packet that should match the received packet.
         """
-        received = self.send_packet_and_capture(packet)
+        received = send_packet_and_capture(packet)

         # verify reception
-        self.verify(received != [], "Packet was never received.")
+        verify(received != [], "Packet was never received.")

-        self.log(f"SENT PACKET:     {packet.summary()}")
-        self.log(f"EXPECTED PACKET: {expected_packet.summary()}")
+        log(f"SENT PACKET:     {packet.summary()}")
+        log(f"EXPECTED PACKET: {expected_packet.summary()}")
         for packet in received:
-            self.log(f"RECEIVED PACKET: {packet.summary()}")
+            log(f"RECEIVED PACKET: {packet.summary()}")

         expected_ip_dst = expected_packet[IP].dst if IP in expected_packet else None
         received_ip_dst = received[IP].dst if IP in received else None
@@ -173,13 +175,13 @@ def _send_packet_and_verify_modification(self, packet: Packet, expected_packet:

         # verify modification
         if expected_ip_dst is not None:
-            self.verify(
+            verify(
                 received_ip_dst == expected_ip_dst,
                 f"IPv4 dst mismatch: expected {expected_ip_dst}, got {received_ip_dst}",
             )

         if expected_mac_dst is not None:
-            self.verify(
+            verify(
                 received_mac_dst == expected_mac_dst,
                 f"MAC dst mismatch: expected {expected_mac_dst}, got {received_mac_dst}",
             )
@@ -202,23 +204,23 @@ def _send_packet_and_verify_jump(
         testpmd.set_verbose(level=8)
         for flow in flow_rules:
             is_valid = testpmd.flow_validate(flow_rule=flow, port_id=0)
-            self.verify_else_skip(is_valid, "flow rule failed validation.")
+            verify_else_skip(is_valid, "flow rule failed validation.")

             try:
                 testpmd.flow_create(flow_rule=flow, port_id=0)
             except InteractiveCommandExecutionError:
-                self.log("Flow validation passed, but flow creation failed.")
-                self.verify(False, "Failed flow creation")
+                log("Flow validation passed, but flow creation failed.")
+                fail("Failed flow creation")

         for packet, test_queue in zip(packets, test_queues):
             testpmd.start()
-            self.send_packet_and_capture(packet=packet)
+            send_packet_and_capture(packet=packet)
             verbose_output = testpmd.extract_verbose_output(testpmd.stop())
             received = False
             for testpmd_packet in verbose_output:
                 if testpmd_packet.queue_id == test_queue:
                     received = True
-            self.verify(received, f"Expected packet was not received on queue {test_queue}")
+            verify(received, f"Expected packet was not received on queue {test_queue}")

     @func_test
     def queue_action_ETH(self) -> None:
@@ -439,8 +441,8 @@ def drop_action_ETH(self) -> None:
         packet = Ether() / IP() / Raw(load="xxxxx")
         with TestPmd() as testpmd:
             testpmd.start()
-            received = self.send_packet_and_capture(packet)
-            self.verify(received != [], "Test packet was never received.")
+            received = send_packet_and_capture(packet)
+            verify(received != [], "Test packet was never received.")
         self._runner(
             verification_method=self._send_packet_and_verify,
             flows=flow_list,
@@ -494,8 +496,8 @@ def drop_action_IP(self) -> None:
         packet = Ether() / IP() / Raw(load="xxxxx")
         with TestPmd() as testpmd:
             testpmd.start()
-            received = self.send_packet_and_capture(packet)
-            self.verify(received != [], "Test packet was never received.")
+            received = send_packet_and_capture(packet)
+            verify(received != [], "Test packet was never received.")
         self._runner(
             verification_method=self._send_packet_and_verify,
             flows=flow_list,
@@ -545,8 +547,8 @@ def drop_action_L4(self) -> None:
         packet = Ether() / IP() / Raw(load="xxxxx")
         with TestPmd() as testpmd:
             testpmd.start()
-            received = self.send_packet_and_capture(packet)
-            self.verify(received != [], "Test packet was never received.")
+            received = send_packet_and_capture(packet)
+            verify(received != [], "Test packet was never received.")
         self._runner(
             verification_method=self._send_packet_and_verify,
             flows=flow_list,
@@ -584,8 +586,8 @@ def drop_action_VLAN(self) -> None:
         packet = Ether() / IP() / Raw(load="xxxxx")
         with TestPmd() as testpmd:
             testpmd.start()
-            received = self.send_packet_and_capture(packet)
-            self.verify(received != [], "Test packet was never received.")
+            received = send_packet_and_capture(packet)
+            verify(received != [], "Test packet was never received.")
         self._runner(
             verification_method=self._send_packet_and_verify,
             flows=flow_list,
@@ -674,8 +676,8 @@ def egress_rules(self) -> None:
         packet = Ether() / IP() / Raw(load="xxxxx")
         with TestPmd() as testpmd:
             testpmd.start()
-            received = self.send_packet_and_capture(packet)
-            self.verify(received != [], "Test packet was never received.")
+            received = send_packet_and_capture(packet)
+            verify(received != [], "Test packet was never received.")
         self._runner(
             verification_method=self._send_packet_and_verify,
             flows=flow_list,
@@ -779,17 +781,17 @@ def priority_attribute(self) -> None:
             testpmd.set_verbose(level=8)
             for flow, expected_queue in zip(flow_list, expected_queue_list):
                 is_valid = testpmd.flow_validate(flow_rule=flow, port_id=0)
-                self.verify_else_skip(is_valid, "flow rule failed validation.")
+                verify_else_skip(is_valid, "flow rule failed validation.")
                 try:
                     testpmd.flow_create(flow_rule=flow, port_id=0)
                 except InteractiveCommandExecutionError:
-                    self.log("Flow rule validation passed, but flow creation failed.")
-                    self.verify(False, "Failed flow creation")
+                    log("Flow rule validation passed, but flow creation failed.")
+                    fail("Failed flow creation")
                 testpmd.start()
-                self.send_packet_and_capture(test_packet)
+                send_packet_and_capture(test_packet)
                 verbose_output = testpmd.extract_verbose_output(testpmd.stop())
                 received = False
                 for testpmd_packet in verbose_output:
                     if testpmd_packet.queue_id == expected_queue:
                         received = True
-                self.verify(received, f"Packet was not received on queue {expected_queue}")
+                verify(received, f"Packet was not received on queue {expected_queue}")
diff --git a/dts/tests/TestSuite_smoke_tests.py b/dts/tests/TestSuite_smoke_tests.py
index 4211f5d97d..271ad4301c 100644
--- a/dts/tests/TestSuite_smoke_tests.py
+++ b/dts/tests/TestSuite_smoke_tests.py
@@ -18,6 +18,7 @@
     LinkTopology,
     requires_link_topology,
 )
+from api.test import verify
 from api.testpmd import TestPmd
 from framework.config.node import PortConfig
 from framework.settings import SETTINGS
@@ -119,7 +120,7 @@ def devices_listed_in_testpmd(self) -> None:
         with TestPmd() as testpmd:
             dev_list = [str(x) for x in testpmd.get_devices()]
         for nic in self.nics_in_node:
-            self.verify(
+            verify(
                 nic.pci in dev_list,
                 f"Device {nic.pci} was not listed in testpmd's available devices, "
                 "please check your configuration",
@@ -156,13 +157,13 @@ def device_bound_to_driver(self) -> None:
                 rf"{nic.pci}.*drv=(\S+) [^\\n]*",
                 all_nics_in_dpdk_devbind,
             )
-            self.verify(
+            verify(
                 devbind_info_for_nic is not None,
                 f"Failed to find configured device ({nic.pci}) using dpdk-devbind.py",
             )
             # We know this isn't None, but mypy doesn't
             assert devbind_info_for_nic is not None
-            self.verify(
+            verify(
                 devbind_info_for_nic.group(1) == nic.os_driver_for_dpdk,
                 f"Driver for device {nic.pci} does not match driver listed in "
                 f"configuration (bound to {devbind_info_for_nic.group(1)})",
diff --git a/dts/tests/TestSuite_softnic.py b/dts/tests/TestSuite_softnic.py
index 44cd066998..fa91f7ee2f 100644
--- a/dts/tests/TestSuite_softnic.py
+++ b/dts/tests/TestSuite_softnic.py
@@ -13,6 +13,11 @@
     requires_link_topology,
     requires_nic_capability,
 )
+from api.packet import (
+    get_expected_packets,
+    match_all_packets,
+    send_packets_and_capture,
+)
 from api.testpmd import TestPmd
 from api.testpmd.config import EthPeer
 from framework.test_suite import TestSuite, func_test
@@ -96,9 +101,9 @@ def softnic(self) -> None:
             port_topology=None,
         ) as shell:
             shell.start()
-            received_packets = self.send_packets_and_capture(self.packets)
+            received_packets = send_packets_and_capture(self.packets)
             # packets are being forwarded without addresses being amended so
             # we get the address as it would be expected to come from TG
-            expected_packets = self.get_expected_packets(self.packets, sent_from_tg=True)
+            expected_packets = get_expected_packets(self.packets, sent_from_tg=True)

-            self.match_all_packets(expected_packets, received_packets)
+            match_all_packets(expected_packets, received_packets)
diff --git a/dts/tests/TestSuite_uni_pkt.py b/dts/tests/TestSuite_uni_pkt.py
index d258f95d24..97d61cd03a 100644
--- a/dts/tests/TestSuite_uni_pkt.py
+++ b/dts/tests/TestSuite_uni_pkt.py
@@ -25,6 +25,8 @@
     requires_link_topology,
     requires_nic_capability,
 )
+from api.packet import send_packet_and_capture
+from api.test import verify
 from api.testpmd import TestPmd
 from api.testpmd.config import SimpleForwardingModes
 from api.testpmd.types import RtePTypes, TestPmdVerbosePacket
@@ -55,10 +57,10 @@ def _send_packet_and_verify_flags(
         self, expected_flag: RtePTypes, packet: Packet, testpmd: TestPmd
     ) -> None:
         """Sends a packet to the DUT and verifies the verbose ptype flags."""
-        self.send_packet_and_capture(packet=packet)
+        send_packet_and_capture(packet=packet)
         verbose_output = testpmd.extract_verbose_output(testpmd.stop())
         valid = self._check_for_matching_packet(output=verbose_output, flags=expected_flag)
-        self.verify(valid, f"Packet type flag did not match the expected flag: {expected_flag}.")
+        verify(valid, f"Packet type flag did not match the expected flag: {expected_flag}.")

     def _setup_session(
         self, testpmd: TestPmd, expected_flags: list[RtePTypes], packet_list=list[Packet]
diff --git a/dts/tests/TestSuite_vlan.py b/dts/tests/TestSuite_vlan.py
index 6c1b181c74..0ef63562b6 100644
--- a/dts/tests/TestSuite_vlan.py
+++ b/dts/tests/TestSuite_vlan.py
@@ -21,6 +21,8 @@
     requires_link_topology,
     requires_nic_capability,
 )
+from api.packet import send_packet_and_capture
+from api.test import verify
 from api.testpmd import TestPmd
 from api.testpmd.config import SimpleForwardingModes
 from framework.test_suite import TestSuite, func_test
@@ -55,30 +57,30 @@ def _send_vlan_packet_and_verify(self, should_receive: bool, strip: bool, vlan_i
             vlan_id: Expected VLAN ID.
         """
         packet = Ether() / Dot1Q(vlan=vlan_id) / Raw(load="xxxxx")
-        received_packets = self.send_packet_and_capture(packet)
+        received_packets = send_packet_and_capture(packet)
         test_packet = None
         for packet in received_packets:
             if hasattr(packet, "load") and b"xxxxx" in packet.load:
                 test_packet = packet
                 break
         if should_receive:
-            self.verify(
+            verify(
                 test_packet is not None,
                 "Packet was dropped when it should have been received",
             )
             if test_packet is not None:
                 if strip:
-                    self.verify(
+                    verify(
                         not test_packet.haslayer(Dot1Q),
                         "VLAN tag was not stripped successfully",
                     )
                 else:
-                    self.verify(
+                    verify(
                         test_packet.vlan == vlan_id,
                         "The received tag did not match the expected tag",
                     )
         else:
-            self.verify(
+            verify(
                 test_packet is None,
                 "Packet was received when it should have been dropped",
             )
@@ -90,22 +92,22 @@ def _send_packet_and_verify_insertion(self, expected_id: int) -> None:
             expected_id: The VLAN id that is being inserted through tx_offload configuration.
         """
         packet = Ether() / Raw(load="xxxxx")
-        received_packets = self.send_packet_and_capture(packet)
+        received_packets = send_packet_and_capture(packet)
         test_packet = None
         for packet in received_packets:
             if hasattr(packet, "load") and b"xxxxx" in packet.load:
                 test_packet = packet
                 break
-        self.verify(
+        verify(
             test_packet is not None,
             "Packet was dropped when it should have been received",
         )
         if test_packet is not None:
-            self.verify(
+            verify(
                 test_packet.haslayer(Dot1Q) == 1,
                 "The received packet did not have a VLAN tag",
             )
-            self.verify(
+            verify(
                 test_packet.vlan == expected_id,
                 "The received tag did not match the expected tag",
             )
--
2.39.5


^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2025-09-23 10:38 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2025-09-23 10:37 [PATCH v1 0/2] Move TestSuite methods to API modules Paul Szczepanek
2025-09-23 10:37 ` [PATCH v1 1/2] dts: add packet handling and test utilities to API Paul Szczepanek
2025-09-23 10:37 ` [PATCH v1 2/2] dts: adjust all tests to use the new API calls Paul Szczepanek

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).