test suite reviews and discussions
 help / color / mirror / Atom feed
* [dts] [PATCH V1] update inline_ipsec to test dpdk19.05
@ 2019-04-26  8:58 xiao,qimai
  2019-05-07  7:18 ` Li, WenjieX A
  0 siblings, 1 reply; 5+ messages in thread
From: xiao,qimai @ 2019-04-26  8:58 UTC (permalink / raw)
  To: dts; +Cc: xiao,qimai

1.remove unused imports
2.add some steps to verfiy encrypt package
3.upadte verify message of decrypt with wrong key

Signed-off-by: xiao,qimai <qimaix.xiao@intel.com>
---
 tests/TestSuite_inline_ipsec.py | 86 +++++++++++++++++++++++----------
 1 file changed, 60 insertions(+), 26 deletions(-)

diff --git a/tests/TestSuite_inline_ipsec.py b/tests/TestSuite_inline_ipsec.py
index 1813c08..369d4f0 100644
--- a/tests/TestSuite_inline_ipsec.py
+++ b/tests/TestSuite_inline_ipsec.py
@@ -35,14 +35,13 @@ DPDK Test suite.
 Test inline_ipsec.
 """
 
-import utils
-import string
-import time
+import random
 import re
-import threading
+import time
+
+import utils
+from scapy.all import ESP, IP, Ether, sendp, SecurityAssociation
 from test_case import TestCase
-import getopt
-from scapy.all import *
 
 ETHER_STANDARD_MTU = 1518
 ETHER_JUMBO_FRAME_MTU = 9000
@@ -51,9 +50,10 @@ ETHER_JUMBO_FRAME_MTU = 9000
 class TestInlineIpsec(TestCase):
     """
     This suite depend PyCryptodome,it provide authenticated encryption modes(GCM)
-    my environment:asn1crypto (0.22.0), pycryptodome (3.4.7), pycryptodomex (3.4.7),
-    pycryptopp (0.6.0.1206569328141510525648634803928199668821045408958), scapy (2.3.3.dev623)
+    my environment:cryptography (1.7.2), pycryptodome (3.4.7), pycryptodomex (3.4.7),
+    pycryptopp (0.6.0.1206569328141510525648634803928199668821045408958), scapy (2.4.2)
     """
+
     def set_up_all(self):
         """
         Run at the start of each test suite.
@@ -86,7 +86,7 @@ class TestInlineIpsec(TestCase):
 
         self.path = "./examples/ipsec-secgw/build/ipsec-secgw"
         # add print code in IPSEC app
-        sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx, portid);/i\\printf("[debug]receive %hhu packet in rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec-secgw.c"""
+        sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx, portid);/i\\t\t\t\tprintf("[debug]receive %hhu packet in rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec-secgw.c"""
         self.dut.send_expect(sedcmd, "#", 60)
 
         # build sample app
@@ -165,7 +165,8 @@ class TestInlineIpsec(TestCase):
                 f.write(cfg)
         self.dut.session.copy_file_to(filename, self.dut.base_dir)
 
-    def send_encryption_package(self, intf, paysize=64, do_encrypt=False, send_spi=5, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5', sa_dst='172.16.2.5'):
+    def send_encryption_package(self, intf, paysize=64, do_encrypt=False, send_spi=5, count=1,
+                                inner_dst='192.168.105.10', sa_src='172.16.1.5', sa_dst='172.16.2.5'):
         """
         prepare a packet and send
         """
@@ -184,6 +185,8 @@ class TestInlineIpsec(TestCase):
 
         if do_encrypt == True:
             print "send encrypt package"
+            print("before encrypt, the package info is like below: ")
+            p.show()
             e = sa_gcm.encrypt(p)
         else:
             print "send normal package"
@@ -196,14 +199,15 @@ class TestInlineIpsec(TestCase):
             name='send_encryption_package')
         sendp(eth_e, iface=intf, count=count)
         self.tester.destroy_session(session_send)
+        return payload, p.src, p.dst
 
-        return payload
-
-    def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32, jumboframe=1518, do_encrypt=False, verify=True, send_spi=5, receive_spi=1005, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5', sa_dst='172.16.2.5'):
+    def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32, jumboframe=1518, do_encrypt=False,
+                         verify=True, send_spi=5, receive_spi=1005, count=1, inner_dst='192.168.105.10',
+                         sa_src='172.16.1.5', sa_dst='172.16.2.5'):
         """
         verify Ipsec receive package
         """
-        cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' --log-level 8 --socket-mem 1024,1 -- -p 0xf -P -u 0x2 -j %s --config='%s' -f %s" % (
+        cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' --log-level 8 --socket-mem 1024,1024 -- -p 0xf -P -u 0x2 -j %s --config='%s' -f %s" % (
             self.portpci_0, self.portpci_1, jumboframe, config, file_name)
         self.dut.send_expect(cmd, "IPSEC", 60)
 
@@ -213,25 +217,40 @@ class TestInlineIpsec(TestCase):
 
         session_receive.send_expect("scapy", ">>>", 10)
         session_receive.send_expect(
-            "pkts=sniff(iface='%s',count=1,timeout=10)" % rxItf, "", 30)
-        send_package = self.send_encryption_package(
-            txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, sa_dst)
+            "pkts=sniff(iface='%s',count=1,timeout=45)" % rxItf, "", 10)
 
-        time.sleep(10)
-        out = session_receive.send_expect("pkts", "", 30)
         if do_encrypt:
+            send_package = self.send_encryption_package(
+                txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, sa_dst)
+            time.sleep(45)
+            session_receive.send_expect("pkts", "", 30)
             out = session_receive.send_expect("pkts[0]['IP'] ", ">>>", 10)
         else:
+            session_receive2 = self.tester.create_session(name='receive_encryption_package2')
+            session_receive2.send_expect("tcpdump -Xvvvi %s -c 1" % rxItf, "", 30)
+            send_package = self.send_encryption_package(txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src,
+                                                        sa_dst)
+            time.sleep(45)
+            rev = session_receive2.get_session_before()
+            print(rev)
+            p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),')
+            res = p.search(rev)
+            self.verify(res, 'encrypt failed, tcpdump get %s' % rev)
+            self.tester.destroy_session(session_receive2)
+            session_receive.send_expect("pkts", "", 30)
             session_receive.send_expect(sa_gcm, ">>>", 10)
-            session_receive.send_expect(
-                "results=sa_gcm.decrypt(pkts[0]['IP'])", ">>>", 10)
+            time.sleep(2)
+            session_receive.send_expect("results=sa_gcm.decrypt(pkts[0]['IP'])", ">>>", 10)
             out = session_receive.send_expect("results", ">>>", 10)
 
         if verify:
-            self.verify(send_package in out,
+            print('received packet content is %s' % out)
+            print('send pkt src ip is %s, dst ip is %s, payload is %s' % (
+                send_package[1], send_package[2], send_package[0]))
+            self.verify(send_package[0] in out,
                         "Unreceived package or get other package")
         else:
-            self.verify(send_package not in out,
+            self.verify(send_package[0] not in out,
                         "The function is not in effect")
         session_receive.send_expect("quit()", "#", 10)
         self.tester.destroy_session(session_receive)
@@ -244,6 +263,7 @@ class TestInlineIpsec(TestCase):
         paysize = random.randint(1, ETHER_STANDARD_MTU)
         self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg',
                               self.txItf, self.rxItf, paysize)
+        self.dut.send_expect("^C", "#", 5)
 
     def test_Ipsec_Encryption_Jumboframe(self):
         """
@@ -253,6 +273,7 @@ class TestInlineIpsec(TestCase):
         paysize = random.randint(ETHER_STANDARD_MTU, ETHER_JUMBO_FRAME_MTU)
         self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg',
                               self.txItf, self.rxItf, paysize, ETHER_JUMBO_FRAME_MTU)
+        self.dut.send_expect("^C", "#", 5)
 
     def test_Ipsec_Encryption_Rss(self):
         """
@@ -264,6 +285,7 @@ class TestInlineIpsec(TestCase):
         out = self.dut.get_session_output()
         verifycode = "receive 1 packet in rxqueueid=1"
         self.verify(verifycode in out, "rxqueueid error")
+        self.dut.send_expect("^C", "#", 5)
 
     def test_IPSec_Decryption(self):
         """
@@ -273,6 +295,7 @@ class TestInlineIpsec(TestCase):
         paysize = random.randint(1, ETHER_STANDARD_MTU)
         self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf,
                               self.txItf, paysize, do_encrypt=True, count=2)
+        self.dut.send_expect("^C", "#", 5)
 
     def test_IPSec_Decryption_Jumboframe(self):
         """
@@ -282,6 +305,7 @@ class TestInlineIpsec(TestCase):
         paysize = random.randint(ETHER_STANDARD_MTU, ETHER_JUMBO_FRAME_MTU)
         self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf,
                               self.txItf, paysize, ETHER_JUMBO_FRAME_MTU, do_encrypt=True, count=2)
+        self.dut.send_expect("^C", "#", 5)
 
     def test_Ipsec_Decryption_Rss(self):
         """
@@ -293,6 +317,7 @@ class TestInlineIpsec(TestCase):
         out = self.dut.get_session_output()
         verifycode = "receive 1 packet in rxqueueid=1"
         self.verify(verifycode in out, "rxqueueid error")
+        self.dut.send_expect("^C", "#", 5)
 
     def test_Ipsec_Decryption_wrongkey(self):
         """
@@ -303,8 +328,10 @@ class TestInlineIpsec(TestCase):
         self.Ipsec_Encryption(config, '/root/dpdk/dec_wrong_key.cfg', self.rxItf,
                               self.txItf, paysize, do_encrypt=True, verify=False, count=2)
         out = self.dut.get_session_output()
-        verifycode = "IPSEC_ESP: failed crypto op"
-        self.verify(verifycode in out, "Ipsec Decryption wrongkey failed")
+        verifycode = "IPSEC_ESP: esp_inbound_post() failed crypto op"
+        l = re.findall(verifycode, out)
+        self.verify(len(l) == 2, "Ipsec Decryption wrongkey failed")
+        self.dut.send_expect("^C", "#", 5)
 
     def test_Ipsec_Encryption_Decryption(self):
         """
@@ -350,10 +377,17 @@ class TestInlineIpsec(TestCase):
         eth_e2 = Ether() / e2
         eth_e2.src = self.rx_src
         eth_e2.dst = self.tx_dst
-
+        session_receive3 = self.tester.create_session('check_forward_encryption_package')
+        session_receive3.send_expect("tcpdump -Xvvvi %s -c 1" % self.rxItf, "", 30)
+        time.sleep(2)
         sendp(eth_e1, iface=self.rxItf, count=2)
         sendp(eth_e2, iface=self.txItf, count=1)
         time.sleep(30)
+        rev = session_receive3.get_session_before()
+        print(rev)
+        p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),')
+        res = p.search(rev)
+        self.verify(res, 'encrypt failed, tcpdump get %s' % rev)
         session_receive.send_expect(
             "results=sa_gcm.decrypt(pkts[2]['IP'])", ">>>", 60)
         out = session_receive.send_expect("results", ">>>", 60)
-- 
2.17.0


^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [dts] [PATCH V1] update inline_ipsec to test dpdk19.05
  2019-04-26  8:58 [dts] [PATCH V1] update inline_ipsec to test dpdk19.05 xiao,qimai
@ 2019-05-07  7:18 ` Li, WenjieX A
  2019-05-22  9:32   ` Xiao, QimaiX
  0 siblings, 1 reply; 5+ messages in thread
From: Li, WenjieX A @ 2019-05-07  7:18 UTC (permalink / raw)
  To: Xiao, QimaiX, dts; +Cc: Xiao, QimaiX

Hi Qimai,

1. Please do not change script like this:
> -import utils
> -import time
...
> +import time
> +import utils

2. You are using scapy 2.4.2. Does this script still work with scapy 2.2.* or 2.3.*?  Scapy 2.4.2 can cause below error with current dts(dep/Dot1BR.py).

"dts: name 'ETHER_TYPES' is not defined
Traceback (most recent call last):
  File "./main.py", line 162, in <module>
    args.debug, args.debugcase, args.re_run, args.commands)
  File "/home/autoregression/dts/framework/dts.py", line 563, in run_all
    dts_run_target(duts, tester, targets, test_suites)
  File "/home/autoregression/dts/framework/dts.py", line 381, in dts_run_target
    dts_run_suite(duts, tester, test_suites, target)
  File "/home/autoregression/dts/framework/dts.py", line 435, in dts_run_suite
    suite_obj.execute_tear_downall()
UnboundLocalError: local variable 'suite_obj' referenced before assignment
dts: DTS ended
"

Thank you!

BR,
Wenjie

> -----Original Message-----
> From: dts [mailto:dts-bounces@dpdk.org] On Behalf Of xiao,qimai
> Sent: Friday, April 26, 2019 4:58 PM
> To: dts@dpdk.org
> Cc: Xiao, QimaiX <qimaix.xiao@intel.com>
> Subject: [dts] [PATCH V1] update inline_ipsec to test dpdk19.05
> 
> 1.remove unused imports
> 2.add some steps to verfiy encrypt package 3.upadte verify message of decrypt
> with wrong key
> 
> Signed-off-by: xiao,qimai <qimaix.xiao@intel.com>
> ---
>  tests/TestSuite_inline_ipsec.py | 86 +++++++++++++++++++++++----------
>  1 file changed, 60 insertions(+), 26 deletions(-)
> 
> diff --git a/tests/TestSuite_inline_ipsec.py b/tests/TestSuite_inline_ipsec.py
> index 1813c08..369d4f0 100644
> --- a/tests/TestSuite_inline_ipsec.py
> +++ b/tests/TestSuite_inline_ipsec.py
> @@ -35,14 +35,13 @@ DPDK Test suite.
>  Test inline_ipsec.
>  """
> 
> -import utils
> -import string
> -import time
> +import random
>  import re
> -import threading
> +import time
> +
> +import utils
> +from scapy.all import ESP, IP, Ether, sendp, SecurityAssociation
>  from test_case import TestCase
> -import getopt
> -from scapy.all import *
> 
>  ETHER_STANDARD_MTU = 1518
>  ETHER_JUMBO_FRAME_MTU = 9000
> @@ -51,9 +50,10 @@ ETHER_JUMBO_FRAME_MTU = 9000  class
> TestInlineIpsec(TestCase):
>      """
>      This suite depend PyCryptodome,it provide authenticated encryption
> modes(GCM)
> -    my environment:asn1crypto (0.22.0), pycryptodome (3.4.7), pycryptodomex
> (3.4.7),
> -    pycryptopp
> (0.6.0.1206569328141510525648634803928199668821045408958), scapy
> (2.3.3.dev623)
> +    my environment:cryptography (1.7.2), pycryptodome (3.4.7), pycryptodomex
> (3.4.7),
> +    pycryptopp
> + (0.6.0.1206569328141510525648634803928199668821045408958), scapy
> + (2.4.2)
>      """
> +
>      def set_up_all(self):
>          """
>          Run at the start of each test suite.
> @@ -86,7 +86,7 @@ class TestInlineIpsec(TestCase):
> 
>          self.path = "./examples/ipsec-secgw/build/ipsec-secgw"
>          # add print code in IPSEC app
> -        sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx,
> portid);/i\\printf("[debug]receive %hhu packet in rxqueueid=%hhu\\n",nb_rx,
> queueid);' examples/ipsec-secgw/ipsec-secgw.c"""
> +        sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx,
> portid);/i\\t\t\t\tprintf("[debug]receive %hhu packet in
> rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec-secgw.c"""
>          self.dut.send_expect(sedcmd, "#", 60)
> 
>          # build sample app
> @@ -165,7 +165,8 @@ class TestInlineIpsec(TestCase):
>                  f.write(cfg)
>          self.dut.session.copy_file_to(filename, self.dut.base_dir)
> 
> -    def send_encryption_package(self, intf, paysize=64, do_encrypt=False,
> send_spi=5, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5',
> sa_dst='172.16.2.5'):
> +    def send_encryption_package(self, intf, paysize=64, do_encrypt=False,
> send_spi=5, count=1,
> +                                inner_dst='192.168.105.10', sa_src='172.16.1.5',
> sa_dst='172.16.2.5'):
>          """
>          prepare a packet and send
>          """
> @@ -184,6 +185,8 @@ class TestInlineIpsec(TestCase):
> 
>          if do_encrypt == True:
>              print "send encrypt package"
> +            print("before encrypt, the package info is like below: ")
> +            p.show()
>              e = sa_gcm.encrypt(p)
>          else:
>              print "send normal package"
> @@ -196,14 +199,15 @@ class TestInlineIpsec(TestCase):
>              name='send_encryption_package')
>          sendp(eth_e, iface=intf, count=count)
>          self.tester.destroy_session(session_send)
> +        return payload, p.src, p.dst
> 
> -        return payload
> -
> -    def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32,
> jumboframe=1518, do_encrypt=False, verify=True, send_spi=5,
> receive_spi=1005, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5',
> sa_dst='172.16.2.5'):
> +    def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32,
> jumboframe=1518, do_encrypt=False,
> +                         verify=True, send_spi=5, receive_spi=1005, count=1,
> inner_dst='192.168.105.10',
> +                         sa_src='172.16.1.5', sa_dst='172.16.2.5'):
>          """
>          verify Ipsec receive package
>          """
> -        cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' --log-level 8 --
> socket-mem 1024,1 -- -p 0xf -P -u 0x2 -j %s --config='%s' -f %s" % (
> +        cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null'
> + --log-level 8 --socket-mem 1024,1024 -- -p 0xf -P -u 0x2 -j %s
> + --config='%s' -f %s" % (
>              self.portpci_0, self.portpci_1, jumboframe, config, file_name)
>          self.dut.send_expect(cmd, "IPSEC", 60)
> 
> @@ -213,25 +217,40 @@ class TestInlineIpsec(TestCase):
> 
>          session_receive.send_expect("scapy", ">>>", 10)
>          session_receive.send_expect(
> -            "pkts=sniff(iface='%s',count=1,timeout=10)" % rxItf, "", 30)
> -        send_package = self.send_encryption_package(
> -            txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, sa_dst)
> +            "pkts=sniff(iface='%s',count=1,timeout=45)" % rxItf, "",
> + 10)
> 
> -        time.sleep(10)
> -        out = session_receive.send_expect("pkts", "", 30)
>          if do_encrypt:
> +            send_package = self.send_encryption_package(
> +                txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, sa_dst)
> +            time.sleep(45)
> +            session_receive.send_expect("pkts", "", 30)
>              out = session_receive.send_expect("pkts[0]['IP'] ", ">>>", 10)
>          else:
> +            session_receive2 =
> self.tester.create_session(name='receive_encryption_package2')
> +            session_receive2.send_expect("tcpdump -Xvvvi %s -c 1" % rxItf, "", 30)
> +            send_package = self.send_encryption_package(txItf, paysize,
> do_encrypt, send_spi, count, inner_dst, sa_src,
> +                                                        sa_dst)
> +            time.sleep(45)
> +            rev = session_receive2.get_session_before()
> +            print(rev)
> +            p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),')
> +            res = p.search(rev)
> +            self.verify(res, 'encrypt failed, tcpdump get %s' % rev)
> +            self.tester.destroy_session(session_receive2)
> +            session_receive.send_expect("pkts", "", 30)
>              session_receive.send_expect(sa_gcm, ">>>", 10)
> -            session_receive.send_expect(
> -                "results=sa_gcm.decrypt(pkts[0]['IP'])", ">>>", 10)
> +            time.sleep(2)
> +
> + session_receive.send_expect("results=sa_gcm.decrypt(pkts[0]['IP'])",
> + ">>>", 10)
>              out = session_receive.send_expect("results", ">>>", 10)
> 
>          if verify:
> -            self.verify(send_package in out,
> +            print('received packet content is %s' % out)
> +            print('send pkt src ip is %s, dst ip is %s, payload is %s' % (
> +                send_package[1], send_package[2], send_package[0]))
> +            self.verify(send_package[0] in out,
>                          "Unreceived package or get other package")
>          else:
> -            self.verify(send_package not in out,
> +            self.verify(send_package[0] not in out,
>                          "The function is not in effect")
>          session_receive.send_expect("quit()", "#", 10)
>          self.tester.destroy_session(session_receive)
> @@ -244,6 +263,7 @@ class TestInlineIpsec(TestCase):
>          paysize = random.randint(1, ETHER_STANDARD_MTU)
>          self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg',
>                                self.txItf, self.rxItf, paysize)
> +        self.dut.send_expect("^C", "#", 5)
> 
>      def test_Ipsec_Encryption_Jumboframe(self):
>          """
> @@ -253,6 +273,7 @@ class TestInlineIpsec(TestCase):
>          paysize = random.randint(ETHER_STANDARD_MTU,
> ETHER_JUMBO_FRAME_MTU)
>          self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg',
>                                self.txItf, self.rxItf, paysize, ETHER_JUMBO_FRAME_MTU)
> +        self.dut.send_expect("^C", "#", 5)
> 
>      def test_Ipsec_Encryption_Rss(self):
>          """
> @@ -264,6 +285,7 @@ class TestInlineIpsec(TestCase):
>          out = self.dut.get_session_output()
>          verifycode = "receive 1 packet in rxqueueid=1"
>          self.verify(verifycode in out, "rxqueueid error")
> +        self.dut.send_expect("^C", "#", 5)
> 
>      def test_IPSec_Decryption(self):
>          """
> @@ -273,6 +295,7 @@ class TestInlineIpsec(TestCase):
>          paysize = random.randint(1, ETHER_STANDARD_MTU)
>          self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf,
>                                self.txItf, paysize, do_encrypt=True, count=2)
> +        self.dut.send_expect("^C", "#", 5)
> 
>      def test_IPSec_Decryption_Jumboframe(self):
>          """
> @@ -282,6 +305,7 @@ class TestInlineIpsec(TestCase):
>          paysize = random.randint(ETHER_STANDARD_MTU,
> ETHER_JUMBO_FRAME_MTU)
>          self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf,
>                                self.txItf, paysize, ETHER_JUMBO_FRAME_MTU,
> do_encrypt=True, count=2)
> +        self.dut.send_expect("^C", "#", 5)
> 
>      def test_Ipsec_Decryption_Rss(self):
>          """
> @@ -293,6 +317,7 @@ class TestInlineIpsec(TestCase):
>          out = self.dut.get_session_output()
>          verifycode = "receive 1 packet in rxqueueid=1"
>          self.verify(verifycode in out, "rxqueueid error")
> +        self.dut.send_expect("^C", "#", 5)
> 
>      def test_Ipsec_Decryption_wrongkey(self):
>          """
> @@ -303,8 +328,10 @@ class TestInlineIpsec(TestCase):
>          self.Ipsec_Encryption(config, '/root/dpdk/dec_wrong_key.cfg', self.rxItf,
>                                self.txItf, paysize, do_encrypt=True, verify=False, count=2)
>          out = self.dut.get_session_output()
> -        verifycode = "IPSEC_ESP: failed crypto op"
> -        self.verify(verifycode in out, "Ipsec Decryption wrongkey failed")
> +        verifycode = "IPSEC_ESP: esp_inbound_post() failed crypto op"
> +        l = re.findall(verifycode, out)
> +        self.verify(len(l) == 2, "Ipsec Decryption wrongkey failed")
> +        self.dut.send_expect("^C", "#", 5)
> 
>      def test_Ipsec_Encryption_Decryption(self):
>          """
> @@ -350,10 +377,17 @@ class TestInlineIpsec(TestCase):
>          eth_e2 = Ether() / e2
>          eth_e2.src = self.rx_src
>          eth_e2.dst = self.tx_dst
> -
> +        session_receive3 =
> self.tester.create_session('check_forward_encryption_package')
> +        session_receive3.send_expect("tcpdump -Xvvvi %s -c 1" % self.rxItf, "", 30)
> +        time.sleep(2)
>          sendp(eth_e1, iface=self.rxItf, count=2)
>          sendp(eth_e2, iface=self.txItf, count=1)
>          time.sleep(30)
> +        rev = session_receive3.get_session_before()
> +        print(rev)
> +        p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),')
> +        res = p.search(rev)
> +        self.verify(res, 'encrypt failed, tcpdump get %s' % rev)
>          session_receive.send_expect(
>              "results=sa_gcm.decrypt(pkts[2]['IP'])", ">>>", 60)
>          out = session_receive.send_expect("results", ">>>", 60)
> --
> 2.17.0


^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [dts] [PATCH V1] update inline_ipsec to test dpdk19.05
  2019-05-07  7:18 ` Li, WenjieX A
