test suite reviews and discussions
 help / color / mirror / Atom feed
* Re: [dts] [PATCH V1] tests add new testsuite vxlan gpe support in i40e
  2019-06-05 11:26 [dts] [PATCH V1] tests add new testsuite vxlan gpe support in i40e zhaomeijuan
@ 2019-06-05  8:14 ` Zhang, Yuwei1
  2019-06-12  8:22 ` Tu, Lijuan
  1 sibling, 0 replies; 3+ messages in thread
From: Zhang, Yuwei1 @ 2019-06-05  8:14 UTC (permalink / raw)
  To: Zhao, MeijuanX, dts; +Cc: Zhao, MeijuanX

Acked-by: Yuwei Zhang<yuwei1.zhang@intel.com>

-----Original Message-----
From: dts [mailto:dts-bounces@dpdk.org] On Behalf Of zhaomeijuan
Sent: Wednesday, June 5, 2019 12:26 PM
To: dts@dpdk.org
Cc: Zhao, MeijuanX <meijuanx.zhao@intel.com>
Subject: [dts] [PATCH V1] tests add new testsuite vxlan gpe support in i40e

Signed-off-by: zhaomeijuan <meijuanx.zhao@intel.com>
---
 tests/TestSuite_vxlan_gpe_support_in_i40e.py | 320 +++++++++++++++++++
 1 file changed, 320 insertions(+)
 create mode 100644 tests/TestSuite_vxlan_gpe_support_in_i40e.py

