Soft Patch Panel
 help / color / mirror / Atom feed
From: x-fn-spp@sl.ntt-tx.co.jp
To: ferruh.yigit@intel.com, ogawa.yasufumi@lab.ntt.co.jp
Cc: spp@dpdk.org
Subject: [spp] [PATCH v2 4/7] spp_pcap: add spp_pcap main function
Date: Fri,  8 Feb 2019 17:44:35 +0900	[thread overview]
Message-ID: <201902080844.x188icwT030823@imss04.silk.ntt-tx.co.jp> (raw)
In-Reply-To: <20190208084438.7952-1-x-fn-spp@sl.ntt-tx.co.jp>

From: Hideyuki Yamashita <yamashita.hideyuki@po.ntt-tx.co.jp>

Add spp_pcap.c and spp_pcap.h which defines main functions for spp_pcap.

Signed-off-by: Hideyuki Yamashita <yamashita.hideyuki@po.ntt-tx.co.jp>
Signed-off-by: Naoki Takada <takada.naoki@lab.ntt.co.jp>
---
 src/pcap/spp_pcap.c | 1091 +++++++++++++++++++++++++++++++++++++++++++
 src/pcap/spp_pcap.h |   35 ++
 2 files changed, 1126 insertions(+)
 create mode 100644 src/pcap/spp_pcap.c
 create mode 100644 src/pcap/spp_pcap.h