@ 2019-05-22  9:32   ` Xiao, QimaiX
  0 siblings, 0 replies; 5+ messages in thread
From: Xiao, QimaiX @ 2019-05-22  9:32 UTC (permalink / raw)
  To: Li, WenjieX A, dts

Hi, wenjie:
This case depend on PyCryptodome, ,it provide authenticated encryption modes(GCM).
And due to scapy lower than 2.3.3 lack of some crypto modules, therefor this case can only run on tester with scapy 2.3.3 or later.
I've submitted a patch to update its test plan to specify the dependences info.

Best Regards,
Xiao,qiami

-----Original Message-----
From: Li, WenjieX A 
Sent: Tuesday, May 7, 2019 3:19 PM
To: Xiao, QimaiX <qimaix.xiao@intel.com>; dts@dpdk.org
Cc: Xiao, QimaiX <qimaix.xiao@intel.com>
Subject: RE: [dts] [PATCH V1] update inline_ipsec to test dpdk19.05

Hi Qimai,

1. Please do not change script like this:
> -import utils
> -import time
...
> +import time
> +import utils

2. You are using scapy 2.4.2. Does this script still work with scapy 2.2.* or 2.3.*?  Scapy 2.4.2 can cause below error with current dts(dep/Dot1BR.py).

"dts: name 'ETHER_TYPES' is not defined
Traceback (most recent call last):
  File "./main.py", line 162, in <module>
    args.debug, args.debugcase, args.re_run, args.commands)
  File "/home/autoregression/dts/framework/dts.py", line 563, in run_all
    dts_run_target(duts, tester, targets, test_suites)
  File "/home/autoregression/dts/framework/dts.py", line 381, in dts_run_target
    dts_run_suite(duts, tester, test_suites, target)
  File "/home/autoregression/dts/framework/dts.py", line 435, in dts_run_suite
    suite_obj.execute_tear_downall()
UnboundLocalError: local variable 'suite_obj' referenced before assignment
dts: DTS ended
"

Thank you!

BR,
Wenjie

> -----Original Message-----
> From: dts [mailto:dts-bounces@dpdk.org] On Behalf Of xiao,qimai
> Sent: Friday, April 26, 2019 4:58 PM
> To: dts@dpdk.org
> Cc: Xiao, QimaiX <qimaix.xiao@intel.com>
> Subject: [dts] [PATCH V1] update inline_ipsec to test dpdk19.05
> 
> 1.remove unused imports
> 2.add some steps to verfiy encrypt package 3.upadte verify message of 
> decrypt with wrong key
> 
> Signed-off-by: xiao,qimai <qimaix.xiao@intel.com>
> ---
>  tests/TestSuite_inline_ipsec.py | 86 
> +++++++++++++++++++++++----------
>  1 file changed, 60 insertions(+), 26 deletions(-)
> 
> diff --git a/tests/TestSuite_inline_ipsec.py 
> b/tests/TestSuite_inline_ipsec.py index 1813c08..369d4f0 100644
> --- a/tests/TestSuite_inline_ipsec.py
> +++ b/tests/TestSuite_inline_ipsec.py
> @@ -35,14 +35,13 @@ DPDK Test suite.
>  Test inline_ipsec.
>  """
> 
> -import utils
> -import string
> -import time
> +import random
>  import re
> -import threading
> +import time
> +
> +import utils
> +from scapy.all import ESP, IP, Ether, sendp, SecurityAssociation
>  from test_case import TestCase
> -import getopt
> -from scapy.all import *
> 
>  ETHER_STANDARD_MTU = 1518
>  ETHER_JUMBO_FRAME_MTU = 9000
> @@ -51,9 +50,10 @@ ETHER_JUMBO_FRAME_MTU = 9000  class
> TestInlineIpsec(TestCase):
>      """
>      This suite depend PyCryptodome,it provide authenticated 
> encryption
> modes(GCM)
> -    my environment:asn1crypto (0.22.0), pycryptodome (3.4.7), pycryptodomex
> (3.4.7),
> -    pycryptopp
> (0.6.0.1206569328141510525648634803928199668821045408958), scapy
> (2.3.3.dev623)
> +    my environment:cryptography (1.7.2), pycryptodome (3.4.7), 
> + pycryptodomex
> (3.4.7),
> +    pycryptopp
> + (0.6.0.1206569328141510525648634803928199668821045408958), scapy
> + (2.4.2)
>      """
> +
>      def set_up_all(self):
>          """
>          Run at the start of each test suite.
> @@ -86,7 +86,7 @@ class TestInlineIpsec(TestCase):
> 
>          self.path = "./examples/ipsec-secgw/build/ipsec-secgw"
>          # add print code in IPSEC app
> -        sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx,
> portid);/i\\printf("[debug]receive %hhu packet in 
> rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec-secgw.c"""
> +        sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx,
> portid);/i\\t\t\t\tprintf("[debug]receive %hhu packet in 
> rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec-secgw.c"""
>          self.dut.send_expect(sedcmd, "#", 60)
> 
>          # build sample app
> @@ -165,7 +165,8 @@ class TestInlineIpsec(TestCase):
>                  f.write(cfg)
>          self.dut.session.copy_file_to(filename, self.dut.base_dir)
> 
> -    def send_encryption_package(self, intf, paysize=64, do_encrypt=False,
> send_spi=5, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5',
> sa_dst='172.16.2.5'):
> +    def send_encryption_package(self, intf, paysize=64, 
> + do_encrypt=False,
> send_spi=5, count=1,
> +                                inner_dst='192.168.105.10', 
> + sa_src='172.16.1.5',
> sa_dst='172.16.2.5'):
>          """
>          prepare a packet and send
>          """
> @@ -184,6 +185,8 @@ class TestInlineIpsec(TestCase):
> 
>          if do_encrypt == True:
>              print "send encrypt package"
> +            print("before encrypt, the package info is like below: ")
> +            p.show()
>              e = sa_gcm.encrypt(p)
>          else:
>              print "send normal package"
> @@ -196,14 +199,15 @@ class TestInlineIpsec(TestCase):
>              name='send_encryption_package')
>          sendp(eth_e, iface=intf, count=count)
>          self.tester.destroy_session(session_send)
> +        return payload, p.src, p.dst
> 
> -        return payload
> -
> -    def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32,
> jumboframe=1518, do_encrypt=False, verify=True, send_spi=5, 
> receive_spi=1005, count=1, inner_dst='192.168.105.10', 
> sa_src='172.16.1.5',
> sa_dst='172.16.2.5'):
> +    def Ipsec_Encryption(self, config, file_name, txItf, rxItf, 
> + paysize=32,
> jumboframe=1518, do_encrypt=False,
> +                         verify=True, send_spi=5, receive_spi=1005, 
> + count=1,
> inner_dst='192.168.105.10',
> +                         sa_src='172.16.1.5', sa_dst='172.16.2.5'):
>          """
>          verify Ipsec receive package
>          """
> -        cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' --log-level 8 --
> socket-mem 1024,1 -- -p 0xf -P -u 0x2 -j %s --config='%s' -f %s" % (
> +        cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null'
> + --log-level 8 --socket-mem 1024,1024 -- -p 0xf -P -u 0x2 -j %s 
> + --config='%s' -f %s" % (
>              self.portpci_0, self.portpci_1, jumboframe, config, file_name)
>          self.dut.send_expect(cmd, "IPSEC", 60)
> 
> @@ -213,25 +217,40 @@ class TestInlineIpsec(TestCase):
> 
>          session_receive.send_expect("scapy", ">>>", 10)
>          session_receive.send_expect(
> -            "pkts=sniff(iface='%s',count=1,timeout=10)" % rxItf, "", 30)
> -        send_package = self.send_encryption_package(
> -            txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, sa_dst)
> +            "pkts=sniff(iface='%s',count=1,timeout=45)" % rxItf, "",
> + 10)
> 
> -        time.sleep(10)
> -        out = session_receive.send_expect("pkts", "", 30)
>          if do_encrypt:
> +            send_package = self.send_encryption_package(
> +                txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, sa_dst)
> +            time.sleep(45)
> +            session_receive.send_expect("pkts", "", 30)
>              out = session_receive.send_expect("pkts[0]['IP'] ", ">>>", 10)
>          else:
> +            session_receive2 =
> self.tester.create_session(name='receive_encryption_package2')
> +            session_receive2.send_expect("tcpdump -Xvvvi %s -c 1" % rxItf, "", 30)
> +            send_package = self.send_encryption_package(txItf, 
> + paysize,
> do_encrypt, send_spi, count, inner_dst, sa_src,
> +                                                        sa_dst)
> +            time.sleep(45)
> +            rev = session_receive2.get_session_before()
> +            print(rev)
> +            p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),')
> +            res = p.search(rev)
> +            self.verify(res, 'encrypt failed, tcpdump get %s' % rev)
> +            self.tester.destroy_session(session_receive2)
> +            session_receive.send_expect("pkts", "", 30)
>              session_receive.send_expect(sa_gcm, ">>>", 10)
> -            session_receive.send_expect(
> -                "results=sa_gcm.decrypt(pkts[0]['IP'])", ">>>", 10)
> +            time.sleep(2)
> +
> + session_receive.send_expect("results=sa_gcm.decrypt(pkts[0]['IP'])",
> + ">>>", 10)
>              out = session_receive.send_expect("results", ">>>", 10)
> 
>          if verify:
> -            self.verify(send_package in out,
> +            print('received packet content is %s' % out)
> +            print('send pkt src ip is %s, dst ip is %s, payload is %s' % (
> +                send_package[1], send_package[2], send_package[0]))
> +            self.verify(send_package[0] in out,
>                          "Unreceived package or get other package")
>          else:
> -            self.verify(send_package not in out,
> +            self.verify(send_package[0] not in out,
>                          "The function is not in effect")
>          session_receive.send_expect("quit()", "#", 10)
>          self.tester.destroy_session(session_receive)
> @@ -244,6 +263,7 @@ class TestInlineIpsec(TestCase):
>          paysize = random.randint(1, ETHER_STANDARD_MTU)
>          self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg',
>                                self.txItf, self.rxItf, paysize)
> +        self.dut.send_expect("^C", "#", 5)
> 
>      def test_Ipsec_Encryption_Jumboframe(self):
>          """
> @@ -253,6 +273,7 @@ class TestInlineIpsec(TestCase):
>          paysize = random.randint(ETHER_STANDARD_MTU,
> ETHER_JUMBO_FRAME_MTU)
>          self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg',
>                                self.txItf, self.rxItf, paysize, 
> ETHER_JUMBO_FRAME_MTU)
> +        self.dut.send_expect("^C", "#", 5)
> 
>      def test_Ipsec_Encryption_Rss(self):
>          """
> @@ -264,6 +285,7 @@ class TestInlineIpsec(TestCase):
>          out = self.dut.get_session_output()
>          verifycode = "receive 1 packet in rxqueueid=1"
>          self.verify(verifycode in out, "rxqueueid error")
> +        self.dut.send_expect("^C", "#", 5)
> 
>      def test_IPSec_Decryption(self):
>          """
> @@ -273,6 +295,7 @@ class TestInlineIpsec(TestCase):
>          paysize = random.randint(1, ETHER_STANDARD_MTU)
>          self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf,
>                                self.txItf, paysize, do_encrypt=True, 
> count=2)
> +        self.dut.send_expect("^C", "#", 5)
> 
>      def test_IPSec_Decryption_Jumboframe(self):
>          """
> @@ -282,6 +305,7 @@ class TestInlineIpsec(TestCase):
>          paysize = random.randint(ETHER_STANDARD_MTU,
> ETHER_JUMBO_FRAME_MTU)
>          self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf,
>                                self.txItf, paysize, 
> ETHER_JUMBO_FRAME_MTU, do_encrypt=True, count=2)
> +        self.dut.send_expect("^C", "#", 5)
> 
>      def test_Ipsec_Decryption_Rss(self):
>          """
> @@ -293,6 +317,7 @@ class TestInlineIpsec(TestCase):
>          out = self.dut.get_session_output()
>          verifycode = "receive 1 packet in rxqueueid=1"
>          self.verify(verifycode in out, "rxqueueid error")
> +        self.dut.send_expect("^C", "#", 5)
> 
>      def test_Ipsec_Decryption_wrongkey(self):
>          """
> @@ -303,8 +328,10 @@ class TestInlineIpsec(TestCase):
>          self.Ipsec_Encryption(config, '/root/dpdk/dec_wrong_key.cfg', self.rxItf,
>                                self.txItf, paysize, do_encrypt=True, verify=False, count=2)
>          out = self.dut.get_session_output()
> -        verifycode = "IPSEC_ESP: failed crypto op"
> -        self.verify(verifycode in out, "Ipsec Decryption wrongkey failed")
> +        verifycode = "IPSEC_ESP: esp_inbound_post() failed crypto op"
> +        l = re.findall(verifycode, out)
> +        self.verify(len(l) == 2, "Ipsec Decryption wrongkey failed")
> +        self.dut.send_expect("^C", "#", 5)
> 
>      def test_Ipsec_Encryption_Decryption(self):
>          """
> @@ -350,10 +377,17 @@ class TestInlineIpsec(TestCase):
>          eth_e2 = Ether() / e2
>          eth_e2.src = self.rx_src
>          eth_e2.dst = self.tx_dst
> -
> +        session_receive3 =
> self.tester.create_session('check_forward_encryption_package')
> +        session_receive3.send_expect("tcpdump -Xvvvi %s -c 1" % self.rxItf, "", 30)
> +        time.sleep(2)
>          sendp(eth_e1, iface=self.rxItf, count=2)
>          sendp(eth_e2, iface=self.txItf, count=1)
>          time.sleep(30)
> +        rev = session_receive3.get_session_before()
> +        print(rev)
> +        p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),')
> +        res = p.search(rev)
> +        self.verify(res, 'encrypt failed, tcpdump get %s' % rev)
>          session_receive.send_expect(
>              "results=sa_gcm.decrypt(pkts[2]['IP'])", ">>>", 60)
>          out = session_receive.send_expect("results", ">>>", 60)
> --
> 2.17.0


^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [dts] [PATCH V1] update inline_ipsec to test dpdk19.05
  2019-04-27  2:15 xiao,qimai
@ 2019-04-28  9:05 ` Wu, ChangqingX
  0 siblings, 0 replies; 5+ messages in thread