diff --git a/tests/TestSuite_vxlan_gpe_support_in_i40e.py b/tests/TestSuite_vxlan_gpe_support_in_i40e.py
new file mode 100644
index 0000000..f87c541
--- /dev/null
+++ b/tests/TestSuite_vxlan_gpe_support_in_i40e.py
@@ -0,0 +1,320 @@
+# BSD LICENSE
+#
+# Copyright(c) <2019> Intel Corporation. All rights reserved.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without # 
+modification, are permitted provided that the following conditions # 
+are met:
+#
+#   * Redistributions of source code must retain the above copyright
+#     notice, this list of conditions and the following disclaimer.
+#   * Redistributions in binary form must reproduce the above copyright
+#     notice, this list of conditions and the following disclaimer in
+#     the documentation and/or other materials provided with the
+#     distribution.
+#   * Neither the name of Intel Corporation nor the names of its
+#     contributors may be used to endorse or promote products derived
+#     from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS # 
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # 
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR # 
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT # 
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, # 
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT # 
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # 
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY # 
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # 
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # 
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""
+DPDK Test suite.
+
+Test VXLAN-GPE behaviour in DPDK.
+
+"""
+
+import utils
+import os
+import re
+from pmd_output import PmdOutput
+from scapy.utils import wrpcap, rdpcap
+from scapy.layers.inet import Ether, IP, UDP from scapy.layers.l2 
+import Dot1Q from vxlan import Vxlan from scapy.config import conf from 
+test_case import TestCase from settings import FOLDERS
+
+#
+#
+# Test class.
+#
+
+VXLAN_GPE_PORT = 4790
+
+
+class VxlanGpeTestConfig(object):
+
+    """
+    Module for config/create/transmit vxlan packet
+    """
+
+    def __init__(self, test_case, **kwargs):
+        self.test_case = test_case
+        self.init()
+        for name in kwargs:
+            setattr(self, name, kwargs[name])
+
+    def init(self):
+        self.packets_config()
+
+    def packets_config(self):
+        """
+        Default vxlan packet format
+        """
+        self.pcap_file = '/root/vxlan_gpe.pcap'
+        self.outer_mac_dst = '11:22:33:44:55:66'
+        self.outer_ip_dst = '192.168.1.2'
+        self.outer_udp_dst = VXLAN_GPE_PORT
+        self.vni = 1
+        self.inner_mac_dst = '00:00:20:00:00:01'
+        self.inner_vlan = 'N/A'
+        self.inner_ip_dst = '192.168.2.2'
+        self.payload_size = 18
+
+    def create_pcap(self):
+        """
+        Create pcap file and copy it to tester
+        """
+        self.inner_payload = ("X" * self.payload_size)
+
+        if self.inner_vlan != 'N/A':
+            inner = Ether() / Dot1Q() / IP() / UDP() / self.inner_payload
+            inner[Dot1Q].vlan = self.inner_vlan
+        else:
+            inner = Ether() / IP() / UDP() / self.inner_payload
+
+        inner[IP().name].dst = self.inner_ip_dst
+
+        inner[Ether].dst = self.inner_mac_dst
+
+        outer = Ether() / IP() / UDP()
+
+        outer[Ether].dst = self.outer_mac_dst
+
+        outer[IP().name].dst = self.outer_ip_dst
+
+        outer[UDP].dport = self.outer_udp_dst
+
+        if self.outer_udp_dst == VXLAN_GPE_PORT:
+            self.pkt = outer / Vxlan(vni=self.vni) / inner
+        else:
+            self.pkt = outer / ("X" * self.payload_size)
+
+        wrpcap(self.pcap_file, self.pkt)
+
+    def send_pcap(self, iface=""):
+        """
+        Send vxlan pcap file by iface
+        """
+        # load vxlan module to scapy
+        cwd = os.getcwd()
+        dir_vxlan_module = cwd + r'/' + FOLDERS['Depends']
+        self.test_case.tester.scapy_append("sys.path.append('%s')" % dir_vxlan_module)
+        self.test_case.tester.scapy_append("from vxlan import Vxlan")
+        self.test_case.tester.scapy_append(
+            'pcap = rdpcap("%s")' % self.pcap_file)
+        self.test_case.tester.scapy_append(
+            'sendp(pcap, iface="%s")' % iface)
+        self.test_case.tester.scapy_execute()
+
+
+class TestVxlanGpeSupportInI40e(TestCase):
+
+    def set_up_all(self):
+        """
+        vxlan Prerequisites
+        """
+        # this feature only enable in FVL now
+        if self.nic not in ["fortville_eagle", "fortville_spirit", "fortville_spirit_single", "fortville_25g"]:
+            self.verify(False, "%s not support this vxlan-gpe" % self.nic)
+        # Based on h/w type, choose how many ports to use
+        ports = self.dut.get_ports()
+
+        # Verify that enough ports are available
+        self.verify(len(ports) >= 2, "Insufficient ports for testing")
+        global valports
+        valports = [_ for _ in ports if self.tester.get_local_port(_) 
+ != -1]
+
+        self.portMask = utils.create_mask(valports[:2])
+
+        # Verify that enough threads are available
+        netdev = self.dut.ports_info[ports[0]]['port']
+        self.ports_socket = netdev.socket
+        cores = self.dut.get_core_list("all", socket=self.ports_socket)
+        self.verify(cores is not None, "Insufficient cores for speed testing")
+        self.coremask = utils.create_mask(cores)
+
+        # start testpmd
+        self.pmdout = PmdOutput(self.dut)
+
+        # init port config
+        self.dut_port = valports[0]
+        self.dut_port_mac = self.dut.get_mac_address(self.dut_port)
+        tester_port = self.tester.get_local_port(self.dut_port)
+        self.tester_iface = self.tester.get_interface(tester_port)
+
+    def set_up(self):
+        """
+        Run before each test case.
+        """
+        pass
+
+    def filter_and_check(self, filter_type="imac-ivlan", queue_id=3,
+                         vlan=False, remove=False):
+        """
+        send vxlan packet and check whether receive packet in assigned queue
+        """
+        if vlan is not False:
+            config = VxlanGpeTestConfig(self, inner_vlan=vlan)
+            vlan_id = vlan
+        else:
+            config = VxlanGpeTestConfig(self)
+            vlan_id = 1
+
+        # now cloud filter will default enable L2 mac filter, so dst mac must
+        # be same
+        config.outer_mac_dst = self.dut_port_mac
+
+        args = [self.dut_port, config.outer_mac_dst, config.inner_mac_dst,
+                config.inner_ip_dst, vlan_id, filter_type, config.vni,
+                queue_id]
+
+        self.tunnel_filter_add(*args)
+
+        # invalid case request to remove tunnel filter
+        if remove is True:
+            queue_id = 0
+            args = [self.dut_port, config.outer_mac_dst, config.inner_mac_dst,
+                    config.inner_ip_dst, vlan_id, filter_type, config.vni,
+                    queue_id]
+            self.tunnel_filter_del(*args)
+
+        # send vxlan packet
+        config.create_pcap()
+        self.dut.send_expect("start", "testpmd>", 10)
+        config.send_pcap(self.tester_iface)
+        out = self.dut.get_session_output(timeout=2)
+
+        queue = -1
+        pattern = re.compile("- Receive queue=0x(\d)")
+        m = pattern.search(out)
+        if m is not None:
+            queue = m.group(1)
+
+        # verify received in expected queue
+        self.verify(queue_id == int(queue), "invalid receive queue")
+
+        self.dut.send_expect("stop", "testpmd>", 10)
+
+    def test_vxlan_gpe_ipv4_detect(self):
+        self.pmdout.start_testpmd('all')
+        self.pmdout.execute_cmd('set fwd io')
+        self.pmdout.execute_cmd('set verbose 1')
+        # add VXLAN-GPE packet type
+        self.pmdout.execute_cmd('port config 0 udp_tunnel_port add vxlan-gpe %s' % VXLAN_GPE_PORT)
+        self.pmdout.execute_cmd('start')
+        mac = self.pmdout.get_port_mac(0)
+        # send one VXLAN-GPE type packet
+        packet = 'sendp([Ether(dst="%s")/IP(src="18.0.0.1")/UDP(dport=%d, sport=43)/' % (mac, VXLAN_GPE_PORT) + \
+                 'VXLAN(flags=12)/IP(src="10.0.0.1")], iface="%s", count=1)' % self.tester_iface
+        self.tester.scapy_append(packet)
+        self.tester.scapy_execute()
+        out = self.dut.get_session_output(timeout=5)
+        print out
+        self.verify('L3_IPV4_EXT_UNKNOWN' in out and '%s' % 
+ VXLAN_GPE_PORT in out, 'no detect vxlan-gpe packet')
+
+        # delete the VXLAN-GPE packet type, testpmd should treat the packet as a normal UDP packet
+        self.pmdout.execute_cmd('port config 0 udp_tunnel_port rm vxlan-gpe %s' % VXLAN_GPE_PORT)
+        self.tester.scapy_append(packet)
+        self.tester.scapy_execute()
+        out = self.dut.get_session_output(timeout=5)
+        print out
+        self.pmdout.execute_cmd('quit', '#')
+        self.verify('L3_IPV4_EXT_UNKNOWN' in out and '%s' % 
+ VXLAN_GPE_PORT not in out, 'no detect vxlan-gpe packet')
+
+    def test_tunnel_filter_vxlan_gpe(self):
+        """
+        verify tunnel filter feature
+        """
+        pmd_temp = "./%(TARGET)s/app/testpmd -c %(COREMASK)s -n " + \
+            "%(CHANNEL)d -- -i --disable-rss --rxq=4 --txq=4" + \
+            " --nb-cores=4 --portmask=%(PORT)s"
+        pmd_cmd = pmd_temp % {'TARGET': self.target,
+                              'COREMASK': self.coremask,
+                              'CHANNEL': self.dut.get_memory_channels(),
+                              'PORT': self.portMask}
+        self.dut.send_expect(pmd_cmd, "testpmd>", 30)
+
+        self.dut.send_expect("set fwd rxonly", "testpmd>", 10)
+        self.dut.send_expect("set verbose 1", "testpmd>", 10)
+        self.enable_vxlan(self.dut_port)
+
+        # check inner mac + inner vlan filter can work
+        self.filter_and_check(filter_type="imac-ivlan", vlan=1)
+        # check inner mac + inner vlan + tunnel id filter can work
+        self.filter_and_check(filter_type="imac-ivlan-tenid", vlan=1)
+        # check inner mac + tunnel id filter can work
+        self.filter_and_check(filter_type="imac-tenid")
+        # check inner mac filter can work
+        self.filter_and_check(filter_type="imac")
+        # check outer mac + inner mac + tunnel id filter can work
+        self.filter_and_check(filter_type="omac-imac-tenid")
+        # iip not supported by now
+        self.filter_and_check(filter_type="iip")
+
+        self.dut.send_expect("quit", "#", 10)
+
+    def enable_vxlan(self, port):
+        self.dut.send_expect("rx_vxlan_port add %d %d"
+                             % (VXLAN_GPE_PORT, port),
+                             "testpmd>", 10)
+
+    def tunnel_filter_add(self, *args):
+        # tunnel_filter add port_id outer_mac inner_mac ip inner_vlan
+        # tunnel_type(vxlan)
+        # filter_type
+        # (imac-ivlan|imac-ivlan-tenid|imac-tenid|imac|omac-imac-tenid|iip)
+        # tenant_id queue_num
+        out = self.dut.send_expect("tunnel_filter add %d " % args[0] +
+                                   "%s %s %s " % (args[1], args[2], args[3]) +
+                                   "%d vxlan-gpe %s " % (args[4], args[5]) +
+                                   "%d %d" % (args[6], args[7]),
+                                   "testpmd>", 10)
+        self.verify("Bad arguments" not in out, "Failed to add tunnel filter")
+        self.verify("error" not in out, "Failed to add tunnel filter")
+
+    def tunnel_filter_del(self, *args):
+        out = self.dut.send_expect("tunnel_filter rm %d " % args[0] +
+                                   "%s %s %s " % (args[1], args[2], args[3]) +
+                                   "%d vxlan-gpe %s " % (args[4], args[5]) +
+                                   "%d %d" % (args[6], args[7]),
+                                   "testpmd>", 10)
+        self.verify("Bad arguments" not in out,
+                    "Failed to remove tunnel filter")
+        self.verify("error" not in out, "Failed to remove tunnel 
+ filter")
+
+    def tear_down(self):
+        """
+        Run after each test case.
+        """
+        self.dut.kill_all()
+
+    def tear_down_all(self):
+        """
+        Run after each test suite.
+        """
+        pass
--
2.17.1


^ permalink raw reply	[flat|nested] 3+ messages in thread

* [dts] [PATCH V1] tests add new testsuite vxlan gpe support in i40e
@ 2019-06-05 11:26 zhaomeijuan
  2019-06-05  8:14 ` Zhang, Yuwei1
  2019-06-12  8:22 ` Tu, Lijuan
  0 siblings, 2 replies; 3+ messages in thread
From: zhaomeijuan @ 2019-06-05 11:26 UTC (permalink / raw)
  To: dts; +Cc: zhaomeijuan

Signed-off-by: zhaomeijuan <meijuanx.zhao@intel.com>
---
 tests/TestSuite_vxlan_gpe_support_in_i40e.py | 320 +++++++++++++++++++
 1 file changed, 320 insertions(+)
 create mode 100644 tests/TestSuite_vxlan_gpe_support_in_i40e.py

diff --git a/tests/TestSuite_vxlan_gpe_support_in_i40e.py b/tests/TestSuite_vxlan_gpe_support_in_i40e.py
new file mode 100644
index 0000000..f87c541
--- /dev/null
+++ b/tests/TestSuite_vxlan_gpe_support_in_i40e.py
@@ -0,0 +1,320 @@
+# BSD LICENSE
+#
+# Copyright(c) <2019> Intel Corporation. All rights reserved.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+#   * Redistributions of source code must retain the above copyright
+#     notice, this list of conditions and the following disclaimer.
+#   * Redistributions in binary form must reproduce the above copyright
+#     notice, this list of conditions and the following disclaimer in
+#     the documentation and/or other materials provided with the
+#     distribution.
+#   * Neither the name of Intel Corporation nor the names of its
+#     contributors may be used to endorse or promote products derived
+#     from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""
+DPDK Test suite.
+
+Test VXLAN-GPE behaviour in DPDK.
+
+"""
+
+import utils
+import os
+import re
+from pmd_output import PmdOutput
+from scapy.utils import wrpcap, rdpcap
+from scapy.layers.inet import Ether, IP, UDP
+from scapy.layers.l2 import Dot1Q
+from vxlan import Vxlan
+from scapy.config import conf
+from test_case import TestCase
+from settings import FOLDERS
+
+#
+#
+# Test class.
+#
+
+VXLAN_GPE_PORT = 4790
+
+
+class VxlanGpeTestConfig(object):
+
+    """
+    Module for config/create/transmit vxlan packet
+    """
+
+    def __init__(self, test_case, **kwargs):
+        self.test_case = test_case
+        self.init()
+        for name in kwargs:
+            setattr(self, name, kwargs[name])
+
+    def init(self):
+        self.packets_config()
+
+    def packets_config(self):
+        """
+        Default vxlan packet format
+        """
+        self.pcap_file = '/root/vxlan_gpe.pcap'
+        self.outer_mac_dst = '11:22:33:44:55:66'
+        self.outer_ip_dst = '192.168.1.2'
+        self.outer_udp_dst = VXLAN_GPE_PORT
+        self.vni = 1
+        self.inner_mac_dst = '00:00:20:00:00:01'
+        self.inner_vlan = 'N/A'
+        self.inner_ip_dst = '192.168.2.2'
+        self.payload_size = 18
+
+    def create_pcap(self):
+        """
+        Create pcap file and copy it to tester
+        """
+        self.inner_payload = ("X" * self.payload_size)
+
+        if self.inner_vlan != 'N/A':
+            inner = Ether() / Dot1Q() / IP() / UDP() / self.inner_payload
+            inner[Dot1Q].vlan = self.inner_vlan
+        else:
+            inner = Ether() / IP() / UDP() / self.inner_payload
+
+        inner[IP().name].dst = self.inner_ip_dst
+
+        inner[Ether].dst = self.inner_mac_dst
+
+        outer = Ether() / IP() / UDP()
+
+        outer[Ether].dst = self.outer_mac_dst
+
+        outer[IP().name].dst = self.outer_ip_dst
+
+        outer[UDP].dport = self.outer_udp_dst
+
+        if self.outer_udp_dst == VXLAN_GPE_PORT:
+            self.pkt = outer / Vxlan(vni=self.vni) / inner
+        else:
+            self.pkt = outer / ("X" * self.payload_size)
+
+        wrpcap(self.pcap_file, self.pkt)
+
+    def send_pcap(self, iface=""):
+        """
+        Send vxlan pcap file by iface
+        """
+        # load vxlan module to scapy
+        cwd = os.getcwd()
+        dir_vxlan_module = cwd + r'/' + FOLDERS['Depends']
+        self.test_case.tester.scapy_append("sys.path.append('%s')" % dir_vxlan_module)
+        self.test_case.tester.scapy_append("from vxlan import Vxlan")
+        self.test_case.tester.scapy_append(
+            'pcap = rdpcap("%s")' % self.pcap_file)
+        self.test_case.tester.scapy_append(
+            'sendp(pcap, iface="%s")' % iface)
+        self.test_case.tester.scapy_execute()
+
+
+class TestVxlanGpeSupportInI40e(TestCase):
+
+    def set_up_all(self):
+        """
+        vxlan Prerequisites
+        """
+        # this feature only enable in FVL now
+        if self.nic not in ["fortville_eagle", "fortville_spirit", "fortville_spirit_single", "fortville_25g"]:
+            self.verify(False, "%s not support this vxlan-gpe" % self.nic)
+        # Based on h/w type, choose how many ports to use
+        ports = self.dut.get_ports()
+
+        # Verify that enough ports are available
+        self.verify(len(ports) >= 2, "Insufficient ports for testing")
+        global valports
+        valports = [_ for _ in ports if self.tester.get_local_port(_) != -1]
+
+        self.portMask = utils.create_mask(valports[:2])
+
+        # Verify that enough threads are available
+        netdev = self.dut.ports_info[ports[0]]['port']
+        self.ports_socket = netdev.socket
+        cores = self.dut.get_core_list("all", socket=self.ports_socket)
+        self.verify(cores is not None, "Insufficient cores for speed testing")
+        self.coremask = utils.create_mask(cores)
+
+        # start testpmd
+        self.pmdout = PmdOutput(self.dut)
+
+        # init port config
+        self.dut_port = valports[0]
+        self.dut_port_mac = self.dut.get_mac_address(self.dut_port)
+        tester_port = self.tester.get_local_port(self.dut_port)
+        self.tester_iface = self.tester.get_interface(tester_port)
+
+    def set_up(self):
+        """
+        Run before each test case.
+        """
+        pass
+
+    def filter_and_check(self, filter_type="imac-ivlan", queue_id=3,
+                         vlan=False, remove=False):
+        """
+        send vxlan packet and check whether receive packet in assigned queue
+        """
+        if vlan is not False:
+            config = VxlanGpeTestConfig(self, inner_vlan=vlan)
+            vlan_id = vlan
+        else:
+            config = VxlanGpeTestConfig(self)
+            vlan_id = 1
+
+        # now cloud filter will default enable L2 mac filter, so dst mac must
+        # be same
+        config.outer_mac_dst = self.dut_port_mac
+
+        args = [self.dut_port, config.outer_mac_dst, config.inner_mac_dst,
+                config.inner_ip_dst, vlan_id, filter_type, config.vni,
+                queue_id]
+
+        self.tunnel_filter_add(*args)
+
+        # invalid case request to remove tunnel filter
+        if remove is True:
+            queue_id = 0
+            args = [self.dut_port, config.outer_mac_dst, config.inner_mac_dst,
+                    config.inner_ip_dst, vlan_id, filter_type, config.vni,
+                    queue_id]
+            self.tunnel_filter_del(*args)
+
+        # send vxlan packet
+        config.create_pcap()
+        self.dut.send_expect("start", "testpmd>", 10)
+        config.send_pcap(self.tester_iface)
+        out = self.dut.get_session_output(timeout=2)
+
+        queue = -1
+        pattern = re.compile("- Receive queue=0x(\d)")
+        m = pattern.search(out)
+        if m is not None:
+            queue = m.group(1)
+
+        # verify received in expected queue
+        self.verify(queue_id == int(queue), "invalid receive queue")
+
+        self.dut.send_expect("stop", "testpmd>", 10)
+
+    def test_vxlan_gpe_ipv4_detect(self):
+        self.pmdout.start_testpmd('all')
+        self.pmdout.execute_cmd('set fwd io')
+        self.pmdout.execute_cmd('set verbose 1')
+        # add VXLAN-GPE packet type
+        self.pmdout.execute_cmd('port config 0 udp_tunnel_port add vxlan-gpe %s' % VXLAN_GPE_PORT)
+        self.pmdout.execute_cmd('start')
+        mac = self.pmdout.get_port_mac(0)
+        # send one VXLAN-GPE type packet
+        packet = 'sendp([Ether(dst="%s")/IP(src="18.0.0.1")/UDP(dport=%d, sport=43)/' % (mac, VXLAN_GPE_PORT) + \
+                 'VXLAN(flags=12)/IP(src="10.0.0.1")], iface="%s", count=1)' % self.tester_iface
+        self.tester.scapy_append(packet)
+        self.tester.scapy_execute()
+        out = self.dut.get_session_output(timeout=5)
+        print out
+        self.verify('L3_IPV4_EXT_UNKNOWN' in out and '%s' % VXLAN_GPE_PORT in out, 'no detect vxlan-gpe packet')
+
+        # delete the VXLAN-GPE packet type, testpmd should treat the packet as a normal UDP packet
+        self.pmdout.execute_cmd('port config 0 udp_tunnel_port rm vxlan-gpe %s' % VXLAN_GPE_PORT)
+        self.tester.scapy_append(packet)
+        self.tester.scapy_execute()
+        out = self.dut.get_session_output(timeout=5)
+        print out
+        self.pmdout.execute_cmd('quit', '#')
+        self.verify('L3_IPV4_EXT_UNKNOWN' in out and '%s' % VXLAN_GPE_PORT not in out, 'no detect vxlan-gpe packet')
+
+    def test_tunnel_filter_vxlan_gpe(self):
+        """
+        verify tunnel filter feature
+        """
+        pmd_temp = "./%(TARGET)s/app/testpmd -c %(COREMASK)s -n " + \
+            "%(CHANNEL)d -- -i --disable-rss --rxq=4 --txq=4" + \
+            " --nb-cores=4 --portmask=%(PORT)s"
+        pmd_cmd = pmd_temp % {'TARGET': self.target,
+                              'COREMASK': self.coremask,
+                              'CHANNEL': self.dut.get_memory_channels(),
+                              'PORT': self.portMask}
+        self.dut.send_expect(pmd_cmd, "testpmd>", 30)
+
+        self.dut.send_expect("set fwd rxonly", "testpmd>", 10)
+        self.dut.send_expect("set verbose 1", "testpmd>", 10)
+        self.enable_vxlan(self.dut_port)
+
+        # check inner mac + inner vlan filter can work
+        self.filter_and_check(filter_type="imac-ivlan", vlan=1)
+        # check inner mac + inner vlan + tunnel id filter can work
+        self.filter_and_check(filter_type="imac-ivlan-tenid", vlan=1)
+        # check inner mac + tunnel id filter can work
+        self.filter_and_check(filter_type="imac-tenid")
+        # check inner mac filter can work
+        self.filter_and_check(filter_type="imac")
+        # check outer mac + inner mac + tunnel id filter can work
+        self.filter_and_check(filter_type="omac-imac-tenid")
+        # iip not supported by now
+        self.filter_and_check(filter_type="iip")
+
+        self.dut.send_expect("quit", "#", 10)
+
+    def enable_vxlan(self, port):
+        self.dut.send_expect("rx_vxlan_port add %d %d"
+                             % (VXLAN_GPE_PORT, port),
+                             "testpmd>", 10)
+
+    def tunnel_filter_add(self, *args):
+        # tunnel_filter add port_id outer_mac inner_mac ip inner_vlan
+        # tunnel_type(vxlan)
+        # filter_type
+        # (imac-ivlan|imac-ivlan-tenid|imac-tenid|imac|omac-imac-tenid|iip)
+        # tenant_id queue_num
+        out = self.dut.send_expect("tunnel_filter add %d " % args[0] +
+                                   "%s %s %s " % (args[1], args[2], args[3]) +
+                                   "%d vxlan-gpe %s " % (args[4], args[5]) +
+                                   "%d %d" % (args[6], args[7]),
+                                   "testpmd>", 10)
+        self.verify("Bad arguments" not in out, "Failed to add tunnel filter")
+        self.verify("error" not in out, "Failed to add tunnel filter")
+
+    def tunnel_filter_del(self, *args):
+        out = self.dut.send_expect("tunnel_filter rm %d " % args[0] +
+                                   "%s %s %s " % (args[1], args[2], args[3]) +
+                                   "%d vxlan-gpe %s " % (args[4], args[5]) +
+                                   "%d %d" % (args[6], args[7]),
+                                   "testpmd>", 10)
+        self.verify("Bad arguments" not in out,
+                    "Failed to remove tunnel filter")
+        self.verify("error" not in out, "Failed to remove tunnel filter")
+
+    def tear_down(self):
+        """
+        Run after each test case.
+        """
+        self.dut.kill_all()
+
+    def tear_down_all(self):
+        """
+        Run after each test suite.
+        """
+        pass
-- 
2.17.1


