* [dts] [PATCH V1] add test suite link_status_interrupt
@ 2017-02-10 6:54 xu,gang
2017-02-20 7:46 ` Liu, Yong
0 siblings, 1 reply; 4+ messages in thread
From: xu,gang @ 2017-02-10 6:54 UTC (permalink / raw)
To: dts; +Cc: xu,gang
Signed-off-by: xu,gang <gangx.xu@intel.com>
---
tests/TestSuite_link_status_interrupt.py | 288 ++++++++++++++-----------------
1 file changed, 132 insertions(+), 156 deletions(-)
diff --git a/tests/TestSuite_link_status_interrupt.py b/tests/TestSuite_link_status_interrupt.py
index c50a99d..ae205cc 100644
--- a/tests/TestSuite_link_status_interrupt.py
+++ b/tests/TestSuite_link_status_interrupt.py
@@ -1,87 +1,77 @@
-# <COPYRIGHT_TAG>
+# BSD LICENSE
+#
+# Copyright(c) 2010-2017 Intel Corporation. All rights reserved.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Intel Corporation nor the names of its
+# contributors may be used to endorse or promote products derived
+# from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
"""
DPDK Test suite.
-
-Link Status Detection
-
+Test link status.
"""
-# NOTE: These tests generally won't work in automated mode since the
-# link doesn't stay down unless the cable is actually removed. The code
-# is left here for legacy reasons.
-
-
import utils
+import string
+import time
import re
-
-testPorts = []
-intr_mode = ['"intr_mode=random"',
- '"intr_mode=msix"', '"intr_mode=legacy"', '']
-intr_mode_output = ['Error: bad parameter - random', 'Use MSIX interrupt',
- 'Use legacy interrupt', 'Use MSIX interrupt by default']
-
-
from test_case import TestCase
-
-#
-#
-# Test class.
-#
+from packet import Packet, sniff_packets, load_sniff_packets
class TestLinkStatusInterrupt(TestCase):
- #
- #
- #
- # Test cases.
- #
-
def set_up_all(self):
"""
Run at the start of each test suite.
-
-
- Link Status Interrupt Prerequisites
"""
+ self.dut_ports = self.dut.get_ports(self.nic)
+ self.verify(len(self.dut_ports) >= 2, "Insufficient ports")
+ cores = self.dut.get_core_list("1S/4C/1T")
+ self.coremask = utils.create_mask(cores)
+ self.portmask = utils.create_mask(self.dut_ports)
+
+ self.path = "./examples/link_status_interrupt/build/link_status_interrupt"
- dutPorts = self.dut.get_ports(self.nic)
- self.verify(len(dutPorts) > 1, "Insufficient ports for " + self.nic)
- for n in range(2):
- testPorts.append(dutPorts[n])
- inter = self.tester.get_interface(
- self.tester.get_local_port(testPorts[n]))
- self.tester.send_expect("ifconfig %s up" % inter, "# ", 5)
- out = self.dut.send_expect(
- "make -C examples/link_status_interrupt", "# ")
- self.verify("Error" not in out, "Compilation error 1")
- self.verify("No such file" not in out, "Compilation error 2")
+ # build sample app
+ out = self.dut.build_dpdk_apps("./examples/link_status_interrupt")
+ self.verify("Error" not in out, "compilation error 1")
+ self.verify("No such file" not in out, "compilation error 2")
def set_link_status_and_verify(self, dutPort, status):
"""
- In registered callback...
- Event type: LSC interrupt
- Port 0 Link Up - speed 10000 Mbps - full-duplex
-
- In registered callback...
- Event type: LSC interrupt
- Port 0 Link Down
+ set link status verify results
"""
-
- inter = self.tester.get_interface(self.tester.get_local_port(dutPort))
- self.tester.send_expect("ifconfig %s %s" % (inter, status), "# ", 10)
- self.dut.send_expect("", "Port %s Link %s" %
- (dutPort, status.capitalize()), 60)
- out = self.dut.send_expect("", "Aggregate statistics", 60)
- exp = r"Statistics for port (\d+) -+\r\n" + \
- "Link status:\s+Link (up|down)\r\n"
- pattern = re.compile(exp)
- info = pattern.findall(out)
- if info[0][0] == repr(dutPort):
- self.verify(info[0][1] == status, "Link status change port error")
- else:
- self.verify(info[1][1] == status, "Link status change hello error")
+ self.intf = self.tester.get_interface(
+ self.tester.get_local_port(dutPort))
+ self.tester.send_expect("ifconfig %s %s" %
+ (self.intf, status.lower()), "# ", 10)
+ verify_point = "Port %s Link %s" % (dutPort, status)
+ self.dut.send_expect("", verify_point, 60)
def set_up(self):
"""
@@ -91,111 +81,97 @@ class TestLinkStatusInterrupt(TestCase):
def test_link_status_interrupt_change(self):
"""
- Link status change.
+ Verify Link status change
"""
-
- memChannel = self.dut.get_memory_channels()
- portMask = utils.create_mask(testPorts)
- if self.drivername in ["igb_uio"]:
- cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s -- -q 2 -p %s" % (
- memChannel, portMask)
- elif self.drivername in ["vfio-pci"]:
- cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s --vfio-intr=intx -- -q 2 -p %s" % (
- memChannel, portMask)
- else:
- print "unknow driver"
- for n in range(len(intr_mode)):
- if self.drivername in ["igb_uio"]:
- self.dut.send_expect("rmmod igb_uio", "# ")
+ if self.drivername == "igb_uio":
+ cmdline = self.path + " -c %s -n %s -- -p %s " % (
+ self.coremask, self.dut.get_memory_channels(), self.portmask)
+ for mode in ["legacy", "msix"]:
+ self.dut.send_expect("rmmod -f igb_uio", "#", 20)
self.dut.send_expect(
- "insmod %s/kmod/igb_uio.ko %s" % (self.target, intr_mode[n]), "# ")
- out = self.dut.send_expect(
- "dmesg -c | grep '\<%s\>'" % (intr_mode_output[n]), "# ")
- self.verify(
- intr_mode_output[n] in out, "Fail to insmod igb_uio " + intr_mode[n])
- if n == 0:
- continue
- self.dut.send_expect(cmdline, "Aggregate statistics", 605)
- self.dut.send_expect(
- "", "Port %s Link Up.+\r\n" % (testPorts[1]), 5)
- self.set_link_status_and_verify(testPorts[0], 'down')
- self.set_link_status_and_verify(testPorts[0], 'up')
- self.set_link_status_and_verify(testPorts[1], 'down')
- self.set_link_status_and_verify(testPorts[1], 'up')
- self.dut.send_expect("^C", "# ")
+ 'insmod %s/kmod/igb_uio.ko "intr_mode=%s"' % (self.target, mode), "# ")
+ self.dut.bind_interfaces_linux()
+ self.dut.send_expect(cmdline, "Aggregate statistics", 60)
+ self.set_link_status_and_verify(self.dut_ports[0], 'Down')
+ self.set_link_status_and_verify(self.dut_ports[0], 'Up')
+ self.dut.send_expect("^C", "#", 60)
+ elif self.drivername == "vfio-pci":
+ for mode in ["legacy", "msi", "msix"]:
+ cmdline = self.path + " -c %s -n %s --vfio-intr=%s -- -p %s" % (
+ self.coremask, self.dut.get_memory_channels(), mode, self.portmask)
+ self.dut.send_expect(cmdline, "Aggregate statistics", 60)
+ self.set_link_status_and_verify(self.dut_ports[0], 'Down')
+ self.set_link_status_and_verify(self.dut_ports[0], 'Up')
+ self.dut.send_expect("^C", "#", 60)
def test_link_status_interrupt_port_available(self):
"""
- Port available.
+ interrupt all port link, then link them,
+ sent packet, check packets received.
"""
-
- memChannel = self.dut.get_memory_channels()
- portMask = utils.create_mask(testPorts)
- if self.drivername in ["igb_uio"]:
- cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s -- -q 2 -p %s" % (
- memChannel, portMask)
- elif self.drivername in ["vfio-pci"]:
- cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s --vfio-intr=intx -- -q 2 -p %s " % (
- memChannel, portMask)
- else:
- print "unknow driver"
- for n in range(1, len(intr_mode)):
- if self.drivername in ["igb_uio"]:
- self.dut.send_expect("rmmod igb_uio", "# ")
+ if self.drivername == "igb_uio":
+ cmdline = self.path + " -c %s -n %s -- -p %s " % (
+ self.coremask, self.dut.get_memory_channels(), self.portmask)
+ for mode in ["legacy", "msix"]:
+ self.dut.send_expect("rmmod -f igb_uio", "#", 20)
self.dut.send_expect(
- "insmod %s/kmod/igb_uio.ko %s" % (self.target, intr_mode[n]), "# ")
- out = self.dut.send_expect(
- "dmesg -c | grep '\<%s\>'" % (intr_mode_output[n]), "# ")
- self.verify(
- intr_mode_output[n] in out, "Fail to insmod igb_uio " + intr_mode[n])
- self.dut.send_expect(cmdline, "Aggregate statistics", 60)
- self.dut.send_expect(
- "", "Port %s Link Up.+\r\n" % (testPorts[1]), 5)
- self.set_link_status_and_verify(testPorts[0], 'down')
- self.set_link_status_and_verify(testPorts[1], 'down')
- self.set_link_status_and_verify(testPorts[0], 'up')
- self.set_link_status_and_verify(testPorts[1], 'up')
- for m in [0, 1]:
- txPort = self.tester.get_local_port(testPorts[m])
- rxPort = self.tester.get_local_port(testPorts[1 - m])
- txItf = self.tester.get_interface(txPort)
- rxItf = self.tester.get_interface(rxPort)
- self.tester.scapy_background()
- self.tester.scapy_append(
- 'p = sniff(iface="%s", count=1)' % rxItf)
- self.tester.scapy_append('nr_packets=len(p)')
- self.tester.scapy_append('RESULT = str(nr_packets)')
- self.tester.scapy_foreground()
- self.tester.scapy_append(
- 'sendp([Ether()/IP()/UDP()/("X"*46)], iface="%s")' % txItf)
- self.tester.scapy_execute()
- nr_packets = self.tester.scapy_get_result()
- self.verify(nr_packets == "1", "Fail to switch L2 frame")
- self.dut.send_expect("^C", "# ")
-
- def test_link_status_interrupt_recovery(self):
- """
- Recovery.
- """
- if self.drivername in ["igb_uio"]:
- self.dut.send_expect("^C", "# ")
- self.dut.send_expect("rmmod igb_uio", "# ")
- self.dut.send_expect(
- "insmod %s/kmod/igb_uio.ko" % (self.target), "# ")
- out = self.dut.send_expect(
- "dmesg -c | grep '\<Use MSIX interrupt by default\>'", "# ")
- self.verify(
- 'Use MSIX interrupt by default' in out, "Fail to recovery default igb_uio")
- elif self.drivername in ["vfio-pci"]:
- self.verify(Ture, "not need run this case, when used vfio driver")
- else:
- print "unknow driver"
+ 'insmod %s/kmod/igb_uio.ko "intr_mode=%s"' % (self.target, mode), "# ")
+ self.dut.bind_interfaces_linux()
+ self.dut.send_expect(cmdline, "Aggregate statistics", 60)
+ for port in self.dut_ports:
+ self.set_link_status_and_verify(
+ self.dut_ports[port], 'Down')
+ time.sleep(10)
+ for port in self.dut_ports:
+ self.set_link_status_and_verify(self.dut_ports[port], 'Up')
+ self.scapy_send_packet(1)
+ out = self.dut.get_session_output(timeout=60)
+ pkt_send = re.findall("Total packets sent:\s*(\d*)", out)
+ pkt_received = re.findall(
+ "Total packets received:\s*(\d*)", out)
+ self.verify(pkt_send == pkt_received == '1',
+ "Error: sent packets != received packets")
+ self.dut.send_expect("^C", "#", 60)
+ elif self.drivername == "vfio-pci":
+ for mode in ["legacy", "msi", "msix"]:
+ cmdline = self.path + " -c %s -n %s --vfio-intr=%s -- -p %s" % (
+ self.coremask, self.dut.get_memory_channels(), mode, self.portmask)
+ self.dut.send_expect(cmdline, "Aggregate statistics", 60)
+ for port in self.dut_ports:
+ self.set_link_status_and_verify(
+ self.dut_ports[port], 'Down')
+ time.sleep(10)
+ for port in self.dut_ports:
+ self.set_link_status_and_verify(self.dut_ports[port], 'Up')
+ self.scapy_send_packet(1)
+ out = self.dut.get_session_output(timeout=60)
+ pkt_send = re.findall("Total packets sent:\s*(\d*)", out)
+ pkt_received = re.findall(
+ "Total packets received:\s*(\d*)", out)
+ self.verify(pkt_send == pkt_received == '1',
+ "Error: sent packets != received packets")
+ self.dut.send_expect("^C", "#", 60)
+
+ def scapy_send_packet(self, num=1):
+ """
+ Send a packet to port
+ """
+ self.dmac = self.dut.get_mac_address(self.dut_ports[0])
+ txport = self.tester.get_local_port(self.dut_ports[0])
+ self.txItf = self.tester.get_interface(txport)
+ pkt = Packet(pkt_type='UDP')
+ pkt.config_layer('ether', {'dst': self.dmac})
+ pkt.send_pkt(tx_port=self.txItf, count=num)
def tear_down(self):
"""
Run after each test case.
"""
- pass
+ self.dut.kill_all()
+ time.sleep(2)
+ for port in self.dut_ports:
+ intf = self.tester.get_interface(self.tester.get_local_port(port))
+ self.tester.send_expect("ifconfig %s up" % intf, "# ", 10)
def tear_down_all(self):
"""
--
1.9.3
^ permalink raw reply [flat|nested] 4+ messages in thread
* Re: [dts] [PATCH V1] add test suite link_status_interrupt
2017-02-10 6:54 [dts] [PATCH V1] add test suite link_status_interrupt xu,gang
@ 2017-02-20 7:46 ` Liu, Yong
0 siblings, 0 replies; 4+ messages in thread
From: Liu, Yong @ 2017-02-20 7:46 UTC (permalink / raw)
To: xu,gang, dts
Thanks gang, applied. But the patch subject is not align with content in
it. Look like there's some code optimization in this patch.
On 02/10/2017 02:54 PM, xu,gang wrote:
> Signed-off-by: xu,gang <gangx.xu@intel.com>
> ---
> tests/TestSuite_link_status_interrupt.py | 288 ++++++++++++++-----------------
> 1 file changed, 132 insertions(+), 156 deletions(-)
>
> diff --git a/tests/TestSuite_link_status_interrupt.py b/tests/TestSuite_link_status_interrupt.py
> index c50a99d..ae205cc 100644
> --- a/tests/TestSuite_link_status_interrupt.py
> +++ b/tests/TestSuite_link_status_interrupt.py
> @@ -1,87 +1,77 @@
> -# <COPYRIGHT_TAG>
> +# BSD LICENSE
> +#
> +# Copyright(c) 2010-2017 Intel Corporation. All rights reserved.
> +# All rights reserved.
> +#
> +# Redistribution and use in source and binary forms, with or without
> +# modification, are permitted provided that the following conditions
> +# are met:
> +#
> +# * Redistributions of source code must retain the above copyright
> +# notice, this list of conditions and the following disclaimer.
> +# * Redistributions in binary form must reproduce the above copyright
> +# notice, this list of conditions and the following disclaimer in
> +# the documentation and/or other materials provided with the
> +# distribution.
> +# * Neither the name of Intel Corporation nor the names of its
> +# contributors may be used to endorse or promote products derived
> +# from this software without specific prior written permission.
> +#
> +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> +
>
> """
> DPDK Test suite.
> -
> -Link Status Detection
> -
> +Test link status.
> """
>
> -# NOTE: These tests generally won't work in automated mode since the
> -# link doesn't stay down unless the cable is actually removed. The code
> -# is left here for legacy reasons.
> -
> -
> import utils
> +import string
> +import time
> import re
> -
> -testPorts = []
> -intr_mode = ['"intr_mode=random"',
> - '"intr_mode=msix"', '"intr_mode=legacy"', '']
> -intr_mode_output = ['Error: bad parameter - random', 'Use MSIX interrupt',
> - 'Use legacy interrupt', 'Use MSIX interrupt by default']
> -
> -
> from test_case import TestCase
> -
> -#
> -#
> -# Test class.
> -#
> +from packet import Packet, sniff_packets, load_sniff_packets
>
>
> class TestLinkStatusInterrupt(TestCase):
>
> - #
> - #
> - #
> - # Test cases.
> - #
> -
> def set_up_all(self):
> """
> Run at the start of each test suite.
> -
> -
> - Link Status Interrupt Prerequisites
> """
> + self.dut_ports = self.dut.get_ports(self.nic)
> + self.verify(len(self.dut_ports) >= 2, "Insufficient ports")
> + cores = self.dut.get_core_list("1S/4C/1T")
> + self.coremask = utils.create_mask(cores)
> + self.portmask = utils.create_mask(self.dut_ports)
> +
> + self.path = "./examples/link_status_interrupt/build/link_status_interrupt"
>
> - dutPorts = self.dut.get_ports(self.nic)
> - self.verify(len(dutPorts) > 1, "Insufficient ports for " + self.nic)
> - for n in range(2):
> - testPorts.append(dutPorts[n])
> - inter = self.tester.get_interface(
> - self.tester.get_local_port(testPorts[n]))
> - self.tester.send_expect("ifconfig %s up" % inter, "# ", 5)
> - out = self.dut.send_expect(
> - "make -C examples/link_status_interrupt", "# ")
> - self.verify("Error" not in out, "Compilation error 1")
> - self.verify("No such file" not in out, "Compilation error 2")
> + # build sample app
> + out = self.dut.build_dpdk_apps("./examples/link_status_interrupt")
> + self.verify("Error" not in out, "compilation error 1")
> + self.verify("No such file" not in out, "compilation error 2")
>
> def set_link_status_and_verify(self, dutPort, status):
> """
> - In registered callback...
> - Event type: LSC interrupt
> - Port 0 Link Up - speed 10000 Mbps - full-duplex
> -
> - In registered callback...
> - Event type: LSC interrupt
> - Port 0 Link Down
> + set link status verify results
> """
> -
> - inter = self.tester.get_interface(self.tester.get_local_port(dutPort))
> - self.tester.send_expect("ifconfig %s %s" % (inter, status), "# ", 10)
> - self.dut.send_expect("", "Port %s Link %s" %
> - (dutPort, status.capitalize()), 60)
> - out = self.dut.send_expect("", "Aggregate statistics", 60)
> - exp = r"Statistics for port (\d+) -+\r\n" + \
> - "Link status:\s+Link (up|down)\r\n"
> - pattern = re.compile(exp)
> - info = pattern.findall(out)
> - if info[0][0] == repr(dutPort):
> - self.verify(info[0][1] == status, "Link status change port error")
> - else:
> - self.verify(info[1][1] == status, "Link status change hello error")
> + self.intf = self.tester.get_interface(
> + self.tester.get_local_port(dutPort))
> + self.tester.send_expect("ifconfig %s %s" %
> + (self.intf, status.lower()), "# ", 10)
> + verify_point = "Port %s Link %s" % (dutPort, status)
> + self.dut.send_expect("", verify_point, 60)
>
> def set_up(self):
> """
> @@ -91,111 +81,97 @@ class TestLinkStatusInterrupt(TestCase):
>
> def test_link_status_interrupt_change(self):
> """
> - Link status change.
> + Verify Link status change
> """
> -
> - memChannel = self.dut.get_memory_channels()
> - portMask = utils.create_mask(testPorts)
> - if self.drivername in ["igb_uio"]:
> - cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s -- -q 2 -p %s" % (
> - memChannel, portMask)
> - elif self.drivername in ["vfio-pci"]:
> - cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s --vfio-intr=intx -- -q 2 -p %s" % (
> - memChannel, portMask)
> - else:
> - print "unknow driver"
> - for n in range(len(intr_mode)):
> - if self.drivername in ["igb_uio"]:
> - self.dut.send_expect("rmmod igb_uio", "# ")
> + if self.drivername == "igb_uio":
> + cmdline = self.path + " -c %s -n %s -- -p %s " % (
> + self.coremask, self.dut.get_memory_channels(), self.portmask)
> + for mode in ["legacy", "msix"]:
> + self.dut.send_expect("rmmod -f igb_uio", "#", 20)
> self.dut.send_expect(
> - "insmod %s/kmod/igb_uio.ko %s" % (self.target, intr_mode[n]), "# ")
> - out = self.dut.send_expect(
> - "dmesg -c | grep '\<%s\>'" % (intr_mode_output[n]), "# ")
> - self.verify(
> - intr_mode_output[n] in out, "Fail to insmod igb_uio " + intr_mode[n])
> - if n == 0:
> - continue
> - self.dut.send_expect(cmdline, "Aggregate statistics", 605)
> - self.dut.send_expect(
> - "", "Port %s Link Up.+\r\n" % (testPorts[1]), 5)
> - self.set_link_status_and_verify(testPorts[0], 'down')
> - self.set_link_status_and_verify(testPorts[0], 'up')
> - self.set_link_status_and_verify(testPorts[1], 'down')
> - self.set_link_status_and_verify(testPorts[1], 'up')
> - self.dut.send_expect("^C", "# ")
> + 'insmod %s/kmod/igb_uio.ko "intr_mode=%s"' % (self.target, mode), "# ")
> + self.dut.bind_interfaces_linux()
> + self.dut.send_expect(cmdline, "Aggregate statistics", 60)
> + self.set_link_status_and_verify(self.dut_ports[0], 'Down')
> + self.set_link_status_and_verify(self.dut_ports[0], 'Up')
> + self.dut.send_expect("^C", "#", 60)
> + elif self.drivername == "vfio-pci":
> + for mode in ["legacy", "msi", "msix"]:
> + cmdline = self.path + " -c %s -n %s --vfio-intr=%s -- -p %s" % (
> + self.coremask, self.dut.get_memory_channels(), mode, self.portmask)
> + self.dut.send_expect(cmdline, "Aggregate statistics", 60)
> + self.set_link_status_and_verify(self.dut_ports[0], 'Down')
> + self.set_link_status_and_verify(self.dut_ports[0], 'Up')
> + self.dut.send_expect("^C", "#", 60)
>
> def test_link_status_interrupt_port_available(self):
> """
> - Port available.
> + interrupt all port link, then link them,
> + sent packet, check packets received.
> """
> -
> - memChannel = self.dut.get_memory_channels()
> - portMask = utils.create_mask(testPorts)
> - if self.drivername in ["igb_uio"]:
> - cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s -- -q 2 -p %s" % (
> - memChannel, portMask)
> - elif self.drivername in ["vfio-pci"]:
> - cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s --vfio-intr=intx -- -q 2 -p %s " % (
> - memChannel, portMask)
> - else:
> - print "unknow driver"
> - for n in range(1, len(intr_mode)):
> - if self.drivername in ["igb_uio"]:
> - self.dut.send_expect("rmmod igb_uio", "# ")
> + if self.drivername == "igb_uio":
> + cmdline = self.path + " -c %s -n %s -- -p %s " % (
> + self.coremask, self.dut.get_memory_channels(), self.portmask)
> + for mode in ["legacy", "msix"]:
> + self.dut.send_expect("rmmod -f igb_uio", "#", 20)
> self.dut.send_expect(
> - "insmod %s/kmod/igb_uio.ko %s" % (self.target, intr_mode[n]), "# ")
> - out = self.dut.send_expect(
> - "dmesg -c | grep '\<%s\>'" % (intr_mode_output[n]), "# ")
> - self.verify(
> - intr_mode_output[n] in out, "Fail to insmod igb_uio " + intr_mode[n])
> - self.dut.send_expect(cmdline, "Aggregate statistics", 60)
> - self.dut.send_expect(
> - "", "Port %s Link Up.+\r\n" % (testPorts[1]), 5)
> - self.set_link_status_and_verify(testPorts[0], 'down')
> - self.set_link_status_and_verify(testPorts[1], 'down')
> - self.set_link_status_and_verify(testPorts[0], 'up')
> - self.set_link_status_and_verify(testPorts[1], 'up')
> - for m in [0, 1]:
> - txPort = self.tester.get_local_port(testPorts[m])
> - rxPort = self.tester.get_local_port(testPorts[1 - m])
> - txItf = self.tester.get_interface(txPort)
> - rxItf = self.tester.get_interface(rxPort)
> - self.tester.scapy_background()
> - self.tester.scapy_append(
> - 'p = sniff(iface="%s", count=1)' % rxItf)
> - self.tester.scapy_append('nr_packets=len(p)')
> - self.tester.scapy_append('RESULT = str(nr_packets)')
> - self.tester.scapy_foreground()
> - self.tester.scapy_append(
> - 'sendp([Ether()/IP()/UDP()/("X"*46)], iface="%s")' % txItf)
> - self.tester.scapy_execute()
> - nr_packets = self.tester.scapy_get_result()
> - self.verify(nr_packets == "1", "Fail to switch L2 frame")
> - self.dut.send_expect("^C", "# ")
> -
> - def test_link_status_interrupt_recovery(self):
> - """
> - Recovery.
> - """
> - if self.drivername in ["igb_uio"]:
> - self.dut.send_expect("^C", "# ")
> - self.dut.send_expect("rmmod igb_uio", "# ")
> - self.dut.send_expect(
> - "insmod %s/kmod/igb_uio.ko" % (self.target), "# ")
> - out = self.dut.send_expect(
> - "dmesg -c | grep '\<Use MSIX interrupt by default\>'", "# ")
> - self.verify(
> - 'Use MSIX interrupt by default' in out, "Fail to recovery default igb_uio")
> - elif self.drivername in ["vfio-pci"]:
> - self.verify(Ture, "not need run this case, when used vfio driver")
> - else:
> - print "unknow driver"
> + 'insmod %s/kmod/igb_uio.ko "intr_mode=%s"' % (self.target, mode), "# ")
> + self.dut.bind_interfaces_linux()
> + self.dut.send_expect(cmdline, "Aggregate statistics", 60)
> + for port in self.dut_ports:
> + self.set_link_status_and_verify(
> + self.dut_ports[port], 'Down')
> + time.sleep(10)
> + for port in self.dut_ports:
> + self.set_link_status_and_verify(self.dut_ports[port], 'Up')
> + self.scapy_send_packet(1)
> + out = self.dut.get_session_output(timeout=60)
> + pkt_send = re.findall("Total packets sent:\s*(\d*)", out)
> + pkt_received = re.findall(
> + "Total packets received:\s*(\d*)", out)
> + self.verify(pkt_send == pkt_received == '1',
> + "Error: sent packets != received packets")
> + self.dut.send_expect("^C", "#", 60)
> + elif self.drivername == "vfio-pci":
> + for mode in ["legacy", "msi", "msix"]:
> + cmdline = self.path + " -c %s -n %s --vfio-intr=%s -- -p %s" % (
> + self.coremask, self.dut.get_memory_channels(), mode, self.portmask)
> + self.dut.send_expect(cmdline, "Aggregate statistics", 60)
> + for port in self.dut_ports:
> + self.set_link_status_and_verify(
> + self.dut_ports[port], 'Down')
> + time.sleep(10)
> + for port in self.dut_ports:
> + self.set_link_status_and_verify(self.dut_ports[port], 'Up')
> + self.scapy_send_packet(1)
> + out = self.dut.get_session_output(timeout=60)
> + pkt_send = re.findall("Total packets sent:\s*(\d*)", out)
> + pkt_received = re.findall(
> + "Total packets received:\s*(\d*)", out)
> + self.verify(pkt_send == pkt_received == '1',
> + "Error: sent packets != received packets")
> + self.dut.send_expect("^C", "#", 60)
> +
> + def scapy_send_packet(self, num=1):
> + """
> + Send a packet to port
> + """
> + self.dmac = self.dut.get_mac_address(self.dut_ports[0])
> + txport = self.tester.get_local_port(self.dut_ports[0])
> + self.txItf = self.tester.get_interface(txport)
> + pkt = Packet(pkt_type='UDP')
> + pkt.config_layer('ether', {'dst': self.dmac})
> + pkt.send_pkt(tx_port=self.txItf, count=num)
>
> def tear_down(self):
> """
> Run after each test case.
> """
> - pass
> + self.dut.kill_all()
> + time.sleep(2)
> + for port in self.dut_ports:
> + intf = self.tester.get_interface(self.tester.get_local_port(port))
> + self.tester.send_expect("ifconfig %s up" % intf, "# ", 10)
>
> def tear_down_all(self):
> """
^ permalink raw reply [flat|nested] 4+ messages in thread
* Re: [dts] [PATCH V1] add test suite link_status_interrupt
2017-02-10 6:50 Lijuan Tu
@ 2017-02-10 6:54 ` Xu, GangX
0 siblings, 0 replies; 4+ messages in thread
From: Xu, GangX @ 2017-02-10 6:54 UTC (permalink / raw)
To: Tu, LijuanX A, dts; +Cc: Tu, LijuanX A
Hi,
Please ignore this email
-----Original Message-----
From: dts [mailto:dts-bounces@dpdk.org] On Behalf Of Lijuan Tu
Sent: Friday, February 10, 2017 2:51 PM
To: dts@dpdk.org
Cc: Tu, LijuanX A <lijuanx.a.tu@intel.com>
Subject: [dts] [PATCH V1] add test suite link_status_interrupt
Signed-off-by: Lijuan Tu <lijuanx.a.tu@intel.com>
---
tests/TestSuite_link_status_interrupt.py | 288 ++++++++++++++-----------------
1 file changed, 132 insertions(+), 156 deletions(-)
diff --git a/tests/TestSuite_link_status_interrupt.py b/tests/TestSuite_link_status_interrupt.py
index c50a99d..ae205cc 100644
--- a/tests/TestSuite_link_status_interrupt.py
+++ b/tests/TestSuite_link_status_interrupt.py
@@ -1,87 +1,77 @@
-# <COPYRIGHT_TAG>
+# BSD LICENSE
+#
+# Copyright(c) 2010-2017 Intel Corporation. All rights reserved.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without #
+modification, are permitted provided that the following conditions #
+are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Intel Corporation nor the names of its
+# contributors may be used to endorse or promote products derived
+# from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS #
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT #
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR #
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT #
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, #
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT #
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, #
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY #
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT #
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE #
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
"""
DPDK Test suite.
-
-Link Status Detection
-
+Test link status.
"""
-# NOTE: These tests generally won't work in automated mode since the -# link doesn't stay down unless the cable is actually removed. The code -# is left here for legacy reasons.
-
-
import utils
+import string
+import time
import re
-
-testPorts = []
-intr_mode = ['"intr_mode=random"',
- '"intr_mode=msix"', '"intr_mode=legacy"', '']
-intr_mode_output = ['Error: bad parameter - random', 'Use MSIX interrupt',
- 'Use legacy interrupt', 'Use MSIX interrupt by default']
-
-
from test_case import TestCase
-
-#
-#
-# Test class.
-#
+from packet import Packet, sniff_packets, load_sniff_packets
class TestLinkStatusInterrupt(TestCase):
- #
- #
- #
- # Test cases.
- #
-
def set_up_all(self):
"""
Run at the start of each test suite.
-
-
- Link Status Interrupt Prerequisites
"""
+ self.dut_ports = self.dut.get_ports(self.nic)
+ self.verify(len(self.dut_ports) >= 2, "Insufficient ports")
+ cores = self.dut.get_core_list("1S/4C/1T")
+ self.coremask = utils.create_mask(cores)
+ self.portmask = utils.create_mask(self.dut_ports)
+
+ self.path = "./examples/link_status_interrupt/build/link_status_interrupt"
- dutPorts = self.dut.get_ports(self.nic)
- self.verify(len(dutPorts) > 1, "Insufficient ports for " + self.nic)
- for n in range(2):
- testPorts.append(dutPorts[n])
- inter = self.tester.get_interface(
- self.tester.get_local_port(testPorts[n]))
- self.tester.send_expect("ifconfig %s up" % inter, "# ", 5)
- out = self.dut.send_expect(
- "make -C examples/link_status_interrupt", "# ")
- self.verify("Error" not in out, "Compilation error 1")
- self.verify("No such file" not in out, "Compilation error 2")
+ # build sample app
+ out = self.dut.build_dpdk_apps("./examples/link_status_interrupt")
+ self.verify("Error" not in out, "compilation error 1")
+ self.verify("No such file" not in out, "compilation error 2")
def set_link_status_and_verify(self, dutPort, status):
"""
- In registered callback...
- Event type: LSC interrupt
- Port 0 Link Up - speed 10000 Mbps - full-duplex
-
- In registered callback...
- Event type: LSC interrupt
- Port 0 Link Down
+ set link status verify results
"""
-
- inter = self.tester.get_interface(self.tester.get_local_port(dutPort))
- self.tester.send_expect("ifconfig %s %s" % (inter, status), "# ", 10)
- self.dut.send_expect("", "Port %s Link %s" %
- (dutPort, status.capitalize()), 60)
- out = self.dut.send_expect("", "Aggregate statistics", 60)
- exp = r"Statistics for port (\d+) -+\r\n" + \
- "Link status:\s+Link (up|down)\r\n"
- pattern = re.compile(exp)
- info = pattern.findall(out)
- if info[0][0] == repr(dutPort):
- self.verify(info[0][1] == status, "Link status change port error")
- else:
- self.verify(info[1][1] == status, "Link status change hello error")
+ self.intf = self.tester.get_interface(
+ self.tester.get_local_port(dutPort))
+ self.tester.send_expect("ifconfig %s %s" %
+ (self.intf, status.lower()), "# ", 10)
+ verify_point = "Port %s Link %s" % (dutPort, status)
+ self.dut.send_expect("", verify_point, 60)
def set_up(self):
"""
@@ -91,111 +81,97 @@ class TestLinkStatusInterrupt(TestCase):
def test_link_status_interrupt_change(self):
"""
- Link status change.
+ Verify Link status change
"""
-
- memChannel = self.dut.get_memory_channels()
- portMask = utils.create_mask(testPorts)
- if self.drivername in ["igb_uio"]:
- cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s -- -q 2 -p %s" % (
- memChannel, portMask)
- elif self.drivername in ["vfio-pci"]:
- cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s --vfio-intr=intx -- -q 2 -p %s" % (
- memChannel, portMask)
- else:
- print "unknow driver"
- for n in range(len(intr_mode)):
- if self.drivername in ["igb_uio"]:
- self.dut.send_expect("rmmod igb_uio", "# ")
+ if self.drivername == "igb_uio":
+ cmdline = self.path + " -c %s -n %s -- -p %s " % (
+ self.coremask, self.dut.get_memory_channels(), self.portmask)
+ for mode in ["legacy", "msix"]:
+ self.dut.send_expect("rmmod -f igb_uio", "#", 20)
self.dut.send_expect(
- "insmod %s/kmod/igb_uio.ko %s" % (self.target, intr_mode[n]), "# ")
- out = self.dut.send_expect(
- "dmesg -c | grep '\<%s\>'" % (intr_mode_output[n]), "# ")
- self.verify(
- intr_mode_output[n] in out, "Fail to insmod igb_uio " + intr_mode[n])
- if n == 0:
- continue
- self.dut.send_expect(cmdline, "Aggregate statistics", 605)
- self.dut.send_expect(
- "", "Port %s Link Up.+\r\n" % (testPorts[1]), 5)
- self.set_link_status_and_verify(testPorts[0], 'down')
- self.set_link_status_and_verify(testPorts[0], 'up')
- self.set_link_status_and_verify(testPorts[1], 'down')
- self.set_link_status_and_verify(testPorts[1], 'up')
- self.dut.send_expect("^C", "# ")
+ 'insmod %s/kmod/igb_uio.ko "intr_mode=%s"' % (self.target, mode), "# ")
+ self.dut.bind_interfaces_linux()
+ self.dut.send_expect(cmdline, "Aggregate statistics", 60)
+ self.set_link_status_and_verify(self.dut_ports[0], 'Down')
+ self.set_link_status_and_verify(self.dut_ports[0], 'Up')
+ self.dut.send_expect("^C", "#", 60)
+ elif self.drivername == "vfio-pci":
+ for mode in ["legacy", "msi", "msix"]:
+ cmdline = self.path + " -c %s -n %s --vfio-intr=%s -- -p %s" % (
+ self.coremask, self.dut.get_memory_channels(), mode, self.portmask)
+ self.dut.send_expect(cmdline, "Aggregate statistics", 60)
+ self.set_link_status_and_verify(self.dut_ports[0], 'Down')
+ self.set_link_status_and_verify(self.dut_ports[0], 'Up')
+ self.dut.send_expect("^C", "#", 60)
def test_link_status_interrupt_port_available(self):
"""
- Port available.
+ interrupt all port link, then link them,
+ sent packet, check packets received.
"""
-
- memChannel = self.dut.get_memory_channels()
- portMask = utils.create_mask(testPorts)
- if self.drivername in ["igb_uio"]:
- cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s -- -q 2 -p %s" % (
- memChannel, portMask)
- elif self.drivername in ["vfio-pci"]:
- cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s --vfio-intr=intx -- -q 2 -p %s " % (
- memChannel, portMask)
- else:
- print "unknow driver"
- for n in range(1, len(intr_mode)):
- if self.drivername in ["igb_uio"]:
- self.dut.send_expect("rmmod igb_uio", "# ")
+ if self.drivername == "igb_uio":
+ cmdline = self.path + " -c %s -n %s -- -p %s " % (
+ self.coremask, self.dut.get_memory_channels(), self.portmask)
+ for mode in ["legacy", "msix"]:
+ self.dut.send_expect("rmmod -f igb_uio", "#", 20)
self.dut.send_expect(
- "insmod %s/kmod/igb_uio.ko %s" % (self.target, intr_mode[n]), "# ")
- out = self.dut.send_expect(
- "dmesg -c | grep '\<%s\>'" % (intr_mode_output[n]), "# ")
- self.verify(
- intr_mode_output[n] in out, "Fail to insmod igb_uio " + intr_mode[n])
- self.dut.send_expect(cmdline, "Aggregate statistics", 60)
- self.dut.send_expect(
- "", "Port %s Link Up.+\r\n" % (testPorts[1]), 5)
- self.set_link_status_and_verify(testPorts[0], 'down')
- self.set_link_status_and_verify(testPorts[1], 'down')
- self.set_link_status_and_verify(testPorts[0], 'up')
- self.set_link_status_and_verify(testPorts[1], 'up')
- for m in [0, 1]:
- txPort = self.tester.get_local_port(testPorts[m])
- rxPort = self.tester.get_local_port(testPorts[1 - m])
- txItf = self.tester.get_interface(txPort)
- rxItf = self.tester.get_interface(rxPort)
- self.tester.scapy_background()
- self.tester.scapy_append(
- 'p = sniff(iface="%s", count=1)' % rxItf)
- self.tester.scapy_append('nr_packets=len(p)')
- self.tester.scapy_append('RESULT = str(nr_packets)')
- self.tester.scapy_foreground()
- self.tester.scapy_append(
- 'sendp([Ether()/IP()/UDP()/("X"*46)], iface="%s")' % txItf)
- self.tester.scapy_execute()
- nr_packets = self.tester.scapy_get_result()
- self.verify(nr_packets == "1", "Fail to switch L2 frame")
- self.dut.send_expect("^C", "# ")
-
- def test_link_status_interrupt_recovery(self):
- """
- Recovery.
- """
- if self.drivername in ["igb_uio"]:
- self.dut.send_expect("^C", "# ")
- self.dut.send_expect("rmmod igb_uio", "# ")
- self.dut.send_expect(
- "insmod %s/kmod/igb_uio.ko" % (self.target), "# ")
- out = self.dut.send_expect(
- "dmesg -c | grep '\<Use MSIX interrupt by default\>'", "# ")
- self.verify(
- 'Use MSIX interrupt by default' in out, "Fail to recovery default igb_uio")
- elif self.drivername in ["vfio-pci"]:
- self.verify(Ture, "not need run this case, when used vfio driver")
- else:
- print "unknow driver"
+ 'insmod %s/kmod/igb_uio.ko "intr_mode=%s"' % (self.target, mode), "# ")
+ self.dut.bind_interfaces_linux()
+ self.dut.send_expect(cmdline, "Aggregate statistics", 60)
+ for port in self.dut_ports:
+ self.set_link_status_and_verify(
+ self.dut_ports[port], 'Down')
+ time.sleep(10)
+ for port in self.dut_ports:
+ self.set_link_status_and_verify(self.dut_ports[port], 'Up')
+ self.scapy_send_packet(1)
+ out = self.dut.get_session_output(timeout=60)
+ pkt_send = re.findall("Total packets sent:\s*(\d*)", out)
+ pkt_received = re.findall(
+ "Total packets received:\s*(\d*)", out)
+ self.verify(pkt_send == pkt_received == '1',
+ "Error: sent packets != received packets")
+ self.dut.send_expect("^C", "#", 60)
+ elif self.drivername == "vfio-pci":
+ for mode in ["legacy", "msi", "msix"]:
+ cmdline = self.path + " -c %s -n %s --vfio-intr=%s -- -p %s" % (
+ self.coremask, self.dut.get_memory_channels(), mode, self.portmask)
+ self.dut.send_expect(cmdline, "Aggregate statistics", 60)
+ for port in self.dut_ports:
+ self.set_link_status_and_verify(
+ self.dut_ports[port], 'Down')
+ time.sleep(10)
+ for port in self.dut_ports:
+ self.set_link_status_and_verify(self.dut_ports[port], 'Up')
+ self.scapy_send_packet(1)
+ out = self.dut.get_session_output(timeout=60)
+ pkt_send = re.findall("Total packets sent:\s*(\d*)", out)
+ pkt_received = re.findall(
+ "Total packets received:\s*(\d*)", out)
+ self.verify(pkt_send == pkt_received == '1',
+ "Error: sent packets != received packets")
+ self.dut.send_expect("^C", "#", 60)
+
+ def scapy_send_packet(self, num=1):
+ """
+ Send a packet to port
+ """
+ self.dmac = self.dut.get_mac_address(self.dut_ports[0])
+ txport = self.tester.get_local_port(self.dut_ports[0])
+ self.txItf = self.tester.get_interface(txport)
+ pkt = Packet(pkt_type='UDP')
+ pkt.config_layer('ether', {'dst': self.dmac})
+ pkt.send_pkt(tx_port=self.txItf, count=num)
def tear_down(self):
"""
Run after each test case.
"""
- pass
+ self.dut.kill_all()
+ time.sleep(2)
+ for port in self.dut_ports:
+ intf = self.tester.get_interface(self.tester.get_local_port(port))
+ self.tester.send_expect("ifconfig %s up" % intf, "# ", 10)
def tear_down_all(self):
"""
--
1.9.3
^ permalink raw reply [flat|nested] 4+ messages in thread
* [dts] [PATCH V1] add test suite link_status_interrupt
@ 2017-02-10 6:50 Lijuan Tu
2017-02-10 6:54 ` Xu, GangX
0 siblings, 1 reply; 4+ messages in thread
From: Lijuan Tu @ 2017-02-10 6:50 UTC (permalink / raw)
To: dts; +Cc: Lijuan Tu
Signed-off-by: Lijuan Tu <lijuanx.a.tu@intel.com>
---
tests/TestSuite_link_status_interrupt.py | 288 ++++++++++++++-----------------
1 file changed, 132 insertions(+), 156 deletions(-)
diff --git a/tests/TestSuite_link_status_interrupt.py b/tests/TestSuite_link_status_interrupt.py
index c50a99d..ae205cc 100644
--- a/tests/TestSuite_link_status_interrupt.py
+++ b/tests/TestSuite_link_status_interrupt.py
@@ -1,87 +1,77 @@
-# <COPYRIGHT_TAG>
+# BSD LICENSE
+#
+# Copyright(c) 2010-2017 Intel Corporation. All rights reserved.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Intel Corporation nor the names of its
+# contributors may be used to endorse or promote products derived
+# from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
"""
DPDK Test suite.
-
-Link Status Detection
-
+Test link status.
"""
-# NOTE: These tests generally won't work in automated mode since the
-# link doesn't stay down unless the cable is actually removed. The code
-# is left here for legacy reasons.
-
-
import utils
+import string
+import time
import re
-
-testPorts = []
-intr_mode = ['"intr_mode=random"',
- '"intr_mode=msix"', '"intr_mode=legacy"', '']
-intr_mode_output = ['Error: bad parameter - random', 'Use MSIX interrupt',
- 'Use legacy interrupt', 'Use MSIX interrupt by default']
-
-
from test_case import TestCase
-
-#
-#
-# Test class.
-#
+from packet import Packet, sniff_packets, load_sniff_packets
class TestLinkStatusInterrupt(TestCase):
- #
- #
- #
- # Test cases.
- #
-
def set_up_all(self):
"""
Run at the start of each test suite.
-
-
- Link Status Interrupt Prerequisites
"""
+ self.dut_ports = self.dut.get_ports(self.nic)
+ self.verify(len(self.dut_ports) >= 2, "Insufficient ports")
+ cores = self.dut.get_core_list("1S/4C/1T")
+ self.coremask = utils.create_mask(cores)
+ self.portmask = utils.create_mask(self.dut_ports)
+
+ self.path = "./examples/link_status_interrupt/build/link_status_interrupt"
- dutPorts = self.dut.get_ports(self.nic)
- self.verify(len(dutPorts) > 1, "Insufficient ports for " + self.nic)
- for n in range(2):
- testPorts.append(dutPorts[n])
- inter = self.tester.get_interface(
- self.tester.get_local_port(testPorts[n]))
- self.tester.send_expect("ifconfig %s up" % inter, "# ", 5)
- out = self.dut.send_expect(
- "make -C examples/link_status_interrupt", "# ")
- self.verify("Error" not in out, "Compilation error 1")
- self.verify("No such file" not in out, "Compilation error 2")
+ # build sample app
+ out = self.dut.build_dpdk_apps("./examples/link_status_interrupt")
+ self.verify("Error" not in out, "compilation error 1")
+ self.verify("No such file" not in out, "compilation error 2")
def set_link_status_and_verify(self, dutPort, status):
"""
- In registered callback...
- Event type: LSC interrupt
- Port 0 Link Up - speed 10000 Mbps - full-duplex
-
- In registered callback...
- Event type: LSC interrupt
- Port 0 Link Down
+ set link status verify results
"""
-
- inter = self.tester.get_interface(self.tester.get_local_port(dutPort))
- self.tester.send_expect("ifconfig %s %s" % (inter, status), "# ", 10)
- self.dut.send_expect("", "Port %s Link %s" %
- (dutPort, status.capitalize()), 60)
- out = self.dut.send_expect("", "Aggregate statistics", 60)
- exp = r"Statistics for port (\d+) -+\r\n" + \
- "Link status:\s+Link (up|down)\r\n"
- pattern = re.compile(exp)
- info = pattern.findall(out)
- if info[0][0] == repr(dutPort):
- self.verify(info[0][1] == status, "Link status change port error")
- else:
- self.verify(info[1][1] == status, "Link status change hello error")
+ self.intf = self.tester.get_interface(
+ self.tester.get_local_port(dutPort))
+ self.tester.send_expect("ifconfig %s %s" %
+ (self.intf, status.lower()), "# ", 10)
+ verify_point = "Port %s Link %s" % (dutPort, status)
+ self.dut.send_expect("", verify_point, 60)
def set_up(self):
"""
@@ -91,111 +81,97 @@ class TestLinkStatusInterrupt(TestCase):
def test_link_status_interrupt_change(self):
"""
- Link status change.
+ Verify Link status change
"""
-
- memChannel = self.dut.get_memory_channels()
- portMask = utils.create_mask(testPorts)
- if self.drivername in ["igb_uio"]:
- cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s -- -q 2 -p %s" % (
- memChannel, portMask)
- elif self.drivername in ["vfio-pci"]:
- cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s --vfio-intr=intx -- -q 2 -p %s" % (
- memChannel, portMask)
- else:
- print "unknow driver"
- for n in range(len(intr_mode)):
- if self.drivername in ["igb_uio"]:
- self.dut.send_expect("rmmod igb_uio", "# ")
+ if self.drivername == "igb_uio":
+ cmdline = self.path + " -c %s -n %s -- -p %s " % (
+ self.coremask, self.dut.get_memory_channels(), self.portmask)
+ for mode in ["legacy", "msix"]:
+ self.dut.send_expect("rmmod -f igb_uio", "#", 20)
self.dut.send_expect(
- "insmod %s/kmod/igb_uio.ko %s" % (self.target, intr_mode[n]), "# ")
- out = self.dut.send_expect(
- "dmesg -c | grep '\<%s\>'" % (intr_mode_output[n]), "# ")
- self.verify(
- intr_mode_output[n] in out, "Fail to insmod igb_uio " + intr_mode[n])
- if n == 0:
- continue
- self.dut.send_expect(cmdline, "Aggregate statistics", 605)
- self.dut.send_expect(
- "", "Port %s Link Up.+\r\n" % (testPorts[1]), 5)
- self.set_link_status_and_verify(testPorts[0], 'down')
- self.set_link_status_and_verify(testPorts[0], 'up')
- self.set_link_status_and_verify(testPorts[1], 'down')
- self.set_link_status_and_verify(testPorts[1], 'up')
- self.dut.send_expect("^C", "# ")
+ 'insmod %s/kmod/igb_uio.ko "intr_mode=%s"' % (self.target, mode), "# ")
+ self.dut.bind_interfaces_linux()
+ self.dut.send_expect(cmdline, "Aggregate statistics", 60)
+ self.set_link_status_and_verify(self.dut_ports[0], 'Down')
+ self.set_link_status_and_verify(self.dut_ports[0], 'Up')
+ self.dut.send_expect("^C", "#", 60)
+ elif self.drivername == "vfio-pci":
+ for mode in ["legacy", "msi", "msix"]:
+ cmdline = self.path + " -c %s -n %s --vfio-intr=%s -- -p %s" % (
+ self.coremask, self.dut.get_memory_channels(), mode, self.portmask)
+ self.dut.send_expect(cmdline, "Aggregate statistics", 60)
+ self.set_link_status_and_verify(self.dut_ports[0], 'Down')
+ self.set_link_status_and_verify(self.dut_ports[0], 'Up')
+ self.dut.send_expect("^C", "#", 60)
def test_link_status_interrupt_port_available(self):
"""
- Port available.
+ interrupt all port link, then link them,
+ sent packet, check packets received.
"""
-
- memChannel = self.dut.get_memory_channels()
- portMask = utils.create_mask(testPorts)
- if self.drivername in ["igb_uio"]:
- cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s -- -q 2 -p %s" % (
- memChannel, portMask)
- elif self.drivername in ["vfio-pci"]:
- cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s --vfio-intr=intx -- -q 2 -p %s " % (
- memChannel, portMask)
- else:
- print "unknow driver"
- for n in range(1, len(intr_mode)):
- if self.drivername in ["igb_uio"]:
- self.dut.send_expect("rmmod igb_uio", "# ")
+ if self.drivername == "igb_uio":
+ cmdline = self.path + " -c %s -n %s -- -p %s " % (
+ self.coremask, self.dut.get_memory_channels(), self.portmask)
+ for mode in ["legacy", "msix"]:
+ self.dut.send_expect("rmmod -f igb_uio", "#", 20)
self.dut.send_expect(
- "insmod %s/kmod/igb_uio.ko %s" % (self.target, intr_mode[n]), "# ")
- out = self.dut.send_expect(
- "dmesg -c | grep '\<%s\>'" % (intr_mode_output[n]), "# ")
- self.verify(
- intr_mode_output[n] in out, "Fail to insmod igb_uio " + intr_mode[n])
- self.dut.send_expect(cmdline, "Aggregate statistics", 60)
- self.dut.send_expect(
- "", "Port %s Link Up.+\r\n" % (testPorts[1]), 5)
- self.set_link_status_and_verify(testPorts[0], 'down')
- self.set_link_status_and_verify(testPorts[1], 'down')
- self.set_link_status_and_verify(testPorts[0], 'up')
- self.set_link_status_and_verify(testPorts[1], 'up')
- for m in [0, 1]:
- txPort = self.tester.get_local_port(testPorts[m])
- rxPort = self.tester.get_local_port(testPorts[1 - m])
- txItf = self.tester.get_interface(txPort)
- rxItf = self.tester.get_interface(rxPort)
- self.tester.scapy_background()
- self.tester.scapy_append(
- 'p = sniff(iface="%s", count=1)' % rxItf)
- self.tester.scapy_append('nr_packets=len(p)')
- self.tester.scapy_append('RESULT = str(nr_packets)')
- self.tester.scapy_foreground()
- self.tester.scapy_append(
- 'sendp([Ether()/IP()/UDP()/("X"*46)], iface="%s")' % txItf)
- self.tester.scapy_execute()
- nr_packets = self.tester.scapy_get_result()
- self.verify(nr_packets == "1", "Fail to switch L2 frame")
- self.dut.send_expect("^C", "# ")
-
- def test_link_status_interrupt_recovery(self):
- """
- Recovery.
- """
- if self.drivername in ["igb_uio"]:
- self.dut.send_expect("^C", "# ")
- self.dut.send_expect("rmmod igb_uio", "# ")
- self.dut.send_expect(
- "insmod %s/kmod/igb_uio.ko" % (self.target), "# ")
- out = self.dut.send_expect(
- "dmesg -c | grep '\<Use MSIX interrupt by default\>'", "# ")
- self.verify(
- 'Use MSIX interrupt by default' in out, "Fail to recovery default igb_uio")
- elif self.drivername in ["vfio-pci"]:
- self.verify(Ture, "not need run this case, when used vfio driver")
- else:
- print "unknow driver"
+ 'insmod %s/kmod/igb_uio.ko "intr_mode=%s"' % (self.target, mode), "# ")
+ self.dut.bind_interfaces_linux()
+ self.dut.send_expect(cmdline, "Aggregate statistics", 60)
+ for port in self.dut_ports:
+ self.set_link_status_and_verify(
+ self.dut_ports[port], 'Down')
+ time.sleep(10)
+ for port in self.dut_ports:
+ self.set_link_status_and_verify(self.dut_ports[port], 'Up')
+ self.scapy_send_packet(1)
+ out = self.dut.get_session_output(timeout=60)
+ pkt_send = re.findall("Total packets sent:\s*(\d*)", out)
+ pkt_received = re.findall(
+ "Total packets received:\s*(\d*)", out)
+ self.verify(pkt_send == pkt_received == '1',
+ "Error: sent packets != received packets")
+ self.dut.send_expect("^C", "#", 60)
+ elif self.drivername == "vfio-pci":
+ for mode in ["legacy", "msi", "msix"]:
+ cmdline = self.path + " -c %s -n %s --vfio-intr=%s -- -p %s" % (
+ self.coremask, self.dut.get_memory_channels(), mode, self.portmask)
+ self.dut.send_expect(cmdline, "Aggregate statistics", 60)
+ for port in self.dut_ports:
+ self.set_link_status_and_verify(
+ self.dut_ports[port], 'Down')
+ time.sleep(10)
+ for port in self.dut_ports:
+ self.set_link_status_and_verify(self.dut_ports[port], 'Up')
+ self.scapy_send_packet(1)
+ out = self.dut.get_session_output(timeout=60)
+ pkt_send = re.findall("Total packets sent:\s*(\d*)", out)
+ pkt_received = re.findall(
+ "Total packets received:\s*(\d*)", out)
+ self.verify(pkt_send == pkt_received == '1',
+ "Error: sent packets != received packets")
+ self.dut.send_expect("^C", "#", 60)
+
+ def scapy_send_packet(self, num=1):
+ """
+ Send a packet to port
+ """
+ self.dmac = self.dut.get_mac_address(self.dut_ports[0])
+ txport = self.tester.get_local_port(self.dut_ports[0])
+ self.txItf = self.tester.get_interface(txport)
+ pkt = Packet(pkt_type='UDP')
+ pkt.config_layer('ether', {'dst': self.dmac})
+ pkt.send_pkt(tx_port=self.txItf, count=num)
def tear_down(self):
"""
Run after each test case.
"""
- pass
+ self.dut.kill_all()
+ time.sleep(2)
+ for port in self.dut_ports:
+ intf = self.tester.get_interface(self.tester.get_local_port(port))
+ self.tester.send_expect("ifconfig %s up" % intf, "# ", 10)
def tear_down_all(self):
"""
--
1.9.3
^ permalink raw reply [flat|nested] 4+ messages in thread
end of thread, other threads:[~2017-02-20 7:53 UTC | newest]
Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2017-02-10 6:54 [dts] [PATCH V1] add test suite link_status_interrupt xu,gang
2017-02-20 7:46 ` Liu, Yong
-- strict thread matches above, loose matches on Subject: below --
2017-02-10 6:50 Lijuan Tu
2017-02-10 6:54 ` Xu, GangX
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).