From: Wu, ChangqingX @ 2019-04-28  9:05 UTC (permalink / raw)
  To: Xiao, QimaiX, dts; +Cc: Xiao, QimaiX

Tested-by: Wu, ChangqingX <changqingx.wu@intel.com>

-----Original Message-----
From: dts [mailto:dts-bounces@dpdk.org] On Behalf Of xiao,qimai
Sent: Saturday, April 27, 2019 10:15 AM
To: dts@dpdk.org
Cc: Xiao, QimaiX <qimaix.xiao@intel.com>
Subject: [dts] [PATCH V1] update inline_ipsec to test dpdk19.05

1.remove unused imports
2.add some steps to verfiy encrypt package 3.upadte verify message of decrypt with wrong key

Signed-off-by: xiao,qimai <qimaix.xiao@intel.com>
---
 tests/TestSuite_inline_ipsec.py | 86 +++++++++++++++++++++++----------
 1 file changed, 60 insertions(+), 26 deletions(-)

diff --git a/tests/TestSuite_inline_ipsec.py b/tests/TestSuite_inline_ipsec.py index 1813c08..369d4f0 100644
--- a/tests/TestSuite_inline_ipsec.py
+++ b/tests/TestSuite_inline_ipsec.py
@@ -35,14 +35,13 @@ DPDK Test suite.
 Test inline_ipsec.
 """
 
-import utils
-import string
-import time
+import random
 import re
-import threading
+import time
+
+import utils
+from scapy.all import ESP, IP, Ether, sendp, SecurityAssociation
 from test_case import TestCase
-import getopt
-from scapy.all import *
 
 ETHER_STANDARD_MTU = 1518
 ETHER_JUMBO_FRAME_MTU = 9000
@@ -51,9 +50,10 @@ ETHER_JUMBO_FRAME_MTU = 9000  class TestInlineIpsec(TestCase):
     """
     This suite depend PyCryptodome,it provide authenticated encryption modes(GCM)