^ permalink raw reply	[flat|nested] 3+ messages in thread

* Re: [dts] [PATCH V1] tests add new testsuite vxlan gpe support in i40e
  2019-06-05 11:26 [dts] [PATCH V1] tests add new testsuite vxlan gpe support in i40e zhaomeijuan
  2019-06-05  8:14 ` Zhang, Yuwei1
@ 2019-06-12  8:22 ` Tu, Lijuan
  1 sibling, 0 replies; 3+ messages in thread
From: Tu, Lijuan @ 2019-06-12  8:22 UTC (permalink / raw)
  To: Zhao, MeijuanX, dts; +Cc: Zhao, MeijuanX

Applied, thanks

> -----Original Message-----
> From: dts [mailto:dts-bounces@dpdk.org] On Behalf Of zhaomeijuan
> Sent: Wednesday, June 5, 2019 7:26 PM
> To: dts@dpdk.org
> Cc: Zhao, MeijuanX <meijuanx.zhao@intel.com>
> Subject: [dts] [PATCH V1] tests add new testsuite vxlan gpe support in i40e
> 
> Signed-off-by: zhaomeijuan <meijuanx.zhao@intel.com>
> ---
>  tests/TestSuite_vxlan_gpe_support_in_i40e.py | 320 +++++++++++++++++++
>  1 file changed, 320 insertions(+)
>  create mode 100644 tests/TestSuite_vxlan_gpe_support_in_i40e.py
> 
> diff --git a/tests/TestSuite_vxlan_gpe_support_in_i40e.py
> b/tests/TestSuite_vxlan_gpe_support_in_i40e.py
> new file mode 100644
> index 0000000..f87c541
> --- /dev/null
> +++ b/tests/TestSuite_vxlan_gpe_support_in_i40e.py
> @@ -0,0 +1,320 @@
> +# BSD LICENSE
> +#
> +# Copyright(c) <2019> Intel Corporation. All rights reserved.
> +# All rights reserved.
> +#
> +# Redistribution and use in source and binary forms, with or without #
> +modification, are permitted provided that the following conditions #
> +are met:
> +#
> +#   * Redistributions of source code must retain the above copyright
> +#     notice, this list of conditions and the following disclaimer.
> +#   * Redistributions in binary form must reproduce the above copyright
> +#     notice, this list of conditions and the following disclaimer in
> +#     the documentation and/or other materials provided with the
> +#     distribution.
> +#   * Neither the name of Intel Corporation nor the names of its
> +#     contributors may be used to endorse or promote products derived
> +#     from this software without specific prior written permission.
> +#
> +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
> CONTRIBUTORS #
> +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT #
> +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
> FOR #
> +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
> COPYRIGHT #
> +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
> INCIDENTAL, #
> +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> #
> +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
> USE, #
> +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
> ON ANY #
> +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT #
> +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
> USE #
> +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> +
> +"""
> +DPDK Test suite.
> +
> +Test VXLAN-GPE behaviour in DPDK.
> +
> +"""
> +
> +import utils
> +import os
> +import re
> +from pmd_output import PmdOutput
> +from scapy.utils import wrpcap, rdpcap
> +from scapy.layers.inet import Ether, IP, UDP from scapy.layers.l2
> +import Dot1Q from vxlan import Vxlan from scapy.config import conf from
> +test_case import TestCase from settings import FOLDERS
> +
> +#
> +#
> +# Test class.
> +#
> +
> +VXLAN_GPE_PORT = 4790
> +
> +
> +class VxlanGpeTestConfig(object):
> +
> +    """
> +    Module for config/create/transmit vxlan packet
> +    """
> +
> +    def __init__(self, test_case, **kwargs):
> +        self.test_case = test_case
> +        self.init()
> +        for name in kwargs:
> +            setattr(self, name, kwargs[name])
> +
> +    def init(self):
> +        self.packets_config()
> +
> +    def packets_config(self):
> +        """
> +        Default vxlan packet format
> +        """
> +        self.pcap_file = '/root/vxlan_gpe.pcap'
> +        self.outer_mac_dst = '11:22:33:44:55:66'
> +        self.outer_ip_dst = '192.168.1.2'
> +        self.outer_udp_dst = VXLAN_GPE_PORT
> +        self.vni = 1
> +        self.inner_mac_dst = '00:00:20:00:00:01'
> +        self.inner_vlan = 'N/A'
> +        self.inner_ip_dst = '192.168.2.2'
> +        self.payload_size = 18
> +
> +    def create_pcap(self):
> +        """
> +        Create pcap file and copy it to tester
> +        """
> +        self.inner_payload = ("X" * self.payload_size)
> +
> +        if self.inner_vlan != 'N/A':
> +            inner = Ether() / Dot1Q() / IP() / UDP() / self.inner_payload
> +            inner[Dot1Q].vlan = self.inner_vlan
> +        else:
> +            inner = Ether() / IP() / UDP() / self.inner_payload
> +
> +        inner[IP().name].dst = self.inner_ip_dst
> +
> +        inner[Ether].dst = self.inner_mac_dst
> +
> +        outer = Ether() / IP() / UDP()
> +
> +        outer[Ether].dst = self.outer_mac_dst
> +
> +        outer[IP().name].dst = self.outer_ip_dst
> +
> +        outer[UDP].dport = self.outer_udp_dst
> +
> +        if self.outer_udp_dst == VXLAN_GPE_PORT:
> +            self.pkt = outer / Vxlan(vni=self.vni) / inner
> +        else:
> +            self.pkt = outer / ("X" * self.payload_size)
> +
> +        wrpcap(self.pcap_file, self.pkt)
> +
> +    def send_pcap(self, iface=""):
> +        """
> +        Send vxlan pcap file by iface
> +        """
> +        # load vxlan module to scapy
> +        cwd = os.getcwd()
> +        dir_vxlan_module = cwd + r'/' + FOLDERS['Depends']
> +        self.test_case.tester.scapy_append("sys.path.append('%s')" %
> dir_vxlan_module)
> +        self.test_case.tester.scapy_append("from vxlan import Vxlan")
> +        self.test_case.tester.scapy_append(
> +            'pcap = rdpcap("%s")' % self.pcap_file)
> +        self.test_case.tester.scapy_append(
> +            'sendp(pcap, iface="%s")' % iface)
> +        self.test_case.tester.scapy_execute()
> +
> +
> +class TestVxlanGpeSupportInI40e(TestCase):
> +
> +    def set_up_all(self):
> +        """
> +        vxlan Prerequisites
> +        """
> +        # this feature only enable in FVL now
> +        if self.nic not in ["fortville_eagle", "fortville_spirit",
> "fortville_spirit_single", "fortville_25g"]:
> +            self.verify(False, "%s not support this vxlan-gpe" % self.nic)
> +        # Based on h/w type, choose how many ports to use
> +        ports = self.dut.get_ports()
> +
> +        # Verify that enough ports are available
> +        self.verify(len(ports) >= 2, "Insufficient ports for testing")
> +        global valports
> +        valports = [_ for _ in ports if self.tester.get_local_port(_)
> + != -1]
> +
> +        self.portMask = utils.create_mask(valports[:2])
> +
> +        # Verify that enough threads are available
> +        netdev = self.dut.ports_info[ports[0]]['port']
> +        self.ports_socket = netdev.socket
> +        cores = self.dut.get_core_list("all", socket=self.ports_socket)
> +        self.verify(cores is not None, "Insufficient cores for speed testing")
> +        self.coremask = utils.create_mask(cores)
> +
> +        # start testpmd
> +        self.pmdout = PmdOutput(self.dut)
> +
> +        # init port config
> +        self.dut_port = valports[0]
> +        self.dut_port_mac = self.dut.get_mac_address(self.dut_port)
> +        tester_port = self.tester.get_local_port(self.dut_port)
> +        self.tester_iface = self.tester.get_interface(tester_port)
> +
> +    def set_up(self):
> +        """
> +        Run before each test case.
> +        """
> +        pass
> +
> +    def filter_and_check(self, filter_type="imac-ivlan", queue_id=3,
> +                         vlan=False, remove=False):
> +        """
> +        send vxlan packet and check whether receive packet in assigned queue
> +        """
> +        if vlan is not False:
> +            config = VxlanGpeTestConfig(self, inner_vlan=vlan)
> +            vlan_id = vlan
> +        else:
> +            config = VxlanGpeTestConfig(self)
> +            vlan_id = 1
> +
> +        # now cloud filter will default enable L2 mac filter, so dst mac must
> +        # be same
> +        config.outer_mac_dst = self.dut_port_mac
> +
> +        args = [self.dut_port, config.outer_mac_dst, config.inner_mac_dst,
> +                config.inner_ip_dst, vlan_id, filter_type, config.vni,
> +                queue_id]
> +
> +        self.tunnel_filter_add(*args)
> +
> +        # invalid case request to remove tunnel filter
> +        if remove is True:
> +            queue_id = 0
> +            args = [self.dut_port, config.outer_mac_dst, config.inner_mac_dst,
> +                    config.inner_ip_dst, vlan_id, filter_type, config.vni,
> +                    queue_id]
> +            self.tunnel_filter_del(*args)
> +
> +        # send vxlan packet
> +        config.create_pcap()
> +        self.dut.send_expect("start", "testpmd>", 10)
> +        config.send_pcap(self.tester_iface)
> +        out = self.dut.get_session_output(timeout=2)
> +
> +        queue = -1
> +        pattern = re.compile("- Receive queue=0x(\d)")
> +        m = pattern.search(out)
> +        if m is not None:
> +            queue = m.group(1)
> +
> +        # verify received in expected queue
> +        self.verify(queue_id == int(queue), "invalid receive queue")
> +
> +        self.dut.send_expect("stop", "testpmd>", 10)
> +
> +    def test_vxlan_gpe_ipv4_detect(self):
> +        self.pmdout.start_testpmd('all')
> +        self.pmdout.execute_cmd('set fwd io')
> +        self.pmdout.execute_cmd('set verbose 1')
> +        # add VXLAN-GPE packet type
> +        self.pmdout.execute_cmd('port config 0 udp_tunnel_port add vxlan-
> gpe %s' % VXLAN_GPE_PORT)
> +        self.pmdout.execute_cmd('start')
> +        mac = self.pmdout.get_port_mac(0)
> +        # send one VXLAN-GPE type packet
> +        packet = 'sendp([Ether(dst="%s")/IP(src="18.0.0.1")/UDP(dport=%d,
> sport=43)/' % (mac, VXLAN_GPE_PORT) + \
> +                 'VXLAN(flags=12)/IP(src="10.0.0.1")], iface="%s", count=1)' %
> self.tester_iface
> +        self.tester.scapy_append(packet)
> +        self.tester.scapy_execute()
> +        out = self.dut.get_session_output(timeout=5)
> +        print out
> +        self.verify('L3_IPV4_EXT_UNKNOWN' in out and '%s' %
> + VXLAN_GPE_PORT in out, 'no detect vxlan-gpe packet')
> +
> +        # delete the VXLAN-GPE packet type, testpmd should treat the packet
> as a normal UDP packet
> +        self.pmdout.execute_cmd('port config 0 udp_tunnel_port rm vxlan-
> gpe %s' % VXLAN_GPE_PORT)
> +        self.tester.scapy_append(packet)
> +        self.tester.scapy_execute()
> +        out = self.dut.get_session_output(timeout=5)
> +        print out
> +        self.pmdout.execute_cmd('quit', '#')
> +        self.verify('L3_IPV4_EXT_UNKNOWN' in out and '%s' %
> + VXLAN_GPE_PORT not in out, 'no detect vxlan-gpe packet')
> +
> +    def test_tunnel_filter_vxlan_gpe(self):
> +        """
> +        verify tunnel filter feature
> +        """
> +        pmd_temp = "./%(TARGET)s/app/testpmd -c %(COREMASK)s -n " + \
> +            "%(CHANNEL)d -- -i --disable-rss --rxq=4 --txq=4" + \
> +            " --nb-cores=4 --portmask=%(PORT)s"
> +        pmd_cmd = pmd_temp % {'TARGET': self.target,
> +                              'COREMASK': self.coremask,
> +                              'CHANNEL': self.dut.get_memory_channels(),
> +                              'PORT': self.portMask}
> +        self.dut.send_expect(pmd_cmd, "testpmd>", 30)
> +
> +        self.dut.send_expect("set fwd rxonly", "testpmd>", 10)
> +        self.dut.send_expect("set verbose 1", "testpmd>", 10)
> +        self.enable_vxlan(self.dut_port)
> +
> +        # check inner mac + inner vlan filter can work
> +        self.filter_and_check(filter_type="imac-ivlan", vlan=1)
> +        # check inner mac + inner vlan + tunnel id filter can work
> +        self.filter_and_check(filter_type="imac-ivlan-tenid", vlan=1)
> +        # check inner mac + tunnel id filter can work
> +        self.filter_and_check(filter_type="imac-tenid")
> +        # check inner mac filter can work
> +        self.filter_and_check(filter_type="imac")
> +        # check outer mac + inner mac + tunnel id filter can work
> +        self.filter_and_check(filter_type="omac-imac-tenid")
> +        # iip not supported by now
> +        self.filter_and_check(filter_type="iip")
> +
> +        self.dut.send_expect("quit", "#", 10)
> +
> +    def enable_vxlan(self, port):
> +        self.dut.send_expect("rx_vxlan_port add %d %d"
> +                             % (VXLAN_GPE_PORT, port),
> +                             "testpmd>", 10)
> +
> +    def tunnel_filter_add(self, *args):
> +        # tunnel_filter add port_id outer_mac inner_mac ip inner_vlan
> +        # tunnel_type(vxlan)
> +        # filter_type
> +        # (imac-ivlan|imac-ivlan-tenid|imac-tenid|imac|omac-imac-tenid|iip)
> +        # tenant_id queue_num
> +        out = self.dut.send_expect("tunnel_filter add %d " % args[0] +
> +                                   "%s %s %s " % (args[1], args[2], args[3]) +
> +                                   "%d vxlan-gpe %s " % (args[4], args[5]) +
> +                                   "%d %d" % (args[6], args[7]),
> +                                   "testpmd>", 10)
> +        self.verify("Bad arguments" not in out, "Failed to add tunnel filter")
> +        self.verify("error" not in out, "Failed to add tunnel filter")
> +
> +    def tunnel_filter_del(self, *args):
> +        out = self.dut.send_expect("tunnel_filter rm %d " % args[0] +
> +                                   "%s %s %s " % (args[1], args[2], args[3]) +
> +                                   "%d vxlan-gpe %s " % (args[4], args[5]) +
> +                                   "%d %d" % (args[6], args[7]),
> +                                   "testpmd>", 10)
> +        self.verify("Bad arguments" not in out,
> +                    "Failed to remove tunnel filter")
> +        self.verify("error" not in out, "Failed to remove tunnel
> + filter")
> +
> +    def tear_down(self):
> +        """
> +        Run after each test case.
> +        """
> +        self.dut.kill_all()
> +
> +    def tear_down_all(self):
> +        """
> +        Run after each test suite.
> +        """
> +        pass
> --
> 2.17.1


^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2019-06-12  8:22 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-06-05 11:26 [dts] [PATCH V1] tests add new testsuite vxlan gpe support in i40e zhaomeijuan
2019-06-05  8:14 ` Zhang, Yuwei1
2019-06-12  8:22 ` Tu, Lijuan

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).