* [PATCH v1] dts: refactor flow suite with generator pattern
@ 2026-01-06 21:38 Dean Marx
0 siblings, 0 replies; only message in thread
From: Dean Marx @ 2026-01-06 21:38 UTC (permalink / raw)
To: probb, luca.vizzarro, yoan.picchi, Honnappa.Nagarahalli, paul.szczepanek
Cc: dev, Dean Marx
Refactor current flow test suite to utilize a generator
method, which dynamically creates flow rules from a
dictionary of pattern and action fields during runtime.
This allows for much more extensive testing of the flow
API without exponential code growth.
Signed-off-by: Dean Marx <dmarx@iol.unh.edu>
---
dts/tests/TestSuite_rte_flow.py | 1435 ++++++++++++++++---------------
1 file changed, 764 insertions(+), 671 deletions(-)
diff --git a/dts/tests/TestSuite_rte_flow.py b/dts/tests/TestSuite_rte_flow.py
index 207cbce2d3..24d50e603c 100644
--- a/dts/tests/TestSuite_rte_flow.py
+++ b/dts/tests/TestSuite_rte_flow.py
@@ -10,223 +10,657 @@
"""
-from collections.abc import Callable
-from itertools import zip_longest
-from typing import Any, Iterator, cast
+from dataclasses import dataclass, field
+from itertools import product
+from typing import Any, Callable, cast
-from scapy.layers.inet import IP, TCP, UDP
+from scapy.layers.inet import ICMP, IP, TCP, UDP
from scapy.layers.inet6 import IPv6
-from scapy.layers.l2 import Dot1Q, Ether
+from scapy.layers.l2 import ARP, Dot1Q, Ether
+from scapy.layers.sctp import SCTP
from scapy.packet import Packet, Raw
-from api.capabilities import (
- NicCapability,
- requires_nic_capability,
-)
+from api.capabilities import NicCapability, requires_nic_capability
from api.packet import send_packet_and_capture
-from api.test import fail, log, verify, verify_else_skip
+from api.test import fail, log, verify
from api.testpmd import TestPmd
from api.testpmd.types import FlowRule
-from framework.exception import InteractiveCommandExecutionError
+from framework.exception import InteractiveCommandExecutionError, SkippedTestException
from framework.test_suite import TestSuite, func_test
-@requires_nic_capability(NicCapability.FLOW_CTRL)
-class TestRteFlow(TestSuite):
- """RTE Flow test suite.
+@dataclass
+class PatternField:
+ """Specification for a single matchable field within a protocol layer."""
+
+ scapy_field: str
+ pattern_field: str
+ test_values: list[Any]
+
+
+@dataclass
+class Layer:
+ """Complete specification for a protocol layer."""
+
+ name: str
+ scapy_class: type
+ pattern_name: str
+ fields: list[PatternField]
+ requires: list[str] = field(default_factory=list)
+
+ def build_scapy_layer(self, field_values: dict[str, Any]) -> Packet:
+ """Construct a Scapy layer with the given field values."""
+ return self.scapy_class(**field_values)
+
+
+@dataclass
+class Action:
+ """Specification for a flow action."""
+
+ name: str
+ action_format: str
+ verification_type: str
+ param_builder: Callable[[Any], dict[str, Any]]
+ expected_packet_builder: Callable[[Packet], Packet] | None = None
+
+ def build_action_string(self, value: Any = None) -> str:
+ """Generate the action string for a flow rule."""
+ if value is not None and "{value}" in self.action_format:
+ return self.action_format.format(value=value)
+ return self.action_format
+
+ def build_verification_params(self, value: Any = None) -> dict[str, Any]:
+ """Generate verification parameters for this action."""
+ return self.param_builder(value)
+
+ def build_expected_packet(self, original_packet: Packet) -> Packet | None:
+ """Build expected packet for modification actions."""
+ if self.expected_packet_builder:
+ return self.expected_packet_builder(original_packet)
+ return None
+
+
+@dataclass
+class FlowTestCase:
+ """A complete test case ready for execution."""
+
+ flow_rule: FlowRule
+ packet: Packet
+ verification_type: str
+ verification_params: dict[str, Any]
+ description: str = ""
+ expected_packet: Packet | None = None
+
+
+@dataclass
+class FlowTestResult:
+ """Result of a single test case execution."""
+
+ description: str
+ passed: bool
+ failure_reason: str = ""
+ flow_rule_pattern: str = ""
+ skipped: bool = False
+ sent_packet: Packet | None = None
+
+
+LAYERS: dict[str, Layer] = {
+ "eth": Layer(
+ name="eth",
+ scapy_class=Ether,
+ pattern_name="eth",
+ fields=[
+ PatternField("src", "src", ["02:00:00:00:00:00"]),
+ PatternField("dst", "dst", ["02:00:00:00:00:02"]),
+ ],
+ ),
+ "ipv4": Layer(
+ name="ipv4",
+ scapy_class=IP,
+ pattern_name="ipv4",
+ fields=[
+ PatternField("src", "src", ["192.168.1.1"]),
+ PatternField("dst", "dst", ["192.168.1.2"]),
+ PatternField("ttl", "ttl", [64, 128]),
+ PatternField("tos", "tos", [0, 4]),
+ ],
+ requires=["eth"],
+ ),
+ "ipv6": Layer(
+ name="ipv6",
+ scapy_class=IPv6,
+ pattern_name="ipv6",
+ fields=[
+ PatternField("src", "src", ["2001:db8::1"]),
+ PatternField("dst", "dst", ["2001:db8::2"]),
+ PatternField("tc", "tc", [0, 4]),
+ PatternField("hlim", "hop", [64, 128]),
+ ],
+ requires=["eth"],
+ ),
+ "tcp": Layer(
+ name="tcp",
+ scapy_class=TCP,
+ pattern_name="tcp",
+ fields=[
+ PatternField("sport", "src", [1234, 8080]),
+ PatternField("dport", "dst", [80, 443]),
+ PatternField("flags", "flags", [2, 16]),
+ ],
+ requires=["eth", "ipv4"],
+ ),
+ "udp": Layer(
+ name="udp",
+ scapy_class=UDP,
+ pattern_name="udp",
+ fields=[
+ PatternField("sport", "src", [5000]),
+ PatternField("dport", "dst", [53, 123]),
+ ],
+ requires=["eth", "ipv4"],
+ ),
+ "vlan": Layer(
+ name="vlan",
+ scapy_class=Dot1Q,
+ pattern_name="vlan",
+ fields=[
+ PatternField("vlan", "vid", [100, 200]),
+ PatternField("prio", "pcp", [0, 7]),
+ ],
+ requires=["eth"],
+ ),
+ "icmp": Layer(
+ name="icmp",
+ scapy_class=ICMP,
+ pattern_name="icmp",
+ fields=[
+ PatternField("type", "type", [8, 0]),
+ PatternField("code", "code", [0]),
+ PatternField("id", "ident", [0, 1234]),
+ PatternField("seq", "seq", [0, 1]),
+ ],
+ requires=["eth", "ipv4"],
+ ),
+ "sctp": Layer(
+ name="sctp",
+ scapy_class=SCTP,
+ pattern_name="sctp",
+ fields=[
+ PatternField("sport", "src", [2905, 3868]),
+ PatternField("dport", "dst", [2905, 3868]),
+ PatternField("tag", "tag", [1, 12346]),
+ ],
+ requires=["eth", "ipv4"],
+ ),
+ "arp": Layer(
+ name="arp",
+ scapy_class=ARP,
+ pattern_name="arp_eth_ipv4",
+ fields=[
+ PatternField("psrc", "spa", ["192.168.1.1"]),
+ PatternField("pdst", "tpa", ["192.168.1.2"]),
+ PatternField("op", "opcode", [1, 2]),
+ ],
+ requires=["eth"],
+ ),
+}
+
+
+def _build_ipv4_src_to_dst_expected(packet: Packet) -> Packet:
+ """Build expected packet for IPV4 src to dst copy."""
+ expected = cast(Packet, packet.copy())
+ if IP in expected:
+ expected[IP].dst = packet[IP].src
+ return expected
+
+
+def _build_mac_src_to_dst_expected(packet: Packet) -> Packet:
+ """Build expected packet for MAC src to dst copy."""
+ expected = cast(Packet, packet.copy())
+ if Ether in expected:
+ expected[Ether].dst = packet[Ether].src
+ return expected
+
+
+ACTIONS: dict[str, Action] = {
+ "queue": Action(
+ name="queue",
+ action_format="queue index {value}",
+ verification_type="queue",
+ param_builder=lambda queue_id: {"queue_id": queue_id},
+ ),
+ "drop": Action(
+ name="drop",
+ action_format="drop",
+ verification_type="drop",
+ param_builder=lambda _: {"should_receive": False},
+ ),
+ "modify_ipv4_src_to_dst": Action(
+ name="modify_ipv4_src_to_dst",
+ action_format="modify_field op set dst_type "
+ "ipv4_dst src_type ipv4_src width 32 / queue index 0",
+ verification_type="modify",
+ param_builder=lambda _: {},
+ expected_packet_builder=_build_ipv4_src_to_dst_expected,
+ ),
+ "modify_mac_src_to_dst": Action(
+ name="modify_mac_src_to_dst",
+ action_format="modify_field op set dst_type "
+ "mac_dst src_type mac_src width 48 / queue index 0",
+ verification_type="modify",
+ param_builder=lambda _: {},
+ expected_packet_builder=_build_mac_src_to_dst_expected,
+ ),
+}
+
+LAYER_STACKS = [
+ ["eth"],
+ ["eth", "ipv4"],
+ ["eth", "ipv4", "tcp"],
+ ["eth", "ipv4", "udp"],
+ ["eth", "ipv4", "icmp"],
+ ["eth", "ipv4", "sctp"],
+ ["eth", "ipv6"],
+ ["eth", "ipv6", "tcp"],
+ ["eth", "ipv6", "udp"],
+ ["eth", "ipv6", "sctp"],
+ ["eth", "vlan"],
+ ["eth", "vlan", "ipv4"],
+ ["eth", "vlan", "ipv4", "tcp"],
+ ["eth", "vlan", "ipv4", "udp"],
+ ["eth", "vlan", "ipv4", "sctp"],
+ ["eth", "vlan", "ipv6"],
+ ["eth", "vlan", "ipv6", "tcp"],
+ ["eth", "vlan", "ipv6", "udp"],
+ ["eth", "arp"],
+]
+
+
+class FlowTestGenerator:
+ """Generates test cases by combining patterns and actions."""
+
+ def __init__(self, layers: dict[str, Layer], actions: dict[str, Action]):
+ """Initialize the generator with layer and action specifications."""
+ self.layers = layers
+ self.actions = actions
+
+ def _build_multi_layer_packet(
+ self,
+ layer_stack: list[str],
+ all_field_values: dict[str, dict[str, Any]],
+ add_payload: bool = True,
+ ) -> Packet:
+ """Build a packet from multiple protocol layers."""
+ packet: Packet = Ether()
+ prev_layer_name = None
- This suite consists of 12 test cases:
- 1. Queue Action Ethernet: Verifies queue actions with ethernet patterns
- 2. Queue Action IP: Verifies queue actions with IPv4 and IPv6 patterns
- 3. Queue Action L4: Verifies queue actions with TCP and UDP patterns
- 4. Queue Action VLAN: Verifies queue actions with VLAN patterns
- 5. Drop Action Eth: Verifies drop action with ethernet patterns
- 6. Drop Action IP: Verifies drop actions with IPV4 and IPv6 patterns
- 7. Drop Action L4: Verifies drop actions with TCP and UDP patterns
- 8. Drop Action VLAN: Verifies drop actions with VLAN patterns
- 9. Modify Field Action: Verifies packet modification patterns
- 10. Egress Rules: Verifies previously covered rules are still valid as egress
- 11. Jump Action: Verifies packet behavior given grouped flows
- 12. Priority Attribute: Verifies packet behavior given flows with different priorities
+ for layer_name in layer_stack:
+ layer_spec = self.layers[layer_name]
+ values = all_field_values.get(layer_name, {})
+ layer = layer_spec.build_scapy_layer(values)
- """
+ if layer_name == "eth":
+ packet = layer
+ else:
+ if prev_layer_name == "ipv6" and layer_name in ["tcp", "udp", "sctp"]:
+ nh_map = {"tcp": 6, "udp": 17, "sctp": 132}
+ packet[IPv6].nh = nh_map[layer_name]
+
+ packet = packet / layer
- def _runner(
+ prev_layer_name = layer_name
+
+ if add_payload:
+ packet = packet / Raw(load="X" * 32)
+
+ return packet
+
+ def generate(
self,
- verification_method: Callable[..., Any],
- flows: list[FlowRule],
- packets: list[Packet],
- port_id: int,
- expected_packets: list[Packet] | None = None,
- *args: Any,
- **kwargs: Any,
- ) -> None:
- """Runner method that validates each flow using the corresponding verification method.
+ layer_names: list[str],
+ action_name: str,
+ action_value: Any = None,
+ group_id: int = 0,
+ ) -> list[FlowTestCase]:
+ """Generate test cases for patterns matching fields across multiple layers.
+
+ This method identifies every possible combination of one field per layer.
+ For each field combination, it iterates through the available test values.
+ If fields have an unequal number of test values, it cycles through the
+ shorter lists to ensure every specific value in every field is tested.
Args:
- verification_method: Callable that performs verification logic.
- flows: List of flow rules to create and test.
- packets: List of packets corresponding to each flow.
- port_id: Number representing the port to create flows on.
- expected_packets: List of packets to check sent packets against in modification cases.
- *args: Additional positional arguments to pass to the verification method.
- **kwargs: Additional keyword arguments to pass to the verification method.
+ layer_names: List of layer names to match.
+ action_name: Name of the action to apply.
+ action_value: Optional value for parameterized actions.
+ group_id: Flow group ID.
+
+ Returns:
+ List of FlowTestCase objects ready for execution.
"""
+ action_spec = self.actions[action_name]
+
+ # Organize layers into lists of matchable fields
+ layer_field_specs = []
+ for layer_name in layer_names:
+ layer_spec = self.layers[layer_name]
+ # Capture the layer spec and the field spec for each field in the layer
+ layer_field_specs.append([(layer_spec, f) for f in layer_spec.fields])
+
+ test_cases = []
+
+ # Iterate through every combination of fields across the requested layers
+ # For ['eth', 'ipv4'], this produces: (eth_src, ipv4_src), (eth_src, ipv4_dst), etc.
+ for field_combo in product(*layer_field_specs):
+ # Determine how many test cases are needed to cover all values in this combo
+ max_vals = max(len(f_spec.test_values) for _, f_spec in field_combo)
+
+ # Cycle through the test values for these fields
+ for i in range(max_vals):
+ pattern_parts = []
+ all_field_values: dict[str, dict[str, Any]] = {}
+ desc_parts = []
+
+ for layer_spec, field_spec in field_combo:
+ # Select value by index
+ val = field_spec.test_values[i % len(field_spec.test_values)]
+
+ pattern_parts.append(
+ f"{layer_spec.pattern_name} {field_spec.pattern_field} is {val}"
+ )
+ # Store value for Scapy packet building
+ if layer_spec.name not in all_field_values:
+ all_field_values[layer_spec.name] = {}
+ all_field_values[layer_spec.name][field_spec.scapy_field] = val
+
+ desc_parts.append(f"{layer_spec.name}[{field_spec.scapy_field}={val}]")
+
+ full_pattern = " / ".join(pattern_parts)
+ flow_rule = FlowRule(
+ direction="ingress",
+ pattern=[full_pattern],
+ actions=[action_spec.build_action_string(action_value)],
+ group_id=group_id,
+ )
+
+ add_payload = action_spec.verification_type in ["drop", "modify"]
+ packet = self._build_multi_layer_packet(layer_names, all_field_values, add_payload)
+
+ expected_packet = None
+ if action_spec.verification_type == "modify":
+ expected_packet = action_spec.build_expected_packet(packet)
+
+ test_cases.append(
+ FlowTestCase(
+ flow_rule=flow_rule,
+ packet=packet,
+ verification_type=action_spec.verification_type,
+ verification_params=action_spec.build_verification_params(action_value),
+ description=" / ".join(desc_parts) + f" -> {action_spec.name}",
+ expected_packet=expected_packet,
+ )
+ )
- def zip_lists(
- rules: list[FlowRule],
- packets1: list[Packet],
- packets2: list[Packet] | None,
- ) -> Iterator[tuple[FlowRule, Packet, Packet | None]]:
- """Method that creates an iterable zip containing lists used in runner.
-
- Args:
- rules: List of flow rules.
- packets1: List of packets.
- packets2: Optional list of packets, excluded from zip if not passed to runner.
- """
- return cast(
- Iterator[tuple[FlowRule, Packet, Packet | None]],
- zip_longest(rules, packets1, packets2 or [], fillvalue=None),
- )
+ return test_cases
- with TestPmd(rx_queues=4, tx_queues=4) as testpmd:
- for flow, packet, expected_packet in zip_lists(flows, packets, expected_packets):
- is_valid = testpmd.flow_validate(flow_rule=flow, port_id=port_id)
- verify_else_skip(is_valid, "flow rule failed validation.")
- try:
- flow_id = testpmd.flow_create(flow_rule=flow, port_id=port_id)
- except InteractiveCommandExecutionError:
- log("Flow rule validation passed, but flow creation failed.")
- fail("Failed flow creation")
+@requires_nic_capability(NicCapability.FLOW_CTRL)
+class TestRteFlow(TestSuite):
+ """RTE Flow test suite.
- if verification_method == self._send_packet_and_verify:
- verification_method(packet=packet, *args, **kwargs)
+ This suite consists of 4 test cases:
+ 1. Queue Action: Verifies queue actions with multi-layer patterns
+ 2. Drop Action: Verifies drop actions with multi-layer patterns
+ 3. Modify Field Action: Verifies modify_field actions with multi-layer patterns
+ 4. Jump Action: Verifies jump action between flow groups
- elif verification_method == self._send_packet_and_verify_queue:
- verification_method(
- packet=packet, test_queue=kwargs["test_queue"], testpmd=testpmd
- )
+ """
- elif verification_method == self._send_packet_and_verify_modification:
- verification_method(packet=packet, expected_packet=expected_packet)
+ def set_up_suite(self) -> None:
+ """Initialize the test generator and result tracking."""
+ self.generator = FlowTestGenerator(LAYERS, ACTIONS)
+ self.test_suite_results: list[FlowTestResult] = []
+ self.test_case_results: list[FlowTestResult] = []
- testpmd.flow_delete(flow_id, port_id=port_id)
+ def _run_confidence_check(self, action_type: str) -> None:
+ """Verify that non-matching packets are unaffected by flow rules.
- def _send_packet_and_verify(self, packet: Packet, should_receive: bool = True) -> None:
- """Generate a packet, send to the DUT, and verify it is forwarded back.
+ Creates a flow rule for the specified action, then sends a packet that
+ should NOT match the rule to confirm:
+ - For 'drop': non-matching packets ARE received (not dropped)
+ - For 'queue': non-matching packets are NOT steered to the target queue
+ - For 'modify': non-matching packets arrive unmodified
+
+ This ensures flow rules only affect matching traffic before
+ running the actual action tests.
Args:
- packet: Scapy packet to send and verify.
- should_receive: Indicate whether the packet should be received.
+ action_type: The action being tested ('drop', 'queue', 'modify').
"""
- received = send_packet_and_capture(packet)
- contains_packet = any(
- packet.haslayer(Raw) and b"xxxxx" in packet.load for packet in received
- )
- verify(
- should_receive == contains_packet,
- f"Packet was {'dropped' if should_receive else 'received'}",
+ non_matching_packet = (
+ Ether(src="02:00:00:00:00:00", dst="02:00:00:00:00:01")
+ / IP(src="192.168.100.1", dst="192.168.100.2")
+ / UDP(sport=9999, dport=9998)
+ / Raw(load="CONFIDENCE" + "X" * 22)
)
- def _send_packet_and_verify_queue(
- self, packet: Packet, test_queue: int, testpmd: TestPmd
- ) -> None:
- """Send packet and verify queue stats show packet was received.
+ with TestPmd(rx_queues=4, tx_queues=4) as testpmd:
+ if action_type == "drop":
+ drop_rule = FlowRule(
+ direction="ingress",
+ pattern=["eth / ipv4 src is 192.168.1.1 / udp dst is 53"],
+ actions=["drop"],
+ )
+ flow_id = testpmd.flow_create(flow_rule=drop_rule, port_id=0)
- Args:
- packet: Scapy packet to send to the SUT.
- test_queue: Represents the queue the test packet is being sent to.
- testpmd: TestPmd instance being used to send test packet.
- """
- testpmd.set_verbose(level=8)
- testpmd.start()
- send_packet_and_capture(packet=packet)
+ testpmd.start()
+ received = send_packet_and_capture(non_matching_packet)
+ testpmd.stop()
+ contains_packet = any(
+ p.haslayer(Raw) and b"CONFIDENCE" in bytes(p[Raw].load) for p in received
+ )
+ testpmd.flow_delete(flow_id, port_id=0)
+ verify(
+ contains_packet,
+ "Confidence check failed: non-matching packet dropped by drop rule",
+ )
+
+ elif action_type == "queue":
+ queue_rule = FlowRule(
+ direction="ingress",
+ pattern=[
+ "eth src is aa:bb:cc:dd:ee:ff / ipv4 src is 10.255.255.254 "
+ "dst is 10.255.255.253 / udp src is 12345 dst is 54321"
+ ],
+ actions=["queue index 3"],
+ )
+ flow_id = testpmd.flow_create(flow_rule=queue_rule, port_id=0)
+
+ testpmd.set_verbose(level=8)
+ testpmd.start()
+ send_packet_and_capture(non_matching_packet)
+ verbose_output = testpmd.extract_verbose_output(testpmd.stop())
+ received_on_target = any(p.queue_id == 3 for p in verbose_output)
+ testpmd.flow_delete(flow_id, port_id=0)
+ verify(
+ not received_on_target,
+ "Confidence check failed: non-matching packet steered to queue 3",
+ )
+
+ log(f"Confidence check passed for '{action_type}' action")
+
+ def _verify_queue(self, packet: Packet, queue_id: int, testpmd: TestPmd, **kwargs: Any) -> None:
+ """Verify packet is received on the expected queue."""
+ send_packet_and_capture(packet)
verbose_output = testpmd.extract_verbose_output(testpmd.stop())
- received = False
- for testpmd_packet in verbose_output:
- if testpmd_packet.queue_id == test_queue:
- received = True
- verify(received, f"Expected packet was not received on queue {test_queue}")
+ received_on_queue = any(p.queue_id == queue_id for p in verbose_output)
+ verify(received_on_queue, f"Packet not received on queue {queue_id}")
- def _send_packet_and_verify_modification(self, packet: Packet, expected_packet: Packet) -> None:
- """Send packet and verify the expected modifications are present upon reception.
-
- Args:
- packet: Scapy packet to send to the SUT.
- expected_packet: Scapy packet that should match the received packet.
- """
+ def _verify_drop(self, packet: Packet, **kwargs: Any) -> None:
+ """Verify packet is dropped."""
received = send_packet_and_capture(packet)
+ contains_packet = any(p.haslayer(Raw) and b"XXXXX" in p.load for p in received)
+ verify(not contains_packet, "Packet was not dropped")
- # verify reception
- verify(received != [], "Packet was never received.")
-
- log(f"SENT PACKET: {packet.summary()}")
- log(f"EXPECTED PACKET: {expected_packet.summary()}")
- for packet in received:
- log(f"RECEIVED PACKET: {packet.summary()}")
+ def _verify_modify(
+ self, packet: Packet, expected_packet: Packet, testpmd: TestPmd, **kwargs: Any
+ ) -> None:
+ """Verify packet modifications."""
+ testpmd.start()
+ received = send_packet_and_capture(packet)
+ testpmd.stop()
- expected_ip_dst = expected_packet[IP].dst if IP in expected_packet else None
- received_ip_dst = received[IP].dst if IP in received else None
+ verify(
+ any(p.haslayer(Raw) and b"XXXXX" in p.load for p in received),
+ "Test packet with payload marker not found",
+ )
- expected_mac_dst = expected_packet[Ether].dst if Ether in expected_packet else None
- received_mac_dst = received[Ether].dst if Ether in received else None
+ test_packet = None
+ for pkt in received:
+ if pkt.haslayer(Raw) and b"XXXXX" in pkt.load:
+ test_packet = pkt
+ break
- # verify modification
- if expected_ip_dst is not None:
+ if IP in expected_packet and test_packet is not None:
verify(
- received_ip_dst == expected_ip_dst,
- f"IPv4 dst mismatch: expected {expected_ip_dst}, got {received_ip_dst}",
+ test_packet[IP].dst == expected_packet[IP].dst,
+ f"IPv4 dst mismatch: expected {expected_packet[IP].dst}, got {test_packet[IP].dst}",
)
- if expected_mac_dst is not None:
+ if Ether in expected_packet and test_packet is not None:
verify(
- received_mac_dst == expected_mac_dst,
- f"MAC dst mismatch: expected {expected_mac_dst}, got {received_mac_dst}",
+ test_packet[Ether].dst == expected_packet[Ether].dst,
+ f"MAC dst mismatch: expected {expected_packet[Ether].dst}, "
+ f"got {test_packet[Ether].dst}",
)
- def _send_packet_and_verify_jump(
+ def _run_tests(
self,
- packets: list[Packet],
- flow_rules: list[FlowRule],
- test_queues: list[int],
- testpmd: TestPmd,
+ test_cases: list[FlowTestCase],
+ port_id: int = 0,
) -> None:
- """Create a testpmd session with every rule in the given list, verify jump behavior.
-
- Args:
- packets: List of packets to send.
- flow_rules: List of flow rules to create in the same session.
- test_queues: List of Rx queue IDs each packet should be received on.
- testpmd: TestPmd instance to create flows on.
- """
- testpmd.set_verbose(level=8)
- for flow in flow_rules:
- is_valid = testpmd.flow_validate(flow_rule=flow, port_id=0)
- verify_else_skip(is_valid, "flow rule failed validation.")
+ """Execute a sequence of test cases."""
+ with TestPmd(rx_queues=4, tx_queues=4) as testpmd:
+ for test_case in test_cases:
+ log(f"Testing: {test_case.description}")
- try:
- testpmd.flow_create(flow_rule=flow, port_id=0)
- except InteractiveCommandExecutionError:
- log("Flow validation passed, but flow creation failed.")
- fail("Failed flow creation")
+ result = FlowTestResult(
+ description=test_case.description,
+ passed=False,
+ flow_rule_pattern=" / ".join(test_case.flow_rule.pattern),
+ sent_packet=test_case.packet,
+ )
- for packet, test_queue in zip(packets, test_queues):
- testpmd.start()
- send_packet_and_capture(packet=packet)
- verbose_output = testpmd.extract_verbose_output(testpmd.stop())
- received = False
- for testpmd_packet in verbose_output:
- if testpmd_packet.queue_id == test_queue:
- received = True
- verify(received, f"Expected packet was not received on queue {test_queue}")
+ try:
+ is_valid = testpmd.flow_validate(flow_rule=test_case.flow_rule, port_id=port_id)
+ if not is_valid:
+ result.skipped = True
+ result.failure_reason = "Flow rule failed validation"
+ self.test_suite_results.append(result)
+ self.test_case_results.append(result)
+
+ try:
+ flow_id = testpmd.flow_create(
+ flow_rule=test_case.flow_rule, port_id=port_id
+ )
+ except InteractiveCommandExecutionError:
+ result.failure_reason = "Hardware validated but failed to create flow rule"
+ self.test_suite_results.append(result)
+ self.test_case_results.append(result)
+ continue
+
+ verification_method = getattr(self, f"_verify_{test_case.verification_type}")
+
+ if test_case.verification_type == "queue":
+ testpmd.set_verbose(level=8)
+ testpmd.start()
+ verification_method(
+ packet=test_case.packet,
+ testpmd=testpmd,
+ **test_case.verification_params,
+ )
+ elif test_case.verification_type == "modify":
+ verification_method(
+ packet=test_case.packet,
+ expected_packet=test_case.expected_packet,
+ testpmd=testpmd,
+ **test_case.verification_params,
+ )
+ else:
+ verification_method(
+ packet=test_case.packet,
+ testpmd=testpmd,
+ **test_case.verification_params,
+ )
+
+ testpmd.flow_delete(flow_id, port_id=port_id)
+ result.passed = True
+ self.test_suite_results.append(result)
+ self.test_case_results.append(result)
+
+ except SkippedTestException as e:
+ result.skipped = True
+ result.failure_reason = f"Skipped: {str(e)}"
+ self.test_suite_results.append(result)
+ self.test_case_results.append(result)
+
+ def _log_test_suite_summary(self) -> None:
+ """Log a summary of all test results."""
+ if not self.test_suite_results:
+ return
+
+ passed_tests = [r for r in self.test_suite_results if r.passed]
+ skipped_tests = [r for r in self.test_suite_results if r.skipped]
+ failed_tests = [r for r in self.test_suite_results if not r.passed and not r.skipped]
+
+ log(f"Total tests run: {len(self.test_suite_results)}")
+ log(f"Passed: {len(passed_tests)}")
+ log(f"Skipped: {len(skipped_tests)}")
+ log(f"Failed: {len(failed_tests)}")
+
+ if passed_tests:
+ log("\nPASSED TESTS:")
+ for result in passed_tests:
+ log(f" {result.description}")
+ log(f" Sent Packet: {result.sent_packet}")
+
+ if skipped_tests:
+ log("\nSKIPPED TESTS:")
+ for result in skipped_tests:
+ log(f" {result.description}")
+ log(f" Pattern: {result.flow_rule_pattern}")
+ log(f" Reason: {result.failure_reason}")
+ log(f" Sent Packet: {result.sent_packet}")
+
+ if failed_tests:
+ log("\nFAILED TESTS:")
+ for result in failed_tests:
+ log(f" {result.description}")
+ log(f" Pattern: {result.flow_rule_pattern}")
+ log(f" Reason: {result.failure_reason}")
+ log(f" Sent Packet: {result.sent_packet}")
+
+ def _log_test_case_failures(self) -> None:
+ """Log each pattern that failed for a given test case."""
+ failures = [r for r in self.test_case_results if not r.passed and not r.skipped]
+
+ if failures:
+ patterns = "\n".join(f"\t - {r.flow_rule_pattern}" for r in failures)
+
+ self.test_case_results = []
+
+ fail(
+ "Flow rule passed validation but failed creation.\n"
+ "\tFailing flow rule patterns:\n"
+ f"{patterns}"
+ )
@func_test
- def queue_action_ETH(self) -> None:
- """Validate flow rules with queue actions and ethernet patterns.
+ def queue_action(self) -> None:
+ """Validate flow rules with queue actions and multi-layer patterns.
Steps:
+ * Run confidence check to verify baseline packet reception.
* Create a list of packets to test, with a corresponding flow list.
* Launch testpmd.
* Create first flow rule in flow list.
@@ -236,39 +670,22 @@ def queue_action_ETH(self) -> None:
Verify:
* Each packet is received on the appropriate queue.
"""
- packet_list = [
- Ether(src="02:00:00:00:00:00"),
- Ether(dst="02:00:00:00:00:00"),
- Ether(type=0x0800) / IP(),
- ]
- flow_list = [
- FlowRule(
- direction="ingress",
- pattern=["eth src is 02:00:00:00:00:00"],
- actions=["queue index 2"],
- ),
- FlowRule(
- direction="ingress",
- pattern=["eth dst is 02:00:00:00:00:00"],
- actions=["queue index 2"],
- ),
- FlowRule(
- direction="ingress", pattern=["eth type is 0x0800"], actions=["queue index 2"]
- ),
- ]
- self._runner(
- verification_method=self._send_packet_and_verify_queue,
- flows=flow_list,
- packets=packet_list,
- port_id=0,
- test_queue=2,
- )
+ self._run_confidence_check("queue")
+ for stack in LAYER_STACKS:
+ test_cases = self.generator.generate(
+ layer_names=stack,
+ action_name="queue",
+ action_value=2,
+ )
+ self._run_tests(test_cases)
+ self._log_test_case_failures()
@func_test
- def queue_action_IP(self) -> None:
- """Validate flow rules with queue actions and IPv4/IPv6 patterns.
+ def drop_action(self) -> None:
+ """Validate flow rules with drop actions and multi-layer patterns.
Steps:
+ * Run confidence check to verify packets are received without drop rules.
* Create a list of packets to test, with a corresponding flow list.
* Launch testpmd.
* Create first flow rule in flow list.
@@ -276,522 +693,198 @@ def queue_action_IP(self) -> None:
* Delete flow rule, repeat for all flows/packets.
Verify:
- * Each packet is received on the appropriate queue.
+ * Packet is dropped.
"""
- packet_list = [
- Ether() / IP(src="192.168.1.1"),
- Ether() / IP(dst="192.168.1.1"),
- Ether() / IP(ttl=64),
- Ether() / IPv6(src="2001:db8::1"),
- Ether() / IPv6(dst="2001:db8::2"),
- Ether() / IPv6() / UDP(),
- ]
- flow_list = [
- FlowRule(
- direction="ingress",
- pattern=["eth / ipv4 src is 192.168.1.1"],
- actions=["queue index 2"],
- ),
- FlowRule(
- direction="ingress",
- pattern=["eth / ipv4 dst is 192.168.1.1"],
- actions=["queue index 2"],
- ),
- FlowRule(
- direction="ingress", pattern=["eth / ipv4 ttl is 64"], actions=["queue index 2"]
- ),
- FlowRule(
- direction="ingress",
- pattern=["eth / ipv6 src is 2001:db8::1"],
- actions=["queue index 2"],
- ),
- FlowRule(
- direction="ingress",
- pattern=["eth / ipv6 dst is 2001:db8::2"],
- actions=["queue index 2"],
- ),
- FlowRule(
- direction="ingress", pattern=["eth / ipv6 proto is 17"], actions=["queue index 2"]
- ),
- ]
- self._runner(
- verification_method=self._send_packet_and_verify_queue,
- flows=flow_list,
- packets=packet_list,
- port_id=0,
- test_queue=2,
- )
+ self._run_confidence_check("drop")
+ for stack in LAYER_STACKS:
+ test_cases = self.generator.generate(
+ layer_names=stack,
+ action_name="drop",
+ )
+ self._run_tests(test_cases)
+ self._log_test_case_failures()
- @requires_nic_capability(NicCapability.PHYSICAL_FUNCTION)
@func_test
- def queue_action_L4(self) -> None:
- """Validate flow rules with queue actions and TCP/UDP patterns.
+ def modify_field_action(self) -> None:
+ """Validate flow rules with modify_field actions and various patterns.
Steps:
+ * Run confidence check to verify packets arrive unmodified.
* Create a list of packets to test, with a corresponding flow list.
* Launch testpmd.
* Create first flow rule in flow list.
- * Send first packet in packet list, capture verbose output.
+ * Send first packet in packet list, capture received packet.
* Delete flow rule, repeat for all flows/packets.
Verify:
- * Each packet is received on the appropriate queue.
+ * Packet is modified correctly according to the action.
"""
- packet_list = [
- Ether() / IP() / TCP(sport=1234),
- Ether() / IP() / TCP(dport=80),
- Ether() / IP() / TCP(flags=0x02),
- Ether() / IP() / UDP(sport=5000),
- Ether() / IP() / UDP(dport=53),
- ]
- flow_list = [
- FlowRule(
- direction="ingress",
- pattern=["eth / ipv4 / tcp src is 1234"],
- actions=["queue index 2"],
- ),
- FlowRule(
- direction="ingress",
- pattern=["eth / ipv4 / tcp dst is 80"],
- actions=["queue index 2"],
- ),
- FlowRule(
- direction="ingress",
- pattern=["eth / ipv4 / tcp flags is 0x02"],
- actions=["queue index 2"],
- ),
- FlowRule(
- direction="ingress",
- pattern=["eth / ipv4 / udp src is 5000"],
- actions=["queue index 2"],
- ),
- FlowRule(
- direction="ingress",
- pattern=["eth / ipv4 / udp dst is 53"],
- actions=["queue index 2"],
- ),
- ]
- self._runner(
- verification_method=self._send_packet_and_verify_queue,
- flows=flow_list,
- packets=packet_list,
- port_id=0,
- test_queue=2,
- )
-
- @func_test
- def queue_action_VLAN(self) -> None:
- """Validate flow rules with queue actions and VLAN patterns.
-
- Steps:
- * Create a list of packets to test, with a corresponding flow list.
- * Launch testpmd.
- * Create first flow rule in flow list.
- * Send first packet in packet list, capture verbose output.
- * Delete flow rule, repeat for all flows/packets.
+ self._run_confidence_check("modify")
+ for stack in [
+ ["eth", "ipv4"],
+ ["eth", "ipv4", "tcp"],
+ ["eth", "ipv4", "udp"],
+ ]:
+ test_cases = self.generator.generate(
+ layer_names=stack,
+ action_name="modify_ipv4_src_to_dst",
+ group_id=1,
+ )
+ self._run_tests(test_cases)
- Verify:
- * Each packet is received on the appropriate queue.
- """
- packet_list = [Ether() / Dot1Q(vlan=100), Ether() / Dot1Q(type=0x0800)]
- flow_list = [
- FlowRule(direction="ingress", pattern=["eth / vlan"], actions=["queue index 2"]),
- FlowRule(direction="ingress", pattern=["eth / vlan"], actions=["queue index 2"]),
- ]
- self._runner(
- verification_method=self._send_packet_and_verify_queue,
- flows=flow_list,
- packets=packet_list,
- port_id=0,
- test_queue=2,
- )
+ for stack in [["eth"], ["eth", "vlan"]]:
+ test_cases = self.generator.generate(
+ layer_names=stack,
+ action_name="modify_mac_src_to_dst",
+ group_id=1,
+ )
+ self._run_tests(test_cases)
+ self._log_test_case_failures()
@func_test
- def drop_action_ETH(self) -> None:
- """Validate flow rules with drop actions and ethernet patterns.
-
- Steps:
- * Create a list of packets to test, with a corresponding flow list.
- * Launch testpmd.
- * Create first flow rule in flow list.
- * Send first packet in packet list, capture verbose output.
- * Delete flow rule, repeat for all flows/packets.
+ def jump_action(self) -> None:
+ """Validate flow rules with jump action between groups.
- Verify:
- * Packet is dropped.
+ The jump action redirects matched packets from one flow group to another.
+ Only flow rules in group 0 are guaranteed to be matched against initially;
+ subsequent groups can only be reached via jump actions.
- One packet will be sent as a confidence check, to ensure packets are being
- received under normal circumstances.
- """
- packet_list = [
- Ether(src="02:00:00:00:00:00") / Raw(load="xxxxx"),
- Ether(dst="02:00:00:00:00:00") / Raw(load="xxxxx"),
- Ether(type=0x0800) / Raw(load="xxxxx"),
- ]
- flow_list = [
- FlowRule(
- direction="ingress", pattern=["eth src is 02:00:00:00:00:00"], actions=["drop"]
- ),
- FlowRule(
- direction="ingress", pattern=["eth dst is 02:00:00:00:00:00"], actions=["drop"]
- ),
- FlowRule(direction="ingress", pattern=["eth type is 0x0800"], actions=["drop"]),
- ]
- # verify reception with test packet
- packet = Ether() / IP() / Raw(load="xxxxx")
- with TestPmd() as testpmd:
- testpmd.start()
- received = send_packet_and_capture(packet)
- verify(received != [], "Test packet was never received.")
- self._runner(
- verification_method=self._send_packet_and_verify,
- flows=flow_list,
- packets=packet_list,
- port_id=0,
- should_receive=False,
- )
-
- @func_test
- def drop_action_IP(self) -> None:
- """Validate flow rules with drop actions and ethernet patterns.
+ This test creates a two-stage pipeline:
+ - Group 0: Match on Ethernet src, jump to group 1
+ - Group 1: Match on IPv4 dst, forward to specific queue
Steps:
- * Create a list of packets to test, with a corresponding flow list.
- * Launch testpmd.
- * Create first flow rule in flow list.
- * Send first packet in packet list, capture verbose output.
- * Delete flow rule, repeat for all flows/packets.
+ * Launch testpmd with multiple queues.
+ * Create flow rule in group 0 that matches eth src and jumps to group 1.
+ * Create flow rule in group 1 that matches ipv4 dst and queues to queue 2.
+ * Send matching packet and verify it arrives on queue 2.
+ * Send non-matching packet (wrong eth src) and verify it doesn't hit queue 2.
Verify:
- * Packet is dropped.
-
- One packet will be sent as a confidence check, to ensure packets are being
- received under normal circumstances.
+ * Packet matching both rules is received on the target queue.
+ * Packet not matching group 0 rule does not reach target queue.
"""
- packet_list = [
- Ether() / IP(src="192.168.1.1") / Raw(load="xxxxx"),
- Ether() / IP(dst="192.168.1.1") / Raw(load="xxxxx"),
- Ether() / IP(ttl=64) / Raw(load="xxxxx"),
- Ether() / IPv6(src="2001:db8::1") / Raw(load="xxxxx"),
- Ether() / IPv6(dst="2001:db8::1") / Raw(load="xxxxx"),
- Ether() / IPv6() / UDP() / Raw(load="xxxxx"),
- ]
- flow_list = [
- FlowRule(
- direction="ingress", pattern=["eth / ipv4 src is 192.168.1.1"], actions=["drop"]
- ),
- FlowRule(
- direction="ingress", pattern=["eth / ipv4 dst is 192.168.1.1"], actions=["drop"]
- ),
- FlowRule(direction="ingress", pattern=["eth / ipv4 ttl is 64"], actions=["drop"]),
- FlowRule(
- direction="ingress", pattern=["eth / ipv6 src is 2001:db8::1"], actions=["drop"]
- ),
- FlowRule(
- direction="ingress", pattern=["eth / ipv6 dst is 2001:db8::2"], actions=["drop"]
- ),
- FlowRule(direction="ingress", pattern=["eth / ipv6 proto is 17"], actions=["drop"]),
- ]
- # verify reception with test packet
- packet = Ether() / IP() / Raw(load="xxxxx")
- with TestPmd() as testpmd:
- testpmd.start()
- received = send_packet_and_capture(packet)
- verify(received != [], "Test packet was never received.")
- self._runner(
- verification_method=self._send_packet_and_verify,
- flows=flow_list,
- packets=packet_list,
- port_id=0,
- should_receive=False,
+ port_id = 0
+ target_queue = 2
+
+ test_eth_src = "02:00:00:00:00:00"
+ non_matching_eth_src = "02:00:00:00:00:01"
+ test_ipv4_dst = "192.168.1.2"
+ test_ipv4_src = "192.168.1.1"
+ dst_mac = "02:00:00:00:00:02"
+
+ jump_rule = FlowRule(
+ direction="ingress",
+ group_id=0,
+ pattern=[f"eth src is {test_eth_src}"],
+ actions=["jump group 1"],
)
- @func_test
- def drop_action_L4(self) -> None:
- """Validate flow rules with drop actions and ethernet patterns.
-
- Steps:
- * Create a list of packets to test, with a corresponding flow list.
- * Launch testpmd.
- * Create first flow rule in flow list.
- * Send first packet in packet list, capture verbose output.
- * Delete flow rule, repeat for all flows/packets.
-
- Verify:
- * Packet is dropped.
-
- One packet will be sent as a confidence check, to ensure packets are being
- received under normal circumstances.
- """
- packet_list = [
- Ether() / IP() / TCP(sport=1234) / Raw(load="xxxxx"),
- Ether() / IP() / TCP(dport=80) / Raw(load="xxxxx"),
- Ether() / IP() / TCP(flags=0x02) / Raw(load="xxxxx"),
- Ether() / IP() / UDP(sport=5000) / Raw(load="xxxxx"),
- Ether() / IP() / UDP(dport=53) / Raw(load="xxxxx"),
- ]
- flow_list = [
- FlowRule(
- direction="ingress", pattern=["eth / ipv4 / tcp src is 1234"], actions=["drop"]
- ),
- FlowRule(direction="ingress", pattern=["eth / ipv4 / tcp dst is 80"], actions=["drop"]),
- FlowRule(
- direction="ingress", pattern=["eth / ipv4 / tcp flags is 0x02"], actions=["drop"]
- ),
- FlowRule(
- direction="ingress", pattern=["eth / ipv4 / udp src is 5000"], actions=["drop"]
- ),
- FlowRule(direction="ingress", pattern=["eth / ipv4 / udp dst is 53"], actions=["drop"]),
- ]
- # verify reception with test packet
- packet = Ether() / IP() / Raw(load="xxxxx")
- with TestPmd() as testpmd:
- testpmd.start()
- received = send_packet_and_capture(packet)
- verify(received != [], "Test packet was never received.")
- self._runner(
- verification_method=self._send_packet_and_verify,
- flows=flow_list,
- packets=packet_list,
- port_id=0,
- should_receive=False,
+ queue_rule = FlowRule(
+ direction="ingress",
+ group_id=1,
+ pattern=[f"ipv4 dst is {test_ipv4_dst}"],
+ actions=[f"queue index {target_queue}"],
)
- @func_test
- def drop_action_VLAN(self) -> None:
- """Validate flow rules with drop actions and ethernet patterns.
-
- Steps:
- * Create a list of packets to test, with a corresponding flow list.
- * Launch testpmd.
- * Create first flow rule in flow list.
- * Send first packet in packet list, capture verbose output.
- * Delete flow rule, repeat for all flows/packets.
-
- Verify:
- * Packet is dropped.
-
- One packet will be sent as a confidence check, to ensure packets are being
- received under normal circumstances.
- """
- packet_list = [
- Ether() / Dot1Q(vlan=100) / Raw(load="xxxxx"),
- Ether() / Dot1Q(type=0x0800) / Raw(load="xxxxx"),
- ]
- flow_list = [
- FlowRule(direction="ingress", pattern=["eth / vlan"], actions=["drop"]),
- FlowRule(direction="ingress", pattern=["eth / vlan"], actions=["drop"]),
- ]
- # verify reception with test packet
- packet = Ether() / IP() / Raw(load="xxxxx")
- with TestPmd() as testpmd:
- testpmd.start()
- received = send_packet_and_capture(packet)
- verify(received != [], "Test packet was never received.")
- self._runner(
- verification_method=self._send_packet_and_verify,
- flows=flow_list,
- packets=packet_list,
- port_id=0,
- should_receive=False,
+ matching_packet = (
+ Ether(src=test_eth_src, dst=dst_mac)
+ / IP(src=test_ipv4_src, dst=test_ipv4_dst)
+ / UDP(sport=5000, dport=53)
+ / Raw(load="X" * 32)
)
- @func_test
- def modify_actions(self) -> None:
- """Validate flow rules with actions that modify that packet during transmission.
+ non_matching_packet = (
+ Ether(src=non_matching_eth_src, dst=dst_mac)
+ / IP(src=test_ipv4_src, dst=test_ipv4_dst)
+ / UDP(sport=5000, dport=53)
+ / Raw(load="X" * 32)
+ )
- Steps:
- * Create a list of packets to test, with a corresponding flow list.
- * Launch testpmd.
- * Create first flow rule in flow list.
- * Send first packet in packet list, capture verbose output.
- * Delete flow rule, repeat for all flows/packets.
+ jump_result = FlowTestResult(
+ description="eth src -> jump group 1",
+ passed=False,
+ flow_rule_pattern=" / ".join(jump_rule.pattern),
+ )
- Verify:
- * Verify packet is received with the new attributes.
- """
- packet_list = [Ether() / IP(src="192.68.1.1"), Ether(src="02:00:00:00:00:00")]
- flow_list = [
- # rule to copy IPv4 src to IPv4 dst
- FlowRule(
- direction="ingress",
- group_id=1,
- pattern=["eth"],
- actions=[
- "modify_field op set dst_type ipv4_dst src_type ipv4_src width 32"
- " / queue index 0"
- ],
- ),
- # rule to copy src MAC to dst MAC
- FlowRule(
- direction="ingress",
- group_id=1,
- pattern=["eth"],
- actions=[
- "modify_field op set dst_type mac_dst src_type mac_src width 48 / queue index 0"
- ],
- ),
- ]
- expected_packet_list = [Ether() / IP(dst="192.68.1.1"), Ether(dst="02:00:00:00:00:00")]
- self._runner(
- verification_method=self._send_packet_and_verify_modification,
- flows=flow_list,
- packets=packet_list,
- port_id=0,
- expected_packets=expected_packet_list,
+ queue_result = FlowTestResult(
+ description="ipv4 dst -> queue (group 1)",
+ passed=False,
+ flow_rule_pattern=" / ".join(queue_rule.pattern),
)
- @func_test
- def egress_rules(self) -> None:
- """Validate flow rules with egress directions.
+ with TestPmd(rx_queues=4, tx_queues=4) as testpmd:
+ is_valid = testpmd.flow_validate(flow_rule=jump_rule, port_id=port_id)
+ if not is_valid:
+ jump_result.skipped = True
+ jump_result.failure_reason = "Flow rule failed validation"
+ self.test_suite_results.append(jump_result)
+ self.test_case_results.append(jump_result)
+ self._log_test_case_failures()
+ return
- Steps:
- * Create a list of packets to test, with a corresponding flow list.
- * Launch testpmd.
- * Create first flow rule in flow list.
- * Send first packet in packet list, capture verbose output.
- * Delete flow rule, repeat for all flows/packets.
+ try:
+ jump_flow_id = testpmd.flow_create(flow_rule=jump_rule, port_id=port_id)
+ except InteractiveCommandExecutionError:
+ jump_result.failure_reason = "Hardware validated but failed to create flow rule"
+ self.test_suite_results.append(jump_result)
+ self.test_case_results.append(jump_result)
+ self._log_test_case_failures()
+ return
+
+ jump_result.passed = True
+ self.test_suite_results.append(jump_result)
+ self.test_case_results.append(jump_result)
+
+ is_valid = testpmd.flow_validate(flow_rule=queue_rule, port_id=port_id)
+ if not is_valid:
+ queue_result.skipped = True
+ queue_result.failure_reason = "Flow rule failed validation"
+ self.test_suite_results.append(queue_result)
+ self.test_case_results.append(queue_result)
+ testpmd.flow_delete(jump_flow_id, port_id=port_id)
+ self._log_test_case_failures()
+ return
- Verify:
- * Packet is dropped.
+ try:
+ queue_flow_id = testpmd.flow_create(flow_rule=queue_rule, port_id=port_id)
+ except InteractiveCommandExecutionError:
+ queue_result.failure_reason = "Hardware validated but failed to create flow rule"
+ self.test_suite_results.append(queue_result)
+ self.test_case_results.append(queue_result)
+ testpmd.flow_delete(jump_flow_id, port_id=port_id)
+ self._log_test_case_failures()
+ return
- One packet will be sent as a confidence check, to ensure packets are being
- received under normal circumstances.
- """
- packet_list = [
- Ether(src="02:00:00:00:00:00"),
- Ether() / IP(src="192.168.1.1"),
- IP() / TCP(sport=1234),
- IP() / UDP(sport=5000),
- ]
- flow_list = [
- FlowRule(
- direction="egress", pattern=["eth src is 02:00:00:00:00:00"], actions=["drop"]
- ),
- FlowRule(direction="egress", pattern=["ipv4 src is 192.168.1.1"], actions=["drop"]),
- FlowRule(direction="egress", pattern=["tcp src is 1234"], actions=["drop"]),
- FlowRule(direction="egress", pattern=["udp src is 5000"], actions=["drop"]),
- ]
- # verify reception with test packet
- packet = Ether() / IP() / Raw(load="xxxxx")
- with TestPmd() as testpmd:
- testpmd.start()
- received = send_packet_and_capture(packet)
- verify(received != [], "Test packet was never received.")
- self._runner(
- verification_method=self._send_packet_and_verify,
- flows=flow_list,
- packets=packet_list,
- port_id=1,
- should_receive=False,
- )
+ queue_result.passed = True
+ self.test_suite_results.append(queue_result)
+ self.test_case_results.append(queue_result)
- @func_test
- def jump_action(self) -> None:
- """Validate flow rules with different group levels and jump actions.
+ testpmd.set_verbose(level=8)
+ testpmd.start()
- Steps:
- * Create a list of packets to test, with a corresponding flow list.
- * Launch testpmd with the necessary configuration.
- * Create each flow rule in testpmd.
- * Send each packet in the list, check Rx queue ID.
+ send_packet_and_capture(matching_packet)
+ verbose_output = testpmd.extract_verbose_output(testpmd.stop())
+ received = any(p.queue_id == target_queue for p in verbose_output)
+ verify(
+ received,
+ f"Matching packet not received on queue {target_queue} after jump",
+ )
- Verify:
- * Check that each packet is received on the appropriate Rx queue.
- """
- packet_list = [Ether() / IP(), Ether() / IP() / TCP(), Ether() / IP() / UDP()]
- flow_list = [
- FlowRule(
- direction="ingress",
- group_id=0,
- pattern=["eth / ipv4 / tcp"],
- actions=["jump group 1"],
- ),
- FlowRule(
- direction="ingress",
- group_id=0,
- pattern=["eth / ipv4 / udp"],
- actions=["jump group 2"],
- ),
- FlowRule(
- direction="ingress",
- group_id=1,
- pattern=["eth / ipv4 / tcp"],
- actions=["queue index 2"],
- ),
- FlowRule(
- direction="ingress",
- group_id=2,
- pattern=["eth / ipv4 / udp"],
- actions=["queue index 3"],
- ),
- FlowRule(
- direction="ingress",
- group_id=0,
- pattern=["eth / ipv4"],
- actions=["queue index 1"],
- ),
- ]
- expected_queue_list = [1, 2, 3]
- with TestPmd(rx_queues=4, tx_queues=4) as testpmd:
- self._send_packet_and_verify_jump(
- packets=packet_list,
- flow_rules=flow_list,
- test_queues=expected_queue_list,
- testpmd=testpmd,
+ testpmd.start()
+ send_packet_and_capture(non_matching_packet)
+ verbose_output = testpmd.extract_verbose_output(testpmd.stop())
+ received = any(p.queue_id == target_queue for p in verbose_output)
+ verify(
+ not received,
+ f"Non-matching packet incorrectly received on queue {target_queue}",
)
- @func_test
- def priority_attribute(self) -> None:
- """Validate flow rules with queue actions and ethernet patterns.
+ testpmd.flow_delete(queue_flow_id, port_id=port_id)
+ testpmd.flow_delete(jump_flow_id, port_id=port_id)
- Steps:
- * Create a list of packets to test, with a corresponding flow list.
- * Launch testpmd.
- * Create first flow rule in flow list.
- * Send first packet in packet list, capture verbose output.
- * Delete flow rule, repeat for all flows/packets.
+ self._log_test_case_failures()
- Verify:
- * Each packet is received on the appropriate queue.
- """
- test_packet = Ether() / IP() / Raw()
- flow_list = [
- FlowRule(
- direction="ingress",
- priority_level=3,
- pattern=["eth / ipv4"],
- actions=["queue index 1"],
- ),
- FlowRule(
- direction="ingress",
- priority_level=2,
- pattern=["eth / ipv4"],
- actions=["queue index 2"],
- ),
- FlowRule(
- direction="ingress",
- priority_level=1,
- pattern=["eth / ipv4"],
- actions=["queue index 3"],
- ),
- ]
- expected_queue_list = [1, 2, 3]
- with TestPmd(rx_queues=4, tx_queues=4) as testpmd:
- testpmd.set_verbose(level=8)
- for flow, expected_queue in zip(flow_list, expected_queue_list):
- is_valid = testpmd.flow_validate(flow_rule=flow, port_id=0)
- verify_else_skip(is_valid, "flow rule failed validation.")
- try:
- testpmd.flow_create(flow_rule=flow, port_id=0)
- except InteractiveCommandExecutionError:
- log("Flow rule validation passed, but flow creation failed.")
- fail("Failed flow creation")
- testpmd.start()
- send_packet_and_capture(test_packet)
- verbose_output = testpmd.extract_verbose_output(testpmd.stop())
- received = False
- for testpmd_packet in verbose_output:
- if testpmd_packet.queue_id == expected_queue:
- received = True
- verify(received, f"Packet was not received on queue {expected_queue}")
+ def tear_down_suite(self) -> None:
+ """Log test summary at the end of the suite."""
+ self._log_test_suite_summary()
--
2.52.0
^ permalink raw reply [flat|nested] only message in thread
only message in thread, other threads:[~2026-01-06 21:38 UTC | newest]
Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2026-01-06 21:38 [PATCH v1] dts: refactor flow suite with generator pattern Dean Marx
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).