-    my environment:asn1crypto (0.22.0), pycryptodome (3.4.7), pycryptodomex (3.4.7),
-    pycryptopp (0.6.0.1206569328141510525648634803928199668821045408958), scapy (2.3.3.dev623)
+    my environment:cryptography (1.7.2), pycryptodome (3.4.7), pycryptodomex (3.4.7),
+    pycryptopp 
+ (0.6.0.1206569328141510525648634803928199668821045408958), scapy 
+ (2.4.2)
     """
+
     def set_up_all(self):
         """
         Run at the start of each test suite.
@@ -86,7 +86,7 @@ class TestInlineIpsec(TestCase):
 
         self.path = "./examples/ipsec-secgw/build/ipsec-secgw"
         # add print code in IPSEC app
-        sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx, portid);/i\\printf("[debug]receive %hhu packet in rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec-secgw.c"""
+        sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx, portid);/i\\t\t\t\tprintf("[debug]receive %hhu packet in rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec-secgw.c"""
         self.dut.send_expect(sedcmd, "#", 60)
 
         # build sample app
@@ -165,7 +165,8 @@ class TestInlineIpsec(TestCase):
                 f.write(cfg)
         self.dut.session.copy_file_to(filename, self.dut.base_dir)
 
-    def send_encryption_package(self, intf, paysize=64, do_encrypt=False, send_spi=5, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5', sa_dst='172.16.2.5'):
+    def send_encryption_package(self, intf, paysize=64, do_encrypt=False, send_spi=5, count=1,
+                                inner_dst='192.168.105.10', sa_src='172.16.1.5', sa_dst='172.16.2.5'):
         """
         prepare a packet and send
         """
@@ -184,6 +185,8 @@ class TestInlineIpsec(TestCase):
 
         if do_encrypt == True:
             print "send encrypt package"
+            print("before encrypt, the package info is like below: ")
+            p.show()
             e = sa_gcm.encrypt(p)
         else:
             print "send normal package"
@@ -196,14 +199,15 @@ class TestInlineIpsec(TestCase):
             name='send_encryption_package')
         sendp(eth_e, iface=intf, count=count)
         self.tester.destroy_session(session_send)
+        return payload, p.src, p.dst
 
-        return payload
-
-    def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32, jumboframe=1518, do_encrypt=False, verify=True, send_spi=5, receive_spi=1005, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5', sa_dst='172.16.2.5'):
+    def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32, jumboframe=1518, do_encrypt=False,
+                         verify=True, send_spi=5, receive_spi=1005, count=1, inner_dst='192.168.105.10',
+                         sa_src='172.16.1.5', sa_dst='172.16.2.5'):
         """
         verify Ipsec receive package
         """
