* [dts] [PATCH V2] update inline_ipsec to test new version dpdk
@ 2019-05-22 10:23 xiao,qimai
2019-05-27 2:12 ` Li, WenjieX A
2019-05-29 2:21 ` Tu, Lijuan
0 siblings, 2 replies; 3+ messages in thread
From: xiao,qimai @ 2019-05-22 10:23 UTC (permalink / raw)
To: dts; +Cc: xiao,qimai
1.remove unused imports
2.add some steps to verfiy encrypt package
3.upadte verify message of decrypt with wrong key
Signed-off-by: xiao,qimai <qimaix.xiao@intel.com>
---
tests/TestSuite_inline_ipsec.py | 82 +++++++++++++++++++++++----------
1 file changed, 58 insertions(+), 24 deletions(-)
diff --git a/tests/TestSuite_inline_ipsec.py b/tests/TestSuite_inline_ipsec.py
index 1813c08..d67bcce 100644
--- a/tests/TestSuite_inline_ipsec.py
+++ b/tests/TestSuite_inline_ipsec.py
@@ -36,13 +36,12 @@ Test inline_ipsec.
"""
import utils
-import string
import time
import re
-import threading
+import random
+
+from scapy.all import ESP, IP, Ether, sendp, SecurityAssociation
from test_case import TestCase
-import getopt
-from scapy.all import *
ETHER_STANDARD_MTU = 1518
ETHER_JUMBO_FRAME_MTU = 9000
@@ -51,9 +50,10 @@ ETHER_JUMBO_FRAME_MTU = 9000
class TestInlineIpsec(TestCase):
"""
This suite depend PyCryptodome,it provide authenticated encryption modes(GCM)
- my environment:asn1crypto (0.22.0), pycryptodome (3.4.7), pycryptodomex (3.4.7),
- pycryptopp (0.6.0.1206569328141510525648634803928199668821045408958), scapy (2.3.3.dev623)
+ my environment:cryptography (1.7.2), pycryptodome (3.4.7), pycryptodomex (3.4.7),
+ pycryptopp (0.6.0.1206569328141510525648634803928199668821045408958), scapy (2.4.2)
"""
+
def set_up_all(self):
"""
Run at the start of each test suite.
@@ -86,7 +86,7 @@ class TestInlineIpsec(TestCase):
self.path = "./examples/ipsec-secgw/build/ipsec-secgw"
# add print code in IPSEC app
- sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx, portid);/i\\printf("[debug]receive %hhu packet in rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec-secgw.c"""
+ sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx, portid);/i\\t\t\t\tprintf("[debug]receive %hhu packet in rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec-secgw.c"""
self.dut.send_expect(sedcmd, "#", 60)
# build sample app
@@ -165,7 +165,8 @@ class TestInlineIpsec(TestCase):
f.write(cfg)
self.dut.session.copy_file_to(filename, self.dut.base_dir)
- def send_encryption_package(self, intf, paysize=64, do_encrypt=False, send_spi=5, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5', sa_dst='172.16.2.5'):
+ def send_encryption_package(self, intf, paysize=64, do_encrypt=False, send_spi=5, count=1,
+ inner_dst='192.168.105.10', sa_src='172.16.1.5', sa_dst='172.16.2.5'):
"""
prepare a packet and send
"""
@@ -184,6 +185,8 @@ class TestInlineIpsec(TestCase):
if do_encrypt == True:
print "send encrypt package"
+ print("before encrypt, the package info is like below: ")
+ p.show()
e = sa_gcm.encrypt(p)
else:
print "send normal package"
@@ -196,14 +199,15 @@ class TestInlineIpsec(TestCase):
name='send_encryption_package')
sendp(eth_e, iface=intf, count=count)
self.tester.destroy_session(session_send)
+ return payload, p.src, p.dst
- return payload
-
- def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32, jumboframe=1518, do_encrypt=False, verify=True, send_spi=5, receive_spi=1005, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5', sa_dst='172.16.2.5'):
+ def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32, jumboframe=1518, do_encrypt=False,
+ verify=True, send_spi=5, receive_spi=1005, count=1, inner_dst='192.168.105.10',
+ sa_src='172.16.1.5', sa_dst='172.16.2.5'):
"""
verify Ipsec receive package
"""
- cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' --log-level 8 --socket-mem 1024,1 -- -p 0xf -P -u 0x2 -j %s --config='%s' -f %s" % (
+ cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' --log-level 8 --socket-mem 1024,1024 -- -p 0xf -P -u 0x2 -j %s --config='%s' -f %s" % (
self.portpci_0, self.portpci_1, jumboframe, config, file_name)
self.dut.send_expect(cmd, "IPSEC", 60)
@@ -213,25 +217,40 @@ class TestInlineIpsec(TestCase):
session_receive.send_expect("scapy", ">>>", 10)
session_receive.send_expect(
- "pkts=sniff(iface='%s',count=1,timeout=10)" % rxItf, "", 30)
- send_package = self.send_encryption_package(
- txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, sa_dst)
+ "pkts=sniff(iface='%s',count=1,timeout=45)" % rxItf, "", 10)
- time.sleep(10)
- out = session_receive.send_expect("pkts", "", 30)
if do_encrypt:
+ send_package = self.send_encryption_package(
+ txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, sa_dst)
+ time.sleep(45)
+ session_receive.send_expect("pkts", "", 30)
out = session_receive.send_expect("pkts[0]['IP'] ", ">>>", 10)
else:
+ session_receive2 = self.tester.create_session(name='receive_encryption_package2')
+ session_receive2.send_expect("tcpdump -Xvvvi %s -c 1" % rxItf, "", 30)
+ send_package = self.send_encryption_package(txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src,
+ sa_dst)
+ time.sleep(45)
+ rev = session_receive2.get_session_before()
+ print(rev)
+ p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),')
+ res = p.search(rev)
+ self.verify(res, 'encrypt failed, tcpdump get %s' % rev)
+ self.tester.destroy_session(session_receive2)
+ session_receive.send_expect("pkts", "", 30)
session_receive.send_expect(sa_gcm, ">>>", 10)
- session_receive.send_expect(
- "results=sa_gcm.decrypt(pkts[0]['IP'])", ">>>", 10)
+ time.sleep(2)
+ session_receive.send_expect("results=sa_gcm.decrypt(pkts[0]['IP'])", ">>>", 10)
out = session_receive.send_expect("results", ">>>", 10)
if verify:
- self.verify(send_package in out,
+ print('received packet content is %s' % out)
+ print('send pkt src ip is %s, dst ip is %s, payload is %s' % (
+ send_package[1], send_package[2], send_package[0]))
+ self.verify(send_package[0] in out,
"Unreceived package or get other package")
else:
- self.verify(send_package not in out,
+ self.verify(send_package[0] not in out,
"The function is not in effect")
session_receive.send_expect("quit()", "#", 10)
self.tester.destroy_session(session_receive)
@@ -244,6 +263,7 @@ class TestInlineIpsec(TestCase):
paysize = random.randint(1, ETHER_STANDARD_MTU)
self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg',
self.txItf, self.rxItf, paysize)
+ self.dut.send_expect("^C", "#", 5)
def test_Ipsec_Encryption_Jumboframe(self):
"""
@@ -253,6 +273,7 @@ class TestInlineIpsec(TestCase):
paysize = random.randint(ETHER_STANDARD_MTU, ETHER_JUMBO_FRAME_MTU)
self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg',
self.txItf, self.rxItf, paysize, ETHER_JUMBO_FRAME_MTU)
+ self.dut.send_expect("^C", "#", 5)
def test_Ipsec_Encryption_Rss(self):
"""
@@ -264,6 +285,7 @@ class TestInlineIpsec(TestCase):
out = self.dut.get_session_output()
verifycode = "receive 1 packet in rxqueueid=1"
self.verify(verifycode in out, "rxqueueid error")
+ self.dut.send_expect("^C", "#", 5)
def test_IPSec_Decryption(self):
"""
@@ -273,6 +295,7 @@ class TestInlineIpsec(TestCase):
paysize = random.randint(1, ETHER_STANDARD_MTU)
self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf,
self.txItf, paysize, do_encrypt=True, count=2)
+ self.dut.send_expect("^C", "#", 5)
def test_IPSec_Decryption_Jumboframe(self):
"""
@@ -282,6 +305,7 @@ class TestInlineIpsec(TestCase):
paysize = random.randint(ETHER_STANDARD_MTU, ETHER_JUMBO_FRAME_MTU)
self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf,
self.txItf, paysize, ETHER_JUMBO_FRAME_MTU, do_encrypt=True, count=2)
+ self.dut.send_expect("^C", "#", 5)
def test_Ipsec_Decryption_Rss(self):
"""
@@ -293,6 +317,7 @@ class TestInlineIpsec(TestCase):
out = self.dut.get_session_output()
verifycode = "receive 1 packet in rxqueueid=1"
self.verify(verifycode in out, "rxqueueid error")
+ self.dut.send_expect("^C", "#", 5)
def test_Ipsec_Decryption_wrongkey(self):
"""
@@ -303,8 +328,10 @@ class TestInlineIpsec(TestCase):
self.Ipsec_Encryption(config, '/root/dpdk/dec_wrong_key.cfg', self.rxItf,
self.txItf, paysize, do_encrypt=True, verify=False, count=2)
out = self.dut.get_session_output()
- verifycode = "IPSEC_ESP: failed crypto op"
- self.verify(verifycode in out, "Ipsec Decryption wrongkey failed")
+ verifycode = "IPSEC_ESP: esp_inbound_post\(\) failed crypto op"
+ l = re.findall(verifycode, out)
+ self.verify(len(l) == 2, "Ipsec Decryption wrongkey failed")
+ self.dut.send_expect("^C", "#", 5)
def test_Ipsec_Encryption_Decryption(self):
"""
@@ -350,10 +377,17 @@ class TestInlineIpsec(TestCase):
eth_e2 = Ether() / e2
eth_e2.src = self.rx_src
eth_e2.dst = self.tx_dst
-
+ session_receive3 = self.tester.create_session('check_forward_encryption_package')
+ session_receive3.send_expect("tcpdump -Xvvvi %s -c 1" % self.rxItf, "", 30)
+ time.sleep(2)
sendp(eth_e1, iface=self.rxItf, count=2)
sendp(eth_e2, iface=self.txItf, count=1)
time.sleep(30)
+ rev = session_receive3.get_session_before()
+ print(rev)
+ p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),')
+ res = p.search(rev)
+ self.verify(res, 'encrypt failed, tcpdump get %s' % rev)
session_receive.send_expect(
"results=sa_gcm.decrypt(pkts[2]['IP'])", ">>>", 60)
out = session_receive.send_expect("results", ">>>", 60)
--
2.17.1
^ permalink raw reply [flat|nested] 3+ messages in thread
* Re: [dts] [PATCH V2] update inline_ipsec to test new version dpdk
2019-05-22 10:23 [dts] [PATCH V2] update inline_ipsec to test new version dpdk xiao,qimai
@ 2019-05-27 2:12 ` Li, WenjieX A
2019-05-29 2:21 ` Tu, Lijuan
1 sibling, 0 replies; 3+ messages in thread
From: Li, WenjieX A @ 2019-05-27 2:12 UTC (permalink / raw)
To: Xiao, QimaiX, dts; +Cc: Xiao, QimaiX
Reviewed-by: Wenjie <wenjiex.a.li@intel.com>
> -----Original Message-----
> From: dts [mailto:dts-bounces@dpdk.org] On Behalf Of xiao,qimai
> Sent: Wednesday, May 22, 2019 6:23 PM
> To: dts@dpdk.org
> Cc: Xiao, QimaiX <qimaix.xiao@intel.com>
> Subject: [dts] [PATCH V2] update inline_ipsec to test new version dpdk
>
> 1.remove unused imports
> 2.add some steps to verfiy encrypt package 3.upadte verify message of
> decrypt with wrong key
>
> Signed-off-by: xiao,qimai <qimaix.xiao@intel.com>
> ---
> tests/TestSuite_inline_ipsec.py | 82 +++++++++++++++++++++++----------
> 1 file changed, 58 insertions(+), 24 deletions(-)
>
> diff --git a/tests/TestSuite_inline_ipsec.py b/tests/TestSuite_inline_ipsec.py
> index 1813c08..d67bcce 100644
> --- a/tests/TestSuite_inline_ipsec.py
> +++ b/tests/TestSuite_inline_ipsec.py
> @@ -36,13 +36,12 @@ Test inline_ipsec.
> """
>
> import utils
> -import string
> import time
> import re
> -import threading
> +import random
> +
> +from scapy.all import ESP, IP, Ether, sendp, SecurityAssociation
> from test_case import TestCase
> -import getopt
> -from scapy.all import *
>
> ETHER_STANDARD_MTU = 1518
> ETHER_JUMBO_FRAME_MTU = 9000
> @@ -51,9 +50,10 @@ ETHER_JUMBO_FRAME_MTU = 9000 class
> TestInlineIpsec(TestCase):
> """
> This suite depend PyCryptodome,it provide authenticated encryption
> modes(GCM)
> - my environment:asn1crypto (0.22.0), pycryptodome (3.4.7),
> pycryptodomex (3.4.7),
> - pycryptopp
> (0.6.0.1206569328141510525648634803928199668821045408958),
> scapy (2.3.3.dev623)
> + my environment:cryptography (1.7.2), pycryptodome (3.4.7),
> pycryptodomex (3.4.7),
> + pycryptopp
> + (0.6.0.1206569328141510525648634803928199668821045408958),
> scapy
> + (2.4.2)
> """
> +
> def set_up_all(self):
> """
> Run at the start of each test suite.
> @@ -86,7 +86,7 @@ class TestInlineIpsec(TestCase):
>
> self.path = "./examples/ipsec-secgw/build/ipsec-secgw"
> # add print code in IPSEC app
> - sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx,
> portid);/i\\printf("[debug]receive %hhu packet in
> rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec-
> secgw.c"""
> + sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx,
> portid);/i\\t\t\t\tprintf("[debug]receive %hhu packet in
> rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec-
> secgw.c"""
> self.dut.send_expect(sedcmd, "#", 60)
>
> # build sample app
> @@ -165,7 +165,8 @@ class TestInlineIpsec(TestCase):
> f.write(cfg)
> self.dut.session.copy_file_to(filename, self.dut.base_dir)
>
> - def send_encryption_package(self, intf, paysize=64, do_encrypt=False,
> send_spi=5, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5',
> sa_dst='172.16.2.5'):
> + def send_encryption_package(self, intf, paysize=64, do_encrypt=False,
> send_spi=5, count=1,
> + inner_dst='192.168.105.10', sa_src='172.16.1.5',
> sa_dst='172.16.2.5'):
> """
> prepare a packet and send
> """
> @@ -184,6 +185,8 @@ class TestInlineIpsec(TestCase):
>
> if do_encrypt == True:
> print "send encrypt package"
> + print("before encrypt, the package info is like below: ")
> + p.show()
> e = sa_gcm.encrypt(p)
> else:
> print "send normal package"
> @@ -196,14 +199,15 @@ class TestInlineIpsec(TestCase):
> name='send_encryption_package')
> sendp(eth_e, iface=intf, count=count)
> self.tester.destroy_session(session_send)
> + return payload, p.src, p.dst
>
> - return payload
> -
> - def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32,
> jumboframe=1518, do_encrypt=False, verify=True, send_spi=5,
> receive_spi=1005, count=1, inner_dst='192.168.105.10',
> sa_src='172.16.1.5', sa_dst='172.16.2.5'):
> + def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32,
> jumboframe=1518, do_encrypt=False,
> + verify=True, send_spi=5, receive_spi=1005, count=1,
> inner_dst='192.168.105.10',
> + sa_src='172.16.1.5', sa_dst='172.16.2.5'):
> """
> verify Ipsec receive package
> """
> - cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' --log-level
> 8 --socket-mem 1024,1 -- -p 0xf -P -u 0x2 -j %s --config='%s' -f %s" % (
> + cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null'
> + --log-level 8 --socket-mem 1024,1024 -- -p 0xf -P -u 0x2 -j %s
> + --config='%s' -f %s" % (
> self.portpci_0, self.portpci_1, jumboframe, config, file_name)
> self.dut.send_expect(cmd, "IPSEC", 60)
>
> @@ -213,25 +217,40 @@ class TestInlineIpsec(TestCase):
>
> session_receive.send_expect("scapy", ">>>", 10)
> session_receive.send_expect(
> - "pkts=sniff(iface='%s',count=1,timeout=10)" % rxItf, "", 30)
> - send_package = self.send_encryption_package(
> - txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, sa_dst)
> + "pkts=sniff(iface='%s',count=1,timeout=45)" % rxItf, "",
> + 10)
>
> - time.sleep(10)
> - out = session_receive.send_expect("pkts", "", 30)
> if do_encrypt:
> + send_package = self.send_encryption_package(
> + txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src,
> sa_dst)
> + time.sleep(45)
> + session_receive.send_expect("pkts", "", 30)
> out = session_receive.send_expect("pkts[0]['IP'] ", ">>>", 10)
> else:
> + session_receive2 =
> self.tester.create_session(name='receive_encryption_package2')
> + session_receive2.send_expect("tcpdump -Xvvvi %s -c 1" % rxItf, "",
> 30)
> + send_package = self.send_encryption_package(txItf, paysize,
> do_encrypt, send_spi, count, inner_dst, sa_src,
> + sa_dst)
> + time.sleep(45)
> + rev = session_receive2.get_session_before()
> + print(rev)
> + p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),')
> + res = p.search(rev)
> + self.verify(res, 'encrypt failed, tcpdump get %s' % rev)
> + self.tester.destroy_session(session_receive2)
> + session_receive.send_expect("pkts", "", 30)
> session_receive.send_expect(sa_gcm, ">>>", 10)
> - session_receive.send_expect(
> - "results=sa_gcm.decrypt(pkts[0]['IP'])", ">>>", 10)
> + time.sleep(2)
> +
> + session_receive.send_expect("results=sa_gcm.decrypt(pkts[0]['IP'])",
> + ">>>", 10)
> out = session_receive.send_expect("results", ">>>", 10)
>
> if verify:
> - self.verify(send_package in out,
> + print('received packet content is %s' % out)
> + print('send pkt src ip is %s, dst ip is %s, payload is %s' % (
> + send_package[1], send_package[2], send_package[0]))
> + self.verify(send_package[0] in out,
> "Unreceived package or get other package")
> else:
> - self.verify(send_package not in out,
> + self.verify(send_package[0] not in out,
> "The function is not in effect")
> session_receive.send_expect("quit()", "#", 10)
> self.tester.destroy_session(session_receive)
> @@ -244,6 +263,7 @@ class TestInlineIpsec(TestCase):
> paysize = random.randint(1, ETHER_STANDARD_MTU)
> self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg',
> self.txItf, self.rxItf, paysize)
> + self.dut.send_expect("^C", "#", 5)
>
> def test_Ipsec_Encryption_Jumboframe(self):
> """
> @@ -253,6 +273,7 @@ class TestInlineIpsec(TestCase):
> paysize = random.randint(ETHER_STANDARD_MTU,
> ETHER_JUMBO_FRAME_MTU)
> self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg',
> self.txItf, self.rxItf, paysize, ETHER_JUMBO_FRAME_MTU)
> + self.dut.send_expect("^C", "#", 5)
>
> def test_Ipsec_Encryption_Rss(self):
> """
> @@ -264,6 +285,7 @@ class TestInlineIpsec(TestCase):
> out = self.dut.get_session_output()
> verifycode = "receive 1 packet in rxqueueid=1"
> self.verify(verifycode in out, "rxqueueid error")
> + self.dut.send_expect("^C", "#", 5)
>
> def test_IPSec_Decryption(self):
> """
> @@ -273,6 +295,7 @@ class TestInlineIpsec(TestCase):
> paysize = random.randint(1, ETHER_STANDARD_MTU)
> self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf,
> self.txItf, paysize, do_encrypt=True, count=2)
> + self.dut.send_expect("^C", "#", 5)
>
> def test_IPSec_Decryption_Jumboframe(self):
> """
> @@ -282,6 +305,7 @@ class TestInlineIpsec(TestCase):
> paysize = random.randint(ETHER_STANDARD_MTU,
> ETHER_JUMBO_FRAME_MTU)
> self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf,
> self.txItf, paysize, ETHER_JUMBO_FRAME_MTU,
> do_encrypt=True, count=2)
> + self.dut.send_expect("^C", "#", 5)
>
> def test_Ipsec_Decryption_Rss(self):
> """
> @@ -293,6 +317,7 @@ class TestInlineIpsec(TestCase):
> out = self.dut.get_session_output()
> verifycode = "receive 1 packet in rxqueueid=1"
> self.verify(verifycode in out, "rxqueueid error")
> + self.dut.send_expect("^C", "#", 5)
>
> def test_Ipsec_Decryption_wrongkey(self):
> """
> @@ -303,8 +328,10 @@ class TestInlineIpsec(TestCase):
> self.Ipsec_Encryption(config, '/root/dpdk/dec_wrong_key.cfg',
> self.rxItf,
> self.txItf, paysize, do_encrypt=True, verify=False, count=2)
> out = self.dut.get_session_output()
> - verifycode = "IPSEC_ESP: failed crypto op"
> - self.verify(verifycode in out, "Ipsec Decryption wrongkey failed")
> + verifycode = "IPSEC_ESP: esp_inbound_post\(\) failed crypto op"
> + l = re.findall(verifycode, out)
> + self.verify(len(l) == 2, "Ipsec Decryption wrongkey failed")
> + self.dut.send_expect("^C", "#", 5)
>
> def test_Ipsec_Encryption_Decryption(self):
> """
> @@ -350,10 +377,17 @@ class TestInlineIpsec(TestCase):
> eth_e2 = Ether() / e2
> eth_e2.src = self.rx_src
> eth_e2.dst = self.tx_dst
> -
> + session_receive3 =
> self.tester.create_session('check_forward_encryption_package')
> + session_receive3.send_expect("tcpdump -Xvvvi %s -c 1" % self.rxItf, "",
> 30)
> + time.sleep(2)
> sendp(eth_e1, iface=self.rxItf, count=2)
> sendp(eth_e2, iface=self.txItf, count=1)
> time.sleep(30)
> + rev = session_receive3.get_session_before()
> + print(rev)
> + p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),')
> + res = p.search(rev)
> + self.verify(res, 'encrypt failed, tcpdump get %s' % rev)
> session_receive.send_expect(
> "results=sa_gcm.decrypt(pkts[2]['IP'])", ">>>", 60)
> out = session_receive.send_expect("results", ">>>", 60)
> --
> 2.17.1
^ permalink raw reply [flat|nested] 3+ messages in thread
* Re: [dts] [PATCH V2] update inline_ipsec to test new version dpdk
2019-05-22 10:23 [dts] [PATCH V2] update inline_ipsec to test new version dpdk xiao,qimai
2019-05-27 2:12 ` Li, WenjieX A
@ 2019-05-29 2:21 ` Tu, Lijuan
1 sibling, 0 replies; 3+ messages in thread
From: Tu, Lijuan @ 2019-05-29 2:21 UTC (permalink / raw)
To: Xiao, QimaiX, dts; +Cc: Xiao, QimaiX
Applied, thanks
> -----Original Message-----
> From: dts [mailto:dts-bounces@dpdk.org] On Behalf Of xiao,qimai
> Sent: Wednesday, May 22, 2019 6:23 PM
> To: dts@dpdk.org
> Cc: Xiao, QimaiX <qimaix.xiao@intel.com>
> Subject: [dts] [PATCH V2] update inline_ipsec to test new version dpdk
>
> 1.remove unused imports
> 2.add some steps to verfiy encrypt package 3.upadte verify message of
> decrypt with wrong key
>
> Signed-off-by: xiao,qimai <qimaix.xiao@intel.com>
> ---
> tests/TestSuite_inline_ipsec.py | 82 +++++++++++++++++++++++----------
> 1 file changed, 58 insertions(+), 24 deletions(-)
>
> diff --git a/tests/TestSuite_inline_ipsec.py b/tests/TestSuite_inline_ipsec.py
> index 1813c08..d67bcce 100644
> --- a/tests/TestSuite_inline_ipsec.py
> +++ b/tests/TestSuite_inline_ipsec.py
> @@ -36,13 +36,12 @@ Test inline_ipsec.
> """
>
> import utils
> -import string
> import time
> import re
> -import threading
> +import random
> +
> +from scapy.all import ESP, IP, Ether, sendp, SecurityAssociation
> from test_case import TestCase
> -import getopt
> -from scapy.all import *
>
> ETHER_STANDARD_MTU = 1518
> ETHER_JUMBO_FRAME_MTU = 9000
> @@ -51,9 +50,10 @@ ETHER_JUMBO_FRAME_MTU = 9000 class
> TestInlineIpsec(TestCase):
> """
> This suite depend PyCryptodome,it provide authenticated encryption
> modes(GCM)
> - my environment:asn1crypto (0.22.0), pycryptodome (3.4.7),
> pycryptodomex (3.4.7),
> - pycryptopp
> (0.6.0.1206569328141510525648634803928199668821045408958), scapy
> (2.3.3.dev623)
> + my environment:cryptography (1.7.2), pycryptodome (3.4.7),
> pycryptodomex (3.4.7),
> + pycryptopp
> + (0.6.0.1206569328141510525648634803928199668821045408958), scapy
> + (2.4.2)
> """
> +
> def set_up_all(self):
> """
> Run at the start of each test suite.
> @@ -86,7 +86,7 @@ class TestInlineIpsec(TestCase):
>
> self.path = "./examples/ipsec-secgw/build/ipsec-secgw"
> # add print code in IPSEC app
> - sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx,
> portid);/i\\printf("[debug]receive %hhu packet in rxqueueid=%hhu\\n",nb_rx,
> queueid);' examples/ipsec-secgw/ipsec-secgw.c"""
> + sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx,
> portid);/i\\t\t\t\tprintf("[debug]receive %hhu packet in
> rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec-
> secgw.c"""
> self.dut.send_expect(sedcmd, "#", 60)
>
> # build sample app
> @@ -165,7 +165,8 @@ class TestInlineIpsec(TestCase):
> f.write(cfg)
> self.dut.session.copy_file_to(filename, self.dut.base_dir)
>
> - def send_encryption_package(self, intf, paysize=64, do_encrypt=False,
> send_spi=5, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5',
> sa_dst='172.16.2.5'):
> + def send_encryption_package(self, intf, paysize=64, do_encrypt=False,
> send_spi=5, count=1,
> + inner_dst='192.168.105.10', sa_src='172.16.1.5',
> sa_dst='172.16.2.5'):
> """
> prepare a packet and send
> """
> @@ -184,6 +185,8 @@ class TestInlineIpsec(TestCase):
>
> if do_encrypt == True:
> print "send encrypt package"
> + print("before encrypt, the package info is like below: ")
> + p.show()
> e = sa_gcm.encrypt(p)
> else:
> print "send normal package"
> @@ -196,14 +199,15 @@ class TestInlineIpsec(TestCase):
> name='send_encryption_package')
> sendp(eth_e, iface=intf, count=count)
> self.tester.destroy_session(session_send)
> + return payload, p.src, p.dst
>
> - return payload
> -
> - def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32,
> jumboframe=1518, do_encrypt=False, verify=True, send_spi=5,
> receive_spi=1005, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5',
> sa_dst='172.16.2.5'):
> + def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32,
> jumboframe=1518, do_encrypt=False,
> + verify=True, send_spi=5, receive_spi=1005, count=1,
> inner_dst='192.168.105.10',
> + sa_src='172.16.1.5', sa_dst='172.16.2.5'):
> """
> verify Ipsec receive package
> """
> - cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' --log-level 8
> --socket-mem 1024,1 -- -p 0xf -P -u 0x2 -j %s --config='%s' -f %s" % (
> + cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null'
> + --log-level 8 --socket-mem 1024,1024 -- -p 0xf -P -u 0x2 -j %s
> + --config='%s' -f %s" % (
> self.portpci_0, self.portpci_1, jumboframe, config, file_name)
> self.dut.send_expect(cmd, "IPSEC", 60)
>
> @@ -213,25 +217,40 @@ class TestInlineIpsec(TestCase):
>
> session_receive.send_expect("scapy", ">>>", 10)
> session_receive.send_expect(
> - "pkts=sniff(iface='%s',count=1,timeout=10)" % rxItf, "", 30)
> - send_package = self.send_encryption_package(
> - txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, sa_dst)
> + "pkts=sniff(iface='%s',count=1,timeout=45)" % rxItf, "",
> + 10)
>
> - time.sleep(10)
> - out = session_receive.send_expect("pkts", "", 30)
> if do_encrypt:
> + send_package = self.send_encryption_package(
> + txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src,
> sa_dst)
> + time.sleep(45)
> + session_receive.send_expect("pkts", "", 30)
> out = session_receive.send_expect("pkts[0]['IP'] ", ">>>", 10)
> else:
> + session_receive2 =
> self.tester.create_session(name='receive_encryption_package2')
> + session_receive2.send_expect("tcpdump -Xvvvi %s -c 1" % rxItf, "", 30)
> + send_package = self.send_encryption_package(txItf, paysize,
> do_encrypt, send_spi, count, inner_dst, sa_src,
> + sa_dst)
> + time.sleep(45)
> + rev = session_receive2.get_session_before()
> + print(rev)
> + p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),')
> + res = p.search(rev)
> + self.verify(res, 'encrypt failed, tcpdump get %s' % rev)
> + self.tester.destroy_session(session_receive2)
> + session_receive.send_expect("pkts", "", 30)
> session_receive.send_expect(sa_gcm, ">>>", 10)
> - session_receive.send_expect(
> - "results=sa_gcm.decrypt(pkts[0]['IP'])", ">>>", 10)
> + time.sleep(2)
> +
> + session_receive.send_expect("results=sa_gcm.decrypt(pkts[0]['IP'])",
> + ">>>", 10)
> out = session_receive.send_expect("results", ">>>", 10)
>
> if verify:
> - self.verify(send_package in out,
> + print('received packet content is %s' % out)
> + print('send pkt src ip is %s, dst ip is %s, payload is %s' % (
> + send_package[1], send_package[2], send_package[0]))
> + self.verify(send_package[0] in out,
> "Unreceived package or get other package")
> else:
> - self.verify(send_package not in out,
> + self.verify(send_package[0] not in out,
> "The function is not in effect")
> session_receive.send_expect("quit()", "#", 10)
> self.tester.destroy_session(session_receive)
> @@ -244,6 +263,7 @@ class TestInlineIpsec(TestCase):
> paysize = random.randint(1, ETHER_STANDARD_MTU)
> self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg',
> self.txItf, self.rxItf, paysize)
> + self.dut.send_expect("^C", "#", 5)
>
> def test_Ipsec_Encryption_Jumboframe(self):
> """
> @@ -253,6 +273,7 @@ class TestInlineIpsec(TestCase):
> paysize = random.randint(ETHER_STANDARD_MTU,
> ETHER_JUMBO_FRAME_MTU)
> self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg',
> self.txItf, self.rxItf, paysize, ETHER_JUMBO_FRAME_MTU)
> + self.dut.send_expect("^C", "#", 5)
>
> def test_Ipsec_Encryption_Rss(self):
> """
> @@ -264,6 +285,7 @@ class TestInlineIpsec(TestCase):
> out = self.dut.get_session_output()
> verifycode = "receive 1 packet in rxqueueid=1"
> self.verify(verifycode in out, "rxqueueid error")
> + self.dut.send_expect("^C", "#", 5)
>
> def test_IPSec_Decryption(self):
> """
> @@ -273,6 +295,7 @@ class TestInlineIpsec(TestCase):
> paysize = random.randint(1, ETHER_STANDARD_MTU)
> self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf,
> self.txItf, paysize, do_encrypt=True, count=2)
> + self.dut.send_expect("^C", "#", 5)
>
> def test_IPSec_Decryption_Jumboframe(self):
> """
> @@ -282,6 +305,7 @@ class TestInlineIpsec(TestCase):
> paysize = random.randint(ETHER_STANDARD_MTU,
> ETHER_JUMBO_FRAME_MTU)
> self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf,
> self.txItf, paysize, ETHER_JUMBO_FRAME_MTU,
> do_encrypt=True, count=2)
> + self.dut.send_expect("^C", "#", 5)
>
> def test_Ipsec_Decryption_Rss(self):
> """
> @@ -293,6 +317,7 @@ class TestInlineIpsec(TestCase):
> out = self.dut.get_session_output()
> verifycode = "receive 1 packet in rxqueueid=1"
> self.verify(verifycode in out, "rxqueueid error")
> + self.dut.send_expect("^C", "#", 5)
>
> def test_Ipsec_Decryption_wrongkey(self):
> """
> @@ -303,8 +328,10 @@ class TestInlineIpsec(TestCase):
> self.Ipsec_Encryption(config, '/root/dpdk/dec_wrong_key.cfg', self.rxItf,
> self.txItf, paysize, do_encrypt=True, verify=False, count=2)
> out = self.dut.get_session_output()
> - verifycode = "IPSEC_ESP: failed crypto op"
> - self.verify(verifycode in out, "Ipsec Decryption wrongkey failed")
> + verifycode = "IPSEC_ESP: esp_inbound_post\(\) failed crypto op"
> + l = re.findall(verifycode, out)
> + self.verify(len(l) == 2, "Ipsec Decryption wrongkey failed")
> + self.dut.send_expect("^C", "#", 5)
>
> def test_Ipsec_Encryption_Decryption(self):
> """
> @@ -350,10 +377,17 @@ class TestInlineIpsec(TestCase):
> eth_e2 = Ether() / e2
> eth_e2.src = self.rx_src
> eth_e2.dst = self.tx_dst
> -
> + session_receive3 =
> self.tester.create_session('check_forward_encryption_package')
> + session_receive3.send_expect("tcpdump -Xvvvi %s -c 1" % self.rxItf, "",
> 30)
> + time.sleep(2)
> sendp(eth_e1, iface=self.rxItf, count=2)
> sendp(eth_e2, iface=self.txItf, count=1)
> time.sleep(30)
> + rev = session_receive3.get_session_before()
> + print(rev)
> + p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),')
> + res = p.search(rev)
> + self.verify(res, 'encrypt failed, tcpdump get %s' % rev)
> session_receive.send_expect(
> "results=sa_gcm.decrypt(pkts[2]['IP'])", ">>>", 60)
> out = session_receive.send_expect("results", ">>>", 60)
> --
> 2.17.1
^ permalink raw reply [flat|nested] 3+ messages in thread
end of thread, other threads:[~2019-05-29 2:21 UTC | newest]
Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-05-22 10:23 [dts] [PATCH V2] update inline_ipsec to test new version dpdk xiao,qimai
2019-05-27 2:12 ` Li, WenjieX A
2019-05-29 2:21 ` Tu, Lijuan
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).