From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by dpdk.space (Postfix) with ESMTP id 64786A00E6 for ; Tue, 16 Apr 2019 12:37:30 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id D15621B4B2; Tue, 16 Apr 2019 12:37:29 +0200 (CEST) Received: from mga02.intel.com (mga02.intel.com [134.134.136.20]) by dpdk.org (Postfix) with ESMTP id 945521B4B1 for ; Tue, 16 Apr 2019 12:37:27 +0200 (CEST) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga008.jf.intel.com ([10.7.209.65]) by orsmga101.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 16 Apr 2019 03:37:26 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.60,357,1549958400"; d="scan'208";a="134767738" Received: from xuyanjie.sh.intel.com ([10.67.111.13]) by orsmga008.jf.intel.com with ESMTP; 16 Apr 2019 03:37:23 -0700 From: xuyanjie To: dts@dpdk.org Cc: xuyanjie Date: Tue, 16 Apr 2019 12:23:43 -0400 Message-Id: <1555431823-169297-1-git-send-email-yanjie.xu@intel.com> X-Mailer: git-send-email 2.7.4 Subject: [dts] [PATCH] add new ipsec lib test scripts X-BeenThere: dts@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: test suite reviews and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dts-bounces@dpdk.org Sender: "dts" Signed-off-by: xuyanjie diff --git a/tests/TestSuite_cryptodev_new_ipsec-gw.py b/tests/TestSuite_cryptodev_new_ipsec-gw.py new file mode 100644 index 0000000..5e348db --- /dev/null +++ b/tests/TestSuite_cryptodev_new_ipsec-gw.py @@ -0,0 +1,741 @@ +# BSD LICENSE +# +# Copyright(c) 2017-2018 Intel Corporation. All rights reserved. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in +# the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Intel Corporation nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import hmac +import hashlib +import binascii +import time +import utils +from test_case import TestCase +from packet import Packet, sniff_packets, load_sniff_packets + +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.primitives.ciphers.aead import AESCCM, AESGCM +from cryptography.hazmat.backends import default_backend + +# Manually Install the CryptoMobile Python Library, +# Before running this test suite +# Web link : https://github.com/mitshell/CryptoMobile +import CryptoMobile.CM as cm +import pyDes + +import TestSuite_cryptodev_common as cc +from packet import save_packets + +class TestIPsecGW(TestCase): + + def set_up_all(self): + + self.core_config = "1S/2C/1T" + self.number_of_ports = 1 + self.dut_ports = self.dut.get_ports(self.nic) + self.verify(len(self.dut_ports) >= self.number_of_ports, + "Not enough ports for " + self.nic) + self.ports_socket = self.dut.get_numa_id(self.dut_ports[0]) + + self.logger.info("core config = " + self.core_config) + self.logger.info("number of ports = " + str(self.number_of_ports)) + self.logger.info("dut ports = " + str(self.dut_ports)) + self.logger.info("ports_socket = " + str(self.ports_socket)) + + # Generally, testbed should has 4 ports NIC, like, + # 03:00.0 03:00.1 03:00.2 03:00.3 + # This test case will + # - physical link is 03:00.0 <-> 03:00.1 and 03:00.2 <-> 03:00.3 + # - bind 03:00.0 and 03:00.2 to ipsec-secgw app + # - send test packet from 03:00.3 + # - receive packet which forwarded by ipsec-secgw from 03:00.0 + # - configure port and peer in dts port.cfg + self.tx_port = self.tester.get_local_port(self.dut_ports[1]) + self.rx_port = self.tester.get_local_port(self.dut_ports[0]) + + self.tx_interface = self.tester.get_interface(self.tx_port) + self.rx_interface = self.tester.get_interface(self.rx_port) + + self.logger.info("tx interface = " + self.tx_interface) + self.logger.info("rx interface = " + self.rx_interface) + + self._app_path = "./examples/ipsec-secgw/build/ipsec-secgw" + if not cc.is_build_skip(self): + cc.build_dpdk_with_cryptodev(self) + cc.bind_qat_device(self) + + self._default_ipsec_gw_opts = { + "config": None, + "P": "", + "p": "0x3", + "f": "local_conf/ipsec_test.cfg", + "u": "0x1" + } + + self._pcap_idx = 0 + self.pcap_filename = '' + + def set_up(self): + pass + + def tear_down(self): + self.dut.kill_all() + + def tear_down_all(self): + cc.clear_dpdk_config(self) + + def test_tun_qat_aes_128_cbc_ipv4(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test tun_qat_aes_128_cbc_ipv4") + self.pcap_filename = "test_tun_qat_aes_128_cbc_ipv4" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + + self.verify(result, "FAIL") + + def test_tun_qat_aes_256_cbc_ipv4(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test tun_qat_aes_256_cbc_ipv4") + self.pcap_filename = "test_tun_qat_aes_256_cbc_ipv4" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + + self.verify(result, "FAIL") + + def test_tun_qat_aes_gcm_ipv4(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test tun_qat_aes_gcm_ipv4") + self.pcap_filename = "test_tun_qat_aes_gcm_ipv4" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + self.verify(result, "FAIL") + + def test_tun_qat_aes_128_ctr_ipv4(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test tun_qat_aes_128_ctr_ipv4") + self.pcap_filename = "test_tun_qat_aes_128_ctr_ipv4" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + self.verify(result, "FAIL") + + def test_tun_qat_aes_128_ctr_ipv6(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test tun_qat_aes_128_ctr_ipv6") + self.pcap_filename = "test_tun_qat_aes_128_ctr_ipv6" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + self.verify(result, "FAIL") + + def test_qat_aes_128_ctr_ipv4_transport(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test trs_qat_aes_128_ctr_ipv4") + self.pcap_filename = "test_trs_qat_aes_128_ctr_ipv4" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + self.verify(result, "FAIL") + + def test_trs_qat_aes_128_ctr_ipv6(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test trs_qat_aes_128_ctr_ipv6") + self.pcap_filename = "test_trs_qat_aes_128_ctr_ipv6" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + self.verify(result, "FAIL") + + def test_tun_qat_null_ipv4(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test tun_qat_null_ipv4") + self.pcap_filename = "test_tun_qat_null_ipv4" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + + self.verify(result, "FAIL") + + def test_trs_qat_aes_128_cbc_ipv4(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test trs_qat_aes_128_cbc_ipv4") + self.pcap_filename = "test_trs_qat_aes_128_cbc_ipv4" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + + self.verify(result, "FAIL") + + def test_trs_qat_aes_256_cbc_ipv4(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test trs_qat_aes_256_cbc_ipv4") + self.pcap_filename = "test_trs_qat_aes_256_cbc_ipv4" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + + self.verify(result, "FAIL") + + def test_trs_qat_aes_gcm_ipv4(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test trs_qat_aes_gcm_ipv4") + self.pcap_filename = "test_trs_qat_aes_gcm_ipv4" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + self.verify(result, "FAIL") + + def test_tun_qat_aes_128_cbc_ipv6(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test tun_qat_aes_128_cbc_ipv6") + self.pcap_filename = "test_tun_qat_aes_128_cbc_ipv6" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + + self.verify(result, "FAIL") + + def test_tun_qat_aes_256_cbc_ipv6(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test tun_qat_aes_256_cbc_ipv6") + self.pcap_filename = "test_tun_qat_aes_256_cbc_ipv6" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + + self.verify(result, "FAIL") + + def test_tun_qat_aes_gcm_ipv6(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test tun_qat_aes_gcm_ipv6") + self.pcap_filename = "test_tun_qat_aes_gcm_ipv6" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + + self.verify(result, "FAIL") + + def test_tun_qat_null_ipv6(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test tun_qat_null_ipv6") + self.pcap_filename = "test_tun_qat_null_ipv6" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + + self.verify(result, "FAIL") + + def test_trs_qat_aes_128_cbc_ipv6(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test trs_qat_aes_128_cbc_ipv6") + self.pcap_filename = "test_trs_qat_aes_128_cbc_ipv6" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + + self.verify(result, "FAIL") + + def test_trs_qat_aes_256_cbc_ipv6(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test trs_qat_aes_256_cbc_ipv6") + self.pcap_filename = "test_trs_qat_aes_256_cbc_ipv6" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + + self.verify(result, "FAIL") + + def test_trs_qat_aes_gcm_ipv6(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test trs_qat_aes_gcm_ipv6") + self.pcap_filename = "test_trs_qat_aes_gcm_ipv6" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + self.verify(result, "FAIL") + + def test_tun_sw_aes_128_cbc_ipv4(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test tun_sw_aes_128_cbc_ipv4") + self.pcap_filename = "test_tun_sw_aes_128_cbc_ipv4" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + + self.verify(result, "FAIL") + + def test_tun_sw_aes_256_cbc_ipv4(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test tun_sw_aes_256_cbc_ipv4") + self.pcap_filename = "test_tun_sw_aes_256_cbc_ipv4" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + + self.verify(result, "FAIL") + + def test_tun_sw_aes_gcm_ipv4(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test tun_sw_aes_gcm_ipv4") + self.pcap_filename = "test_tun_sw_aes_gcm_ipv4" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + self.verify(result, "FAIL") + + def test_tun_sw_null_ipv4(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test tun_sw_null_ipv4") + self.pcap_filename = "test_tun_sw_null_ipv4" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + + self.verify(result, "FAIL") + + def test_trs_sw_aes_128_cbc_ipv4(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test trs_sw_aes_128_cbc_ipv4") + self.pcap_filename = "test_trs_sw_aes_128_cbc_ipv4" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + + self.verify(result, "FAIL") + + def test_trs_sw_aes_256_cbc_ipv4(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test trs_sw_aes_256_cbc_ipv4") + self.pcap_filename = "test_trs_sw_aes_256_cbc_ipv4" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + + self.verify(result, "FAIL") + + def test_trs_sw_aes_gcm_ipv4(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test trs_sw_aes_gcm_ipv4") + self.pcap_filename = "test_trs_sw_aes_gcm_ipv4" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + self.verify(result, "FAIL") + + def test_tun_sw_aes_128_cbc_ipv6(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test tun_sw_aes_128_cbc_ipv6") + self.pcap_filename = "test_tun_sw_aes_128_cbc_ipv6" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + + self.verify(result, "FAIL") + + def test_tun_sw_aes_256_cbc_ipv6(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test tun_sw_aes_256_cbc_ipv6") + self.pcap_filename = "test_tun_sw_aes_256_cbc_ipv6" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + + self.verify(result, "FAIL") + + def test_tun_sw_aes_gcm_ipv6(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test tun_sw_aes_gcm_ipv6") + self.pcap_filename = "test_tun_sw_aes_gcm_ipv6" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + + self.verify(result, "FAIL") + + def test_tun_sw_null_ipv6(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test tun_sw_null_ipv6") + self.pcap_filename = "test_tun_sw_null_ipv6" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + + self.verify(result, "FAIL") + + def test_trs_sw_aes_128_cbc_ipv6(self): + if cc.is_test_skip(self): + return + + result = True + self.logger.info("Test trs_sw_aes_128_cbc_ipv6") + self.pcap_filename = "test_trs_sw_aes_128_cbc_ipv6" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + + self.verify(result, "FAIL") + + def test_trs_sw_aes_256_cbc_ipv6(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test trs_sw_aes_256_cbc_ipv6") + self.pcap_filename = "test_trs_sw_aes_256_cbc_ipv6" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + + self.verify(result, "FAIL") + + def test_trs_sw_aes_gcm_ipv6(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test trs_sw_aes_gcm_ipv6") + self.pcap_filename = "test_trs_sw_aes_gcm_ipv6" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + self.verify(result, "FAIL") + + def test_tun_sw_aes_128_ctr_ipv4(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test tun_sw_aes_128_ctr_ipv4") + self.pcap_filename = "test_tun_sw_aes_128_ctr_ipv4" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + self.verify(result, "FAIL") + + def test_tun_sw_aes_128_ctr_ipv6(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test tun_sw_aes_128_ctr_ipv6") + self.pcap_filename = "test_tun_sw_aes_128_ctr_ipv6" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + self.verify(result, "FAIL") + + def test_trs_sw_aes_128_ctr_ipv4(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test trs_sw_aes_128_ctr_ipv4") + self.pcap_filename = "test_trs_sw_aes_128_ctr_ipv4" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + self.verify(result, "FAIL") + + def test_trs_sw_aes_128_ctr_ipv6(self): + if cc.is_test_skip(self): + return + + result = True + + self.logger.info("Test trs_sw_aes_128_ctr_ipv6") + self.pcap_filename = "test_trs_sw_aes_128_ctr_ipv6" + ipsec_gw_opt_str = self._get_ipsec_gw_opt_str() + self.logger.debug(ipsec_gw_opt_str) + + result = self._execute_ipsec_gw_test(ipsec_gw_opt_str) + self.verify(result, "FAIL") + + def _get_ipsec_gw_opt_str(self, override_ipsec_gw_opts={}): + return cc.get_opt_str(self, self._default_ipsec_gw_opts, + override_ipsec_gw_opts) + + def _execute_ipsec_gw_test(self, ipsec_gw_opt_str): + result = True + eal_opt_str = cc.get_eal_opt_str(self) + + cmd_str = cc.get_dpdk_app_cmd_str(self._app_path, eal_opt_str, ipsec_gw_opt_str) + "-l" + self.logger.info("IPsec-gw cmd: " + cmd_str) + self.dut.send_expect(cmd_str, "IPSEC:", 30) + time.sleep(3) + inst = sniff_packets(self.rx_interface, + timeout=25) + + PACKET_COUNT = 65 + payload = 256 * ['11'] + + case_cfgs = self.get_case_cfg() + dst_ip = case_cfgs["dst_ip"] + src_ip = case_cfgs["src_ip"] + expected_dst_ip = case_cfgs["expected_dst_ip"] + expected_src_ip = case_cfgs["expected_src_ip"] + expected_spi = case_cfgs["expected_spi"] + expected_length = case_cfgs["expected_length"] + #expected_data = case_cfgs["expected_data"] + + pkt = Packet() + if len(dst_ip)<=15: + pkt.assign_layers(["ether", "ipv4", "udp", "raw"]) + pkt.config_layer("ether", {"src": "52:00:00:00:00:00", "dst": "52:00:00:00:00:01"}) + pkt.config_layer("ipv4", {"src": src_ip, "dst": dst_ip}) + else: + pkt.assign_layers(["ether", "ipv6", "udp", "raw"]) + pkt.config_layer("ether", {"src": "52:00:00:00:00:00", "dst": "52:00:00:00:00:01"}) + pkt.config_layer("ipv6", {"src": src_ip, "dst": dst_ip}) + pkt.config_layer("udp", {"dst": 0}) + pkt.config_layer("raw", {"payload": payload}) + pkt.send_pkt(tx_port=self.tx_interface, count=PACKET_COUNT) + pkt.pktgen.pkt.show() + + pkt_rec = load_sniff_packets(inst) + + pcap_filename = "output/{0}.pcap".format(self.pcap_filename) + self.logger.info("Save pkts to {0}".format(pcap_filename)) + save_packets(pkt_rec, pcap_filename) + self._pcap_idx = self._pcap_idx + 1 + + if len(pkt_rec) == 0: + self.logger.error("IPsec forwarding failed") + result = False + + for pkt_r in pkt_rec: + try: + pkt_src_ip = pkt_r.pktgen.strip_layer3("src") + except: + result = False + continue + if pkt_src_ip: + self.logger.debug(pkt_src_ip) + result = True + else: + result = False + continue + if pkt_src_ip != expected_src_ip: + pkt_r.pktgen.pkt.show() + self.logger.error("SRC IP does not match. Pkt:{0}, Expected:{1}".format( + pkt_src_ip, expected_src_ip)) + result = False + break + + pkt_dst_ip = pkt_r.pktgen.strip_layer3("dst") + self.logger.debug(pkt_dst_ip) + if pkt_dst_ip != expected_dst_ip: + pkt_r.pktgen.pkt.show() + self.logger.error("DST IP does not match. Pkt:{0}, Expected:{1}".format( + pkt_dst_ip, expected_dst_ip)) + result = False + break + + packet_hex = pkt_r.pktgen.pkt["ESP"].getfieldval("data") + if packet_hex is None: + self.logger.error("NO Payload !") + result = False + break + payload_str = binascii.b2a_hex(packet_hex) + self.logger.debug(payload_str) + + pkt_spi = hex(pkt_r.pktgen.pkt["ESP"].getfieldval("spi")) + self.logger.debug(pkt_spi) + if pkt_spi != expected_spi: + self.logger.error("SPI does not match. Pkt:{0}, Expected:{1}".format( + pkt_spi, expected_spi)) + result = False + break + + pkt_len = len(payload_str)/2 + self.logger.debug(pkt_len) + if pkt_len != int(expected_length): + self.logger.error("Packet length does not match. Pkt:{0}, Expected:{1}".format( + pkt_len, expected_length)) + result = False + break + + self.dut.kill_all() + return result -- 2.7.4