DPDK patches and discussions
 help / color / mirror / Atom feed
From: Reshma Pattan <reshma.pattan@intel.com>
To: dev@dpdk.org
Subject: [dpdk-dev] [PATCH v3 4/5] lib/librte_eal: add tcpdump support in primary process
Date: Wed,  2 Mar 2016 12:16:09 +0000	[thread overview]
Message-ID: <1456920970-2047-5-git-send-email-reshma.pattan@intel.com> (raw)
In-Reply-To: <1456920970-2047-1-git-send-email-reshma.pattan@intel.com>

Added tcpdump functionality to eal interrupt thread.

Enhanced interrupt thread to support tcpdump socket
and message processing from secondary.

Created new mempool and rings to handle packets of tcpdump.

Added rte_eth_rxtx_callbacks for ingress/egress packets processing
for tcpdump.

Added functionality to remove registered rte_eth_rxtx_callbacks
once secondary process is terminated.

Signed-off-by: Reshma Pattan <reshma.pattan@intel.com>
---
 lib/librte_eal/linuxapp/eal/Makefile         |    5 +-
 lib/librte_eal/linuxapp/eal/eal_interrupts.c |  422 +++++++++++++++++++++++++-
 2 files changed, 424 insertions(+), 3 deletions(-)

diff --git a/lib/librte_eal/linuxapp/eal/Makefile b/lib/librte_eal/linuxapp/eal/Makefile
index 6e26250..425152c 100644
--- a/lib/librte_eal/linuxapp/eal/Makefile
+++ b/lib/librte_eal/linuxapp/eal/Makefile
@@ -1,6 +1,6 @@
 #   BSD LICENSE
 #
-#   Copyright(c) 2010-2015 Intel Corporation. All rights reserved.
+#   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
 #   All rights reserved.
 #
 #   Redistribution and use in source and binary forms, with or without
@@ -47,6 +47,9 @@ CFLAGS += -I$(RTE_SDK)/lib/librte_eal/common/include
 CFLAGS += -I$(RTE_SDK)/lib/librte_ring
 CFLAGS += -I$(RTE_SDK)/lib/librte_mempool
 CFLAGS += -I$(RTE_SDK)/lib/librte_ivshmem
+CFLAGS += -I$(RTE_SDK)/lib/librte_mbuf
+CFLAGS += -I$(RTE_SDK)/lib/librte_ether
+CFLAGS += -I$(RTE_SDK)/lib/librte_net
 CFLAGS += $(WERROR_FLAGS) -O3
 
 # specific to linuxapp exec-env
diff --git a/lib/librte_eal/linuxapp/eal/eal_interrupts.c b/lib/librte_eal/linuxapp/eal/eal_interrupts.c
index 06b26a9..3b82b7b 100644
--- a/lib/librte_eal/linuxapp/eal/eal_interrupts.c
+++ b/lib/librte_eal/linuxapp/eal/eal_interrupts.c
@@ -1,7 +1,7 @@
 /*-
  *   BSD LICENSE
  *
- *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
  *   All rights reserved.
  *
  *   Redistribution and use in source and binary forms, with or without
@@ -45,7 +45,11 @@
 #include <sys/signalfd.h>
 #include <sys/ioctl.h>
 #include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <sys/un.h>
 #include <assert.h>
+#include <arpa/inet.h>
+#include <sys/stat.h>
 
 #include <rte_common.h>
 #include <rte_interrupts.h>
@@ -65,15 +69,40 @@
 #include <rte_malloc.h>
 #include <rte_errno.h>
 #include <rte_spinlock.h>
+#include <rte_memcpy.h>
+#include <rte_mbuf.h>
+#include <rte_ethdev.h>
+#include <rte_ether.h>
+#include <rte_ip.h>
 
 #include "eal_private.h"
 #include "eal_vfio.h"
 #include "eal_thread.h"
+#include "eal_internal_cfg.h"
 
 #define EAL_INTR_EPOLL_WAIT_FOREVER (-1)
 #define NB_OTHER_INTR               1
+#define TCPDUMP_SOCKET_PATH "%s/tcpdump_mp_socket"
+#define TCPDUMP_SOCKET_ERR 0xFF
+#define TCPDUMP_REQ 0x1
+#define RING_SIZE 1024
+#define BURST_SIZE 32
+#define NUM_MBUFS 65536
+#define MBUF_CACHE_SIZE 250
+#define MAX_CBS 54
+#define PORT_QUEUE_SIZE 5
 
 static RTE_DEFINE_PER_LCORE(int, _epfd) = -1; /**< epoll fd per thread */
