test suite reviews and discussions
 help / color / mirror / Atom feed
From: "xu,gang" <gangx.xu@intel.com>
To: dts@dpdk.org
Cc: "xu,gang" <gangx.xu@intel.com>
Subject: [dts] [PTCH V1 2/2] add link status interrupt test script
Date: Wed, 27 Apr 2016 20:18:24 +0800	[thread overview]
Message-ID: <1461759504-30167-2-git-send-email-gangx.xu@intel.com> (raw)
In-Reply-To: <1461759504-30167-1-git-send-email-gangx.xu@intel.com>

Signed-off-by: xu,gang <gangx.xu@intel.com>
---
 tests/TestSuite_link_status_interrupt.py | 205 +++++++++++++++++++++++++++++++
 1 file changed, 205 insertions(+)
 create mode 100644 tests/TestSuite_link_status_interrupt.py

diff --git a/tests/TestSuite_link_status_interrupt.py b/tests/TestSuite_link_status_interrupt.py
new file mode 100644
index 0000000..2a02fc9
--- /dev/null
+++ b/tests/TestSuite_link_status_interrupt.py
@@ -0,0 +1,205 @@
+# <COPYRIGHT_TAG>
+
+"""
+DPDK Test suite.
+
+Link Status Detection
+
+"""
+
+# NOTE: These tests generally won't work in automated mode since the
+# link doesn't stay down unless the cable is actually removed. The code
+# is left here for legacy reasons.
+
+
+import dts
+import re
+
+testPorts = []
+intr_mode = ['"intr_mode=random"',
+             '"intr_mode=msix"', '"intr_mode=legacy"', '']
+intr_mode_output = ['Error: bad parameter - random', 'Use MSIX interrupt',
+                    'Use legacy interrupt', 'Use MSIX interrupt by default']
+
+
+from test_case import TestCase
+
+#
+#
+# Test class.
+#
+
+
+class TestLinkStatusInterrupt(TestCase):
+
+    #
+    #
+    #
+    # Test cases.
+    #
+
+    def set_up_all(self):
+        """
+        Run at the start of each test suite.
+
+
+        Link Status Interrupt Prerequisites
+        """
+
+        dutPorts = self.dut.get_ports(self.nic)
+        self.verify(len(dutPorts) > 1, "Insufficient ports for " + self.nic)
+        for n in range(2):
+            testPorts.append(dutPorts[n])
+            inter = self.tester.get_interface(
+                self.tester.get_local_port(testPorts[n]))
+            self.tester.send_expect("ifconfig %s up" % inter, "# ", 5)
+        out = self.dut.send_expect(
+            "make -C examples/link_status_interrupt", "# ")
+        self.verify("Error" not in out, "Compilation error 1")
+        self.verify("No such file" not in out, "Compilation error 2")
+
+    def set_link_status_and_verify(self, dutPort, status):
+        """
+        In registered callback...
+        Event type: LSC interrupt
+        Port 0 Link Up - speed 10000 Mbps - full-duplex
+
+        In registered callback...
+        Event type: LSC interrupt
+        Port 0 Link Down
+        """
+
+        inter = self.tester.get_interface(self.tester.get_local_port(dutPort))
+        self.tester.send_expect("ifconfig %s %s" % (inter, status), "# ", 10)
+        self.dut.send_expect("", "Port %s Link %s" %
+                             (dutPort, status.capitalize()), 60)
+        out = self.dut.send_expect("", "Aggregate statistics", 60)
+        exp = r"Statistics for port (\d+) -+\r\n" + \
+            "Link status:\s+Link (up|down)\r\n"
+        pattern = re.compile(exp)
+        info = pattern.findall(out)
+        if info[0][0] == repr(dutPort):
+            self.verify(info[0][1] == status, "Link status change port error")
+        else:
+            self.verify(info[1][1] == status, "Link status change hello error")
+
+    def set_up(self):
+        """
+        Run before each test case.
+        """
+        pass
+
+    def test_link_status_interrupt_change(self):
+        """
+        Link status change.
+        """
+
+        memChannel = self.dut.get_memory_channels()
+        portMask = dts.create_mask(testPorts)
+        if dts.drivername in ["igb_uio"]:
+            cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s -- -q 2 -p %s" % (
+                memChannel, portMask)
+        elif dts.drivername in ["vfio-pci"]:
+            cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s --vfio-intr=intx  -- -q 2 -p %s" % (
+                memChannel, portMask)
+        else:
+            print "unknow driver"
+        for n in range(len(intr_mode)):
+            if dts.drivername in ["igb_uio"]:
+                self.dut.send_expect("rmmod igb_uio", "# ")
+                self.dut.send_expect(
+                    "insmod %s/kmod/igb_uio.ko %s" % (self.target, intr_mode[n]), "# ")
+                self.dut.send_expect("tools/dpdk_nic_bind.py --bind=igb_uio 03:00.0 03:00.1", "# ") 
+                out = self.dut.send_expect(
+                    "dmesg -c | grep '\<%s\>'" % (intr_mode_output[n]), "# ")
+                self.verify(
+                    intr_mode_output[n] in out, "Fail to insmod igb_uio " + intr_mode[n])
+                if n == 0:
+                    continue
+            self.dut.send_expect(cmdline, "Aggregate statistics", 605)
+            self.dut.send_expect(
+                "", "Port %s Link Up.+\r\n" % (testPorts[1]), 5)
+            self.set_link_status_and_verify(testPorts[0], 'down')
+            self.set_link_status_and_verify(testPorts[0], 'up')
+            self.set_link_status_and_verify(testPorts[1], 'down')
+            self.set_link_status_and_verify(testPorts[1], 'up')
+            self.dut.send_expect("^C", "# ")
+
+    def FAILING_test_link_status_interrupt_port_available(self):
+        """
+        Port available.
+        """
+
+        memChannel = self.dut.get_memory_channels()
+        portMask = dts.create_mask(testPorts)
+        if dts.drivername in ["igb_uio"]:
+            cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s -- -q 2 -p %s" % (
+                memChannel, portMask)
+        elif dts.drivername in ["vfio-pci"]:
+            cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s --vfio-intr=intx -- -q 2 -p %s " % (
+                memChannel, portMask)
+        else:
+            print "unknow driver"
+        for n in range(1, len(intr_mode)):
+            if dts.drivername in ["igb_uio"]:
+                self.dut.send_expect("rmmod igb_uio", "# ")
+                self.dut.send_expect(
+                    "insmod %s/kmod/igb_uio.ko %s" % (self.target, intr_mode[n]), "# ")
+                out = self.dut.send_expect(
+                    "dmesg -c | grep '\<%s\>'" % (intr_mode_output[n]), "# ")
+                self.verify(
+                    intr_mode_output[n] in out, "Fail to insmod igb_uio " + intr_mode[n])
+            self.dut.send_expect(cmdline, "Aggregate statistics", 60)
+            self.dut.send_expect(
+                "", "Port %s Link Up.+\r\n" % (testPorts[1]), 5)
+            self.set_link_status_and_verify(testPorts[0], 'down')
+            self.set_link_status_and_verify(testPorts[1], 'down')
+            self.set_link_status_and_verify(testPorts[0], 'up')
+            self.set_link_status_and_verify(testPorts[1], 'up')
+            for m in [0, 1]:
+                txPort = self.tester.get_local_port(testPorts[m])
+                rxPort = self.tester.get_local_port(testPorts[1 - m])
+                txItf = self.tester.get_interface(txPort)
+                rxItf = self.tester.get_interface(rxPort)
+                self.tester.scapy_background()
+                self.tester.scapy_append(
+                    'p = sniff(iface="%s", count=1)' % rxItf)
+                self.tester.scapy_append('nr_packets=len(p)')
+                self.tester.scapy_append('RESULT = str(nr_packets)')
+                self.tester.scapy_foreground()
+                self.tester.scapy_append(
+                    'sendp([Ether()/IP()/UDP()/("X"*46)], iface="%s")' % txItf)
+                self.tester.scapy_execute()
+                nr_packets = self.tester.scapy_get_result()
+                self.verify(nr_packets == "1", "Fail to switch L2 frame")
+            self.dut.send_expect("^C", "# ")
+
+    def test_link_status_interrupt_recovery(self):
+        """
+        Recovery.
+        """
+        if dts.drivername in ["igb_uio"]:
+            self.dut.send_expect("^C", "# ")
+            self.dut.send_expect("rmmod igb_uio", "# ")
+            self.dut.send_expect(
+                "insmod %s/kmod/igb_uio.ko" % (self.target), "# ")
+            out = self.dut.send_expect(
+                "dmesg -c | grep '\<Use MSIX interrupt by default\>'", "# ")
+            self.verify(
+                'Use MSIX interrupt by default' in out, "Fail to recovery default igb_uio")
+        elif dts.drivername in ["vfio-pci"]:
+            self.verify(Ture, "not need run this case, when used vfio driver")
+        else:
+            print "unknow driver"
+
+    def tear_down(self):
+        """
+        Run after each test case.
+        """
+        pass
+
+    def tear_down_all(self):
+        """
+        Run after each test suite.
+        """
+        pass
-- 
1.9.3

  reply	other threads:[~2016-04-27 12:22 UTC|newest]

Thread overview: 3+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2016-04-27 12:18 [dts] [PTCH V1 1/2] add link status interrupt test plan xu,gang
2016-04-27 12:18 ` xu,gang [this message]
2016-04-29 18:02 ` Liu, Yong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1461759504-30167-2-git-send-email-gangx.xu@intel.com \
    --to=gangx.xu@intel.com \
    --cc=dts@dpdk.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).