-        cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' --log-level 8 --socket-mem 1024,1 -- -p 0xf -P -u 0x2 -j %s --config='%s' -f %s" % (
+        cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' 
+ --log-level 8 --socket-mem 1024,1024 -- -p 0xf -P -u 0x2 -j %s 
+ --config='%s' -f %s" % (
             self.portpci_0, self.portpci_1, jumboframe, config, file_name)
         self.dut.send_expect(cmd, "IPSEC", 60)
 
@@ -213,25 +217,40 @@ class TestInlineIpsec(TestCase):
 
         session_receive.send_expect("scapy", ">>>", 10)
         session_receive.send_expect(
-            "pkts=sniff(iface='%s',count=1,timeout=10)" % rxItf, "", 30)
-        send_package = self.send_encryption_package(
-            txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, sa_dst)
+            "pkts=sniff(iface='%s',count=1,timeout=45)" % rxItf, "", 
+ 10)
 
-        time.sleep(10)
-        out = session_receive.send_expect("pkts", "", 30)
         if do_encrypt:
+            send_package = self.send_encryption_package(
+                txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, sa_dst)
+            time.sleep(45)
+            session_receive.send_expect("pkts", "", 30)
             out = session_receive.send_expect("pkts[0]['IP'] ", ">>>", 10)
         else:
+            session_receive2 = self.tester.create_session(name='receive_encryption_package2')
+            session_receive2.send_expect("tcpdump -Xvvvi %s -c 1" % rxItf, "", 30)
+            send_package = self.send_encryption_package(txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src,
+                                                        sa_dst)
+            time.sleep(45)
+            rev = session_receive2.get_session_before()
+            print(rev)
+            p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),')
+            res = p.search(rev)
+            self.verify(res, 'encrypt failed, tcpdump get %s' % rev)
+            self.tester.destroy_session(session_receive2)
+            session_receive.send_expect("pkts", "", 30)
             session_receive.send_expect(sa_gcm, ">>>", 10)
-            session_receive.send_expect(
-                "results=sa_gcm.decrypt(pkts[0]['IP'])", ">>>", 10)
+            time.sleep(2)
+            
+ session_receive.send_expect("results=sa_gcm.decrypt(pkts[0]['IP'])", 
+ ">>>", 10)
             out = session_receive.send_expect("results", ">>>", 10)
 
         if verify:
-            self.verify(send_package in out,
+            print('received packet content is %s' % out)
+            print('send pkt src ip is %s, dst ip is %s, payload is %s' % (
+                send_package[1], send_package[2], send_package[0]))
+            self.verify(send_package[0] in out,
                         "Unreceived package or get other package")
         else:
-            self.verify(send_package not in out,
+            self.verify(send_package[0] not in out,
                         "The function is not in effect")
         session_receive.send_expect("quit()", "#", 10)
         self.tester.destroy_session(session_receive)
@@ -244,6 +263,7 @@ class TestInlineIpsec(TestCase):
         paysize = random.randint(1, ETHER_STANDARD_MTU)
         self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg',
                               self.txItf, self.rxItf, paysize)
+        self.dut.send_expect("^C", "#", 5)
 
     def test_Ipsec_Encryption_Jumboframe(self):
         """
@@ -253,6 +273,7 @@ class TestInlineIpsec(TestCase):
         paysize = random.randint(ETHER_STANDARD_MTU, ETHER_JUMBO_FRAME_MTU)
         self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg',
                               self.txItf, self.rxItf, paysize, ETHER_JUMBO_FRAME_MTU)
+        self.dut.send_expect("^C", "#", 5)
 
     def test_Ipsec_Encryption_Rss(self):
         """
@@ -264,6 +285,7 @@ class TestInlineIpsec(TestCase):
         out = self.dut.get_session_output()
         verifycode = "receive 1 packet in rxqueueid=1"
         self.verify(verifycode in out, "rxqueueid error")
+        self.dut.send_expect("^C", "#", 5)
 
     def test_IPSec_Decryption(self):
         """
@@ -273,6 +295,7 @@ class TestInlineIpsec(TestCase):
         paysize = random.randint(1, ETHER_STANDARD_MTU)
         self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf,
                               self.txItf, paysize, do_encrypt=True, count=2)
+        self.dut.send_expect("^C", "#", 5)
 
     def test_IPSec_Decryption_Jumboframe(self):
         """
@@ -282,6 +305,7 @@ class TestInlineIpsec(TestCase):
         paysize = random.randint(ETHER_STANDARD_MTU, ETHER_JUMBO_FRAME_MTU)
         self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf,
                               self.txItf, paysize, ETHER_JUMBO_FRAME_MTU, do_encrypt=True, count=2)