+static uint32_t src_ip_filter;
+static int tcpdump_socket_fd;
+struct rte_ring *prim_to_sec_rx;
+struct rte_ring *prim_to_sec_tx;
+struct rte_mempool *tcpdump_pktmbuf_pool;
+static struct rxtx_cbs {
+	uint8_t port;
+	uint16_t queue;
+	struct rte_eth_rxtx_callback *cb;
+} rx_cbs[MAX_CBS], tx_cbs[MAX_CBS];
 
 /**
  * union for pipe fds.
@@ -644,6 +673,306 @@ rte_intr_disable(struct rte_intr_handle *intr_handle)
 	return 0;
 }
 
+static inline void
+tcpdump_pktmbuf_duplicate(struct rte_mbuf *mi, struct rte_mbuf *m)
+{
+
+	mi->data_len = m->data_len;
+	mi->port = m->port;
+	mi->vlan_tci = m->vlan_tci;
+	mi->vlan_tci_outer = m->vlan_tci_outer;
+	mi->tx_offload = m->tx_offload;
+	mi->hash = m->hash;
+
+	mi->pkt_len = mi->data_len;
+	mi->ol_flags = m->ol_flags;
+	mi->packet_type = m->packet_type;
+
+	rte_memcpy(rte_pktmbuf_mtod(mi, void *),
+			rte_pktmbuf_mtod(m, void *),
+			rte_pktmbuf_data_len(mi));
+
+	__rte_mbuf_sanity_check(mi, 1);
+	__rte_mbuf_sanity_check(m, 0);
+}
+
+static inline struct rte_mbuf *
+tcpdump_pktmbuf_clone(struct rte_mbuf *md, struct rte_mempool *mp)
+{
+	struct rte_mbuf *mc, *mi, **prev;
+	uint32_t pktlen;
+	uint8_t nseg;
+
+	mc = rte_pktmbuf_alloc(mp);
+	if (unlikely(mc == NULL))
+		return NULL;
+
+	mi = mc;
+	prev = &mi->next;
+	pktlen = md->pkt_len;
+	nseg = 0;
+
+	do {
+		nseg++;
+		tcpdump_pktmbuf_duplicate(mi, md);
+		*prev = mi;
+		prev = &mi->next;
+	} while ((md = md->next) != NULL &&
+			(mi = rte_pktmbuf_alloc(mp)) != NULL);
+
+	*prev = NULL;
+	mc->nb_segs = nseg;
+	mc->pkt_len = pktlen;
+
+	/* Allocation of new indirect segment failed */
+	if (unlikely(mi == NULL)) {
+		rte_pktmbuf_free(mc);
+		return NULL;
+	}
+
+	__rte_mbuf_sanity_check(mc, 1);
+	return mc;
+
+}
+
+static int
+compare_filter(struct rte_mbuf *pkt)
+{
+	struct ipv4_hdr *pkt_hdr = rte_pktmbuf_mtod_offset(pkt, struct ipv4_hdr *,
+						sizeof(struct ether_hdr));
+	if (!src_ip_filter)
+		return 0;
+	else if (pkt_hdr->src_addr != src_ip_filter)
+		return -1;
+
+	return 0;
+}
+
+static uint16_t
+tcpdump_rx(uint8_t port __rte_unused, uint16_t qidx __rte_unused,
+	struct rte_mbuf **pkts, uint16_t nb_pkts,
+	uint16_t max_pkts __rte_unused, void *_ __rte_unused)
+{
+	unsigned i;
+	uint16_t filtered_pkts = 0;
+	int ring_enq = 0;
+	struct rte_mbuf *dup_bufs[nb_pkts];
+
+	for (i = 0; i < nb_pkts; i++) {
+		if (compare_filter(pkts[i]) == 0)
+			dup_bufs[filtered_pkts++] = tcpdump_pktmbuf_clone(pkts[i],
+							tcpdump_pktmbuf_pool);
+	}
+
+	ring_enq = rte_ring_enqueue_burst(prim_to_sec_rx, (void *)dup_bufs,
+						filtered_pkts);
+	if (unlikely(ring_enq < filtered_pkts)) {
+		do {
+			rte_pktmbuf_free(dup_bufs[ring_enq]);
+		} while (++ring_enq < filtered_pkts);
+	}
+	return nb_pkts;
+}
+
+static uint16_t
+tcpdump_tx(uint8_t port __rte_unused, uint16_t qidx __rte_unused,
+		struct rte_mbuf **pkts, uint16_t nb_pkts,
+		void *_ __rte_unused)
+{
+	int i;
+	int ring_enq = 0;
+	uint16_t filtered_pkts = 0;
+	struct rte_mbuf *dup_bufs[nb_pkts];
+
+	/*
+	 * Increment reference count of mbuf to avoid accidental returrn of mbuf
+	 * to pool while tcpdump processing is still on.
+	 */
+	for (i = 0; i < nb_pkts; i++) {
+		if (compare_filter(pkts[i]) == 0) {
+			rte_pktmbuf_refcnt_update(pkts[i], 1);
+			dup_bufs[filtered_pkts++] = pkts[i];
+		}
+	}
+
+	ring_enq = rte_ring_enqueue_burst(prim_to_sec_tx, (void *)dup_bufs,
+						filtered_pkts);
+	if (unlikely(ring_enq < filtered_pkts)) {
+		do {
+			rte_pktmbuf_free(dup_bufs[ring_enq]);
+		} while (++ring_enq < filtered_pkts);
+	}
+	return nb_pkts;
+}
+
+static void
+tcpdump_create_mpool_n_rings(void)
+{
+	/* Create the mbuf pool */
+	tcpdump_pktmbuf_pool = rte_pktmbuf_pool_create("tcpdump_pktmbuf_pool", NUM_MBUFS,
+			MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
+	if (tcpdump_pktmbuf_pool == NULL)
+		rte_exit(EXIT_FAILURE, "Could not initialize tcpdump_pktmbuf_pool\n");
+
+	/* Create rings */
+	prim_to_sec_rx = rte_ring_create("prim_to_sec_rx", RING_SIZE, rte_socket_id(),
+						RING_F_SC_DEQ);
+	prim_to_sec_tx = rte_ring_create("prim_to_sec_tx", RING_SIZE, rte_socket_id(),
+						RING_F_SC_DEQ);
+}
+
+static void
+tcpdump_register_rx_callbacks(int port, int queue)
+{
+	static int cnt;
+
+	rx_cbs[cnt].port = port;
+	rx_cbs[cnt].queue = queue;
+	rx_cbs[cnt].cb = rte_eth_add_rx_callback(port, queue, tcpdump_rx, NULL);
+	cnt++;
+}
+
+static void
+tcpdump_register_tx_callbacks(int port, int queue)
+{
+	static int cnt;
+
+	tx_cbs[cnt].port = port;
+	tx_cbs[cnt].queue = queue;
+	tx_cbs[cnt].cb = rte_eth_add_tx_callback(port, queue, tcpdump_tx, NULL);
+	cnt++;
+}
+
+static void
+tcpdump_remove_rx_callbacks(int port, int queue)
+{
+	int i;
+
+	for (i = 0; i < MAX_CBS; i++) {
+		if ((rx_cbs[i].port == port) && (rx_cbs[i].queue == queue))
+			rte_eth_remove_rx_callback(port, queue, rx_cbs[i].cb);
+	}
+}
+
+static void
+tcpdump_remove_tx_callbacks(int port, int queue)
+{
+	int i;
+
+	for (i = 0; i < MAX_CBS; i++) {
+		if ((tx_cbs[i].port == port) && (tx_cbs[i].queue == queue))
+			rte_eth_remove_tx_callback(port, queue, tx_cbs[i].cb);
+	}
+}
+
+/* receive a request and return it */
+static int
+tcpdump_receive_request(int socket)
+{
+	struct msghdr reg_cbs_msg;
+	struct iovec msg[3];
+
+	char buffer[256];
+	char port[PORT_QUEUE_SIZE], queue[PORT_QUEUE_SIZE];
+	char *buf;
+
+	int msg_type;
+	int rval;
+	int buf_offset;
+	int i;
+	uint8_t port_id;
+	uint16_t queue_id;
+	uint16_t nb_rxq, nb_txq;
+
+	memset(&reg_cbs_msg, 0, sizeof(reg_cbs_msg));
+	reg_cbs_msg.msg_iov = msg;
+	reg_cbs_msg.msg_iovlen = 3;
+
+	msg[0].iov_base = (int *) &msg_type;
+	msg[0].iov_len = sizeof(int);
+
+	msg[1].iov_base = (char *) buffer;
+	msg[1].iov_len = sizeof(buffer);
+
+	msg[2].iov_base = (char *) &src_ip_filter;
+	msg[2].iov_len = sizeof(uint32_t);
+
+	rval = recvmsg(socket, &reg_cbs_msg, 0);
+	if (rval < 0) {
+		RTE_LOG(ERR, EAL, "Error reading from file descriptor %d: %s\n",
+				socket,
+				strerror(errno));
+		return -1;
+	} else if (rval == 0) {
+		RTE_LOG(ERR, EAL, "Read nothing from file "
+				"descriptor %d\n", socket);
+		return -1;
+	}
+
+	buf = buffer;
+
+	/* Update port and queue */
+	while (sscanf(buf, "%[^','],%[^','],%n", port, queue, &buf_offset) == 2) {
+		port_id = atoi(port);
+		queue_id = atoi(queue);
+		if (msg_type == 2) {
+			if (!strcmp(queue, "*")) {
+				nb_rxq = rte_eth_devices[port_id].data->nb_rx_queues;
+				nb_txq = rte_eth_devices[port_id].data->nb_tx_queues;
+				for (i = 0; i < nb_rxq; i++)
+					tcpdump_register_rx_callbacks(port_id, i);
+				for (i = 0; i < nb_txq; i++)
+					tcpdump_register_tx_callbacks(port_id, i);
+			} else {
+				tcpdump_register_rx_callbacks(port_id, queue_id);
+				tcpdump_register_tx_callbacks(port_id, queue_id);
+			}
+		} else if (msg_type == 1) {
+			if (!strcmp(queue, "*")) {
+				nb_rxq = rte_eth_devices[port_id].data->nb_rx_queues;
+				nb_txq = rte_eth_devices[port_id].data->nb_tx_queues;
+				for (i = 0; i < nb_rxq; i++)
+					tcpdump_remove_rx_callbacks(port_id, i);
+				for (i = 0; i < nb_txq; i++)
+					tcpdump_remove_tx_callbacks(port_id, i);
+			} else {
+				tcpdump_remove_rx_callbacks(port_id, queue_id);
+				tcpdump_remove_tx_callbacks(port_id, queue_id);
+			}
+		}
+		buf += buf_offset;
+	}
+
+	return 0;
+}
+
+static void
+tcpdump_socket_ready(int socket)
+{
+	for (;;) {
+		int conn_sock;
+		struct sockaddr_un addr;
+
+		socklen_t sockaddr_len = sizeof(addr);
+		/* this is a blocking call */
+		conn_sock = accept(socket, (struct sockaddr *) &addr, &sockaddr_len);
+		/* just restart on error */
+		if (conn_sock == -1)
+			continue;
+
+		/* set socket to linger after close */
+		struct linger l;
+
+		l.l_onoff = 1;
+		l.l_linger = 60;
+		setsockopt(conn_sock, SOL_SOCKET, SO_LINGER, &l, sizeof(l));
+
+		tcpdump_receive_request(conn_sock);
+		close(conn_sock);
+		break;
+	}
+}
+
 static int
 eal_intr_process_interrupts(struct epoll_event *events, int nfds)
 {
@@ -655,6 +984,13 @@ eal_intr_process_interrupts(struct epoll_event *events, int nfds)
 
 	for (n = 0; n < nfds; n++) {
 
+		if (internal_config.process_type == RTE_PROC_PRIMARY) {
+
+			/** tcpdump socket fd */
+			if (events[n].data.fd ==  tcpdump_socket_fd)
+				tcpdump_socket_ready(tcpdump_socket_fd);
+		}
+
 		/**
 		 * if the pipe fd is ready to read, return out to
 		 * rebuild the wait list.
@@ -786,6 +1122,61 @@ eal_intr_handle_interrupts(int pfd, unsigned totalfds)
 	}
 }
 
+/* get socket path (/var/run if root, $HOME otherwise) */
+	static void
+tcpdump_get_socket_path(char *buffer, int bufsz)
+{
+	const char *dir = "/var/run/tcpdump_socket";
+	const char *home_dir = getenv("HOME/tcpdump_socket");
+
+	if (getuid() != 0 && home_dir != NULL)
+		dir = home_dir;
+	mkdir(dir, 700);
+	/* use current prefix as file path */
+	snprintf(buffer, bufsz, TCPDUMP_SOCKET_PATH, dir);
+}
+
+static int
+tcpdump_create_primary_socket(void)
+{
+	int ret, socket_fd;
+	struct sockaddr_un addr;
+	socklen_t sockaddr_len;
+
+	/* set up a socket */
+	socket_fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+	if (socket_fd < 0) {
+		RTE_LOG(ERR, EAL, "Failed to create socket!\n");
+		return -1;
+	}
+
+	tcpdump_get_socket_path(addr.sun_path, sizeof(addr.sun_path));
+	addr.sun_family = AF_UNIX;
+	sockaddr_len = sizeof(struct sockaddr_un);
+
+	/* unlink() before bind() to remove the socket if it already exists */
+	unlink(addr.sun_path);
+
+	ret = bind(socket_fd, (struct sockaddr *) &addr, sockaddr_len);
+	if (ret) {
+		RTE_LOG(ERR, EAL, "Failed to bind socket: %s!\n", strerror(errno));
+		close(socket_fd);
+		return -1;
+	}
+
+	ret = listen(socket_fd, 1);
+	if (ret) {
+		RTE_LOG(ERR, EAL, "Failed to listen: %s!\n", strerror(errno));
+		close(socket_fd);
+		return -1;
+	}
+
+	/* save the socket in local configuration */
+	tcpdump_socket_fd = socket_fd;
+
+	return 0;
+}
+
 /**
  * It builds/rebuilds up the epoll file descriptor with all the
  * file descriptors being waited on. Then handles the interrupts.
@@ -800,9 +1191,9 @@ static __attribute__((noreturn)) void *
 eal_intr_thread_main(__rte_unused void *arg)
 {
 	struct epoll_event ev;
-
 	/* host thread, never break out */
 	for (;;) {
+
 		/* build up the epoll fd with all descriptors we are to
 		 * wait on then pass it to the handle_interrupts function
 		 */
@@ -829,6 +1220,23 @@ eal_intr_thread_main(__rte_unused void *arg)
 		}
 		numfds++;
 
+		/* build up the epoll fd with tcpdump descriptor.
+		 */
+		static struct epoll_event tcpdump_event = {
+			.events = EPOLLIN | EPOLLPRI,
+		};
+
+		if (internal_config.process_type == RTE_PROC_PRIMARY) {
+			tcpdump_event.data.fd = tcpdump_socket_fd;
+			if (epoll_ctl(pfd, EPOLL_CTL_ADD, tcpdump_socket_fd,
+						&tcpdump_event) < 0) {
+				rte_panic("Error adding tcpdump socket fd to %d "
+						"epoll_ctl, %s\n",
+						tcpdump_socket_fd, strerror(errno));
+			}
+			numfds++;
+		}
+
 		rte_spinlock_lock(&intr_lock);
 
 		TAILQ_FOREACH(src, &intr_sources, next) {
@@ -877,6 +1285,16 @@ rte_eal_intr_init(void)
 	if (pipe(intr_pipe.pipefd) < 0)
 		return -1;
 
+	/* if primary, try to open tcpdump socket */
+	if (internal_config.process_type == RTE_PROC_PRIMARY) {
+		if (tcpdump_create_primary_socket() < 0) {
+			RTE_LOG(ERR, EAL, "Failed to set up tcpdump_socket_fd for "
+					"tcpdump in primary\n");
+			return -1;
+		}
+		tcpdump_create_mpool_n_rings();
+	}
+
 	/* create the host thread to wait/handle the interrupt */
 	ret = pthread_create(&intr_thread, NULL,
 			eal_intr_thread_main, NULL);
-- 
1.7.4.1

  parent reply	other threads:[~2016-03-02 12:16 UTC|newest]

Thread overview: 21+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2016-02-12 14:57 [dpdk-dev] [PATCH v2 0/5] add dpdk packet capture support for tcpdump Reshma Pattan
2016-02-12 14:57 ` [dpdk-dev] [PATCH v2 1/5] app/test-pmd: fix nb_rxq and nb_txq checks Reshma Pattan
2016-02-12 14:57 ` [dpdk-dev] [PATCH v2 2/5] drivers/net/pcap: add public api to create pcap device Reshma Pattan
2016-02-17  9:03   ` Pavel Fedin
2016-02-12 14:57 ` [dpdk-dev] [PATCH v2 3/5] app/proc_info: add tcpdump support in secondary process Reshma Pattan
2016-02-12 14:57 ` [dpdk-dev] [PATCH v2 4/5] lib/librte_eal: add tcpdump support in primary process Reshma Pattan
2016-02-17  9:57   ` Pavel Fedin
2016-02-12 14:57 ` [dpdk-dev] [PATCH v2 5/5] doc: update doc for tcpdump feature Reshma Pattan
2016-02-22 10:01   ` Mcnamara, John
2016-02-18 14:08 ` [dpdk-dev] [PATCH v2 0/5] add dpdk packet capture support for tcpdump Pavel Fedin
2016-02-23 13:16   ` Pattan, Reshma
2016-02-24 15:04     ` Pavel Fedin
2016-02-29 16:11       ` Pattan, Reshma
2016-03-02 12:16 ` [dpdk-dev] [PATCH v3 " Reshma Pattan
2016-03-02 12:16   ` [dpdk-dev] [PATCH v3 1/5] app/test-pmd: fix nb_rxq and nb_txq checks Reshma Pattan
2016-03-02 12:16   ` [dpdk-dev] [PATCH v3 2/5] drivers/net/pcap: add public api to create pcap device Reshma Pattan
2016-03-02 12:16   ` [dpdk-dev] [PATCH v3 3/5] app/proc_info: add tcpdump support in secondary process Reshma Pattan
2016-03-02 12:16   ` Reshma Pattan [this message]
2016-03-02 12:16   ` [dpdk-dev] [PATCH v3 5/5] doc: update doc for tcpdump feature Reshma Pattan
2016-03-09  0:33   ` [dpdk-dev] [PATCH v3 0/5] add dpdk packet capture support for tcpdump Thomas Monjalon
2016-03-11 14:18     ` Pattan, Reshma

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1456920970-2047-5-git-send-email-reshma.pattan@intel.com \
    --to=reshma.pattan@intel.com \
    --cc=dev@dpdk.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).