test suite reviews and discussions
 help / color / mirror / Atom feed
From: Hongbo Li <hongbox.li@intel.com>
To: dts@dpdk.org
Cc: Hongbo Li <hongbox.li@intel.com>
Subject: [dts][PATCH V1 6/7] tests/multiprocess: Separated performance cases
Date: Tue, 10 Jan 2023 02:46:22 +0800	[thread overview]
Message-ID: <20230109184623.12986-6-hongbox.li@intel.com> (raw)
In-Reply-To: <20230109184623.12986-1-hongbox.li@intel.com>

Separated performance test cases

Signed-off-by: Hongbo Li <hongbox.li@intel.com>
---
 test_plans/multiprocess_test_plan.rst      |  48 -
 test_plans/perf_multiprocess_test_plan.rst | 141 +++
 tests/TestSuite_multiprocess.py            | 210 -----
 tests/TestSuite_perf_multiprocess.py       | 994 +++++++++++++++++++++
 4 files changed, 1135 insertions(+), 258 deletions(-)
 create mode 100644 test_plans/perf_multiprocess_test_plan.rst
 create mode 100644 tests/TestSuite_perf_multiprocess.py

diff --git a/test_plans/multiprocess_test_plan.rst b/test_plans/multiprocess_test_plan.rst
index c7aae44b..9f5ef8fa 100644
--- a/test_plans/multiprocess_test_plan.rst
+++ b/test_plans/multiprocess_test_plan.rst
@@ -196,27 +196,6 @@ run should remain the same, except for the ``num-procs`` value, which should be
 adjusted appropriately.
 
 
-Test Case: Performance Tests
-----------------------------
-
-Run the multiprocess application using standard IP traffic - varying source
-and destination address information to allow RSS to evenly distribute packets
-among RX queues. Record traffic throughput results as below.
-
-+-------------------+-----+-----+-----+-----+-----+-----+
-| Num-procs         |  1  |  2  |  2  |  4  |  4  |  8  |
-+-------------------+-----+-----+-----+-----+-----+-----+
-| Cores/Threads     | 1/1 | 1/2 | 2/1 | 2/2 | 4/1 | 4/2 |
-+-------------------+-----+-----+-----+-----+-----+-----+
-| Num Ports         |  2  |  2  |  2  |  2  |  2  |  2  |
-+-------------------+-----+-----+-----+-----+-----+-----+
-| Packet Size       |  64 |  64 |  64 |  64 |  64 |  64 |
-+-------------------+-----+-----+-----+-----+-----+-----+
-| %-age Line Rate   |  X  |  X  |  X  |  X  |  X  |  X  |
-+-------------------+-----+-----+-----+-----+-----+-----+
-| Packet Rate(mpps) |  X  |  X  |  X  |  X  |  X  |  X  |
-+-------------------+-----+-----+-----+-----+-----+-----+
-
 Test Case: Function Tests
 -------------------------
 start 2 symmetric_mp process, send some packets, the number of packets is a random value between 20 and 256.
@@ -294,33 +273,6 @@ An example commands to run 8 client processes is as follows::
    root@host:mp_client# ./x86_64-native-linuxapp-gcc/examples/dpdk-mp_client -c 40000 --proc-type=secondary -- -n 6 &
    root@host:mp_client# ./x86_64-native-linuxapp-gcc/examples/dpdk-mp_client -c 100000 --proc-type=secondary -- -n 7 &
 
-Test Case: Performance Measurement
-----------------------------------
-
-- On the traffic generator set up a traffic flow in both directions specifying
-  IP traffic.
-- Run the server and client applications as above.
-- Start the traffic and record the throughput for transmitted and received packets.
-
-An example set of results is shown below.
-
-+----------------------+-----+-----+-----+-----+-----+-----+
-| Server threads       |  1  |  1  |  1  |  1  |  1  |  1  |
-+----------------------+-----+-----+-----+-----+-----+-----+
-| Server Cores/Threads | 1/1 | 1/1 | 1/1 | 1/1 | 1/1 | 1/1 |
-+----------------------+-----+-----+-----+-----+-----+-----+
-| Num-clients          |  1  |  2  |  2  |  4  |  4  |  8  |
-+----------------------+-----+-----+-----+-----+-----+-----+
-| Client Cores/Threads | 1/1 | 1/2 | 2/1 | 2/2 | 4/1 | 4/2 |
-+----------------------+-----+-----+-----+-----+-----+-----+
-| Num Ports            |  2  |  2  |  2  |  2  |  2  |  2  |
-+----------------------+-----+-----+-----+-----+-----+-----+
-| Packet Size          |  64 |  64 |  64 |  64 |  64 |  64 |
-+----------------------+-----+-----+-----+-----+-----+-----+
-| %-age Line Rate      |  X  |  X  |  X  |  X  |  X  |  X  |
-+----------------------+-----+-----+-----+-----+-----+-----+
-| Packet Rate(mpps)    |  X  |  X  |  X  |  X  |  X  |  X  |
-+----------------------+-----+-----+-----+-----+-----+-----+
 
 Test Case: Function Tests
 -------------------------