+        self.dut.send_expect("^C", "#", 5)
 
     def test_Ipsec_Decryption_Rss(self):
         """
@@ -293,6 +317,7 @@ class TestInlineIpsec(TestCase):
         out = self.dut.get_session_output()
         verifycode = "receive 1 packet in rxqueueid=1"
         self.verify(verifycode in out, "rxqueueid error")
+        self.dut.send_expect("^C", "#", 5)
 
     def test_Ipsec_Decryption_wrongkey(self):
         """
@@ -303,8 +328,10 @@ class TestInlineIpsec(TestCase):
         self.Ipsec_Encryption(config, '/root/dpdk/dec_wrong_key.cfg', self.rxItf,
                               self.txItf, paysize, do_encrypt=True, verify=False, count=2)
         out = self.dut.get_session_output()
-        verifycode = "IPSEC_ESP: failed crypto op"
-        self.verify(verifycode in out, "Ipsec Decryption wrongkey failed")
+        verifycode = "IPSEC_ESP: esp_inbound_post\(\) failed crypto op"
+        l = re.findall(verifycode, out)
+        self.verify(len(l) == 2, "Ipsec Decryption wrongkey failed")
+        self.dut.send_expect("^C", "#", 5)
 
     def test_Ipsec_Encryption_Decryption(self):
         """
@@ -350,10 +377,17 @@ class TestInlineIpsec(TestCase):
         eth_e2 = Ether() / e2
         eth_e2.src = self.rx_src
         eth_e2.dst = self.tx_dst
-
+        session_receive3 = self.tester.create_session('check_forward_encryption_package')
+        session_receive3.send_expect("tcpdump -Xvvvi %s -c 1" % self.rxItf, "", 30)
+        time.sleep(2)
         sendp(eth_e1, iface=self.rxItf, count=2)
         sendp(eth_e2, iface=self.txItf, count=1)
         time.sleep(30)
+        rev = session_receive3.get_session_before()
+        print(rev)
+        p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),')
+        res = p.search(rev)
+        self.verify(res, 'encrypt failed, tcpdump get %s' % rev)
         session_receive.send_expect(
             "results=sa_gcm.decrypt(pkts[2]['IP'])", ">>>", 60)
         out = session_receive.send_expect("results", ">>>", 60)
--
2.17.0


^ permalink raw reply	[flat|nested] 5+ messages in thread

* [dts] [PATCH V1] update inline_ipsec to test dpdk19.05
@ 2019-04-27  2:15 xiao,qimai
  2019-04-28  9:05 ` Wu, ChangqingX
  0 siblings, 1 reply; 5+ messages in thread
From: xiao,qimai @ 2019-04-27  2:15 UTC (permalink / raw)
  To: dts; +Cc: xiao,qimai

1.remove unused imports
2.add some steps to verfiy encrypt package
3.upadte verify message of decrypt with wrong key

Signed-off-by: xiao,qimai <qimaix.xiao@intel.com>
---
 tests/TestSuite_inline_ipsec.py | 86 +++++++++++++++++++++++----------
 1 file changed, 60 insertions(+), 26 deletions(-)

diff --git a/tests/TestSuite_inline_ipsec.py b/tests/TestSuite_inline_ipsec.py
index 1813c08..369d4f0 100644
--- a/tests/TestSuite_inline_ipsec.py
+++ b/tests/TestSuite_inline_ipsec.py
@@ -35,14 +35,13 @@ DPDK Test suite.
 Test inline_ipsec.
 """
 
-import utils
-import string
-import time
+import random
 import re
-import threading
+import time
+
+import utils
+from scapy.all import ESP, IP, Ether, sendp, SecurityAssociation
 from test_case import TestCase
-import getopt
-from scapy.all import *
 
 ETHER_STANDARD_MTU = 1518
 ETHER_JUMBO_FRAME_MTU = 9000
@@ -51,9 +50,10 @@ ETHER_JUMBO_FRAME_MTU = 9000
 class TestInlineIpsec(TestCase):
     """
     This suite depend PyCryptodome,it provide authenticated encryption modes(GCM)
-    my environment:asn1crypto (0.22.0), pycryptodome (3.4.7), pycryptodomex (3.4.7),
-    pycryptopp (0.6.0.1206569328141510525648634803928199668821045408958), scapy (2.3.3.dev623)
+    my environment:cryptography (1.7.2), pycryptodome (3.4.7), pycryptodomex (3.4.7),
+    pycryptopp (0.6.0.1206569328141510525648634803928199668821045408958), scapy (2.4.2)
     """
+
     def set_up_all(self):
         """
         Run at the start of each test suite.
@@ -86,7 +86,7 @@ class TestInlineIpsec(TestCase):
 
         self.path = "./examples/ipsec-secgw/build/ipsec-secgw"
         # add print code in IPSEC app
-        sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx, portid);/i\\printf("[debug]receive %hhu packet in rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec-secgw.c"""
+        sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx, portid);/i\\t\t\t\tprintf("[debug]receive %hhu packet in rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec-secgw.c"""
         self.dut.send_expect(sedcmd, "#", 60)
 
         # build sample app
@@ -165,7 +165,8 @@ class TestInlineIpsec(TestCase):
                 f.write(cfg)
         self.dut.session.copy_file_to(filename, self.dut.base_dir)
 
-    def send_encryption_package(self, intf, paysize=64, do_encrypt=False, send_spi=5, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5', sa_dst='172.16.2.5'):
+    def send_encryption_package(self, intf, paysize=64, do_encrypt=False, send_spi=5, count=1,
+                                inner_dst='192.168.105.10', sa_src='172.16.1.5', sa_dst='172.16.2.5'):
         """
         prepare a packet and send
         """
@@ -184,6 +185,8 @@ class TestInlineIpsec(TestCase):
 
         if do_encrypt == True:
             print "send encrypt package"
+            print("before encrypt, the package info is like below: ")
+            p.show()
             e = sa_gcm.encrypt(p)
         else:
             print "send normal package"
@@ -196,14 +199,15 @@ class TestInlineIpsec(TestCase):
             name='send_encryption_package')
         sendp(eth_e, iface=intf, count=count)
         self.tester.destroy_session(session_send)
+        return payload, p.src, p.dst
 
-        return payload
-
-    def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32, jumboframe=1518, do_encrypt=False, verify=True, send_spi=5, receive_spi=1005, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5', sa_dst='172.16.2.5'):
+    def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32, jumboframe=1518, do_encrypt=False,
+                         verify=True, send_spi=5, receive_spi=1005, count=1, inner_dst='192.168.105.10',
+                         sa_src='172.16.1.5', sa_dst='172.16.2.5'):
         """
         verify Ipsec receive package
         """