diff --git a/src/pcap/spp_pcap.c b/src/pcap/spp_pcap.c
new file mode 100644
index 0000000..2716c01
--- /dev/null
+++ b/src/pcap/spp_pcap.c
@@ -0,0 +1,1091 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Nippon Telegraph and Telephone Corporation
+ */
+
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <getopt.h>
+#include <sys/stat.h>
+
+#include <rte_common.h>
+#include <rte_cycles.h>
+
+#include <lz4frame.h>
+
+#include "shared/common.h"
+#include "spp_proc.h"
+#include "spp_pcap.h"
+#include "command_proc.h"
+#include "command_dec.h"
+#include "spp_port.h"
+
+/* Declare global variables */
+#define RTE_LOGTYPE_SPP_PCAP RTE_LOGTYPE_USER2
+
+#define PCAP_FPATH_STRLEN 128
+#define PCAP_FNAME_STRLEN 64
+#define PCAP_FDATE_STRLEN 16
+/**
+ * The first 4 bytes 0xa1b2c3d4 constitute the magic number which is used to
+ * identify pcap files.
+ */
+#define TCPDUMP_MAGIC 0xa1b2c3d4
+/* constant which indicates major verions of libpcap file */
+#define PCAP_VERSION_MAJOR 2
+#define PCAP_VERSION_MINOR 4
+#define PCAP_SNAPLEN_MAX 65535
+/**
+ * pcap header value which indicates physical layer type.
+ * 1 means LINKTYPE_ETHERNET
+ */
+#define PCAP_LINKTYPE 1
+#define IN_CHUNK_SIZE (16*1024)
+#define DEFAULT_OUTPUT_DIR "/tmp"
+#define DEFAULT_FILE_LIMIT 1073741824 /* 1GiB */
+#define PORT_STR_SIZE 16
+#define RING_SIZE 8192
+/* macro */
+/* Ensure snaplen not to be over the maximum size */
+#define TRANCATE_SNAPLEN(a, b) (((a) < (b))?(a):(b))
+
+/* capture thread type */
+enum worker_thread_type {
+	PCAP_UNUSE,  /* Not used */
+	PCAP_RECEIVE,/* thread type receive */
+	PCAP_WRITE   /* thread type write */
+};
+
+/* compress file generate mode */
+enum comp_file_generate_mode {
+	INIT_MODE,   /**
+		       * initial generation mode which is used
+		       * when capture is started
+		       */
+	UPDATE_MODE, /**
+		       * update generation mode which is used
+		       * when capture file reached max size
+		       */
+	CLOSE_MODE   /* close mode which is used when capture is stopped */
+};
+
+/* capture thread name string  */
+const char *CAPTURE_THREAD_TYPE_STRINGS[] = {
+	"unuse",
+	"receive",
+	"write",
+	/* termination */ "",
+};
+
+/* lz4 preferences */
+static const LZ4F_preferences_t g_kprefs = {
+	{
+		LZ4F_max256KB,
+		LZ4F_blockLinked,
+		LZ4F_noContentChecksum,
+		LZ4F_frame,
+		0,                   /* unknown content size */
+		{ 0, 0},             /* reserved, must be set to 0 */
+	},
+	0,                           /* compression level; 0 == default */
+	0,                           /* autoflush */
+	{ 0, 0, 0, 0},               /* reserved, must be set to 0 */
+};
+
+/* pcap file header */
+struct __attribute__((__packed__)) pcap_header {
+	uint32_t magic_number;  /* magic number */
+	uint16_t version_major; /* major version number */
+	uint16_t version_minor; /* minor version number */
+	int32_t  thiszone;      /* GMT to local correction */
+	uint32_t sigfigs;       /* accuracy of timestamps */
+	uint32_t snaplen;       /* max length of captured packets, in octets */
+	uint32_t network;       /* data link type */
+};
+
+/* pcap packet header */
+struct pcap_packet_header {
+	uint32_t ts_sec;        /* time stamp seconds */
+	uint32_t ts_usec;       /* time stamp micro seconds */
+	uint32_t write_len;     /* write length */
+	uint32_t packet_len;    /* packet length */
+};
+
+/* Option for pcap. */
+struct pcap_option {
+	struct timespec start_time; /* start time */
+	uint64_t file_limit;        /* file size limit */
+	char compress_file_path[PCAP_FPATH_STRLEN]; /* file path */
+	char compress_file_date[PCAP_FDATE_STRLEN]; /* file name date */
+	struct spp_port_info port_cap;  /* capture port */
+	struct rte_ring *cap_ring;      /* RTE ring structure */
+};
+
+/**
+ * pcap management info which stores attributes
+ * (e.g. worker thread type, file number, pointer to writing file etc) per core
+ */
+struct pcap_mng_info {
+	volatile enum worker_thread_type type; /* thread type */
+	enum spp_capture_status status; /* thread status */
+	int thread_no;                 /* thread no */
+	int file_no;                   /* file no */
+	char compress_file_name[PCAP_FNAME_STRLEN]; /* lz4 file name */
+	LZ4F_compressionContext_t ctx; /* lz4 file Ccontext */
+	FILE *compress_fp;             /* lzf file pointer */
+	size_t outbuf_capacity;        /* compress date buffer size */
+	void *outbuff;                 /* compress date buffer */
+	uint64_t file_size;            /* file write size */
+};
+
+/* Logical core ID for main thread */
+static unsigned int g_main_lcore_id = 0xffffffff;
+
+/* Execution parameter of spp_pcap */
+static struct startup_param g_startup_param;
+
+/* Interface management information */
+static struct iface_info g_iface_info;
+
+/* Core management information */
+static struct core_mng_info g_core_info[RTE_MAX_LCORE];
+
+/* Packet capture request information */
+static int g_capture_request;
+
+/* Packet capture status information */
+static int g_capture_status;
+
+/* pcap option */
+static struct pcap_option g_pcap_option;
+
+/* pcap managed info */
+static struct pcap_mng_info g_pcap_info[RTE_MAX_LCORE];
+
+/* Print help message */
+static void
+usage(const char *progname)
+{
+	RTE_LOG(INFO, SPP_PCAP, "Usage: %s [EAL args] --"
+		" --client-id CLIENT_ID"
+		" -s SERVER_IP:SERVER_PORT"
+		" -i INPUT_PORT"
+		" [--output FILE_OUT_PUT_PATH]"
+		" [--limit_file_size LIMIT_FILE_SIZE]\n"
+		" --client-id CLIENT_ID   : My client ID\n"
+		" -s SERVER_IP:SERVER_PORT  : "
+				"Access information to the server\n"
+		" -i                        : capture port(phy,ring)\n"
+		" --output                  : file path(default:/tmp)\n"
+		" --limit_file_size         : "
+				"file limit size(default:1073741824 Byte)\n"
+		, progname);
+}
+
+/**
+ * Convert string of given client id to integer
+ *
+ * If succeeded, client id of integer is assigned to client_id and
+ * return SPP_RET_OK. Or return -SPP_RET_NG if failed.
+ */
+static int
+decode_client_id(const char *client_id_str, int *client_id)
+{
+	int id = 0;
+	char *endptr = NULL;
+
+	id = strtol(client_id_str, &endptr, 0);
+	if (unlikely(client_id_str == endptr) || unlikely(*endptr != '\0'))
+		return SPP_RET_NG;
+
+	if (id >= RTE_MAX_LCORE)
+		return SPP_RET_NG;
+
+	*client_id = id;
+	RTE_LOG(DEBUG, SPP_PCAP, "Set client id = %d\n", *client_id);
+	return SPP_RET_OK;
+}
+
+/* Parse options for server IP and port */
+static int
+parse_server_ip(const char *server_str, char *server_ip, int *server_port)
+{
+	const char delim[2] = ":";
+	unsigned int pos = 0;
+	int port = 0;
+	char *endptr = NULL;
+
+	pos = strcspn(server_str, delim);
+	if (pos >= strlen(server_str))
+		return SPP_RET_NG;
+
+	port = strtol(&server_str[pos+1], &endptr, 0);
+	if (unlikely(&server_str[pos+1] == endptr) || unlikely(*endptr != '\0'))
+		return SPP_RET_NG;
+
+	memcpy(server_ip, server_str, pos);
+	server_ip[pos] = '\0';
+	*server_port = port;
+	RTE_LOG(DEBUG, SPP_PCAP, "Set server IP   = %s\n", server_ip);
+	RTE_LOG(DEBUG, SPP_PCAP, "Set server port = %d\n", *server_port);
+	return SPP_RET_OK;
+}
+
+
+/* Decode options for limit file size */
+static int
+decode_limit_file_size(const char *limit_size_str, uint64_t *limit_size)
+{
+	uint64_t file_limit = 0;
+	char *endptr = NULL;
+
+	file_limit = strtoull(limit_size_str, &endptr, 10);
+	if (unlikely(limit_size_str == endptr) || unlikely(*endptr != '\0'))
+		return SPP_RET_NG;
+
+	*limit_size = file_limit;
+	RTE_LOG(DEBUG, SPP_PCAP, "Set limit file zise = %ld\n", *limit_size);
+	return SPP_RET_OK;
+}
+
+/* Decode options for port */
+static int
+decode_capture_port(const char *port_str, enum port_type *iface_type,
+			int *iface_no)
+{
+	enum port_type type = UNDEF;
+	const char *no_str = NULL;
+	char *endptr = NULL;
+
+	/* Find out which type of interface from resource UID */
+	if (strncmp(port_str, SPP_IFTYPE_NIC_STR ":",
+			strlen(SPP_IFTYPE_NIC_STR)+1) == 0) {
+		/* NIC */
+		type = PHY;
+		no_str = &port_str[strlen(SPP_IFTYPE_NIC_STR)+1];
+	} else if (strncmp(port_str, SPP_IFTYPE_RING_STR ":",
+			strlen(SPP_IFTYPE_RING_STR)+1) == 0) {
+		/* RING */
+		type = RING;
+		no_str = &port_str[strlen(SPP_IFTYPE_RING_STR)+1];
+	} else {
+		/* OTHER */
+		RTE_LOG(ERR, SPP_PCAP, "The interface that does not suppor. "
+					"(port = %s)\n", port_str);
+		return SPP_RET_NG;
+	}
+
+	/* Convert from string to number */
+	int ret_no = strtol(no_str, &endptr, 0);
+	if (unlikely(no_str == endptr) || unlikely(*endptr != '\0')) {
+		/* No IF number */
+		RTE_LOG(ERR, SPP_PCAP, "No interface number. (port = %s)\n",
+								port_str);
+		return SPP_RET_NG;
+	}
+
+	*iface_type = type;
+	*iface_no = ret_no;
+
+	RTE_LOG(DEBUG, SPP_PCAP, "Port = %s => Type = %d No = %d\n",
+					port_str, *iface_type, *iface_no);
+	return SPP_RET_OK;
+}
+
+/* Parse options for client app */
+static int
+parse_args(int argc, char *argv[])
+{
+	int cnt;
+	int proc_flg = 0;
+	int server_flg = 0;
+	int port_flg = 0;
+	int option_index, opt;
+	const int argcopt = argc;
+	char *argvopt[argcopt];
+	const char *progname = argv[0];
+	char port_str[PORT_STR_SIZE];
+	static struct option lgopts[] = {
+			{ "client-id",       required_argument, NULL,
+					SPP_LONGOPT_RETVAL_CLIENT_ID },
+			{ "output",          required_argument, NULL,
+					SPP_LONGOPT_RETVAL_OUTPUT },
+			{ "limit_file_size", required_argument, NULL,
+					SPP_LONGOPT_RETVAL_LIMIT_FILE_SIZE},
+			{ 0 },
+	};
+	/**
+	 * Save argv to argvopt to avoid losing the order of options
+	 * by getopt_long()
+	 */
+	for (cnt = 0; cnt < argcopt; cnt++)
+		argvopt[cnt] = argv[cnt];
+
+	/* Clear startup parameters */
+	memset(&g_startup_param, 0x00, sizeof(g_startup_param));
+
+	/* option parameters init */
+	memset(&g_pcap_option, 0x00, sizeof(g_pcap_option));
+	strcpy(g_pcap_option.compress_file_path, DEFAULT_OUTPUT_DIR);
+	g_pcap_option.file_limit = DEFAULT_FILE_LIMIT;
+
+	/* Check options of application */
+	optind = 0;
+	opterr = 0;
+	while ((opt = getopt_long(argc, argvopt, "i:s:", lgopts,
+			&option_index)) != EOF) {
+		switch (opt) {
+		case SPP_LONGOPT_RETVAL_CLIENT_ID:
+			if (decode_client_id(optarg,
+					&g_startup_param.client_id) !=
+								SPP_RET_OK) {
+				usage(progname);
+				return SPP_RET_NG;
+			}
+			proc_flg = 1;
+			break;
+		case SPP_LONGOPT_RETVAL_OUTPUT:
+			strcpy(g_pcap_option.compress_file_path, optarg);
+			struct stat statBuf;
+			if (g_pcap_option.compress_file_path[0] == '\0' ||
+						stat(optarg, &statBuf) != 0) {
+				usage(progname);
+				return SPP_RET_NG;
+			}
+			break;
+		case SPP_LONGOPT_RETVAL_LIMIT_FILE_SIZE:
+			if (decode_limit_file_size(optarg,
+						&g_pcap_option.file_limit) !=
+						SPP_RET_OK) {
+				usage(progname);
+				return SPP_RET_NG;
+			}
+			break;
+		case 'i':
+			strcpy(port_str, optarg);
+			if (decode_capture_port(optarg,
+					&g_pcap_option.port_cap.iface_type,
+					&g_pcap_option.port_cap.iface_no) !=
+					SPP_RET_OK) {
+				usage(progname);
+				return SPP_RET_NG;
+			}
+			port_flg = 1;
+			break;
+		case 's':
+			if (parse_server_ip(optarg, g_startup_param.server_ip,
+					&g_startup_param.server_port) !=
+								SPP_RET_OK) {
+				usage(progname);
+				return SPP_RET_NG;
+			}
+			server_flg = 1;
+			break;
+		default:
+			usage(progname);
+			return SPP_RET_NG;
+		}
+	}
+
+	/* Check mandatory parameters */
+	if ((proc_flg == 0) || (server_flg == 0) || (port_flg == 0)) {
+		usage(progname);
+		return SPP_RET_NG;
+	}
+
+	RTE_LOG(INFO, SPP_PCAP,
+			"app opts (client_id=%d,server=%s:%d,"
+			"port=%s,output=%s,limit_file_size=%ld)\n",
+			g_startup_param.client_id,
+			g_startup_param.server_ip,
+			g_startup_param.server_port,
+			port_str,
+			g_pcap_option.compress_file_path,
+			g_pcap_option.file_limit);
+	return SPP_RET_OK;
+}
+
+/* Pcap get core status */
+int
+spp_pcap_get_core_status(
+		unsigned int lcore_id,
+		struct spp_iterate_core_params *params)
+{
+	int ret = SPP_RET_NG;
+	char role_type[8];
+	struct pcap_mng_info *info = &g_pcap_info[lcore_id];
+	char name[PCAP_FPATH_STRLEN + PCAP_FDATE_STRLEN];
+	struct spp_port_index rx_ports[1];
+	int rx_num = 0;
+
+	RTE_LOG(DEBUG, SPP_PCAP, "status core[%d]\n", lcore_id);
+	if (info->type == PCAP_RECEIVE) {
+		memset(rx_ports, 0x00, sizeof(rx_ports));
+		rx_ports[0].iface_type = g_pcap_option.port_cap.iface_type;
+		rx_ports[0].iface_no   = g_pcap_option.port_cap.iface_no;
+		rx_num = 1;
+		strcpy(role_type, "receive");
+	}
+	if (info->type == PCAP_WRITE) {
+		memset(name, 0x00, sizeof(name));
+		if (info->compress_fp != NULL)
+			snprintf(name, sizeof(name) - 1, "%s/%s",
+					g_pcap_option.compress_file_path,
+					info->compress_file_name);
+		strcpy(role_type, "write");
+	}
+
+
+	/* Set the information with the function specified by the command. */
+	ret = (*params->element_proc)(
+		params, lcore_id,
+		name, role_type,
+		rx_num, rx_ports, 0, NULL);
+	if (unlikely(ret != 0))
+		return SPP_RET_NG;
+
+	return SPP_RET_OK;
+}
+
+/* write compressed data into file  */
+static int output_pcap_file(FILE *compress_fp, void *srcbuf, size_t write_len)
+{
+	size_t write_size;
+
+	if (write_len == 0)
+		return SPP_RET_OK;
+	write_size = fwrite(srcbuf, write_len, 1, compress_fp);
+	if (write_size != 1) {
+		RTE_LOG(ERR, SPP_PCAP, "file write error len=%lu\n",
+								write_len);
+		return SPP_RET_NG;
+	}
+	return SPP_RET_OK;
+}
+
+/* compress data & write file */
+static int output_lz4_pcap_file(struct pcap_mng_info *info,
+			       void *srcbuf,
+			       int src_len)
+{
+	size_t compress_len;
+
+	compress_len = LZ4F_compressUpdate(info->ctx, info->outbuff,
+				info->outbuf_capacity, srcbuf, src_len, NULL);
+	if (LZ4F_isError(compress_len)) {
+		RTE_LOG(ERR, SPP_PCAP, "Compression failed: error %zd\n",
+							compress_len);
+		return SPP_RET_NG;
+	}
+	RTE_LOG(DEBUG, SPP_PCAP, "src len=%d\n", src_len);
+	if (output_pcap_file(info->compress_fp, info->outbuff,
+						compress_len) != 0)
+		return SPP_RET_NG;
+
+	return SPP_RET_OK;
+}
+
+/**
+ * File compression operation. There are three mode.
+ * Open and update and close.
+ */
+static int file_compression_operation(struct pcap_mng_info *info,
+				   enum comp_file_generate_mode mode)
+{
+	struct pcap_header pcap_h;
+	size_t ctxCreation;
+	size_t headerSize;
+	size_t compress_len;
+	char temp_file[PCAP_FPATH_STRLEN + PCAP_FNAME_STRLEN];
+	char save_file[PCAP_FPATH_STRLEN + PCAP_FNAME_STRLEN];
+	const char *iface_type_str;
+
+	if (mode == INIT_MODE) { /* initial generation mode */
+		/* write buffer size get */
+		info->outbuf_capacity = LZ4F_compressBound(IN_CHUNK_SIZE,
+								&g_kprefs);
+		/* write buff allocation */
+		info->outbuff = malloc(info->outbuf_capacity);
+
+		/* Initialize pcap file name */
+		info->file_size = 0;
+		info->file_no = 1;
+		if (g_pcap_option.port_cap.iface_type == PHY)
+			iface_type_str = SPP_IFTYPE_NIC_STR;
+		else
+			iface_type_str = SPP_IFTYPE_RING_STR;
+		snprintf(info->compress_file_name,
+					PCAP_FNAME_STRLEN - 1,
+					"spp_pcap.%s.%s%d.%u.%u.pcap.lz4",
+					g_pcap_option.compress_file_date,
+					iface_type_str,
+					g_pcap_option.port_cap.iface_no,
+					info->thread_no,
+					info->file_no);
+	} else if (mode == UPDATE_MODE) { /* update generation mode */
+		/* old compress file close */
+		/* flush whatever remains within internal buffers */
+		compress_len = LZ4F_compressEnd(info->ctx, info->outbuff,
+					info->outbuf_capacity, NULL);
+		if (LZ4F_isError(compress_len)) {
+			RTE_LOG(ERR, SPP_PCAP, "Failed to end compression: "
+					"error %zd\n", compress_len);
+			fclose(info->compress_fp);
+			info->compress_fp = NULL;
+			free(info->outbuff);
+			return SPP_RET_NG;
+		}
+		if (output_pcap_file(info->compress_fp, info->outbuff,
+						compress_len) != SPP_RET_OK) {
+			fclose(info->compress_fp);
+			info->compress_fp = NULL;
+			free(info->outbuff);
+			return SPP_RET_NG;
+		}
+
+		/* flush remained data */
+		fclose(info->compress_fp);
+		info->compress_fp = NULL;
+
+		/* rename temporary file */
+		memset(temp_file, 0,
+			PCAP_FPATH_STRLEN + PCAP_FNAME_STRLEN);
+		memset(save_file, 0,
+			PCAP_FPATH_STRLEN + PCAP_FNAME_STRLEN);
+		snprintf(temp_file,
+		    (PCAP_FPATH_STRLEN + PCAP_FNAME_STRLEN) - 1,
+		    "%s/%s.tmp", g_pcap_option.compress_file_path,
+		    info->compress_file_name);
+		snprintf(save_file,
+		    (PCAP_FPATH_STRLEN + PCAP_FNAME_STRLEN) - 1,
+		    "%s/%s", g_pcap_option.compress_file_path,
+		    info->compress_file_name);
+		rename(temp_file, save_file);
+
+		/* Initialize pcap file name */
+		info->file_size = 0;
+		info->file_no++;
+		if (g_pcap_option.port_cap.iface_type == PHY)
+			iface_type_str = SPP_IFTYPE_NIC_STR;
+		else
+			iface_type_str = SPP_IFTYPE_RING_STR;
+		snprintf(info->compress_file_name,
+					PCAP_FNAME_STRLEN - 1,
+					"spp_pcap.%s.%s%d.%u.%u.pcap.lz4",
+					g_pcap_option.compress_file_date,
+					iface_type_str,
+					g_pcap_option.port_cap.iface_no,
+					info->thread_no,
+					info->file_no);
+	} else { /* close mode */
+		/* Close temporary file and rename to persistent */
+		if (info->compress_fp == NULL)
+			return SPP_RET_OK;
+		compress_len = LZ4F_compressEnd(info->ctx, info->outbuff,
+					info->outbuf_capacity, NULL);
+		if (LZ4F_isError(compress_len)) {
+			RTE_LOG(ERR, SPP_PCAP, "Failed to end compression: "
+					"error %zd\n", compress_len);
+		} else {
+			output_pcap_file(info->compress_fp, info->outbuff,
+								compress_len);
+			info->file_size += compress_len;
+		}
+		/* flush remained data */
+		fclose(info->compress_fp);
+
+		/* rename temporary file */
+		memset(temp_file, 0,
+			PCAP_FPATH_STRLEN + PCAP_FNAME_STRLEN);
+		memset(save_file, 0,
+			PCAP_FPATH_STRLEN + PCAP_FNAME_STRLEN);
+		snprintf(temp_file,
+		    (PCAP_FPATH_STRLEN + PCAP_FNAME_STRLEN) - 1,
+		    "%s/%s.tmp", g_pcap_option.compress_file_path,
+		    info->compress_file_name);
+		snprintf(save_file,
+		    (PCAP_FPATH_STRLEN + PCAP_FNAME_STRLEN) - 1,
+		    "%s/%s", g_pcap_option.compress_file_path,
+		    info->compress_file_name);
+		rename(temp_file, save_file);
+
+		info->compress_fp = NULL;
+		free(info->outbuff);
+		return SPP_RET_OK;
+	}
+
+	/* file open */
+	memset(temp_file, 0,
+		PCAP_FPATH_STRLEN + PCAP_FNAME_STRLEN);
+	snprintf(temp_file,
+		(PCAP_FPATH_STRLEN + PCAP_FNAME_STRLEN) - 1,
+		"%s/%s.tmp", g_pcap_option.compress_file_path,
+		info->compress_file_name);
+	RTE_LOG(INFO, SPP_PCAP, "open compress filename=%s\n", temp_file);
+	info->compress_fp = fopen(temp_file, "wb");
+	if (info->compress_fp == NULL) {
+		RTE_LOG(ERR, SPP_PCAP, "file open error! filename=%s\n",
+						info->compress_file_name);
+		free(info->outbuff);
+		return SPP_RET_NG;
+	}
+
+	/* init lz4 stream */
+	ctxCreation = LZ4F_createCompressionContext(&info->ctx, LZ4F_VERSION);
+	if (LZ4F_isError(ctxCreation)) {
+		RTE_LOG(ERR, SPP_PCAP, "LZ4F_createCompressionContext error "
+						"(%zd)\n", ctxCreation);
+		fclose(info->compress_fp);
+		info->compress_fp = NULL;
+		free(info->outbuff);
+		return SPP_RET_NG;
+	}
+
+	/* write compress frame header */
+	headerSize = LZ4F_compressBegin(info->ctx, info->outbuff,
+					info->outbuf_capacity, &g_kprefs);
+	if (LZ4F_isError(headerSize)) {
+		RTE_LOG(ERR, SPP_PCAP, "Failed to start compression: "
+					"error %zd\n", headerSize);
+		fclose(info->compress_fp);
+		info->compress_fp = NULL;
+		free(info->outbuff);
+		return SPP_RET_NG;
+	}
+	RTE_LOG(DEBUG, SPP_PCAP, "Buffer size is %zd bytes, header size %zd "
+			"bytes\n", info->outbuf_capacity, headerSize);
+	if (output_pcap_file(info->compress_fp, info->outbuff,
+						headerSize) != 0) {
+		fclose(info->compress_fp);
+		info->compress_fp = NULL;
+		free(info->outbuff);
+		return SPP_RET_NG;
+	}
+	info->file_size = headerSize;
+
+	/* init the common pcap header */
+	pcap_h.magic_number = TCPDUMP_MAGIC;
+	pcap_h.version_major = PCAP_VERSION_MAJOR;
+	pcap_h.version_minor = PCAP_VERSION_MINOR;
+	pcap_h.thiszone = 0;
+	pcap_h.sigfigs = 0;
+	pcap_h.snaplen = PCAP_SNAPLEN_MAX;
+	pcap_h.network = PCAP_LINKTYPE;
+
+	/* pcap header write */
+	if (output_lz4_pcap_file(info, &pcap_h, sizeof(struct pcap_header))
+							!= SPP_RET_OK) {
+		RTE_LOG(ERR, SPP_PCAP, "pcap header write  error!\n");
+		fclose(info->compress_fp);
+		info->compress_fp = NULL;
+		free(info->outbuff);
+		return SPP_RET_NG;
+	}
+
+	return SPP_RET_OK;
+}
+
+/* compress packet data */
+static int compress_file_packet(struct pcap_mng_info *info,
+				struct rte_mbuf *cap_pkt)
+{
+	unsigned int write_packet_length;
+	unsigned int packet_length;
+	struct timespec cap_time;
+	struct pcap_packet_header pcap_packet_h;
+	unsigned int remaining_bytes;
+	int bytes_to_write;
+
+	if (info->compress_fp == NULL)
+		return SPP_RET_OK;
+
+	/* capture file rool */
+	if (info->file_size > g_pcap_option.file_limit) {
+		if (file_compression_operation(info, UPDATE_MODE)
+							!= SPP_RET_OK)
+			return SPP_RET_NG;
+	}
+
+	/* cast to packet */
+	packet_length = rte_pktmbuf_pkt_len(cap_pkt);
+
+	/* truncate packet over the maximum length */
+	write_packet_length = TRANCATE_SNAPLEN(PCAP_SNAPLEN_MAX,
+							packet_length);
+
+	/* get time */
+	clock_gettime(CLOCK_REALTIME, &cap_time);
+
+	/* write block header */
+	pcap_packet_h.ts_sec = (int32_t)cap_time.tv_sec;
+	pcap_packet_h.ts_usec = (int32_t)(cap_time.tv_nsec / 1000);
+	pcap_packet_h.write_len = write_packet_length;
+	pcap_packet_h.packet_len = packet_length;
+
+	/* output to lz4_pcap_file */
+	if (output_lz4_pcap_file(info, &pcap_packet_h.ts_sec,
+			sizeof(struct pcap_packet_header)) != SPP_RET_OK) {
+		file_compression_operation(info, CLOSE_MODE);
+		return SPP_RET_NG;
+	}
+	info->file_size += sizeof(struct pcap_packet_header);
+
+	/* write content */
+	remaining_bytes = write_packet_length;
+	while (cap_pkt != NULL && remaining_bytes > 0) {
+		/* write file */
+		bytes_to_write = TRANCATE_SNAPLEN(
+					rte_pktmbuf_data_len(cap_pkt),
+					remaining_bytes);
+
+		/* output to lz4_pcap_file */
+		if (output_lz4_pcap_file(info,
+				rte_pktmbuf_mtod(cap_pkt, void*),
+						bytes_to_write) != 0) {
+			file_compression_operation(info, CLOSE_MODE);
+			return SPP_RET_NG;
+		}
+		cap_pkt = cap_pkt->next;
+		remaining_bytes -= bytes_to_write;
+		info->file_size += bytes_to_write;
+	}
+
+	return SPP_RET_OK;
+}
+
+/* receive thread */
+static int pcap_proc_receive(int lcore_id)
+{
+	struct timespec now_time;
+	struct tm l_time;
+	int buf;
+	int nb_rx = 0;
+	int nb_tx = 0;
+	struct spp_port_info *rx;
+	struct rte_mbuf *bufs[MAX_PKT_BURST];
+	struct pcap_mng_info *info = &g_pcap_info[lcore_id];
+	struct rte_ring *write_ring = g_pcap_option.cap_ring;
+
+	if (g_capture_request == SPP_CAPTURE_IDLE) {
+		if (info->status == SPP_CAPTURE_RUNNING) {
+			RTE_LOG(DEBUG, SPP_PCAP, "recive[%d], run->idle\n",
+								lcore_id);
+			info->status = SPP_CAPTURE_IDLE;
+			g_capture_status = SPP_CAPTURE_IDLE;
+		}
+		return SPP_RET_OK;
+	}
+	if (info->status == SPP_CAPTURE_IDLE) {
+		/* get time */
+		clock_gettime(CLOCK_REALTIME, &now_time);
+		memset(g_pcap_option.compress_file_date, 0, PCAP_FDATE_STRLEN);
+		localtime_r(&now_time.tv_sec, &l_time);
+		strftime(g_pcap_option.compress_file_date, PCAP_FDATE_STRLEN,
+					"%Y%m%d%H%M%S", &l_time);
+		info->status = SPP_CAPTURE_RUNNING;
+		g_capture_status = SPP_CAPTURE_RUNNING;
+		RTE_LOG(DEBUG, SPP_PCAP, "recive[%d], idle->run\n", lcore_id);
+		RTE_LOG(DEBUG, SPP_PCAP, "recive[%d], start time=%s\n",
+			lcore_id, g_pcap_option.compress_file_date);
+	}
+
+	/* Receive packets */
+	rx = &g_pcap_option.port_cap;
+
+	nb_rx = spp_eth_rx_burst(rx->dpdk_port, 0, bufs, MAX_PKT_BURST);
+	if (unlikely(nb_rx == 0))
+		return SPP_RET_OK;
+
+	/* Write ring packets */
+	nb_tx = rte_ring_enqueue_bulk(write_ring, (void *)bufs, nb_rx, NULL);
+
+	/* Discard remained packets to release mbuf */
+	if (unlikely(nb_tx < nb_rx)) {
+		RTE_LOG(ERR, SPP_PCAP, "drop packets(receve) %d\n",
+							(nb_rx - nb_tx));
+		for (buf = nb_tx; buf < nb_rx; buf++)
+			rte_pktmbuf_free(bufs[buf]);
+	}
+
+	return SPP_RET_OK;
+}
+
+/* write thread */
+static int pcap_proc_write(int lcore_id)
+{
+	int ret = SPP_RET_OK;
+	int buf;
+	int nb_rx = 0;
+	struct rte_mbuf *bufs[MAX_PKT_BURST];
+	struct rte_mbuf *mbuf = NULL;
+	struct pcap_mng_info *info = &g_pcap_info[lcore_id];
+	struct rte_ring *read_ring = g_pcap_option.cap_ring;
+
+	if (g_capture_status == SPP_CAPTURE_IDLE) {
+		if (info->status == SPP_CAPTURE_RUNNING) {
+			RTE_LOG(DEBUG, SPP_PCAP, "write[%d] run->idle\n",
+								lcore_id);
+			info->status = SPP_CAPTURE_IDLE;
+			if (file_compression_operation(info, CLOSE_MODE)
+							!= SPP_RET_OK)
+				return SPP_RET_NG;
+		}
+		return SPP_RET_OK;
+	}
+	if (info->status == SPP_CAPTURE_IDLE) {
+		RTE_LOG(DEBUG, SPP_PCAP, "write[%d] idle->run\n", lcore_id);
+		info->status = SPP_CAPTURE_RUNNING;
+		if (file_compression_operation(info, INIT_MODE)
+						!= SPP_RET_OK) {
+			info->status = SPP_CAPTURE_IDLE;
+			return SPP_RET_NG;
+		}
+	}
+
+	/* Read packets */
+	nb_rx =  rte_ring_dequeue_bulk(read_ring, (void *)bufs, MAX_PKT_BURST,
+									NULL);
+	if (unlikely(nb_rx == 0))
+		return SPP_RET_OK;
+
+	for (buf = 0; buf < nb_rx; buf++) {
+		mbuf = bufs[buf];
+		rte_prefetch0(rte_pktmbuf_mtod(mbuf, void *));
+		if (compress_file_packet(&g_pcap_info[lcore_id], mbuf)
+							!= SPP_RET_OK) {
+			RTE_LOG(ERR, SPP_PCAP, "capture file write error: "
+				"%d (%s)\n", errno, strerror(errno));
+			RTE_LOG(ERR, SPP_PCAP, "drop packets(write) %d\n",
+							(nb_rx - buf));
+			ret = SPP_RET_NG;
+			info->status = SPP_CAPTURE_IDLE;
+			file_compression_operation(info, CLOSE_MODE);
+			break;
+		}
+	}
+	/* mbuf free */
+	for (buf = 0; buf < nb_rx; buf++)
+		rte_pktmbuf_free(bufs[buf]);
+	return ret;
+}
+
+/* Main process of slave core */
+static int
+slave_main(void *arg __attribute__ ((unused)))
+{
+	int ret = SPP_RET_OK;
+	unsigned int lcore_id = rte_lcore_id();
+	struct pcap_mng_info *pcap_info = &g_pcap_info[lcore_id];
+
+	if (pcap_info->thread_no == 0) {
+		RTE_LOG(INFO, SPP_PCAP, "Core[%d] Start recive.\n", lcore_id);
+		pcap_info->type = PCAP_RECEIVE;
+	} else {
+		RTE_LOG(INFO, SPP_PCAP, "Core[%d] Start write(%d).\n",
+					lcore_id, pcap_info->thread_no);
+		pcap_info->type = PCAP_WRITE;
+	}
+	set_core_status(lcore_id, SPP_CORE_IDLE);
+
+	while (1) {
+		if (spp_get_core_status(lcore_id) == SPP_CORE_STOP_REQUEST) {
+			if (pcap_info->status == SPP_CAPTURE_IDLE)
+				break;
+			if (pcap_info->type == PCAP_RECEIVE)
+				g_capture_request = SPP_CAPTURE_IDLE;
+		}
+
+		if (pcap_info->type == PCAP_RECEIVE)
+			ret = pcap_proc_receive(lcore_id);
+		else
+			ret = pcap_proc_write(lcore_id);
+		if (unlikely(ret != SPP_RET_OK)) {
+			RTE_LOG(ERR, SPP_PCAP, "Core[%d] Thread Error.\n",
+								lcore_id);
+			break;
+		}
+	}
+
+	set_core_status(lcore_id, SPP_CORE_STOP);
+	RTE_LOG(INFO, SPP_PCAP, "Core[%d] End.\n", lcore_id);
+	return ret;
+}
+
+/**
+ * Main function
+ *
+ * Return SPP_RET_NG explicitly if error is occurred.
+ */
+int
+main(int argc, char *argv[])
+{
+	int ret = SPP_RET_NG;
+#ifdef SPP_DEMONIZE
+	/* Daemonize process */
+	int ret_daemon = daemon(0, 0);
+	if (unlikely(ret_daemon != 0)) {
+		RTE_LOG(ERR, SPP_PCAP, "daemonize is failed. (ret = %d)\n",
+				ret_daemon);
+		return ret_daemon;
+	}
+#endif
+
+	/* Signal handler registration (SIGTERM/SIGINT) */
+	signal(SIGTERM, stop_process);
+	signal(SIGINT,  stop_process);
+
+	while (1) {
+		int ret_eal = rte_eal_init(argc, argv);
+		if (unlikely(ret_eal < 0))
+			break;
+
+		argc -= ret_eal;
+		argv += ret_eal;
+
+		/* Parse spp_pcap specific parameters */
+		int ret_parse = parse_args(argc, argv);
+		if (unlikely(ret_parse != 0))
+			break;
+
+		/* Get lcore id of main thread to set its status after */
+		g_main_lcore_id = rte_lcore_id();
+
+		/* set manage address */
+		if (spp_set_mng_data_addr(&g_startup_param,
+					  &g_iface_info,
+					  g_core_info,
+					  &g_capture_request,
+					  &g_capture_status,
+					  g_main_lcore_id) < 0) {
+			RTE_LOG(ERR, SPP_PCAP,
+				"manage address set is failed.\n");
+			break;
+		}
+
+		int ret_mng = init_mng_data();
+		if (unlikely(ret_mng != 0))
+			break;
+
+		spp_port_ability_init();
+
+		/* Setup connection for accepting commands from controller */
+		int ret_command_init = spp_command_proc_init(
+				g_startup_param.server_ip,
+				g_startup_param.server_port);
+		if (unlikely(ret_command_init != SPP_RET_OK))
+			break;
+
+		/* capture port setup */
+		struct spp_port_info *port_cap = &g_pcap_option.port_cap;
+		struct spp_port_info *port_info = get_iface_info(
+						port_cap->iface_type,
+						port_cap->iface_no);
+		if (port_info == NULL) {
+			RTE_LOG(ERR, SPP_PCAP, "caputre port undefined.\n");
+			break;
+		}
+		if (port_cap->iface_type == PHY) {
+			if (port_info->iface_type != UNDEF)
+				port_cap->dpdk_port = port_info->dpdk_port;
+			else {
+				RTE_LOG(ERR, SPP_PCAP,
+					"caputre port undefined.(phy:%d)\n",
+							port_cap->iface_no);
+				break;
+			}
+		} else {
+			if (port_info->iface_type == UNDEF) {
+				ret = add_ring_pmd(port_info->iface_no);
+				if (ret == SPP_RET_NG) {
+					RTE_LOG(ERR, SPP_PCAP, "caputre port "
+						"undefined.(ring:%d)\n",
+						port_cap->iface_no);
+					break;
+				}
+				port_cap->dpdk_port = ret;
+			} else {
+				RTE_LOG(ERR, SPP_PCAP, "caputre port "
+						"undefined.(ring:%d)\n",
+						port_cap->iface_no);
+				break;
+			}
+		}
+		RTE_LOG(DEBUG, SPP_PCAP,
+				"Recv port type=%d, no=%d, port_id=%d\n",
+				port_cap->iface_type, port_cap->iface_no,
+				port_cap->dpdk_port);
+
+		/* create ring */
+		char ring_name[PORT_STR_SIZE];
+		memset(ring_name, 0x00, PORT_STR_SIZE);
+		snprintf(ring_name, PORT_STR_SIZE,  "cap_ring_%d",
+						g_startup_param.client_id);
+		g_pcap_option.cap_ring = rte_ring_create(ring_name,
+					rte_align32pow2(RING_SIZE),
+					rte_socket_id(), 0);
+		if (g_pcap_option.cap_ring == NULL) {
+			RTE_LOG(ERR, SPP_PCAP, "ring create error(%s).\n",
+						rte_strerror(rte_errno));
+			break;
+		}
+		RTE_LOG(DEBUG, SPP_PCAP, "Ring port name=%s, flags=0x%x\n",
+				g_pcap_option.cap_ring->name,
+				g_pcap_option.cap_ring->flags);
+
+		/* Start worker threads of recive or write */
+		unsigned int lcore_id = 0;
+		unsigned int thread_no = 0;
+		RTE_LCORE_FOREACH_SLAVE(lcore_id) {
+			g_pcap_info[lcore_id].thread_no = thread_no++;
+			rte_eal_remote_launch(slave_main, NULL, lcore_id);
+		}
+
+		/* Set the status of main thread to idle */
+		g_core_info[g_main_lcore_id].status = SPP_CORE_IDLE;
+		int ret_wait = check_core_status_wait(SPP_CORE_IDLE);
+		if (unlikely(ret_wait != 0))
+			break;
+
+		/* Start secondary */
+		set_all_core_status(SPP_CORE_FORWARD);
+		RTE_LOG(INFO, SPP_PCAP, "[Press Ctrl-C to quit ...]\n");
+
+		/* Enter loop for accepting commands */
+		int ret_do = 0;
+		while (likely(g_core_info[g_main_lcore_id].status !=
+				SPP_CORE_STOP_REQUEST)) {
+			/* Receive command */
+			ret_do = spp_command_proc_do();
+			if (unlikely(ret_do != SPP_RET_OK))
+				break;
+
+			/*
+			 * Wait to avoid CPU overloaded.
+			 */
+			usleep(100);
+		}
+
+		if (unlikely(ret_do != SPP_RET_OK)) {
+			set_all_core_status(SPP_CORE_STOP_REQUEST);
+			break;
+		}
+
+		ret = SPP_RET_OK;
+		break;
+	}
+
+	/* Finalize to exit */
+	if (g_main_lcore_id == rte_lcore_id()) {
+		g_core_info[g_main_lcore_id].status = SPP_CORE_STOP;
+		int ret_core_end = check_core_status_wait(SPP_CORE_STOP);
+		if (unlikely(ret_core_end != 0))
+			RTE_LOG(ERR, SPP_PCAP, "Core did not stop.\n");
+
+		/* capture write ring free */
+		if (g_pcap_option.cap_ring != NULL)
+			rte_ring_free(g_pcap_option.cap_ring);
+	}
+
+
+	RTE_LOG(INFO, SPP_PCAP, "spp_pcap exit.\n");
+	return ret;
+}
diff --git a/src/pcap/spp_pcap.h b/src/pcap/spp_pcap.h
new file mode 100644
index 0000000..d282226
--- /dev/null
+++ b/src/pcap/spp_pcap.h
@@ -0,0 +1,35 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Nippon Telegraph and Telephone Corporation
+ */
+
+#ifndef __SPP_PCAP_H__
+#define __SPP_PCAP_H__
+
+#include "spp_proc.h"
+
+/**
+ * @file
+ * SPP_PCAP main
+ *
+ * Main function of spp_pcap.
+ * This provides the function for initializing and starting the threads.
+ *
+ */
+
+/**
+ * Pcap get core status
+ *
+ * @param lcore_id
+ *  The logical core ID for forwarder and merger.
+ * @param params
+ *  The pointer to struct spp_iterate_core_params.@n
+ *  Detailed data of pcap status.
+ *
+ * @retval SPP_RET_OK succeeded.
+ * @retval SPP_RET_NG failed.
+ */
+int spp_pcap_get_core_status(
+		unsigned int lcore_id,
+		struct spp_iterate_core_params *params);
+
+#endif /* __SPP_PCAP_H__ */
-- 
2.17.1

  parent reply	other threads:[~2019-02-08  8:46 UTC|newest]

Thread overview: 7+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
     [not found] <20190208084438.7952-1-x-fn-spp@sl.ntt-tx.co.jp>
2019-02-08  8:44 ` [spp] [PATCH v2 1/7] spp_pcap: add command main x-fn-spp
2019-02-08  8:44 ` [spp] [PATCH v2 2/7] spp_pcap: add command decode x-fn-spp
2019-02-08  8:44 ` [spp] [PATCH v2 3/7] spp_pcap: add management function x-fn-spp
2019-02-08  8:44 ` x-fn-spp [this message]
2019-02-08  8:44 ` [spp] [PATCH v2 5/7] spp_pcap: add Makefile for spp_pcap x-fn-spp
2019-02-08  8:44 ` [spp] [PATCH v2 6/7] controller: add SppPcap class x-fn-spp
2019-02-08  8:44 ` [spp] [PATCH v2 7/7] controller: add pcap command to SPP controller x-fn-spp

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=201902080844.x188icwT030823@imss04.silk.ntt-tx.co.jp \
    --to=x-fn-spp@sl.ntt-tx.co.jp \
    --cc=ferruh.yigit@intel.com \
    --cc=ogawa.yasufumi@lab.ntt.co.jp \
    --cc=spp@dpdk.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).