diff --git a/test_plans/perf_multiprocess_test_plan.rst b/test_plans/perf_multiprocess_test_plan.rst
new file mode 100644
index 00000000..c1e7ff87
--- /dev/null
+++ b/test_plans/perf_multiprocess_test_plan.rst
@@ -0,0 +1,141 @@
+.. SPDX-License-Identifier: BSD-3-Clause
+   Copyright(c) 2010-2017 Intel Corporation
+
+=======================================
+Sample Application Tests: Multi-Process
+=======================================
+
+Simple MP Application Test
+==========================
+
+Description
+-----------
+
+This test is a basic multi-process test which demonstrates the basics of sharing
+information between DPDK processes. The same application binary is run
+twice - once as a primary instance, and once as a secondary instance. Messages
+are sent from primary to secondary and vice versa, demonstrating the processes
+are sharing memory and can communicate using rte_ring structures.
+
+Prerequisites
+-------------
+
+If using vfio the kernel must be >= 3.6+ and VT-d must be enabled in bios.When
+using vfio, use the following commands to load the vfio driver and bind it
+to the device under test::
+
+   modprobe vfio
+   modprobe vfio-pci
+   usertools/dpdk-devbind.py --bind=vfio-pci device_bus_id
+
+Assuming that a DPDK build has been set up and the multi-process sample
+applications have been built.
+
+
+Test Case: Performance Tests
+----------------------------
+
+Run the multiprocess application using standard IP traffic - varying source
+and destination address information to allow RSS to evenly distribute packets
+among RX queues. Record traffic throughput results as below.
+
++-------------------+-----+-----+-----+-----+-----+-----+
+| Num-procs         |  1  |  2  |  2  |  4  |  4  |  8  |
++-------------------+-----+-----+-----+-----+-----+-----+
+| Cores/Threads     | 1/1 | 1/2 | 2/1 | 2/2 | 4/1 | 4/2 |
++-------------------+-----+-----+-----+-----+-----+-----+
+| Num Ports         |  2  |  2  |  2  |  2  |  2  |  2  |
++-------------------+-----+-----+-----+-----+-----+-----+
+| Packet Size       |  64 |  64 |  64 |  64 |  64 |  64 |
++-------------------+-----+-----+-----+-----+-----+-----+
+| %-age Line Rate   |  X  |  X  |  X  |  X  |  X  |  X  |
++-------------------+-----+-----+-----+-----+-----+-----+
+| Packet Rate(mpps) |  X  |  X  |  X  |  X  |  X  |  X  |
++-------------------+-----+-----+-----+-----+-----+-----+
+
+
+Client Server Multiprocess Tests
+================================
+
+Description
+-----------
+
+The client-server sample application demonstrates the ability of Intel� DPDK
+to use multiple processes in which a server process performs packet I/O and one
+or multiple client processes perform packet processing. The server process
+controls load balancing on the traffic received from a number of input ports to
+a user-specified number of clients. The client processes forward the received
+traffic, outputting the packets directly by writing them to the TX rings of the
+outgoing ports.
+
+Prerequisites
+-------------
+
+Assuming that an Intel� DPDK build has been set up and the multi-process
+sample application has been built.
+Also assuming a traffic generator is connected to the ports "0" and "1".
+
+It is important to run the server application before the client application,
+as the server application manages both the NIC ports with packet transmission
+and reception, as well as shared memory areas and client queues.
+
+Run the Server Application:
+
+- Provide the core mask on which the server process is to run using -c, e.g. -c 3 (bitmask number).
+- Set the number of ports to be engaged using -p, e.g. -p 3 refers to ports 0 & 1.
+- Define the maximum number of clients using -n, e.g. -n 8.
+
+The command line below is an example on how to start the server process on
+logical core 2 to handle a maximum of 8 client processes configured to
+run on socket 0 to handle traffic from NIC ports 0 and 1::
+
+    root@host:mp_server# ./x86_64-native-linuxapp-gcc/examples/dpdk-mp_server -c 2 -- -p 3 -n 8
+
+NOTE: If an additional second core is given in the coremask to the server process
+that second core will be used to print statistics. When benchmarking, only a
+single lcore is needed for the server process
+
+Run the Client application:
+
+- In another terminal run the client application.
+- Give each client a distinct core mask with -c.
+- Give each client a unique client-id with -n.
+
+An example commands to run 8 client processes is as follows::
+
+   root@host:mp_client# ./x86_64-native-linuxapp-gcc/examples/dpdk-mp_client -c 40 --proc-type=secondary -- -n 0 &
+   root@host:mp_client# ./x86_64-native-linuxapp-gcc/examples/dpdk-mp_client -c 100 --proc-type=secondary -- -n 1 &
+   root@host:mp_client# ./x86_64-native-linuxapp-gcc/examples/dpdk-mp_client -c 400 --proc-type=secondary -- -n 2 &
+   root@host:mp_client# ./x86_64-native-linuxapp-gcc/examples/dpdk-mp_client -c 1000 --proc-type=secondary -- -n 3 &
+   root@host:mp_client# ./x86_64-native-linuxapp-gcc/examples/dpdk-mp_client -c 4000 --proc-type=secondary -- -n 4 &
+   root@host:mp_client# ./x86_64-native-linuxapp-gcc/examples/dpdk-mp_client -c 10000 --proc-type=secondary -- -n 5 &
+   root@host:mp_client# ./x86_64-native-linuxapp-gcc/examples/dpdk-mp_client -c 40000 --proc-type=secondary -- -n 6 &
+   root@host:mp_client# ./x86_64-native-linuxapp-gcc/examples/dpdk-mp_client -c 100000 --proc-type=secondary -- -n 7 &
+
+Test Case: Performance Measurement
+----------------------------------
+
+- On the traffic generator set up a traffic flow in both directions specifying
+  IP traffic.
+- Run the server and client applications as above.
+- Start the traffic and record the throughput for transmitted and received packets.
+
+An example set of results is shown below.
+
++----------------------+-----+-----+-----+-----+-----+-----+
+| Server threads       |  1  |  1  |  1  |  1  |  1  |  1  |
++----------------------+-----+-----+-----+-----+-----+-----+
+| Server Cores/Threads | 1/1 | 1/1 | 1/1 | 1/1 | 1/1 | 1/1 |
++----------------------+-----+-----+-----+-----+-----+-----+
+| Num-clients          |  1  |  2  |  2  |  4  |  4  |  8  |
++----------------------+-----+-----+-----+-----+-----+-----+
+| Client Cores/Threads | 1/1 | 1/2 | 2/1 | 2/2 | 4/1 | 4/2 |
++----------------------+-----+-----+-----+-----+-----+-----+
+| Num Ports            |  2  |  2  |  2  |  2  |  2  |  2  |
++----------------------+-----+-----+-----+-----+-----+-----+
+| Packet Size          |  64 |  64 |  64 |  64 |  64 |  64 |
++----------------------+-----+-----+-----+-----+-----+-----+
+| %-age Line Rate      |  X  |  X  |  X  |  X  |  X  |  X  |
++----------------------+-----+-----+-----+-----+-----+-----+
+| Packet Rate(mpps)    |  X  |  X  |  X  |  X  |  X  |  X  |
++----------------------+-----+-----+-----+-----+-----+-----+
\ No newline at end of file
diff --git a/tests/TestSuite_multiprocess.py b/tests/TestSuite_multiprocess.py
index 099ce6e7..a52622c9 100644
--- a/tests/TestSuite_multiprocess.py
+++ b/tests/TestSuite_multiprocess.py
@@ -1714,216 +1714,6 @@ class TestMultiprocess(TestCase):
                 "core dump" not in out, "Core dump occurred in the secondary process!!!"
             )
 