-        cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' --log-level 8 --socket-mem 1024,1 -- -p 0xf -P -u 0x2 -j %s --config='%s' -f %s" % (
+        cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' --log-level 8 --socket-mem 1024,1024 -- -p 0xf -P -u 0x2 -j %s --config='%s' -f %s" % (
             self.portpci_0, self.portpci_1, jumboframe, config, file_name)
         self.dut.send_expect(cmd, "IPSEC", 60)
 
@@ -213,25 +217,40 @@ class TestInlineIpsec(TestCase):
 
         session_receive.send_expect("scapy", ">>>", 10)
         session_receive.send_expect(
-            "pkts=sniff(iface='%s',count=1,timeout=10)" % rxItf, "", 30)
-        send_package = self.send_encryption_package(
-            txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, sa_dst)
+            "pkts=sniff(iface='%s',count=1,timeout=45)" % rxItf, "", 10)
 
-        time.sleep(10)
-        out = session_receive.send_expect("pkts", "", 30)
         if do_encrypt:
+            send_package = self.send_encryption_package(
+                txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, sa_dst)
+            time.sleep(45)
+            session_receive.send_expect("pkts", "", 30)
             out = session_receive.send_expect("pkts[0]['IP'] ", ">>>", 10)
         else:
+            session_receive2 = self.tester.create_session(name='receive_encryption_package2')
+            session_receive2.send_expect("tcpdump -Xvvvi %s -c 1" % rxItf, "", 30)
+            send_package = self.send_encryption_package(txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src,
+                                                        sa_dst)
+            time.sleep(45)
+            rev = session_receive2.get_session_before()
+            print(rev)
+            p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),')
+            res = p.search(rev)
+            self.verify(res, 'encrypt failed, tcpdump get %s' % rev)
+            self.tester.destroy_session(session_receive2)
+            session_receive.send_expect("pkts", "", 30)
             session_receive.send_expect(sa_gcm, ">>>", 10)
-            session_receive.send_expect(
-                "results=sa_gcm.decrypt(pkts[0]['IP'])", ">>>", 10)
+            time.sleep(2)
+            session_receive.send_expect("results=sa_gcm.decrypt(pkts[0]['IP'])", ">>>", 10)
             out = session_receive.send_expect("results", ">>>", 10)
 
         if verify:
-            self.verify(send_package in out,
+            print('received packet content is %s' % out)
+            print('send pkt src ip is %s, dst ip is %s, payload is %s' % (
+                send_package[1], send_package[2], send_package[0]))
+            self.verify(send_package[0] in out,
                         "Unreceived package or get other package")
         else:
-            self.verify(send_package not in out,
+            self.verify(send_package[0] not in out,
                         "The function is not in effect")
         session_receive.send_expect("quit()", "#", 10)
         self.tester.destroy_session(session_receive)
@@ -244,6 +263,7 @@ class TestInlineIpsec(TestCase):
         paysize = random.randint(1, ETHER_STANDARD_MTU)
         self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg',
                               self.txItf, self.rxItf, paysize)
+        self.dut.send_expect("^C", "#", 5)
 
     def test_Ipsec_Encryption_Jumboframe(self):
         """
@@ -253,6 +273,7 @@ class TestInlineIpsec(TestCase):
         paysize = random.randint(ETHER_STANDARD_MTU, ETHER_JUMBO_FRAME_MTU)
         self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg',
                               self.txItf, self.rxItf, paysize, ETHER_JUMBO_FRAME_MTU)
+        self.dut.send_expect("^C", "#", 5)
 
     def test_Ipsec_Encryption_Rss(self):
         """
@@ -264,6 +285,7 @@ class TestInlineIpsec(TestCase):
         out = self.dut.get_session_output()
         verifycode = "receive 1 packet in rxqueueid=1"
         self.verify(verifycode in out, "rxqueueid error")
+        self.dut.send_expect("^C", "#", 5)
 
     def test_IPSec_Decryption(self):
         """
@@ -273,6 +295,7 @@ class TestInlineIpsec(TestCase):
         paysize = random.randint(1, ETHER_STANDARD_MTU)
         self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf,
                               self.txItf, paysize, do_encrypt=True, count=2)
+        self.dut.send_expect("^C", "#", 5)
 
     def test_IPSec_Decryption_Jumboframe(self):
         """
@@ -282,6 +305,7 @@ class TestInlineIpsec(TestCase):
         paysize = random.randint(ETHER_STANDARD_MTU, ETHER_JUMBO_FRAME_MTU)
         self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf,
                               self.txItf, paysize, ETHER_JUMBO_FRAME_MTU, do_encrypt=True, count=2)
+        self.dut.send_expect("^C", "#", 5)
 
     def test_Ipsec_Decryption_Rss(self):
         """
@@ -293,6 +317,7 @@ class TestInlineIpsec(TestCase):
         out = self.dut.get_session_output()
         verifycode = "receive 1 packet in rxqueueid=1"
         self.verify(verifycode in out, "rxqueueid error")
+        self.dut.send_expect("^C", "#", 5)
 
     def test_Ipsec_Decryption_wrongkey(self):
         """
@@ -303,8 +328,10 @@ class TestInlineIpsec(TestCase):
         self.Ipsec_Encryption(config, '/root/dpdk/dec_wrong_key.cfg', self.rxItf,
                               self.txItf, paysize, do_encrypt=True, verify=False, count=2)
         out = self.dut.get_session_output()
-        verifycode = "IPSEC_ESP: failed crypto op"
-        self.verify(verifycode in out, "Ipsec Decryption wrongkey failed")
+        verifycode = "IPSEC_ESP: esp_inbound_post\(\) failed crypto op"
+        l = re.findall(verifycode, out)
+        self.verify(len(l) == 2, "Ipsec Decryption wrongkey failed")
+        self.dut.send_expect("^C", "#", 5)
 
     def test_Ipsec_Encryption_Decryption(self):
         """
@@ -350,10 +377,17 @@ class TestInlineIpsec(TestCase):
         eth_e2 = Ether() / e2
         eth_e2.src = self.rx_src
         eth_e2.dst = self.tx_dst
-
+        session_receive3 = self.tester.create_session('check_forward_encryption_package')
+        session_receive3.send_expect("tcpdump -Xvvvi %s -c 1" % self.rxItf, "", 30)
+        time.sleep(2)
         sendp(eth_e1, iface=self.rxItf, count=2)
         sendp(eth_e2, iface=self.txItf, count=1)
         time.sleep(30)
+        rev = session_receive3.get_session_before()
+        print(rev)
+        p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),')
+        res = p.search(rev)
+        self.verify(res, 'encrypt failed, tcpdump get %s' % rev)
         session_receive.send_expect(
             "results=sa_gcm.decrypt(pkts[2]['IP'])", ">>>", 60)
         out = session_receive.send_expect("results", ">>>", 60)
-- 
2.17.0


^ permalink raw reply	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2019-05-22  9:32 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-04-26  8:58 [dts] [PATCH V1] update inline_ipsec to test dpdk19.05 xiao,qimai
2019-05-07  7:18 ` Li, WenjieX A
2019-05-22  9:32   ` Xiao, QimaiX
2019-04-27  2:15 xiao,qimai
2019-04-28  9:05 ` Wu, ChangqingX

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).