* [dts] [PATCH V1] update tests/TestSuite_inline_ipsec.py for dpdk_19.05 @ 2019-04-11 9:54 xiao,qimai 2019-04-12 8:12 ` Wu, ChangqingX 2019-04-15 21:04 ` Tu, Lijuan 0 siblings, 2 replies; 5+ messages in thread From: xiao,qimai @ 2019-04-11 9:54 UTC (permalink / raw) To: dts; +Cc: xiao,qimai update tests/TestSuite_inline_ipsec.py to test dpdk_19.05 Signed-off-by: xiao,qimai <qimaix.xiao@intel.com> --- tests/TestSuite_inline_ipsec.py | 96 ++++++++++++++++++++++----------- 1 file changed, 64 insertions(+), 32 deletions(-) diff --git a/tests/TestSuite_inline_ipsec.py b/tests/TestSuite_inline_ipsec.py index 1813c08..9dcb4f4 100644 --- a/tests/TestSuite_inline_ipsec.py +++ b/tests/TestSuite_inline_ipsec.py @@ -28,21 +28,20 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - +# -*- coding:utf-8 -*- """ DPDK Test suite. Test inline_ipsec. """ -import utils -import string -import time +import random import re -import threading +import time + +import utils +from scapy.all import ESP, IP, Ether, sendp, SecurityAssociation from test_case import TestCase -import getopt -from scapy.all import * ETHER_STANDARD_MTU = 1518 ETHER_JUMBO_FRAME_MTU = 9000 @@ -51,14 +50,14 @@ ETHER_JUMBO_FRAME_MTU = 9000 class TestInlineIpsec(TestCase): """ This suite depend PyCryptodome,it provide authenticated encryption modes(GCM) - my environment:asn1crypto (0.22.0), pycryptodome (3.4.7), pycryptodomex (3.4.7), - pycryptopp (0.6.0.1206569328141510525648634803928199668821045408958), scapy (2.3.3.dev623) + my environment:cryptography (1.7.2), pycryptodome (3.4.7), pycryptodomex (3.4.7), + pycryptopp (0.6.0.1206569328141510525648634803928199668821045408958), scapy (2.4.2) """ + def set_up_all(self): """ Run at the start of each test suite. """ - self.verify(self.nic in ["niantic"], "%s NIC not support" % self.nic) self.verify(self.drivername in ["vfio-pci"], "%s drivername not support" % self.drivername) self.dut_ports = self.dut.get_ports(self.nic) self.verify(len(self.dut_ports) >= 2, "Insufficient ports") @@ -86,7 +85,7 @@ class TestInlineIpsec(TestCase): self.path = "./examples/ipsec-secgw/build/ipsec-secgw" # add print code in IPSEC app - sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx, portid);/i\\printf("[debug]receive %hhu packet in rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec-secgw.c""" + sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx, portid);/i\\t\t\t\tprintf("[debug]receive %hhu packet in rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec-secgw.c""" self.dut.send_expect(sedcmd, "#", 60) # build sample app @@ -158,14 +157,15 @@ class TestInlineIpsec(TestCase): def set_cfg(self, filename, cfg): """ - open file and write cfg, scp it to dut base directory + open file and write cfg, scp it to dut base directory """ for i in cfg: with open(filename, 'w') as f: f.write(cfg) self.dut.session.copy_file_to(filename, self.dut.base_dir) - def send_encryption_package(self, intf, paysize=64, do_encrypt=False, send_spi=5, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5', sa_dst='172.16.2.5'): + def send_encryption_package(self, intf, paysize=64, do_encrypt=False, send_spi=5, count=1, + inner_dst='192.168.105.10', sa_src='172.16.1.5', sa_dst='172.16.2.5'): """ prepare a packet and send """ @@ -184,6 +184,8 @@ class TestInlineIpsec(TestCase): if do_encrypt == True: print "send encrypt package" + print("before encrypt, the package info is like below: ") + p.show() e = sa_gcm.encrypt(p) else: print "send normal package" @@ -196,42 +198,59 @@ class TestInlineIpsec(TestCase): name='send_encryption_package') sendp(eth_e, iface=intf, count=count) self.tester.destroy_session(session_send) + return payload, p.src, p.dst - return payload - - def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32, jumboframe=1518, do_encrypt=False, verify=True, send_spi=5, receive_spi=1005, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5', sa_dst='172.16.2.5'): + def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32, jumboframe=1518, do_encrypt=False, + verify=True, send_spi=5, receive_spi=1005, count=1, inner_dst='192.168.105.10', + sa_src='172.16.1.5', sa_dst='172.16.2.5'): """ verify Ipsec receive package """ - cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' --log-level 8 --socket-mem 1024,1 -- -p 0xf -P -u 0x2 -j %s --config='%s' -f %s" % ( + cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' --log-level 8 --socket-mem 1024,1024 -- -p 0xf -P -u 0x2 -j %s --config='%s' -f %s" % ( self.portpci_0, self.portpci_1, jumboframe, config, file_name) self.dut.send_expect(cmd, "IPSEC", 60) session_receive = self.tester.create_session( name='receive_encryption_package') + sa_gcm = r"sa_gcm=SecurityAssociation(ESP,spi=%s,crypt_algo='AES-GCM',crypt_key='\x2b\x7e\x15\x16\x28\xae\xd2\xa6\xab\xf7\x15\x88\x09\xcf\x4f\x3d\xde\xad\xbe\xef',auth_algo='NULL',auth_key=None,tunnel_header=IP(src='172.16.1.5',dst='172.16.2.5'))" % receive_spi session_receive.send_expect("scapy", ">>>", 10) session_receive.send_expect( - "pkts=sniff(iface='%s',count=1,timeout=10)" % rxItf, "", 30) - send_package = self.send_encryption_package( - txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, sa_dst) + "pkts=sniff(iface='%s',count=1,timeout=45)" % rxItf, "", 10) - time.sleep(10) - out = session_receive.send_expect("pkts", "", 30) if do_encrypt: + send_package = self.send_encryption_package( + txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, sa_dst) + time.sleep(45) + session_receive.send_expect("pkts", "", 30) out = session_receive.send_expect("pkts[0]['IP'] ", ">>>", 10) else: + session_receive2 = self.tester.create_session(name='receive_encryption_package2') + session_receive2.send_expect("tcpdump -Xvvvi %s -c 1" % rxItf, "", 30) + send_package = self.send_encryption_package(txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, + sa_dst) + time.sleep(45) + rev = session_receive2.get_session_before() + print(rev) + p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),') + res = p.search(rev) + self.verify(res, 'encrypt failed, tcpdump get %s' % rev) + self.tester.destroy_session(session_receive2) + session_receive.send_expect("pkts", "", 30) session_receive.send_expect(sa_gcm, ">>>", 10) - session_receive.send_expect( - "results=sa_gcm.decrypt(pkts[0]['IP'])", ">>>", 10) + time.sleep(2) + session_receive.send_expect("results=sa_gcm.decrypt(pkts[0]['IP'])", ">>>", 10) out = session_receive.send_expect("results", ">>>", 10) if verify: - self.verify(send_package in out, + print('received packet content is %s' % out) + print('send pkt src ip is %s, dst ip is %s, payload is %s' % ( + send_package[1], send_package[2], send_package[0])) + self.verify(send_package[0] in out, "Unreceived package or get other package") else: - self.verify(send_package not in out, + self.verify(send_package[0] not in out, "The function is not in effect") session_receive.send_expect("quit()", "#", 10) self.tester.destroy_session(session_receive) @@ -244,6 +263,7 @@ class TestInlineIpsec(TestCase): paysize = random.randint(1, ETHER_STANDARD_MTU) self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg', self.txItf, self.rxItf, paysize) + self.dut.send_expect("^C", "#", 5) def test_Ipsec_Encryption_Jumboframe(self): """ @@ -253,6 +273,7 @@ class TestInlineIpsec(TestCase): paysize = random.randint(ETHER_STANDARD_MTU, ETHER_JUMBO_FRAME_MTU) self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg', self.txItf, self.rxItf, paysize, ETHER_JUMBO_FRAME_MTU) + self.dut.send_expect("^C", "#", 5) def test_Ipsec_Encryption_Rss(self): """ @@ -264,6 +285,7 @@ class TestInlineIpsec(TestCase): out = self.dut.get_session_output() verifycode = "receive 1 packet in rxqueueid=1" self.verify(verifycode in out, "rxqueueid error") + self.dut.send_expect("^C", "#", 5) def test_IPSec_Decryption(self): """ @@ -273,6 +295,7 @@ class TestInlineIpsec(TestCase): paysize = random.randint(1, ETHER_STANDARD_MTU) self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf, self.txItf, paysize, do_encrypt=True, count=2) + self.dut.send_expect("^C", "#", 5) def test_IPSec_Decryption_Jumboframe(self): """ @@ -282,6 +305,7 @@ class TestInlineIpsec(TestCase): paysize = random.randint(ETHER_STANDARD_MTU, ETHER_JUMBO_FRAME_MTU) self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf, self.txItf, paysize, ETHER_JUMBO_FRAME_MTU, do_encrypt=True, count=2) + self.dut.send_expect("^C", "#", 5) def test_Ipsec_Decryption_Rss(self): """ @@ -293,6 +317,7 @@ class TestInlineIpsec(TestCase): out = self.dut.get_session_output() verifycode = "receive 1 packet in rxqueueid=1" self.verify(verifycode in out, "rxqueueid error") + self.dut.send_expect("^C", "#", 5) def test_Ipsec_Decryption_wrongkey(self): """ @@ -303,8 +328,10 @@ class TestInlineIpsec(TestCase): self.Ipsec_Encryption(config, '/root/dpdk/dec_wrong_key.cfg', self.rxItf, self.txItf, paysize, do_encrypt=True, verify=False, count=2) out = self.dut.get_session_output() - verifycode = "IPSEC_ESP: failed crypto op" - self.verify(verifycode in out, "Ipsec Decryption wrongkey failed") + verifycode = "IPSEC_ESP: esp_inbound_post() failed crypto op" + l = re.findall(verifycode, out) + self.verify(len(l) == 2, "Ipsec Decryption wrongkey failed") + self.dut.send_expect("^C", "#", 5) def test_Ipsec_Encryption_Decryption(self): """ @@ -328,7 +355,6 @@ class TestInlineIpsec(TestCase): session_receive2.send_expect(sa_gcm, ">>>", 60) session_receive2.send_expect( "pkts=sniff(iface='%s',count=2,timeout=30)" % self.txItf, "", 60) - payload = "test for Ipsec Encryption Decryption simultaneously" sa_gcm = SecurityAssociation(ESP, spi=5, crypt_algo='AES-GCM', @@ -336,24 +362,30 @@ class TestInlineIpsec(TestCase): auth_algo='NULL', auth_key=None, tunnel_header=IP(src='172.16.1.5', dst='172.16.2.5')) sa_gcm.crypt_algo.icv_size = 16 - p = IP(src='192.168.105.10', dst='192.168.105.10') p /= payload p = IP(str(p)) - e1 = sa_gcm.encrypt(p) e2 = p eth_e1 = Ether() / e1 eth_e1.src = self.rx_src eth_e1.dst = self.tx_dst + eth_e2 = Ether() / e2 eth_e2.src = self.rx_src eth_e2.dst = self.tx_dst - + session_receive3 = self.tester.create_session('check_forward_encryption_package') + session_receive3.send_expect("tcpdump -Xvvvi %s -c 1" % self.rxItf, "", 30) + time.sleep(2) sendp(eth_e1, iface=self.rxItf, count=2) sendp(eth_e2, iface=self.txItf, count=1) time.sleep(30) + rev = session_receive3.get_session_before() + print(rev) + p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),') + res = p.search(rev) + self.verify(res, 'encrypt failed, tcpdump get %s' % rev) session_receive.send_expect( "results=sa_gcm.decrypt(pkts[2]['IP'])", ">>>", 60) out = session_receive.send_expect("results", ">>>", 60) -- 2.19.1 ^ permalink raw reply [flat|nested] 5+ messages in thread
* Re: [dts] [PATCH V1] update tests/TestSuite_inline_ipsec.py for dpdk_19.05 2019-04-11 9:54 [dts] [PATCH V1] update tests/TestSuite_inline_ipsec.py for dpdk_19.05 xiao,qimai @ 2019-04-12 8:12 ` Wu, ChangqingX 2019-04-15 21:04 ` Tu, Lijuan 1 sibling, 0 replies; 5+ messages in thread From: Wu, ChangqingX @ 2019-04-12 8:12 UTC (permalink / raw) To: Xiao, QimaiX, dts; +Cc: Xiao, QimaiX Tested-by: Wu, ChangqingX <changqingx.wu@intel.com> -----Original Message----- From: dts [mailto:dts-bounces@dpdk.org] On Behalf Of xiao,qimai Sent: Thursday, April 11, 2019 5:55 PM To: dts@dpdk.org Cc: Xiao, QimaiX <qimaix.xiao@intel.com> Subject: [dts] [PATCH V1] update tests/TestSuite_inline_ipsec.py for dpdk_19.05 update tests/TestSuite_inline_ipsec.py to test dpdk_19.05 Signed-off-by: xiao,qimai <qimaix.xiao@intel.com> --- tests/TestSuite_inline_ipsec.py | 96 ++++++++++++++++++++++----------- 1 file changed, 64 insertions(+), 32 deletions(-) diff --git a/tests/TestSuite_inline_ipsec.py b/tests/TestSuite_inline_ipsec.py index 1813c08..9dcb4f4 100644 --- a/tests/TestSuite_inline_ipsec.py +++ b/tests/TestSuite_inline_ipsec.py @@ -28,21 +28,20 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - +# -*- coding:utf-8 -*- """ DPDK Test suite. Test inline_ipsec. """ -import utils -import string -import time +import random import re -import threading +import time + +import utils +from scapy.all import ESP, IP, Ether, sendp, SecurityAssociation from test_case import TestCase -import getopt -from scapy.all import * ETHER_STANDARD_MTU = 1518 ETHER_JUMBO_FRAME_MTU = 9000 @@ -51,14 +50,14 @@ ETHER_JUMBO_FRAME_MTU = 9000 class TestInlineIpsec(TestCase): """ This suite depend PyCryptodome,it provide authenticated encryption modes(GCM) - my environment:asn1crypto (0.22.0), pycryptodome (3.4.7), pycryptodomex (3.4.7), - pycryptopp (0.6.0.1206569328141510525648634803928199668821045408958), scapy (2.3.3.dev623) + my environment:cryptography (1.7.2), pycryptodome (3.4.7), pycryptodomex (3.4.7), + pycryptopp + (0.6.0.1206569328141510525648634803928199668821045408958), scapy + (2.4.2) """ + def set_up_all(self): """ Run at the start of each test suite. """ - self.verify(self.nic in ["niantic"], "%s NIC not support" % self.nic) self.verify(self.drivername in ["vfio-pci"], "%s drivername not support" % self.drivername) self.dut_ports = self.dut.get_ports(self.nic) self.verify(len(self.dut_ports) >= 2, "Insufficient ports") @@ -86,7 +85,7 @@ class TestInlineIpsec(TestCase): self.path = "./examples/ipsec-secgw/build/ipsec-secgw" # add print code in IPSEC app - sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx, portid);/i\\printf("[debug]receive %hhu packet in rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec-secgw.c""" + sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx, portid);/i\\t\t\t\tprintf("[debug]receive %hhu packet in rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec-secgw.c""" self.dut.send_expect(sedcmd, "#", 60) # build sample app @@ -158,14 +157,15 @@ class TestInlineIpsec(TestCase): def set_cfg(self, filename, cfg): """ - open file and write cfg, scp it to dut base directory + open file and write cfg, scp it to dut base directory """ for i in cfg: with open(filename, 'w') as f: f.write(cfg) self.dut.session.copy_file_to(filename, self.dut.base_dir) - def send_encryption_package(self, intf, paysize=64, do_encrypt=False, send_spi=5, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5', sa_dst='172.16.2.5'): + def send_encryption_package(self, intf, paysize=64, do_encrypt=False, send_spi=5, count=1, + inner_dst='192.168.105.10', sa_src='172.16.1.5', sa_dst='172.16.2.5'): """ prepare a packet and send """ @@ -184,6 +184,8 @@ class TestInlineIpsec(TestCase): if do_encrypt == True: print "send encrypt package" + print("before encrypt, the package info is like below: ") + p.show() e = sa_gcm.encrypt(p) else: print "send normal package" @@ -196,42 +198,59 @@ class TestInlineIpsec(TestCase): name='send_encryption_package') sendp(eth_e, iface=intf, count=count) self.tester.destroy_session(session_send) + return payload, p.src, p.dst - return payload - - def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32, jumboframe=1518, do_encrypt=False, verify=True, send_spi=5, receive_spi=1005, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5', sa_dst='172.16.2.5'): + def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32, jumboframe=1518, do_encrypt=False, + verify=True, send_spi=5, receive_spi=1005, count=1, inner_dst='192.168.105.10', + sa_src='172.16.1.5', sa_dst='172.16.2.5'): """ verify Ipsec receive package """ - cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' --log-level 8 --socket-mem 1024,1 -- -p 0xf -P -u 0x2 -j %s --config='%s' -f %s" % ( + cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' + --log-level 8 --socket-mem 1024,1024 -- -p 0xf -P -u 0x2 -j %s + --config='%s' -f %s" % ( self.portpci_0, self.portpci_1, jumboframe, config, file_name) self.dut.send_expect(cmd, "IPSEC", 60) session_receive = self.tester.create_session( name='receive_encryption_package') + sa_gcm = r"sa_gcm=SecurityAssociation(ESP,spi=%s,crypt_algo='AES-GCM',crypt_key='\x2b\x7e\x15\x16\x28\xae\xd2\xa6\xab\xf7\x15\x88\x09\xcf\x4f\x3d\xde\xad\xbe\xef',auth_algo='NULL',auth_key=None,tunnel_header=IP(src='172.16.1.5',dst='172.16.2.5'))" % receive_spi session_receive.send_expect("scapy", ">>>", 10) session_receive.send_expect( - "pkts=sniff(iface='%s',count=1,timeout=10)" % rxItf, "", 30) - send_package = self.send_encryption_package( - txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, sa_dst) + "pkts=sniff(iface='%s',count=1,timeout=45)" % rxItf, "", + 10) - time.sleep(10) - out = session_receive.send_expect("pkts", "", 30) if do_encrypt: + send_package = self.send_encryption_package( + txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, sa_dst) + time.sleep(45) + session_receive.send_expect("pkts", "", 30) out = session_receive.send_expect("pkts[0]['IP'] ", ">>>", 10) else: + session_receive2 = self.tester.create_session(name='receive_encryption_package2') + session_receive2.send_expect("tcpdump -Xvvvi %s -c 1" % rxItf, "", 30) + send_package = self.send_encryption_package(txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, + sa_dst) + time.sleep(45) + rev = session_receive2.get_session_before() + print(rev) + p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),') + res = p.search(rev) + self.verify(res, 'encrypt failed, tcpdump get %s' % rev) + self.tester.destroy_session(session_receive2) + session_receive.send_expect("pkts", "", 30) session_receive.send_expect(sa_gcm, ">>>", 10) - session_receive.send_expect( - "results=sa_gcm.decrypt(pkts[0]['IP'])", ">>>", 10) + time.sleep(2) + + session_receive.send_expect("results=sa_gcm.decrypt(pkts[0]['IP'])", + ">>>", 10) out = session_receive.send_expect("results", ">>>", 10) if verify: - self.verify(send_package in out, + print('received packet content is %s' % out) + print('send pkt src ip is %s, dst ip is %s, payload is %s' % ( + send_package[1], send_package[2], send_package[0])) + self.verify(send_package[0] in out, "Unreceived package or get other package") else: - self.verify(send_package not in out, + self.verify(send_package[0] not in out, "The function is not in effect") session_receive.send_expect("quit()", "#", 10) self.tester.destroy_session(session_receive) @@ -244,6 +263,7 @@ class TestInlineIpsec(TestCase): paysize = random.randint(1, ETHER_STANDARD_MTU) self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg', self.txItf, self.rxItf, paysize) + self.dut.send_expect("^C", "#", 5) def test_Ipsec_Encryption_Jumboframe(self): """ @@ -253,6 +273,7 @@ class TestInlineIpsec(TestCase): paysize = random.randint(ETHER_STANDARD_MTU, ETHER_JUMBO_FRAME_MTU) self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg', self.txItf, self.rxItf, paysize, ETHER_JUMBO_FRAME_MTU) + self.dut.send_expect("^C", "#", 5) def test_Ipsec_Encryption_Rss(self): """ @@ -264,6 +285,7 @@ class TestInlineIpsec(TestCase): out = self.dut.get_session_output() verifycode = "receive 1 packet in rxqueueid=1" self.verify(verifycode in out, "rxqueueid error") + self.dut.send_expect("^C", "#", 5) def test_IPSec_Decryption(self): """ @@ -273,6 +295,7 @@ class TestInlineIpsec(TestCase): paysize = random.randint(1, ETHER_STANDARD_MTU) self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf, self.txItf, paysize, do_encrypt=True, count=2) + self.dut.send_expect("^C", "#", 5) def test_IPSec_Decryption_Jumboframe(self): """ @@ -282,6 +305,7 @@ class TestInlineIpsec(TestCase): paysize = random.randint(ETHER_STANDARD_MTU, ETHER_JUMBO_FRAME_MTU) self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf, self.txItf, paysize, ETHER_JUMBO_FRAME_MTU, do_encrypt=True, count=2) + self.dut.send_expect("^C", "#", 5) def test_Ipsec_Decryption_Rss(self): """ @@ -293,6 +317,7 @@ class TestInlineIpsec(TestCase): out = self.dut.get_session_output() verifycode = "receive 1 packet in rxqueueid=1" self.verify(verifycode in out, "rxqueueid error") + self.dut.send_expect("^C", "#", 5) def test_Ipsec_Decryption_wrongkey(self): """ @@ -303,8 +328,10 @@ class TestInlineIpsec(TestCase): self.Ipsec_Encryption(config, '/root/dpdk/dec_wrong_key.cfg', self.rxItf, self.txItf, paysize, do_encrypt=True, verify=False, count=2) out = self.dut.get_session_output() - verifycode = "IPSEC_ESP: failed crypto op" - self.verify(verifycode in out, "Ipsec Decryption wrongkey failed") + verifycode = "IPSEC_ESP: esp_inbound_post() failed crypto op" + l = re.findall(verifycode, out) + self.verify(len(l) == 2, "Ipsec Decryption wrongkey failed") + self.dut.send_expect("^C", "#", 5) def test_Ipsec_Encryption_Decryption(self): """ @@ -328,7 +355,6 @@ class TestInlineIpsec(TestCase): session_receive2.send_expect(sa_gcm, ">>>", 60) session_receive2.send_expect( "pkts=sniff(iface='%s',count=2,timeout=30)" % self.txItf, "", 60) - payload = "test for Ipsec Encryption Decryption simultaneously" sa_gcm = SecurityAssociation(ESP, spi=5, crypt_algo='AES-GCM', @@ -336,24 +362,30 @@ class TestInlineIpsec(TestCase): auth_algo='NULL', auth_key=None, tunnel_header=IP(src='172.16.1.5', dst='172.16.2.5')) sa_gcm.crypt_algo.icv_size = 16 - p = IP(src='192.168.105.10', dst='192.168.105.10') p /= payload p = IP(str(p)) - e1 = sa_gcm.encrypt(p) e2 = p eth_e1 = Ether() / e1 eth_e1.src = self.rx_src eth_e1.dst = self.tx_dst + eth_e2 = Ether() / e2 eth_e2.src = self.rx_src eth_e2.dst = self.tx_dst - + session_receive3 = self.tester.create_session('check_forward_encryption_package') + session_receive3.send_expect("tcpdump -Xvvvi %s -c 1" % self.rxItf, "", 30) + time.sleep(2) sendp(eth_e1, iface=self.rxItf, count=2) sendp(eth_e2, iface=self.txItf, count=1) time.sleep(30) + rev = session_receive3.get_session_before() + print(rev) + p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),') + res = p.search(rev) + self.verify(res, 'encrypt failed, tcpdump get %s' % rev) session_receive.send_expect( "results=sa_gcm.decrypt(pkts[2]['IP'])", ">>>", 60) out = session_receive.send_expect("results", ">>>", 60) -- 2.19.1 ^ permalink raw reply [flat|nested] 5+ messages in thread
* Re: [dts] [PATCH V1] update tests/TestSuite_inline_ipsec.py for dpdk_19.05 2019-04-11 9:54 [dts] [PATCH V1] update tests/TestSuite_inline_ipsec.py for dpdk_19.05 xiao,qimai 2019-04-12 8:12 ` Wu, ChangqingX @ 2019-04-15 21:04 ` Tu, Lijuan 2019-04-16 2:20 ` Xiao, QimaiX 1 sibling, 1 reply; 5+ messages in thread From: Tu, Lijuan @ 2019-04-15 21:04 UTC (permalink / raw) To: Xiao, QimaiX, dts; +Cc: Xiao, QimaiX I can't catch what's your key changes, and why you changed. I found a lot of useless changes . If you want to fix some code styles, don't mix them with functional change. Could please write a couple of sentence to describe what you changed, and why you changed them, if it's because DPDK changed, please point DPDK commit set. Thanks Thanks. > -----Original Message----- > From: dts [mailto:dts-bounces@dpdk.org] On Behalf Of xiao,qimai > Sent: Thursday, April 11, 2019 2:55 AM > To: dts@dpdk.org > Cc: Xiao, QimaiX <qimaix.xiao@intel.com> > Subject: [dts] [PATCH V1] update tests/TestSuite_inline_ipsec.py for > dpdk_19.05 > > update tests/TestSuite_inline_ipsec.py to test dpdk_19.05 > > Signed-off-by: xiao,qimai <qimaix.xiao@intel.com> > --- > tests/TestSuite_inline_ipsec.py | 96 ++++++++++++++++++++++----------- > 1 file changed, 64 insertions(+), 32 deletions(-) > > diff --git a/tests/TestSuite_inline_ipsec.py b/tests/TestSuite_inline_ipsec.py > index 1813c08..9dcb4f4 100644 > --- a/tests/TestSuite_inline_ipsec.py > +++ b/tests/TestSuite_inline_ipsec.py > @@ -28,21 +28,20 @@ > # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT > # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF > THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF > SUCH DAMAGE. > - > +# -*- coding:utf-8 -*- > > """ > DPDK Test suite. > Test inline_ipsec. > """ > > -import utils > -import string > -import time > +import random > import re > -import threading > +import time > + > +import utils > +from scapy.all import ESP, IP, Ether, sendp, SecurityAssociation > from test_case import TestCase > -import getopt > -from scapy.all import * > > ETHER_STANDARD_MTU = 1518 > ETHER_JUMBO_FRAME_MTU = 9000 > @@ -51,14 +50,14 @@ ETHER_JUMBO_FRAME_MTU = 9000 class > TestInlineIpsec(TestCase): > """ > This suite depend PyCryptodome,it provide authenticated encryption > modes(GCM) > - my environment:asn1crypto (0.22.0), pycryptodome (3.4.7), > pycryptodomex (3.4.7), > - pycryptopp > (0.6.0.1206569328141510525648634803928199668821045408958), scapy > (2.3.3.dev623) > + my environment:cryptography (1.7.2), pycryptodome (3.4.7), > pycryptodomex (3.4.7), > + pycryptopp > + (0.6.0.1206569328141510525648634803928199668821045408958), scapy > + (2.4.2) > """ > + > def set_up_all(self): > """ > Run at the start of each test suite. > """ > - self.verify(self.nic in ["niantic"], "%s NIC not support" % self.nic) > self.verify(self.drivername in ["vfio-pci"], "%s drivername not support" % > self.drivername) > self.dut_ports = self.dut.get_ports(self.nic) > self.verify(len(self.dut_ports) >= 2, "Insufficient ports") @@ -86,7 +85,7 > @@ class TestInlineIpsec(TestCase): > > self.path = "./examples/ipsec-secgw/build/ipsec-secgw" > # add print code in IPSEC app > - sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx, > portid);/i\\printf("[debug]receive %hhu packet in rxqueueid=%hhu\\n",nb_rx, > queueid);' examples/ipsec-secgw/ipsec-secgw.c""" > + sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx, > portid);/i\\t\t\t\tprintf("[debug]receive %hhu packet in > rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec- > secgw.c""" > self.dut.send_expect(sedcmd, "#", 60) > > # build sample app > @@ -158,14 +157,15 @@ class TestInlineIpsec(TestCase): > > def set_cfg(self, filename, cfg): > """ > - open file and write cfg, scp it to dut base directory > + open file and write cfg, scp it to dut base directory > """ > for i in cfg: > with open(filename, 'w') as f: > f.write(cfg) > self.dut.session.copy_file_to(filename, self.dut.base_dir) > > - def send_encryption_package(self, intf, paysize=64, do_encrypt=False, > send_spi=5, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5', > sa_dst='172.16.2.5'): > + def send_encryption_package(self, intf, paysize=64, do_encrypt=False, > send_spi=5, count=1, > + inner_dst='192.168.105.10', sa_src='172.16.1.5', > sa_dst='172.16.2.5'): > """ > prepare a packet and send > """ > @@ -184,6 +184,8 @@ class TestInlineIpsec(TestCase): > > if do_encrypt == True: > print "send encrypt package" > + print("before encrypt, the package info is like below: ") > + p.show() > e = sa_gcm.encrypt(p) > else: > print "send normal package" > @@ -196,42 +198,59 @@ class TestInlineIpsec(TestCase): > name='send_encryption_package') > sendp(eth_e, iface=intf, count=count) > self.tester.destroy_session(session_send) > + return payload, p.src, p.dst > > - return payload > - > - def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32, > jumboframe=1518, do_encrypt=False, verify=True, send_spi=5, > receive_spi=1005, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5', > sa_dst='172.16.2.5'): > + def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32, > jumboframe=1518, do_encrypt=False, > + verify=True, send_spi=5, receive_spi=1005, count=1, > inner_dst='192.168.105.10', > + sa_src='172.16.1.5', sa_dst='172.16.2.5'): > """ > verify Ipsec receive package > """ > - cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' --log-level 8 > --socket-mem 1024,1 -- -p 0xf -P -u 0x2 -j %s --config='%s' -f %s" % ( > + cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' > + --log-level 8 --socket-mem 1024,1024 -- -p 0xf -P -u 0x2 -j %s > + --config='%s' -f %s" % ( > self.portpci_0, self.portpci_1, jumboframe, config, file_name) > self.dut.send_expect(cmd, "IPSEC", 60) > > session_receive = self.tester.create_session( > name='receive_encryption_package') > + > sa_gcm = r"sa_gcm=SecurityAssociation(ESP,spi=%s,crypt_algo='AES- > GCM',crypt_key='\x2b\x7e\x15\x16\x28\xae\xd2\xa6\xab\xf7\x15\x88\x09\x > cf\x4f\x3d\xde\xad\xbe\xef',auth_algo='NULL',auth_key=None,tunnel_heade > r=IP(src='172.16.1.5',dst='172.16.2.5'))" % receive_spi > > session_receive.send_expect("scapy", ">>>", 10) > session_receive.send_expect( > - "pkts=sniff(iface='%s',count=1,timeout=10)" % rxItf, "", 30) > - send_package = self.send_encryption_package( > - txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, sa_dst) > + "pkts=sniff(iface='%s',count=1,timeout=45)" % rxItf, "", > + 10) > > - time.sleep(10) > - out = session_receive.send_expect("pkts", "", 30) > if do_encrypt: > + send_package = self.send_encryption_package( > + txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, > sa_dst) > + time.sleep(45) > + session_receive.send_expect("pkts", "", 30) > out = session_receive.send_expect("pkts[0]['IP'] ", ">>>", 10) > else: > + session_receive2 = > self.tester.create_session(name='receive_encryption_package2') > + session_receive2.send_expect("tcpdump -Xvvvi %s -c 1" % rxItf, "", 30) > + send_package = self.send_encryption_package(txItf, paysize, > do_encrypt, send_spi, count, inner_dst, sa_src, > + sa_dst) > + time.sleep(45) > + rev = session_receive2.get_session_before() > + print(rev) > + p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),') > + res = p.search(rev) > + self.verify(res, 'encrypt failed, tcpdump get %s' % rev) > + self.tester.destroy_session(session_receive2) > + session_receive.send_expect("pkts", "", 30) > session_receive.send_expect(sa_gcm, ">>>", 10) > - session_receive.send_expect( > - "results=sa_gcm.decrypt(pkts[0]['IP'])", ">>>", 10) > + time.sleep(2) > + > + session_receive.send_expect("results=sa_gcm.decrypt(pkts[0]['IP'])", > + ">>>", 10) > out = session_receive.send_expect("results", ">>>", 10) > > if verify: > - self.verify(send_package in out, > + print('received packet content is %s' % out) > + print('send pkt src ip is %s, dst ip is %s, payload is %s' % ( > + send_package[1], send_package[2], send_package[0])) > + self.verify(send_package[0] in out, > "Unreceived package or get other package") > else: > - self.verify(send_package not in out, > + self.verify(send_package[0] not in out, > "The function is not in effect") > session_receive.send_expect("quit()", "#", 10) > self.tester.destroy_session(session_receive) > @@ -244,6 +263,7 @@ class TestInlineIpsec(TestCase): > paysize = random.randint(1, ETHER_STANDARD_MTU) > self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg', > self.txItf, self.rxItf, paysize) > + self.dut.send_expect("^C", "#", 5) > > def test_Ipsec_Encryption_Jumboframe(self): > """ > @@ -253,6 +273,7 @@ class TestInlineIpsec(TestCase): > paysize = random.randint(ETHER_STANDARD_MTU, > ETHER_JUMBO_FRAME_MTU) > self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg', > self.txItf, self.rxItf, paysize, ETHER_JUMBO_FRAME_MTU) > + self.dut.send_expect("^C", "#", 5) > > def test_Ipsec_Encryption_Rss(self): > """ > @@ -264,6 +285,7 @@ class TestInlineIpsec(TestCase): > out = self.dut.get_session_output() > verifycode = "receive 1 packet in rxqueueid=1" > self.verify(verifycode in out, "rxqueueid error") > + self.dut.send_expect("^C", "#", 5) > > def test_IPSec_Decryption(self): > """ > @@ -273,6 +295,7 @@ class TestInlineIpsec(TestCase): > paysize = random.randint(1, ETHER_STANDARD_MTU) > self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf, > self.txItf, paysize, do_encrypt=True, count=2) > + self.dut.send_expect("^C", "#", 5) > > def test_IPSec_Decryption_Jumboframe(self): > """ > @@ -282,6 +305,7 @@ class TestInlineIpsec(TestCase): > paysize = random.randint(ETHER_STANDARD_MTU, > ETHER_JUMBO_FRAME_MTU) > self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf, > self.txItf, paysize, ETHER_JUMBO_FRAME_MTU, > do_encrypt=True, count=2) > + self.dut.send_expect("^C", "#", 5) > > def test_Ipsec_Decryption_Rss(self): > """ > @@ -293,6 +317,7 @@ class TestInlineIpsec(TestCase): > out = self.dut.get_session_output() > verifycode = "receive 1 packet in rxqueueid=1" > self.verify(verifycode in out, "rxqueueid error") > + self.dut.send_expect("^C", "#", 5) > > def test_Ipsec_Decryption_wrongkey(self): > """ > @@ -303,8 +328,10 @@ class TestInlineIpsec(TestCase): > self.Ipsec_Encryption(config, '/root/dpdk/dec_wrong_key.cfg', self.rxItf, > self.txItf, paysize, do_encrypt=True, verify=False, count=2) > out = self.dut.get_session_output() > - verifycode = "IPSEC_ESP: failed crypto op" > - self.verify(verifycode in out, "Ipsec Decryption wrongkey failed") > + verifycode = "IPSEC_ESP: esp_inbound_post() failed crypto op" > + l = re.findall(verifycode, out) > + self.verify(len(l) == 2, "Ipsec Decryption wrongkey failed") > + self.dut.send_expect("^C", "#", 5) > > def test_Ipsec_Encryption_Decryption(self): > """ > @@ -328,7 +355,6 @@ class TestInlineIpsec(TestCase): > session_receive2.send_expect(sa_gcm, ">>>", 60) > session_receive2.send_expect( > "pkts=sniff(iface='%s',count=2,timeout=30)" % self.txItf, "", 60) > - > payload = "test for Ipsec Encryption Decryption simultaneously" > sa_gcm = SecurityAssociation(ESP, spi=5, > crypt_algo='AES-GCM', @@ -336,24 +362,30 @@ class > TestInlineIpsec(TestCase): > auth_algo='NULL', auth_key=None, > tunnel_header=IP(src='172.16.1.5', dst='172.16.2.5')) > sa_gcm.crypt_algo.icv_size = 16 > - > p = IP(src='192.168.105.10', dst='192.168.105.10') > p /= payload > p = IP(str(p)) > - > e1 = sa_gcm.encrypt(p) > e2 = p > > eth_e1 = Ether() / e1 > eth_e1.src = self.rx_src > eth_e1.dst = self.tx_dst > + > eth_e2 = Ether() / e2 > eth_e2.src = self.rx_src > eth_e2.dst = self.tx_dst > - > + session_receive3 = > self.tester.create_session('check_forward_encryption_package') > + session_receive3.send_expect("tcpdump -Xvvvi %s -c 1" % self.rxItf, "", > 30) > + time.sleep(2) > sendp(eth_e1, iface=self.rxItf, count=2) > sendp(eth_e2, iface=self.txItf, count=1) > time.sleep(30) > + rev = session_receive3.get_session_before() > + print(rev) > + p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),') > + res = p.search(rev) > + self.verify(res, 'encrypt failed, tcpdump get %s' % rev) > session_receive.send_expect( > "results=sa_gcm.decrypt(pkts[2]['IP'])", ">>>", 60) > out = session_receive.send_expect("results", ">>>", 60) > -- > 2.19.1 ^ permalink raw reply [flat|nested] 5+ messages in thread
* Re: [dts] [PATCH V1] update tests/TestSuite_inline_ipsec.py for dpdk_19.05 2019-04-15 21:04 ` Tu, Lijuan @ 2019-04-16 2:20 ` Xiao, QimaiX 2019-04-16 17:33 ` Tu, Lijuan 0 siblings, 1 reply; 5+ messages in thread From: Xiao, QimaiX @ 2019-04-16 2:20 UTC (permalink / raw) To: Tu, Lijuan, dts DPDK commit point is: da7a540e1dc64495785d4f0d6e6b20b27bf28580 Refer to http://git.dpdk.org/dpdk/commit/?h=v19.05-rc1&id=da7a540e1dc64495785d4f0d6e6b20b27bf28580 This patch mainly change verify message from "IPSEC_ESP: failed crypto op" to "IPSEC_ESP: esp_inbound_post() failed crypto op" in a situation of decrypt with wrong key, and add some steps to verify if packages be encrypted successfully in encryption cases. Thanks. -----Original Message----- From: Tu, Lijuan Sent: Tuesday, April 16, 2019 5:04 AM To: Xiao, QimaiX <qimaix.xiao@intel.com>; dts@dpdk.org Cc: Xiao, QimaiX <qimaix.xiao@intel.com> Subject: RE: [dts] [PATCH V1] update tests/TestSuite_inline_ipsec.py for dpdk_19.05 I can't catch what's your key changes, and why you changed. I found a lot of useless changes . If you want to fix some code styles, don't mix them with functional change. Could please write a couple of sentence to describe what you changed, and why you changed them, if it's because DPDK changed, please point DPDK commit set. Thanks Thanks. > -----Original Message----- > From: dts [mailto:dts-bounces@dpdk.org] On Behalf Of xiao,qimai > Sent: Thursday, April 11, 2019 2:55 AM > To: dts@dpdk.org > Cc: Xiao, QimaiX <qimaix.xiao@intel.com> > Subject: [dts] [PATCH V1] update tests/TestSuite_inline_ipsec.py for > dpdk_19.05 > > update tests/TestSuite_inline_ipsec.py to test dpdk_19.05 > > Signed-off-by: xiao,qimai <qimaix.xiao@intel.com> > --- > tests/TestSuite_inline_ipsec.py | 96 > ++++++++++++++++++++++----------- > 1 file changed, 64 insertions(+), 32 deletions(-) > > diff --git a/tests/TestSuite_inline_ipsec.py > b/tests/TestSuite_inline_ipsec.py index 1813c08..9dcb4f4 100644 > --- a/tests/TestSuite_inline_ipsec.py > +++ b/tests/TestSuite_inline_ipsec.py > @@ -28,21 +28,20 @@ > # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT > # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE > USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH > DAMAGE. > - > +# -*- coding:utf-8 -*- > > """ > DPDK Test suite. > Test inline_ipsec. > """ > > -import utils > -import string > -import time > +import random > import re > -import threading > +import time > + > +import utils > +from scapy.all import ESP, IP, Ether, sendp, SecurityAssociation > from test_case import TestCase > -import getopt > -from scapy.all import * > > ETHER_STANDARD_MTU = 1518 > ETHER_JUMBO_FRAME_MTU = 9000 > @@ -51,14 +50,14 @@ ETHER_JUMBO_FRAME_MTU = 9000 class > TestInlineIpsec(TestCase): > """ > This suite depend PyCryptodome,it provide authenticated > encryption > modes(GCM) > - my environment:asn1crypto (0.22.0), pycryptodome (3.4.7), > pycryptodomex (3.4.7), > - pycryptopp > (0.6.0.1206569328141510525648634803928199668821045408958), scapy > (2.3.3.dev623) > + my environment:cryptography (1.7.2), pycryptodome (3.4.7), > pycryptodomex (3.4.7), > + pycryptopp > + (0.6.0.1206569328141510525648634803928199668821045408958), scapy > + (2.4.2) > """ > + > def set_up_all(self): > """ > Run at the start of each test suite. > """ > - self.verify(self.nic in ["niantic"], "%s NIC not support" % self.nic) > self.verify(self.drivername in ["vfio-pci"], "%s drivername > not support" % > self.drivername) > self.dut_ports = self.dut.get_ports(self.nic) > self.verify(len(self.dut_ports) >= 2, "Insufficient ports") > @@ -86,7 +85,7 @@ class TestInlineIpsec(TestCase): > > self.path = "./examples/ipsec-secgw/build/ipsec-secgw" > # add print code in IPSEC app > - sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx, > portid);/i\\printf("[debug]receive %hhu packet in > rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec-secgw.c""" > + sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx, > portid);/i\\t\t\t\tprintf("[debug]receive %hhu packet in > rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec- > secgw.c""" > self.dut.send_expect(sedcmd, "#", 60) > > # build sample app > @@ -158,14 +157,15 @@ class TestInlineIpsec(TestCase): > > def set_cfg(self, filename, cfg): > """ > - open file and write cfg, scp it to dut base directory > + open file and write cfg, scp it to dut base directory > """ > for i in cfg: > with open(filename, 'w') as f: > f.write(cfg) > self.dut.session.copy_file_to(filename, self.dut.base_dir) > > - def send_encryption_package(self, intf, paysize=64, do_encrypt=False, > send_spi=5, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5', > sa_dst='172.16.2.5'): > + def send_encryption_package(self, intf, paysize=64, > + do_encrypt=False, > send_spi=5, count=1, > + inner_dst='192.168.105.10', > + sa_src='172.16.1.5', > sa_dst='172.16.2.5'): > """ > prepare a packet and send > """ > @@ -184,6 +184,8 @@ class TestInlineIpsec(TestCase): > > if do_encrypt == True: > print "send encrypt package" > + print("before encrypt, the package info is like below: ") > + p.show() > e = sa_gcm.encrypt(p) > else: > print "send normal package" > @@ -196,42 +198,59 @@ class TestInlineIpsec(TestCase): > name='send_encryption_package') > sendp(eth_e, iface=intf, count=count) > self.tester.destroy_session(session_send) > + return payload, p.src, p.dst > > - return payload > - > - def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32, > jumboframe=1518, do_encrypt=False, verify=True, send_spi=5, > receive_spi=1005, count=1, inner_dst='192.168.105.10', > sa_src='172.16.1.5', > sa_dst='172.16.2.5'): > + def Ipsec_Encryption(self, config, file_name, txItf, rxItf, > + paysize=32, > jumboframe=1518, do_encrypt=False, > + verify=True, send_spi=5, receive_spi=1005, > + count=1, > inner_dst='192.168.105.10', > + sa_src='172.16.1.5', sa_dst='172.16.2.5'): > """ > verify Ipsec receive package > """ > - cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' --log-level 8 > --socket-mem 1024,1 -- -p 0xf -P -u 0x2 -j %s --config='%s' -f %s" % ( > + cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' > + --log-level 8 --socket-mem 1024,1024 -- -p 0xf -P -u 0x2 -j %s > + --config='%s' -f %s" % ( > self.portpci_0, self.portpci_1, jumboframe, config, file_name) > self.dut.send_expect(cmd, "IPSEC", 60) > > session_receive = self.tester.create_session( > name='receive_encryption_package') > + > sa_gcm = > r"sa_gcm=SecurityAssociation(ESP,spi=%s,crypt_algo='AES- > GCM',crypt_key='\x2b\x7e\x15\x16\x28\xae\xd2\xa6\xab\xf7\x15\x88\x09\x > cf\x4f\x3d\xde\xad\xbe\xef',auth_algo='NULL',auth_key=None,tunnel_head > e r=IP(src='172.16.1.5',dst='172.16.2.5'))" % receive_spi > > session_receive.send_expect("scapy", ">>>", 10) > session_receive.send_expect( > - "pkts=sniff(iface='%s',count=1,timeout=10)" % rxItf, "", 30) > - send_package = self.send_encryption_package( > - txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, sa_dst) > + "pkts=sniff(iface='%s',count=1,timeout=45)" % rxItf, "", > + 10) > > - time.sleep(10) > - out = session_receive.send_expect("pkts", "", 30) > if do_encrypt: > + send_package = self.send_encryption_package( > + txItf, paysize, do_encrypt, send_spi, count, > + inner_dst, sa_src, > sa_dst) > + time.sleep(45) > + session_receive.send_expect("pkts", "", 30) > out = session_receive.send_expect("pkts[0]['IP'] ", ">>>", 10) > else: > + session_receive2 = > self.tester.create_session(name='receive_encryption_package2') > + session_receive2.send_expect("tcpdump -Xvvvi %s -c 1" % rxItf, "", 30) > + send_package = self.send_encryption_package(txItf, > + paysize, > do_encrypt, send_spi, count, inner_dst, sa_src, > + sa_dst) > + time.sleep(45) > + rev = session_receive2.get_session_before() > + print(rev) > + p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),') > + res = p.search(rev) > + self.verify(res, 'encrypt failed, tcpdump get %s' % rev) > + self.tester.destroy_session(session_receive2) > + session_receive.send_expect("pkts", "", 30) > session_receive.send_expect(sa_gcm, ">>>", 10) > - session_receive.send_expect( > - "results=sa_gcm.decrypt(pkts[0]['IP'])", ">>>", 10) > + time.sleep(2) > + > + session_receive.send_expect("results=sa_gcm.decrypt(pkts[0]['IP'])", > + ">>>", 10) > out = session_receive.send_expect("results", ">>>", 10) > > if verify: > - self.verify(send_package in out, > + print('received packet content is %s' % out) > + print('send pkt src ip is %s, dst ip is %s, payload is %s' % ( > + send_package[1], send_package[2], send_package[0])) > + self.verify(send_package[0] in out, > "Unreceived package or get other package") > else: > - self.verify(send_package not in out, > + self.verify(send_package[0] not in out, > "The function is not in effect") > session_receive.send_expect("quit()", "#", 10) > self.tester.destroy_session(session_receive) > @@ -244,6 +263,7 @@ class TestInlineIpsec(TestCase): > paysize = random.randint(1, ETHER_STANDARD_MTU) > self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg', > self.txItf, self.rxItf, paysize) > + self.dut.send_expect("^C", "#", 5) > > def test_Ipsec_Encryption_Jumboframe(self): > """ > @@ -253,6 +273,7 @@ class TestInlineIpsec(TestCase): > paysize = random.randint(ETHER_STANDARD_MTU, > ETHER_JUMBO_FRAME_MTU) > self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg', > self.txItf, self.rxItf, paysize, > ETHER_JUMBO_FRAME_MTU) > + self.dut.send_expect("^C", "#", 5) > > def test_Ipsec_Encryption_Rss(self): > """ > @@ -264,6 +285,7 @@ class TestInlineIpsec(TestCase): > out = self.dut.get_session_output() > verifycode = "receive 1 packet in rxqueueid=1" > self.verify(verifycode in out, "rxqueueid error") > + self.dut.send_expect("^C", "#", 5) > > def test_IPSec_Decryption(self): > """ > @@ -273,6 +295,7 @@ class TestInlineIpsec(TestCase): > paysize = random.randint(1, ETHER_STANDARD_MTU) > self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf, > self.txItf, paysize, do_encrypt=True, > count=2) > + self.dut.send_expect("^C", "#", 5) > > def test_IPSec_Decryption_Jumboframe(self): > """ > @@ -282,6 +305,7 @@ class TestInlineIpsec(TestCase): > paysize = random.randint(ETHER_STANDARD_MTU, > ETHER_JUMBO_FRAME_MTU) > self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf, > self.txItf, paysize, > ETHER_JUMBO_FRAME_MTU, do_encrypt=True, count=2) > + self.dut.send_expect("^C", "#", 5) > > def test_Ipsec_Decryption_Rss(self): > """ > @@ -293,6 +317,7 @@ class TestInlineIpsec(TestCase): > out = self.dut.get_session_output() > verifycode = "receive 1 packet in rxqueueid=1" > self.verify(verifycode in out, "rxqueueid error") > + self.dut.send_expect("^C", "#", 5) > > def test_Ipsec_Decryption_wrongkey(self): > """ > @@ -303,8 +328,10 @@ class TestInlineIpsec(TestCase): > self.Ipsec_Encryption(config, '/root/dpdk/dec_wrong_key.cfg', self.rxItf, > self.txItf, paysize, do_encrypt=True, verify=False, count=2) > out = self.dut.get_session_output() > - verifycode = "IPSEC_ESP: failed crypto op" > - self.verify(verifycode in out, "Ipsec Decryption wrongkey failed") > + verifycode = "IPSEC_ESP: esp_inbound_post() failed crypto op" > + l = re.findall(verifycode, out) > + self.verify(len(l) == 2, "Ipsec Decryption wrongkey failed") > + self.dut.send_expect("^C", "#", 5) > > def test_Ipsec_Encryption_Decryption(self): > """ > @@ -328,7 +355,6 @@ class TestInlineIpsec(TestCase): > session_receive2.send_expect(sa_gcm, ">>>", 60) > session_receive2.send_expect( > "pkts=sniff(iface='%s',count=2,timeout=30)" % self.txItf, > "", 60) > - > payload = "test for Ipsec Encryption Decryption simultaneously" > sa_gcm = SecurityAssociation(ESP, spi=5, > crypt_algo='AES-GCM', @@ -336,24 > +362,30 @@ class > TestInlineIpsec(TestCase): > auth_algo='NULL', auth_key=None, > tunnel_header=IP(src='172.16.1.5', dst='172.16.2.5')) > sa_gcm.crypt_algo.icv_size = 16 > - > p = IP(src='192.168.105.10', dst='192.168.105.10') > p /= payload > p = IP(str(p)) > - > e1 = sa_gcm.encrypt(p) > e2 = p > > eth_e1 = Ether() / e1 > eth_e1.src = self.rx_src > eth_e1.dst = self.tx_dst > + > eth_e2 = Ether() / e2 > eth_e2.src = self.rx_src > eth_e2.dst = self.tx_dst > - > + session_receive3 = > self.tester.create_session('check_forward_encryption_package') > + session_receive3.send_expect("tcpdump -Xvvvi %s -c 1" % > + self.rxItf, "", > 30) > + time.sleep(2) > sendp(eth_e1, iface=self.rxItf, count=2) > sendp(eth_e2, iface=self.txItf, count=1) > time.sleep(30) > + rev = session_receive3.get_session_before() > + print(rev) > + p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),') > + res = p.search(rev) > + self.verify(res, 'encrypt failed, tcpdump get %s' % rev) > session_receive.send_expect( > "results=sa_gcm.decrypt(pkts[2]['IP'])", ">>>", 60) > out = session_receive.send_expect("results", ">>>", 60) > -- > 2.19.1 ^ permalink raw reply [flat|nested] 5+ messages in thread
* Re: [dts] [PATCH V1] update tests/TestSuite_inline_ipsec.py for dpdk_19.05 2019-04-16 2:20 ` Xiao, QimaiX @ 2019-04-16 17:33 ` Tu, Lijuan 0 siblings, 0 replies; 5+ messages in thread From: Tu, Lijuan @ 2019-04-16 17:33 UTC (permalink / raw) To: Xiao, QimaiX, dts Great to get your clarification, Could you please add some explanation like these in your patch comments. Could you please separate your patch to 2 patches, one is for fixing format, the other is for aligning with DPDK changes. One patch is for one single purpose, this is more simple for review and merge. > -----Original Message----- > From: Xiao, QimaiX > Sent: Monday, April 15, 2019 7:20 PM > To: Tu, Lijuan <lijuan.tu@intel.com>; dts@dpdk.org > Subject: RE: [dts] [PATCH V1] update tests/TestSuite_inline_ipsec.py for > dpdk_19.05 > > DPDK commit point is: da7a540e1dc64495785d4f0d6e6b20b27bf28580 > Refer to http://git.dpdk.org/dpdk/commit/?h=v19.05- > rc1&id=da7a540e1dc64495785d4f0d6e6b20b27bf28580 > This patch mainly change verify message from "IPSEC_ESP: failed crypto op" > to "IPSEC_ESP: esp_inbound_post() failed crypto op" in a situation of decrypt > with wrong key, and add some steps to verify if packages be encrypted > successfully in encryption cases. > Thanks. > > -----Original Message----- > From: Tu, Lijuan > Sent: Tuesday, April 16, 2019 5:04 AM > To: Xiao, QimaiX <qimaix.xiao@intel.com>; dts@dpdk.org > Cc: Xiao, QimaiX <qimaix.xiao@intel.com> > Subject: RE: [dts] [PATCH V1] update tests/TestSuite_inline_ipsec.py for > dpdk_19.05 > > I can't catch what's your key changes, and why you changed. I found a lot of > useless changes . > If you want to fix some code styles, don't mix them with functional change. > > Could please write a couple of sentence to describe what you changed, and > why you changed them, if it's because DPDK changed, please point DPDK > commit set. > Thanks > > Thanks. > > > -----Original Message----- > > From: dts [mailto:dts-bounces@dpdk.org] On Behalf Of xiao,qimai > > Sent: Thursday, April 11, 2019 2:55 AM > > To: dts@dpdk.org > > Cc: Xiao, QimaiX <qimaix.xiao@intel.com> > > Subject: [dts] [PATCH V1] update tests/TestSuite_inline_ipsec.py for > > dpdk_19.05 > > > > update tests/TestSuite_inline_ipsec.py to test dpdk_19.05 > > > > Signed-off-by: xiao,qimai <qimaix.xiao@intel.com> > > --- > > tests/TestSuite_inline_ipsec.py | 96 > > ++++++++++++++++++++++----------- > > 1 file changed, 64 insertions(+), 32 deletions(-) > > > > diff --git a/tests/TestSuite_inline_ipsec.py > > b/tests/TestSuite_inline_ipsec.py index 1813c08..9dcb4f4 100644 > > --- a/tests/TestSuite_inline_ipsec.py > > +++ b/tests/TestSuite_inline_ipsec.py > > @@ -28,21 +28,20 @@ > > # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR > TORT > > # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF > THE > > USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH > > DAMAGE. > > - > > +# -*- coding:utf-8 -*- > > > > """ > > DPDK Test suite. > > Test inline_ipsec. > > """ > > > > -import utils > > -import string > > -import time > > +import random > > import re > > -import threading > > +import time > > + > > +import utils > > +from scapy.all import ESP, IP, Ether, sendp, SecurityAssociation > > from test_case import TestCase > > -import getopt > > -from scapy.all import * > > > > ETHER_STANDARD_MTU = 1518 > > ETHER_JUMBO_FRAME_MTU = 9000 > > @@ -51,14 +50,14 @@ ETHER_JUMBO_FRAME_MTU = 9000 class > > TestInlineIpsec(TestCase): > > """ > > This suite depend PyCryptodome,it provide authenticated > > encryption > > modes(GCM) > > - my environment:asn1crypto (0.22.0), pycryptodome (3.4.7), > > pycryptodomex (3.4.7), > > - pycryptopp > > (0.6.0.1206569328141510525648634803928199668821045408958), scapy > > (2.3.3.dev623) > > + my environment:cryptography (1.7.2), pycryptodome (3.4.7), > > pycryptodomex (3.4.7), > > + pycryptopp > > + (0.6.0.1206569328141510525648634803928199668821045408958), scapy > > + (2.4.2) > > """ > > + > > def set_up_all(self): > > """ > > Run at the start of each test suite. > > """ > > - self.verify(self.nic in ["niantic"], "%s NIC not support" % self.nic) > > self.verify(self.drivername in ["vfio-pci"], "%s drivername > > not support" % > > self.drivername) > > self.dut_ports = self.dut.get_ports(self.nic) > > self.verify(len(self.dut_ports) >= 2, "Insufficient ports") > > @@ -86,7 +85,7 @@ class TestInlineIpsec(TestCase): > > > > self.path = "./examples/ipsec-secgw/build/ipsec-secgw" > > # add print code in IPSEC app > > - sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx, > > portid);/i\\printf("[debug]receive %hhu packet in > > rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec- > secgw.c""" > > + sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx, > > portid);/i\\t\t\t\tprintf("[debug]receive %hhu packet in > > rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec- > > secgw.c""" > > self.dut.send_expect(sedcmd, "#", 60) > > > > # build sample app > > @@ -158,14 +157,15 @@ class TestInlineIpsec(TestCase): > > > > def set_cfg(self, filename, cfg): > > """ > > - open file and write cfg, scp it to dut base directory > > + open file and write cfg, scp it to dut base directory > > """ > > for i in cfg: > > with open(filename, 'w') as f: > > f.write(cfg) > > self.dut.session.copy_file_to(filename, self.dut.base_dir) > > > > - def send_encryption_package(self, intf, paysize=64, do_encrypt=False, > > send_spi=5, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5', > > sa_dst='172.16.2.5'): > > + def send_encryption_package(self, intf, paysize=64, > > + do_encrypt=False, > > send_spi=5, count=1, > > + inner_dst='192.168.105.10', > > + sa_src='172.16.1.5', > > sa_dst='172.16.2.5'): > > """ > > prepare a packet and send > > """ > > @@ -184,6 +184,8 @@ class TestInlineIpsec(TestCase): > > > > if do_encrypt == True: > > print "send encrypt package" > > + print("before encrypt, the package info is like below: ") > > + p.show() > > e = sa_gcm.encrypt(p) > > else: > > print "send normal package" > > @@ -196,42 +198,59 @@ class TestInlineIpsec(TestCase): > > name='send_encryption_package') > > sendp(eth_e, iface=intf, count=count) > > self.tester.destroy_session(session_send) > > + return payload, p.src, p.dst > > > > - return payload > > - > > - def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32, > > jumboframe=1518, do_encrypt=False, verify=True, send_spi=5, > > receive_spi=1005, count=1, inner_dst='192.168.105.10', > > sa_src='172.16.1.5', > > sa_dst='172.16.2.5'): > > + def Ipsec_Encryption(self, config, file_name, txItf, rxItf, > > + paysize=32, > > jumboframe=1518, do_encrypt=False, > > + verify=True, send_spi=5, receive_spi=1005, > > + count=1, > > inner_dst='192.168.105.10', > > + sa_src='172.16.1.5', sa_dst='172.16.2.5'): > > """ > > verify Ipsec receive package > > """ > > - cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' --log-level > 8 > > --socket-mem 1024,1 -- -p 0xf -P -u 0x2 -j %s --config='%s' -f %s" % ( > > + cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' > > + --log-level 8 --socket-mem 1024,1024 -- -p 0xf -P -u 0x2 -j %s > > + --config='%s' -f %s" % ( > > self.portpci_0, self.portpci_1, jumboframe, config, file_name) > > self.dut.send_expect(cmd, "IPSEC", 60) > > > > session_receive = self.tester.create_session( > > name='receive_encryption_package') > > + > > sa_gcm = > > r"sa_gcm=SecurityAssociation(ESP,spi=%s,crypt_algo='AES- > > > GCM',crypt_key='\x2b\x7e\x15\x16\x28\xae\xd2\xa6\xab\xf7\x15\x88\x09\x > > > cf\x4f\x3d\xde\xad\xbe\xef',auth_algo='NULL',auth_key=None,tunnel_head > > e r=IP(src='172.16.1.5',dst='172.16.2.5'))" % receive_spi > > > > session_receive.send_expect("scapy", ">>>", 10) > > session_receive.send_expect( > > - "pkts=sniff(iface='%s',count=1,timeout=10)" % rxItf, "", 30) > > - send_package = self.send_encryption_package( > > - txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, > sa_dst) > > + "pkts=sniff(iface='%s',count=1,timeout=45)" % rxItf, "", > > + 10) > > > > - time.sleep(10) > > - out = session_receive.send_expect("pkts", "", 30) > > if do_encrypt: > > + send_package = self.send_encryption_package( > > + txItf, paysize, do_encrypt, send_spi, count, > > + inner_dst, sa_src, > > sa_dst) > > + time.sleep(45) > > + session_receive.send_expect("pkts", "", 30) > > out = session_receive.send_expect("pkts[0]['IP'] ", ">>>", 10) > > else: > > + session_receive2 = > > self.tester.create_session(name='receive_encryption_package2') > > + session_receive2.send_expect("tcpdump -Xvvvi %s -c 1" % rxItf, "", > 30) > > + send_package = self.send_encryption_package(txItf, > > + paysize, > > do_encrypt, send_spi, count, inner_dst, sa_src, > > + sa_dst) > > + time.sleep(45) > > + rev = session_receive2.get_session_before() > > + print(rev) > > + p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),') > > + res = p.search(rev) > > + self.verify(res, 'encrypt failed, tcpdump get %s' % rev) > > + self.tester.destroy_session(session_receive2) > > + session_receive.send_expect("pkts", "", 30) > > session_receive.send_expect(sa_gcm, ">>>", 10) > > - session_receive.send_expect( > > - "results=sa_gcm.decrypt(pkts[0]['IP'])", ">>>", 10) > > + time.sleep(2) > > + > > + session_receive.send_expect("results=sa_gcm.decrypt(pkts[0]['IP'])", > > + ">>>", 10) > > out = session_receive.send_expect("results", ">>>", 10) > > > > if verify: > > - self.verify(send_package in out, > > + print('received packet content is %s' % out) > > + print('send pkt src ip is %s, dst ip is %s, payload is %s' % ( > > + send_package[1], send_package[2], send_package[0])) > > + self.verify(send_package[0] in out, > > "Unreceived package or get other package") > > else: > > - self.verify(send_package not in out, > > + self.verify(send_package[0] not in out, > > "The function is not in effect") > > session_receive.send_expect("quit()", "#", 10) > > self.tester.destroy_session(session_receive) > > @@ -244,6 +263,7 @@ class TestInlineIpsec(TestCase): > > paysize = random.randint(1, ETHER_STANDARD_MTU) > > self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg', > > self.txItf, self.rxItf, paysize) > > + self.dut.send_expect("^C", "#", 5) > > > > def test_Ipsec_Encryption_Jumboframe(self): > > """ > > @@ -253,6 +273,7 @@ class TestInlineIpsec(TestCase): > > paysize = random.randint(ETHER_STANDARD_MTU, > > ETHER_JUMBO_FRAME_MTU) > > self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg', > > self.txItf, self.rxItf, paysize, > > ETHER_JUMBO_FRAME_MTU) > > + self.dut.send_expect("^C", "#", 5) > > > > def test_Ipsec_Encryption_Rss(self): > > """ > > @@ -264,6 +285,7 @@ class TestInlineIpsec(TestCase): > > out = self.dut.get_session_output() > > verifycode = "receive 1 packet in rxqueueid=1" > > self.verify(verifycode in out, "rxqueueid error") > > + self.dut.send_expect("^C", "#", 5) > > > > def test_IPSec_Decryption(self): > > """ > > @@ -273,6 +295,7 @@ class TestInlineIpsec(TestCase): > > paysize = random.randint(1, ETHER_STANDARD_MTU) > > self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf, > > self.txItf, paysize, do_encrypt=True, > > count=2) > > + self.dut.send_expect("^C", "#", 5) > > > > def test_IPSec_Decryption_Jumboframe(self): > > """ > > @@ -282,6 +305,7 @@ class TestInlineIpsec(TestCase): > > paysize = random.randint(ETHER_STANDARD_MTU, > > ETHER_JUMBO_FRAME_MTU) > > self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf, > > self.txItf, paysize, > > ETHER_JUMBO_FRAME_MTU, do_encrypt=True, count=2) > > + self.dut.send_expect("^C", "#", 5) > > > > def test_Ipsec_Decryption_Rss(self): > > """ > > @@ -293,6 +317,7 @@ class TestInlineIpsec(TestCase): > > out = self.dut.get_session_output() > > verifycode = "receive 1 packet in rxqueueid=1" > > self.verify(verifycode in out, "rxqueueid error") > > + self.dut.send_expect("^C", "#", 5) > > > > def test_Ipsec_Decryption_wrongkey(self): > > """ > > @@ -303,8 +328,10 @@ class TestInlineIpsec(TestCase): > > self.Ipsec_Encryption(config, '/root/dpdk/dec_wrong_key.cfg', > self.rxItf, > > self.txItf, paysize, do_encrypt=True, verify=False, > count=2) > > out = self.dut.get_session_output() > > - verifycode = "IPSEC_ESP: failed crypto op" > > - self.verify(verifycode in out, "Ipsec Decryption wrongkey failed") > > + verifycode = "IPSEC_ESP: esp_inbound_post() failed crypto op" > > + l = re.findall(verifycode, out) > > + self.verify(len(l) == 2, "Ipsec Decryption wrongkey failed") > > + self.dut.send_expect("^C", "#", 5) > > > > def test_Ipsec_Encryption_Decryption(self): > > """ > > @@ -328,7 +355,6 @@ class TestInlineIpsec(TestCase): > > session_receive2.send_expect(sa_gcm, ">>>", 60) > > session_receive2.send_expect( > > "pkts=sniff(iface='%s',count=2,timeout=30)" % self.txItf, > > "", 60) > > - > > payload = "test for Ipsec Encryption Decryption simultaneously" > > sa_gcm = SecurityAssociation(ESP, spi=5, > > crypt_algo='AES-GCM', @@ -336,24 > > +362,30 @@ class > > TestInlineIpsec(TestCase): > > auth_algo='NULL', auth_key=None, > > tunnel_header=IP(src='172.16.1.5', dst='172.16.2.5')) > > sa_gcm.crypt_algo.icv_size = 16 > > - > > p = IP(src='192.168.105.10', dst='192.168.105.10') > > p /= payload > > p = IP(str(p)) > > - > > e1 = sa_gcm.encrypt(p) > > e2 = p > > > > eth_e1 = Ether() / e1 > > eth_e1.src = self.rx_src > > eth_e1.dst = self.tx_dst > > + > > eth_e2 = Ether() / e2 > > eth_e2.src = self.rx_src > > eth_e2.dst = self.tx_dst > > - > > + session_receive3 = > > self.tester.create_session('check_forward_encryption_package') > > + session_receive3.send_expect("tcpdump -Xvvvi %s -c 1" % > > + self.rxItf, "", > > 30) > > + time.sleep(2) > > sendp(eth_e1, iface=self.rxItf, count=2) > > sendp(eth_e2, iface=self.txItf, count=1) > > time.sleep(30) > > + rev = session_receive3.get_session_before() > > + print(rev) > > + p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),') > > + res = p.search(rev) > > + self.verify(res, 'encrypt failed, tcpdump get %s' % rev) > > session_receive.send_expect( > > "results=sa_gcm.decrypt(pkts[2]['IP'])", ">>>", 60) > > out = session_receive.send_expect("results", ">>>", 60) > > -- > > 2.19.1 ^ permalink raw reply [flat|nested] 5+ messages in thread
end of thread, other threads:[~2019-04-16 17:33 UTC | newest] Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2019-04-11 9:54 [dts] [PATCH V1] update tests/TestSuite_inline_ipsec.py for dpdk_19.05 xiao,qimai 2019-04-12 8:12 ` Wu, ChangqingX 2019-04-15 21:04 ` Tu, Lijuan 2019-04-16 2:20 ` Xiao, QimaiX 2019-04-16 17:33 ` Tu, Lijuan
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox; as well as URLs for NNTP newsgroup(s).