-    def test_perf_multiprocess_performance(self):
-        """
-        Benchmark Multiprocess performance.
-        #"""
-        packet_count = 16
-        self.dut.send_expect("fg", "# ")
-        txPort = self.tester.get_local_port(self.dut_ports[0])
-        rxPort = self.tester.get_local_port(self.dut_ports[1])
-        mac = self.tester.get_mac(txPort)
-        dmac = self.dut.get_mac_address(self.dut_ports[0])
-        tgenInput = []
-
-        # create mutative src_ip+dst_ip package
-        for i in range(packet_count):
-            package = (
-                r'flows = [Ether(src="%s", dst="%s")/IP(src="192.168.1.%d", dst="192.168.1.%d")/("X"*26)]'
-                % (mac, dmac, i + 1, i + 2)
-            )
-            self.tester.scapy_append(package)
-            pcap = os.sep.join([self.output_path, "test_%d.pcap" % i])
-            self.tester.scapy_append('wrpcap("%s", flows)' % pcap)
-            tgenInput.append([txPort, rxPort, pcap])
-        self.tester.scapy_execute()
-
-        # run multiple symmetric_mp process
-        validExecutions = []
-        for execution in executions:
-            if len(self.dut.get_core_list(execution["cores"])) == execution["nprocs"]:
-                validExecutions.append(execution)
-
-        portMask = utils.create_mask(self.dut_ports)
-
-        for n in range(len(validExecutions)):
-            execution = validExecutions[n]
-            # get coreList form execution['cores']
-            coreList = self.dut.get_core_list(execution["cores"], socket=self.socket)
-            # to run a set of symmetric_mp instances, like test plan
-            dutSessionList = []
-            for index in range(len(coreList)):
-                dut_new_session = self.dut.new_session()
-                dutSessionList.append(dut_new_session)
-                # add -a option when tester and dut in same server
-                dut_new_session.send_expect(
-                    self.app_symmetric_mp
-                    + " -c %s --proc-type=auto %s -- -p %s --num-procs=%d --proc-id=%d"
-                    % (
-                        utils.create_mask([coreList[index]]),
-                        self.eal_param,
-                        portMask,
-                        execution["nprocs"],
-                        index,
-                    ),
-                    "Finished Process Init",
-                )
-
-            # clear streams before add new streams
-            self.tester.pktgen.clear_streams()
-            # run packet generator
-            streams = self.pktgen_helper.prepare_stream_from_tginput(
-                tgenInput, 100, None, self.tester.pktgen
-            )
-            _, pps = self.tester.pktgen.measure_throughput(stream_ids=streams)
-
-            execution["pps"] = pps
-
-            # close all symmetric_mp process
-            self.dut.send_expect("killall symmetric_mp", "# ")
-            # close all dut sessions
-            for dut_session in dutSessionList:
-                self.dut.close_session(dut_session)
-
-        # get rate and mpps data
-        for n in range(len(executions)):
-            self.verify(executions[n]["pps"] is not 0, "No traffic detected")
-        self.result_table_create(
-            [
-                "Num-procs",
-                "Sockets/Cores/Threads",
-                "Num Ports",
-                "Frame Size",
-                "%-age Line Rate",
-                "Packet Rate(mpps)",
-            ]
-        )
-
-        for execution in validExecutions:
-            self.result_table_add(
-                [
-                    execution["nprocs"],
-                    execution["cores"],
-                    2,
-                    64,
-                    execution["pps"] / float(100000000 / (8 * 84)),
-                    execution["pps"] / float(1000000),
-                ]
-            )
-
-        self.result_table_print()
-
-    def test_perf_multiprocess_client_serverperformance(self):
-        """
-        Benchmark Multiprocess client-server performance.
-        """
-        self.dut.kill_all()
-        self.dut.send_expect("fg", "# ")
-        txPort = self.tester.get_local_port(self.dut_ports[0])
-        rxPort = self.tester.get_local_port(self.dut_ports[1])
-        mac = self.tester.get_mac(txPort)
-
-        self.tester.scapy_append(
-            'dmac="%s"' % self.dut.get_mac_address(self.dut_ports[0])
-        )
-        self.tester.scapy_append('smac="%s"' % mac)
-        self.tester.scapy_append(
-            'flows = [Ether(src=smac, dst=dmac)/IP(src="192.168.1.1", dst="192.168.1.1")/("X"*26)]'
-        )
-
-        pcap = os.sep.join([self.output_path, "test.pcap"])
-        self.tester.scapy_append('wrpcap("%s", flows)' % pcap)
-        self.tester.scapy_execute()
-
-        validExecutions = []
-        for execution in executions:
-            if len(self.dut.get_core_list(execution["cores"])) == execution["nprocs"]:
-                validExecutions.append(execution)
-
-        for execution in validExecutions:
-            coreList = self.dut.get_core_list(execution["cores"], socket=self.socket)
-            # get core with socket parameter to specified which core dut used when tester and dut in same server
-            coreMask = utils.create_mask(
-                self.dut.get_core_list("1S/1C/1T", socket=self.socket)
-            )
-            portMask = utils.create_mask(self.dut_ports)
-            # specified mp_server core and add -a option when tester and dut in same server
-            self.dut.send_expect(
-                self.app_mp_server
-                + " -n %d -c %s %s -- -p %s -n %d"
-                % (
-                    self.dut.get_memory_channels(),
-                    coreMask,
-                    self.eal_param,
-                    portMask,
-                    execution["nprocs"],
-                ),
-                "Finished Process Init",
-                20,
-            )
-            self.dut.send_expect("^Z", "\r\n")
-            self.dut.send_expect("bg", "# ")
-
-            for n in range(execution["nprocs"]):
-                time.sleep(5)
-                # use next core as mp_client core, different from mp_server
-                coreMask = utils.create_mask([str(int(coreList[n]) + 1)])
-                self.dut.send_expect(
-                    self.app_mp_client
-                    + " -n %d -c %s --proc-type=secondary %s -- -n %d"
-                    % (self.dut.get_memory_channels(), coreMask, self.eal_param, n),
-                    "Finished Process Init",
-                )
-                self.dut.send_expect("^Z", "\r\n")
-                self.dut.send_expect("bg", "# ")
-
-            tgenInput = []
-            tgenInput.append([txPort, rxPort, pcap])
-
-            # clear streams before add new streams
-            self.tester.pktgen.clear_streams()
-            # run packet generator
-            streams = self.pktgen_helper.prepare_stream_from_tginput(
-                tgenInput, 100, None, self.tester.pktgen
-            )
-            _, pps = self.tester.pktgen.measure_throughput(stream_ids=streams)
-
-            execution["pps"] = pps
-            self.dut.kill_all()
-            time.sleep(5)
-
-        for n in range(len(executions)):
-            self.verify(executions[n]["pps"] is not 0, "No traffic detected")
-
-        self.result_table_create(
-            [
-                "Server threads",
-                "Server Cores/Threads",
-                "Num-procs",
-                "Sockets/Cores/Threads",
-                "Num Ports",
-                "Frame Size",
-                "%-age Line Rate",
-                "Packet Rate(mpps)",
-            ]
-        )
-
-        for execution in validExecutions:
-            self.result_table_add(
-                [
-                    1,
-                    "1S/1C/1T",
-                    execution["nprocs"],
-                    execution["cores"],
-                    2,
-                    64,
-                    execution["pps"] / float(100000000 / (8 * 84)),
-                    execution["pps"] / float(1000000),
-                ]
-            )
-
-        self.result_table_print()
-
     def set_fields(self):
         """set ip protocol field behavior"""
         fields_config = {
diff --git a/tests/TestSuite_perf_multiprocess.py b/tests/TestSuite_perf_multiprocess.py
new file mode 100644
index 00000000..179574a5
--- /dev/null
+++ b/tests/TestSuite_perf_multiprocess.py
@@ -0,0 +1,994 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2010-2014 Intel Corporation
+#
+
+"""
+DPDK Test suite.
+Multi-process Test.
+"""
+
+import copy
+import os
+import random
+import re
+import time
+import traceback
+from collections import OrderedDict
+
+import framework.utils as utils
+from framework.exception import VerifyFailure
+from framework.packet import Packet
+from framework.pktgen import PacketGeneratorHelper
+from framework.pmd_output import PmdOutput
+from framework.test_case import TestCase, check_supported_nic
+from framework.utils import GREEN, RED
+
+from .rte_flow_common import FdirProcessing as fdirprocess
+from .rte_flow_common import RssProcessing as rssprocess
+
+executions = []
+
+
+class TestMultiprocess(TestCase):
+
+    support_nic = ["ICE_100G-E810C_QSFP", "ICE_25G-E810C_SFP", "ICE_25G-E810_XXV_SFP"]
+
+    def set_up_all(self):
+        """
+        Run at the start of each test suite.
+
+        Multiprocess prerequisites.
+        Requirements:
+            OS is not freeBSD
+            DUT core number >= 4
+            multi_process build pass
+        """
+        # self.verify('bsdapp' not in self.target, "Multiprocess not support freebsd")
+
+        self.verify(len(self.dut.get_all_cores()) >= 4, "Not enough Cores")
+        self.pkt = Packet()
+        self.dut_ports = self.dut.get_ports()
+        self.socket = self.dut.get_numa_id(self.dut_ports[0])
+        extra_option = "-Dexamples='multi_process/client_server_mp/mp_server,multi_process/client_server_mp/mp_client,multi_process/simple_mp,multi_process/symmetric_mp'"
+        self.dut.build_install_dpdk(target=self.target, extra_options=extra_option)
+        self.app_mp_client = self.dut.apps_name["mp_client"]
+        self.app_mp_server = self.dut.apps_name["mp_server"]
+        self.app_simple_mp = self.dut.apps_name["simple_mp"]
+        self.app_symmetric_mp = self.dut.apps_name["symmetric_mp"]
+
+        executions.append({"nprocs": 1, "cores": "1S/1C/1T", "pps": 0})
+        executions.append({"nprocs": 2, "cores": "1S/1C/2T", "pps": 0})
+        executions.append({"nprocs": 2, "cores": "1S/2C/1T", "pps": 0})
+        executions.append({"nprocs": 4, "cores": "1S/2C/2T", "pps": 0})
+        executions.append({"nprocs": 4, "cores": "1S/4C/1T", "pps": 0})
+        executions.append({"nprocs": 8, "cores": "1S/4C/2T", "pps": 0})
+
+        self.eal_param = ""
+        for i in self.dut_ports:
+            self.eal_param += " -a %s" % self.dut.ports_info[i]["pci"]
+
+        self.eal_para = self.dut.create_eal_parameters(cores="1S/2C/1T")
+        # start new session to run secondary
+        self.session_secondary = self.dut.new_session()
+
+        # get dts output path
+        if self.logger.log_path.startswith(os.sep):
+            self.output_path = self.logger.log_path
+        else:
+            cur_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
+            self.output_path = os.sep.join([cur_path, self.logger.log_path])
+        # create an instance to set stream field setting
+        self.pktgen_helper = PacketGeneratorHelper()
+        self.dport_info0 = self.dut.ports_info[self.dut_ports[0]]
+        self.pci0 = self.dport_info0["pci"]
+        self.tester_ifaces = [
+            self.tester.get_interface(self.dut.ports_map[port])
+            for port in self.dut_ports
+        ]
+        rxq = 1
+        self.session_list = []
+        self.logfmt = "*" * 20
+
+    def set_up(self):
+        """
+        Run before each test case.
+        """
+        pass
+
+    def launch_multi_testpmd(self, proc_type, queue_num, process_num, **kwargs):
+        self.session_list = [
+            self.dut.new_session("process_{}".format(i)) for i in range(process_num)
+        ]
+        self.pmd_output_list = [
+            PmdOutput(self.dut, self.session_list[i]) for i in range(process_num)
+        ]
+        self.dut.init_reserved_core()
+        proc_type_list = []
+        self.out_list = []
+        if isinstance(proc_type, list):
+            proc_type_list = copy.deepcopy(proc_type)
+            proc_type = proc_type_list[0]
+        for i in range(process_num):
+            cores = self.dut.get_reserved_core("2C", socket=0)
+            if i != 0 and proc_type_list:
+                proc_type = proc_type_list[1]
+            eal_param = "--proc-type={} -a {} --log-level=ice,7".format(
+                proc_type, self.pci0
+            )
+            param = "--rxq={0} --txq={0} --num-procs={1} --proc-id={2}".format(
+                queue_num, process_num, i
+            )
+            if kwargs.get("options") is not None:
+                param = "".join([param, kwargs.get("options")])
+            out = self.pmd_output_list[i].start_testpmd(
+                cores=cores,
+                eal_param=eal_param,
+                param=param,
+                timeout=kwargs.get("timeout", 20),
+            )
+            self.out_list.append(out)
+            self.pmd_output_list[i].execute_cmd("set fwd rxonly")
+            self.pmd_output_list[i].execute_cmd("set verbose 1")
+            self.pmd_output_list[i].execute_cmd("start")
+            self.pmd_output_list[i].execute_cmd("clear port stats all")
+
+    def get_pkt_statistic_process(self, out, **kwargs):
+        """
+        :param out: information received by testpmd process after sending packets and port statistics
+        :return: forward statistic dict, eg: {'rx-packets':1, 'tx-packets:0, 'tx-dropped':1}
+        """
+        p = re.compile(
+            r"Forward\s+Stats\s+for\s+RX\s+Port=\s+{}/Queue=([\s\d+]\d+)\s+.*\n.*RX-packets:\s+(\d+)\s+TX-packets:\s+(\d+)\s+TX-dropped:\s+(\d+)\s".format(
+                kwargs.get("port_id")
+            )
+        )
+        item_name = ["rx-packets", "tx-packets", "tx-dropped"]
+        statistic = p.findall(out)
+        if statistic:
+            rx_pkt_total, tx_pkt_total, tx_drop_total = 0, 0, 0
+            queue_set = set()
+            for item in statistic:
+                queue, rx_pkt, tx_pkt, tx_drop = map(int, item)
+                queue_set.add(queue)
+                rx_pkt_total += rx_pkt
+                tx_pkt_total += tx_pkt
+                tx_drop_total += tx_drop
+            static_dict = {
+                k: v
+                for k, v in zip(item_name, [rx_pkt_total, tx_pkt_total, tx_drop_total])
+            }
+            static_dict["queue"] = queue_set
+            return static_dict
+        else:
+            raise Exception("got wrong output, not match pattern {}".format(p.pattern))
+
+    def random_packet(self, pkt_num):
+        pkt = Packet()
+        if self.kdriver == "i40e":
+            pkt.generate_random_pkts(
+                pktnum=pkt_num,
+                dstmac="00:11:22:33:44:55",
+                random_type=["IP_RAW", "IPv6_RAW"],
+            )
+        else:
+            pkt.generate_random_pkts(
+                pktnum=pkt_num,
+                dstmac="00:11:22:33:44:55",
+            )
+        pkt.send_pkt(crb=self.tester, tx_port=self.tester_ifaces[0], count=1)
+
+    def specify_packet(self, que_num):
+        # create rule to set queue as one of each process queues
+        rule_str = "flow create 0 ingress pattern eth / ipv4 src is 192.168.{0}.3  / end actions queue index {0} / end"
+        rules = [rule_str.format(i) for i in range(que_num)]
+        fdirprocess(
+            self, self.pmd_output_list[0], self.tester_ifaces, rxq=que_num
+        ).create_rule(rules)
+        # send 1 packet for each queue,the number of packets should be received by each process is (queue_num/proc_num)
+        pkt = Packet()
+        pkt_num = que_num
+        self.logger.info("packet num:{}".format(pkt_num))
+        packets = [
+            'Ether(dst="00:11:22:33:44:55") / IP(src="192.168.{0}.3", dst="192.168.0.21") / Raw("x" * 80)'.format(
+                i
+            )
+            for i in range(pkt_num)
+        ]
+        pkt.update_pkt(packets)
+        pkt.send_pkt(crb=self.tester, tx_port=self.tester_ifaces[0], count=1)
+
+    def _multiprocess_data_pass(self, case):
+        que_num, proc_num = case.get("queue_num"), case.get("proc_num")
+        pkt_num = case.setdefault("pkt_num", que_num)
+        step = int(que_num / proc_num)
+        proc_queue = [set(range(i, i + step)) for i in range(0, que_num, step)]
+        queue_dict = {
+            k: v
+            for k, v in zip(
+                ["process_{}".format(i) for i in range(que_num)], proc_queue
+            )
+        }
+        # start testpmd multi-process
+        self.launch_multi_testpmd(
+            proc_type=case.get("proc_type"), queue_num=que_num, process_num=proc_num
+        )
+        # send random or specify packets
+        packet_func = getattr(self, case.get("packet_type") + "_packet")
+        packet_func(pkt_num)
+        # get output for each process
+        process_static = {}
+        for i in range(len(self.pmd_output_list)):
+            out = self.pmd_output_list[i].execute_cmd("stop")
+            static = self.get_pkt_statistic_process(out, port_id=0)
+            process_static["process_{}".format(i)] = static
+        self.logger.info("process output static:{}".format(process_static))
+        # check whether each process receives packet, and ecah process receives packets with the corresponding queue
+        for k, v in process_static.items():
+            self.verify(
+                v.get("rx-packets") > 0,
+                "fail:process:{} does not receive packet".format(k),
+            )
+            self.verify(
+                v.get("queue").issubset(queue_dict.get(k)),
+                "fail: {} is not a subset of {}, "
+                "process should use its own queues".format(
+                    v.get("queue"), queue_dict.get(k)
+                ),
+            )
+        self.logger.info("pass:each process receives packets and uses its own queue")
+        # check whether the sum of packets received by all processes is equal to the number of packets sent
+        received_pkts = sum(
+            int(v.get("rx-packets", 0)) for v in process_static.values()
+        )
+        self.verify(
+            received_pkts == pkt_num,
+            "the number of packets received is not equal to packets sent,"
+            "send packet:{}, received packet:{}".format(pkt_num, received_pkts),
+        )
+        self.logger.info(
+            "pass:the number of packets received is {}, equal to packets sent".format(
+                received_pkts
+            )
+        )
+
+    def check_rss(self, out, **kwargs):
+        """
+        check whether the packet directed by rss or not according to the specified parameters
+        :param out: information received by testpmd after sending packets and port statistics
+        :param kwargs: some specified parameters, such as: rxq, stats
+        :return: queue value list
+        usage:
+            check_rss(out, rxq=rxq, stats=stats)
+        """
+        self.logger.info("{0} check rss {0}".format(self.logfmt))
+        rxq = kwargs.get("rxq")
+        p = re.compile("RSS\shash=(\w+)\s-\sRSS\squeue=(\w+)")
+        pkt_info = p.findall(out)
+        self.verify(
+            pkt_info,
+            "no information matching the pattern was found,pattern:{}".format(
+                p.pattern
+            ),
+        )
+        pkt_queue = set([int(i[1], 16) for i in pkt_info])
+        if kwargs.get("stats"):
+            self.verify(
+                all([int(i[0], 16) % rxq == int(i[1], 16) for i in pkt_info]),
+                "some pkt not directed by rss.",
+            )
+            self.logger.info((GREEN("pass: all pkts directed by rss")))
+        else:
+            self.verify(
+                not any([int(i[0], 16) % rxq == int(i[1], 16) for i in pkt_info]),
+                "some pkt directed by rss, expect not directed by rss",
+            )
+            self.logger.info((GREEN("pass: no pkt directed by rss")))
+        return pkt_queue
+
+    def check_queue(self, out, check_param, **kwargs):
+        """
+        verify that queue value matches the expected value
+        :param out: information received by testpmd after sending packets and port statistics
+        :param check_param: check item name and value, eg
+                            "check_param": {"port_id": 0, "queue": 2}
+        :param kwargs: some specified parameters, such as: pkt_num, port_id, stats
+        :return:
+        """
+        self.logger.info("{0} check queue {0}".format(self.logfmt))
+        queue = check_param["queue"]
+        if isinstance(check_param["queue"], int):
+            queue = [queue]
+        patt = re.compile(
+            r"port\s+{}/queue(.+?):\s+received\s+(\d+)\s+packets".format(
+                kwargs.get("port_id")
+            )
+        )
+        res = patt.findall(out)
+        if res:
+            pkt_queue = set([int(i[0]) for i in res])
+            if kwargs.get("stats"):
+                self.verify(
+                    all(q in queue for q in pkt_queue),
+                    "fail: queue id not matched, expect queue {}, got {}".format(
+                        queue, pkt_queue
+                    ),
+                )
+                self.logger.info((GREEN("pass: queue id {} matched".format(pkt_queue))))
+            else:
+                try:
+                    self.verify(
+                        not any(q in queue for q in pkt_queue),
+                        "fail: queue id should not matched, {} should not in {}".format(
+                            pkt_queue, queue
+                        ),
+                    )
+                    self.logger.info(
+                        (GREEN("pass: queue id {} not matched".format(pkt_queue)))
+                    )
+                except VerifyFailure:
+                    self.logger.info(
+                        "queue id {} contains the queue {} specified in rule, so need to check"
+                        " whether the packet directed by rss or not".format(
+                            pkt_queue, queue
+                        )
+                    )
+                    # for mismatch packet the 'stats' parameter is False, need to change to True
+                    kwargs["stats"] = True
+                    self.check_rss(out, **kwargs)
+
+        else:
+            raise Exception("got wrong output, not match pattern")
+
+    def check_mark_id(self, out, check_param, **kwargs):
+        """
+        verify that the mark ID matches the expected value
+        :param out: information received by testpmd after sending packets
+        :param check_param: check item name and value, eg
+                            "check_param": {"port_id": 0, "mark_id": 1}
+        :param kwargs: some specified parameters,eg: stats
+        :return: None
+        usage:
+            check_mark_id(out, check_param, stats=stats)
+        """
+        self.logger.info("{0} check mark id {0}".format(self.logfmt))
+        fdir_scanner = re.compile("FDIR matched ID=(0x\w+)")
+        all_mark = fdir_scanner.findall(out)
+        stats = kwargs.get("stats")
+        if stats:
+            mark_list = set(int(i, 16) for i in all_mark)
+            self.verify(
+                all([i == check_param["mark_id"] for i in mark_list]) and mark_list,
+                "failed: some packet mark id of {} not match expect {}".format(
+                    mark_list, check_param["mark_id"]
+                ),
+            )
+            self.logger.info((GREEN("pass: all packets mark id are matched ")))
+        else:
+            # for mismatch packet,verify no mark id in output of received packet
+            self.verify(
+                not all_mark, "mark id {} in output, expect no mark id".format(all_mark)
+            )
+            self.logger.info((GREEN("pass: no mark id in output")))
+
+    def check_drop(self, out, **kwargs):
+        """
+        check the drop number of packets according to the specified parameters
+        :param out: information received by testpmd after sending packets and port statistics
+        :param kwargs: some specified parameters, such as: pkt_num, port_id, stats
+        :return: None
+        usage:
+            chek_drop(out, pkt_num=pkt_num, port_id=portid, stats=stats)
+        """
+        self.logger.info("{0} check drop {0}".format(self.logfmt))
+        pkt_num = kwargs.get("pkt_num")
+        stats = kwargs.get("stats")
+        res = self.get_pkt_statistic(out, **kwargs)
+        self.verify(
+            pkt_num == res["rx-total"],
+            "failed: get wrong amount of packet {}, expected {}".format(
+                res["rx-total"], pkt_num
+            ),
+        )
+        drop_packet_num = res["rx-dropped"]
+        if stats:
+            self.verify(
+                drop_packet_num == pkt_num,
+                "failed: {} packet dropped,expect {} dropped".format(
+                    drop_packet_num, pkt_num
+                ),
+            )
+            self.logger.info(
+                (
+                    GREEN(
+                        "pass: drop packet number {} is matched".format(drop_packet_num)
+                    )
+                )
+            )
+        else:
+            self.verify(
+                drop_packet_num == 0 and res["rx-packets"] == pkt_num,
+                "failed: {} packet dropped, expect 0 packet dropped".format(
+                    drop_packet_num
+                ),
+            )
+            self.logger.info(
+                (
+                    GREEN(
+                        "pass: drop packet number {} is matched".format(drop_packet_num)
+                    )
+                )
+            )
+
+    @staticmethod
+    def get_pkt_statistic(out, **kwargs):
+        """
+        :param out: information received by testpmd after sending packets and port statistics
+        :return: rx statistic dict, eg: {'rx-packets':1, 'rx-dropped':0, 'rx-total':1}
+        """
+        p = re.compile(
+            r"Forward\sstatistics\s+for\s+port\s+{}\s+.*\n.*RX-packets:\s(\d+)\s+RX-dropped:\s(\d+)\s+RX-total:\s(\d+)\s".format(
+                kwargs.get("port_id")
+            )
+        )
+        item_name = ["rx-packets", "rx-dropped", "rx-total"]
+        statistic = p.findall(out)
+        if statistic:
+            static_dict = {
+                k: v for k, v in zip(item_name, list(map(int, list(statistic[0]))))
+            }
+            return static_dict
+        else:
+            raise Exception(
+                "got wrong output, not match pattern {}".format(p.pattern).replace(
+                    "\\\\", "\\"
+                )
+            )
+
+    def send_pkt_get_output(
+        self, instance_obj, pkts, port_id=0, count=1, interval=0, get_stats=False
+    ):
+        instance_obj.pmd_output.execute_cmd("clear port stats all")
+        tx_port = self.tester_ifaces[port_id]
+        self.logger.info("----------send packet-------------")
+        self.logger.info("{}".format(pkts))
+        if not isinstance(pkts, list):
+            pkts = [pkts]
+        self.pkt.update_pkt(pkts)
+        self.pkt.send_pkt(
+            crb=self.tester,
+            tx_port=tx_port,
+            count=count,
+            interval=interval,
+        )
+        out1 = instance_obj.pmd_output.get_output(timeout=1)
+        if get_stats:
+            out2 = instance_obj.pmd_output.execute_cmd("show port stats all")
+            instance_obj.pmd_output.execute_cmd("stop")
+        else:
+            out2 = instance_obj.pmd_output.execute_cmd("stop")
+        instance_obj.pmd_output.execute_cmd("start")
+        return "".join([out1, out2])
+
+    def check_pkt_num(self, out, **kwargs):
+        """
+        check number of received packets matches the expected value
+        :param out: information received by testpmd after sending packets and port statistics
+        :param kwargs: some specified parameters, such as: pkt_num, port_id
+        :return: rx statistic dict
+        """
+        self.logger.info(
+            "{0} check pkt num for port:{1} {0}".format(
+                self.logfmt, kwargs.get("port_id")
+            )
+        )
+        pkt_num = kwargs.get("pkt_num")
+        res = self.get_pkt_statistic(out, **kwargs)
+        res_num = res["rx-total"]
+        self.verify(
+            res_num == pkt_num,
+            "fail: got wrong number of packets, expect pakcet number {}, got {}".format(
+                pkt_num, res_num
+            ),
+        )
+        self.logger.info(
+            (GREEN("pass: pkt num is {} same as expected".format(pkt_num)))
+        )
+        return res
+
+    def check_with_param(self, out, pkt_num, check_param, stats=True):
+        """
+        according to the key and value of the check parameter,
+        perform the corresponding verification in the out information
+        :param out: information received by testpmd after sending packets and port statistics
+        :param pkt_num: number of packets sent
+        :param check_param: check item name and value, eg:
+                            "check_param": {"port_id": 0, "mark_id": 1, "queue": 1}
+                            "check_param": {"port_id": 0, "drop": 1}
+        :param stats: effective status of rule, True or False, default is True
+        :return:
+        usage:
+            check_with_param(out, pkt_num, check_param, stats)
+            check_with_param(out, pkt_num, check_param=check_param)
+        """
+        rxq = check_param.get("rxq")
+        port_id = (
+            check_param["port_id"] if check_param.get("port_id") is not None else 0
+        )
+        match_flag = True
+        """
+        check_dict shows the supported check items,the key is item name and value represent the check priority,
+        the smaller the value, the higher the priority, priority default value is 999. if need to add new check item,
+        please add it to the dict and implement the corresponding method and named as 'check_itemname',eg: check_queue
+        """
+        self.matched_queue = []
+        default_pri = 999
+        check_dict = {
+            "queue": default_pri,
+            "drop": default_pri,
+            "mark_id": 1,
+            "rss": default_pri,
+        }
+        params = {"port_id": port_id, "rxq": rxq, "pkt_num": pkt_num, "stats": stats}
+        # sort check_param order by priority, from high to low, set priority as 999 if key not in check_dict
+        check_param = OrderedDict(
+            sorted(
+                check_param.items(),
+                key=lambda item: check_dict.get(item[0], default_pri),
+            )
+        )
+        if not check_param.get("drop"):
+            self.check_pkt_num(out, **params)
+        for k in check_param:
+            parameter = copy.deepcopy(params)
+            if k not in check_dict:
+                continue
+            func_name = "check_{}".format(k)
+            try:
+                func = getattr(self, func_name)
+            except AttributeError:
+                emsg = "{},this func is not implemented, please check!".format(
+                    traceback.format_exc()
+                )
+                raise Exception(emsg)
+            else:
+                # for mismatch packet, if the check item is 'rss',should also verify the packets are distributed by rss
+                if k == "rss" and not stats:
+                    parameter["stats"] = True
+                    match_flag = False
+                res = func(out=out, check_param=check_param, **parameter)
+                if k == "rss" and match_flag:
+                    self.matched_queue.append(res)
+
+    def destroy_rule(self, instance_obj, port_id=0, rule_id=None):
+        rule_id = 0 if rule_id is None else rule_id
+        if not isinstance(rule_id, list):
+            rule_id = [rule_id]
+        for i in rule_id:
+            out = instance_obj.pmd_output.execute_cmd(
+                "flow destroy {} rule {}".format(port_id, i)
+            )
+            p = re.compile(r"Flow rule #(\d+) destroyed")
+            m = p.search(out)
+            self.verify(m, "flow rule {} delete failed".format(rule_id))
+
+    def multiprocess_flow_data(self, case, **pmd_param):
+        que_num, proc_num = pmd_param.get("queue_num"), pmd_param.get("proc_num")
+        # start testpmd multi-process
+        self.launch_multi_testpmd(
+            proc_type=pmd_param.get("proc_type"),
+            queue_num=que_num,
+            process_num=proc_num,
+        )
+        self.pmd_output_list[0].execute_cmd("flow flush 0")
+        check_param = case["check_param"]
+        check_param["rxq"] = pmd_param.get("queue_num")
+        if check_param.get("rss"):
+            [pmd.execute_cmd("port config all rss all") for pmd in self.pmd_output_list]
+        fdir_pro = fdirprocess(
+            self,
+            self.pmd_output_list[0],
+            self.tester_ifaces,
+            rxq=pmd_param.get("queue_num"),
+        )
+        fdir_pro.create_rule(case.get("rule"))
+        # send match and mismatch packet
+        packets = [case.get("packet")["match"], case.get("packet")["mismatch"]]
+        for i in range(2):
+            out1 = self.send_pkt_get_output(fdir_pro, packets[i])
+            patt = re.compile(
+                r"port\s+{}/queue(.+?):\s+received\s+(\d+)\s+packets".format(
+                    check_param.get("port_id")
+                )
+            )
+            if patt.findall(out1) and check_param.get("rss"):
+                self.logger.info(
+                    "check whether the packets received by the primary process are distributed by RSS"
+                )
+                self.check_rss(out1, stats=True, **check_param)
+            for proc_pmd in self.pmd_output_list[1:]:
+                out2 = proc_pmd.get_output(timeout=1)
+                out3 = proc_pmd.execute_cmd("stop")
+                out1 = "".join([out1, out2, out3])
+                proc_pmd.execute_cmd("start")
+                if patt.findall(out2) and check_param.get("rss"):
+                    self.logger.info(
+                        "check whether the packets received by the secondary process are distributed by RSS"
+                    )
+                    self.check_rss(out2, stats=True, **check_param)
+            pkt_num = len(packets[i])
+            self.check_with_param(
+                out1,
+                pkt_num=pkt_num,
+                check_param=check_param,
+                stats=True if i == 0 else False,
+            )
+
+    def _handle_test(self, tests, instance_obj, port_id=0):
+        instance_obj.pmd_output.wait_link_status_up(port_id)
+        for test in tests:
+            if "send_packet" in test:
+                out = self.send_pkt_get_output(
+                    instance_obj, test["send_packet"], port_id
+                )
+                for proc_pmd in self.pmd_output_list[1:]:
+                    out1 = proc_pmd.get_output(timeout=1)
+                    out = "".join([out, out1])
+            if "action" in test:
+                instance_obj.handle_actions(out, test["action"])
+
+    def multiprocess_rss_data(self, case, **pmd_param):
+        que_num, proc_num = pmd_param.get("queue_num"), pmd_param.get("proc_num")
+        # start testpmd multi-process
+        self.launch_multi_testpmd(
+            proc_type=pmd_param.get("proc_type"),
+            queue_num=que_num,
+            process_num=proc_num,
+            options=pmd_param.get("options", None),
+        )
+        self.pmd_output_list[0].execute_cmd("flow flush 0")
+        rss_pro = rssprocess(
+            self,
+            self.pmd_output_list[0],
+            self.tester_ifaces,
+            rxq=pmd_param.get("queue_num"),
+        )
+        rss_pro.error_msgs = []
+        # handle tests
+        tests = case["test"]
+        port_id = case["port_id"]
+        self.logger.info("------------handle test--------------")
+        # validate rule
+        rule = case.get("rule", None)
+        if rule:
+            rss_pro.validate_rule(rule=rule)
+            rule_ids = rss_pro.create_rule(rule=rule)
+            rss_pro.check_rule(rule_list=rule_ids)
+        self._handle_test(tests, rss_pro, port_id)
+        # handle post-test
+        if "post-test" in case:
+            self.logger.info("------------handle post-test--------------")
+            self.destroy_rule(rss_pro, port_id=port_id, rule_id=rule_ids)
+            rss_pro.check_rule(port_id=port_id, stats=False)
+            self._handle_test(case["post-test"], rss_pro, port_id)
+        if rss_pro.error_msgs:
+            self.verify(
+                False,
+                " ".join([errs.replace("'", " ") for errs in rss_pro.error_msgs[:500]]),
+            )
+
+    def rte_flow(self, case_list, func_name, **kwargs):
+        """
+        main flow of case:
+            1. iterate the case list and do the below steps:
+                a. get the subcase name and init dict to save result
+                b. call method by func name to execute case step
+                c. record case result and err msg if case failed
+                d. clear flow rule
+            2. calculate the case passing rate according to the result dict
+            3. record case result and pass rate in the case log file
+            4. verify whether the case pass rate is equal to 100, if not, mark the case as failed and raise the err msg
+        :param case_list: case list, each item is a subcase of case
+        :param func_name: hadle case method name, eg:
+                        'flow_rule_operate': a method of 'FlowRuleProcessing' class,
+                        used to handle flow rule related suites,such as fdir and switch_filter
+                        'handle_rss_distribute_cases': a method of 'RssProcessing' class,
+                        used to handle rss related suites
+        :return:
+        usage:
+        for flow rule related:
+            rte_flow(caselist, flow_rule_operate)
+        for rss related:
+            rte_flow(caselist, handle_rss_distribute_cases)
+        """
+        if not isinstance(case_list, list):
+            case_list = [case_list]
+        test_results = dict()
+        for case in case_list:
+            case_name = case.get("sub_casename")
+            test_results[case_name] = {}
+            try:
+                self.logger.info("{0} case_name:{1} {0}".format("*" * 20, case_name))
+                func_name(case, **kwargs)
+            except Exception:
+                test_results[case_name]["result"] = "failed"
+                test_results[case_name]["err"] = re.sub(
+                    r"['\r\n]", "", str(traceback.format_exc(limit=1))
+                ).replace("\\\\", "\\")
+                self.logger.info(
+                    (
+                        RED(
+                            "case failed:{}, err:{}".format(
+                                case_name, traceback.format_exc()
+                            )
+                        )
+                    )
+                )
+            else:
+                test_results[case_name]["result"] = "passed"
+                self.logger.info((GREEN("case passed: {}".format(case_name))))
+            finally:
+                self.session_list[0].send_command("flow flush 0", timeout=1)
+                for sess in self.session_list:
+                    self.dut.close_session(sess)
+        pass_rate = (
+            round(
+                sum(1 for k in test_results if "passed" in test_results[k]["result"])
+                / len(test_results),
+                4,
+            )
+            * 100
+        )
+        self.logger.info(
+            [
+                "{}:{}".format(sub_name, test_results[sub_name]["result"])
+                for sub_name in test_results
+            ]
+        )
+        self.logger.info("pass rate is: {}".format(pass_rate))
+        msg = [
+            "subcase_name:{}:{},err:{}".format(
+                name, test_results[name].get("result"), test_results[name].get("err")
+            )
+            for name in test_results.keys()
+            if "failed" in test_results[name]["result"]
+        ]
+        self.verify(
+            int(pass_rate) == 100,
+            "some subcases failed, detail as below:{}".format(msg),
+        )
+
+    def test_perf_multiprocess_performance(self):
+        """
+        Benchmark Multiprocess performance.
+        #"""
+        packet_count = 16
+        self.dut.send_expect("fg", "# ")
+        txPort = self.tester.get_local_port(self.dut_ports[0])
+        rxPort = self.tester.get_local_port(self.dut_ports[1])
+        mac = self.tester.get_mac(txPort)
+        dmac = self.dut.get_mac_address(self.dut_ports[0])
+        tgenInput = []
+
+        # create mutative src_ip+dst_ip package
+        for i in range(packet_count):
+            package = (
+                r'flows = [Ether(src="%s", dst="%s")/IP(src="192.168.1.%d", dst="192.168.1.%d")/("X"*26)]'
+                % (mac, dmac, i + 1, i + 2)
+            )
+            self.tester.scapy_append(package)
+            pcap = os.sep.join([self.output_path, "test_%d.pcap" % i])
+            self.tester.scapy_append('wrpcap("%s", flows)' % pcap)
+            tgenInput.append([txPort, rxPort, pcap])
+        self.tester.scapy_execute()
+
+        # run multiple symmetric_mp process
+        validExecutions = []
+        for execution in executions:
+            if len(self.dut.get_core_list(execution["cores"])) == execution["nprocs"]:
+                validExecutions.append(execution)
+
+        portMask = utils.create_mask(self.dut_ports)
+
+        for n in range(len(validExecutions)):
+            execution = validExecutions[n]
+            # get coreList form execution['cores']
+            coreList = self.dut.get_core_list(execution["cores"], socket=self.socket)
+            # to run a set of symmetric_mp instances, like test plan
+            dutSessionList = []
+            for index in range(len(coreList)):
+                dut_new_session = self.dut.new_session()
+                dutSessionList.append(dut_new_session)
+                # add -a option when tester and dut in same server
+                dut_new_session.send_expect(
+                    self.app_symmetric_mp
+                    + " -c %s --proc-type=auto %s -- -p %s --num-procs=%d --proc-id=%d"
+                    % (
+                        utils.create_mask([coreList[index]]),
+                        self.eal_param,
+                        portMask,
+                        execution["nprocs"],
+                        index,
+                    ),
+                    "Finished Process Init",
+                )
+
+            # clear streams before add new streams
+            self.tester.pktgen.clear_streams()
+            # run packet generator
+            streams = self.pktgen_helper.prepare_stream_from_tginput(
+                tgenInput, 100, None, self.tester.pktgen
+            )
+            _, pps = self.tester.pktgen.measure_throughput(stream_ids=streams)
+
+            execution["pps"] = pps
+
+            # close all symmetric_mp process
+            self.dut.send_expect("killall symmetric_mp", "# ")
+            # close all dut sessions
+            for dut_session in dutSessionList:
+                self.dut.close_session(dut_session)
+
+        # get rate and mpps data
+        for n in range(len(executions)):
+            self.verify(executions[n]["pps"] is not 0, "No traffic detected")
+        self.result_table_create(
+            [
+                "Num-procs",
+                "Sockets/Cores/Threads",
+                "Num Ports",
+                "Frame Size",
+                "%-age Line Rate",
+                "Packet Rate(mpps)",
+            ]
+        )
+
+        for execution in validExecutions:
+            self.result_table_add(
+                [
+                    execution["nprocs"],
+                    execution["cores"],
+                    2,
+                    64,
+                    execution["pps"] / float(100000000 / (8 * 84)),
+                    execution["pps"] / float(1000000),
+                ]
+            )
+
+        self.result_table_print()
+
+    def test_perf_multiprocess_client_serverperformance(self):
+        """
+        Benchmark Multiprocess client-server performance.
+        """
+        self.dut.kill_all()
+        self.dut.send_expect("fg", "# ")
+        txPort = self.tester.get_local_port(self.dut_ports[0])
+        rxPort = self.tester.get_local_port(self.dut_ports[1])
+        mac = self.tester.get_mac(txPort)
+
+        self.tester.scapy_append(
+            'dmac="%s"' % self.dut.get_mac_address(self.dut_ports[0])
+        )
+        self.tester.scapy_append('smac="%s"' % mac)
+        self.tester.scapy_append(
+            'flows = [Ether(src=smac, dst=dmac)/IP(src="192.168.1.1", dst="192.168.1.1")/("X"*26)]'
+        )
+
+        pcap = os.sep.join([self.output_path, "test.pcap"])
+        self.tester.scapy_append('wrpcap("%s", flows)' % pcap)
+        self.tester.scapy_execute()
+
+        validExecutions = []
+        for execution in executions:
+            if len(self.dut.get_core_list(execution["cores"])) == execution["nprocs"]:
+                validExecutions.append(execution)
+
+        for execution in validExecutions:
+            coreList = self.dut.get_core_list(execution["cores"], socket=self.socket)
+            # get core with socket parameter to specified which core dut used when tester and dut in same server
+            coreMask = utils.create_mask(
+                self.dut.get_core_list("1S/1C/1T", socket=self.socket)
+            )
+            portMask = utils.create_mask(self.dut_ports)
+            # specified mp_server core and add -a option when tester and dut in same server
+            self.dut.send_expect(
+                self.app_mp_server
+                + " -n %d -c %s %s -- -p %s -n %d"
+                % (
+                    self.dut.get_memory_channels(),
+                    coreMask,
+                    self.eal_param,
+                    portMask,
+                    execution["nprocs"],
+                ),
+                "Finished Process Init",
+                20,
+            )
+            self.dut.send_expect("^Z", "\r\n")
+            self.dut.send_expect("bg", "# ")
+
+            for n in range(execution["nprocs"]):
+                time.sleep(5)
+                # use next core as mp_client core, different from mp_server
+                coreMask = utils.create_mask([str(int(coreList[n]) + 1)])
+                self.dut.send_expect(
+                    self.app_mp_client
+                    + " -n %d -c %s --proc-type=secondary %s -- -n %d"
+                    % (self.dut.get_memory_channels(), coreMask, self.eal_param, n),
+                    "Finished Process Init",
+                )
+                self.dut.send_expect("^Z", "\r\n")
+                self.dut.send_expect("bg", "# ")
+
+            tgenInput = []
+            tgenInput.append([txPort, rxPort, pcap])
+
+            # clear streams before add new streams
+            self.tester.pktgen.clear_streams()
+            # run packet generator
+            streams = self.pktgen_helper.prepare_stream_from_tginput(
+                tgenInput, 100, None, self.tester.pktgen
+            )
+            _, pps = self.tester.pktgen.measure_throughput(stream_ids=streams)
+
+            execution["pps"] = pps
+            self.dut.kill_all()
+            time.sleep(5)
+
+        for n in range(len(executions)):
+            self.verify(executions[n]["pps"] is not 0, "No traffic detected")
+
+        self.result_table_create(
+            [
+                "Server threads",
+                "Server Cores/Threads",
+                "Num-procs",
+                "Sockets/Cores/Threads",
+                "Num Ports",
+                "Frame Size",
+                "%-age Line Rate",
+                "Packet Rate(mpps)",
+            ]
+        )
+
+        for execution in validExecutions:
+            self.result_table_add(
+                [
+                    1,
+                    "1S/1C/1T",
+                    execution["nprocs"],
+                    execution["cores"],
+                    2,
+                    64,
+                    execution["pps"] / float(100000000 / (8 * 84)),
+                    execution["pps"] / float(1000000),
+                ]
+            )
+
+        self.result_table_print()
+
+    def set_fields(self):
+        """set ip protocol field behavior"""
+        fields_config = {
+            "ip": {
+                "src": {"range": 64, "action": "inc"},
+                "dst": {"range": 64, "action": "inc"},
+            },
+        }
+
+        return fields_config
+
+    def tear_down(self):
+        """
+        Run after each test case.
+        """
+        if self.session_list:
+            for sess in self.session_list:
+                self.dut.close_session(sess)
+        self.dut.kill_all()
+
+    def tear_down_all(self):
+        """
+        Run after each test suite.
+        """
+        self.dut.kill_all()
-- 
2.17.1


  parent reply	other threads:[~2023-01-09 10:26 UTC|newest]

Thread overview: 7+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-01-09 18:46 [dts][PATCH V1 1/7] tests/efd: " Hongbo Li
2023-01-09 18:46 ` [dts][PATCH V1 2/7] tests/l2fwd: " Hongbo Li
2023-01-09 18:46 ` [dts][PATCH V1 3/7] tests/tso: " Hongbo Li
2023-01-09 18:46 ` [dts][PATCH V1 4/7] tests/vxlan: " Hongbo Li
2023-01-09 18:46 ` [dts][PATCH V1 5/7] tests/ipfrag: " Hongbo Li
2023-01-09 18:46 ` Hongbo Li [this message]
2023-01-09 18:46 ` [dts][PATCH V1 7/7] tests/checksum_offload: " Hongbo Li

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20230109184623.12986-6-hongbox.li@intel.com \
    --to=hongbox.li@intel.com \
    --cc=dts@dpdk.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).