* [dpdk-dev] [PATCH 1/4] eal: add tailq for new distributor component
2014-05-20 10:00 [dpdk-dev] [PATCH 0/4] New library: rte_distributor Bruce Richardson
@ 2014-05-20 10:00 ` Bruce Richardson
2014-05-20 10:00 ` [dpdk-dev] [PATCH 2/4] distributor: new packet distributor library Bruce Richardson
` (10 subsequent siblings)
11 siblings, 0 replies; 29+ messages in thread
From: Bruce Richardson @ 2014-05-20 10:00 UTC (permalink / raw)
To: dev
add new tailq to the EAL for new distributor library component.
Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
lib/librte_eal/common/include/rte_tailq_elem.h | 2 ++
1 file changed, 2 insertions(+)
diff --git a/lib/librte_eal/common/include/rte_tailq_elem.h b/lib/librte_eal/common/include/rte_tailq_elem.h
index 2de4010..fdd2faf 100644
--- a/lib/librte_eal/common/include/rte_tailq_elem.h
+++ b/lib/librte_eal/common/include/rte_tailq_elem.h
@@ -82,6 +82,8 @@ rte_tailq_elem(RTE_TAILQ_PM, "RTE_PM")
rte_tailq_elem(RTE_TAILQ_ACL, "RTE_ACL")
+rte_tailq_elem(RTE_TAILQ_DISTRIBUTOR, "RTE_DISTRIBUTOR")
+
rte_tailq_end(RTE_TAILQ_NUM)
#undef rte_tailq_elem
--
1.9.0
^ permalink raw reply [flat|nested] 29+ messages in thread
* [dpdk-dev] [PATCH 2/4] distributor: new packet distributor library
2014-05-20 10:00 [dpdk-dev] [PATCH 0/4] New library: rte_distributor Bruce Richardson
2014-05-20 10:00 ` [dpdk-dev] [PATCH 1/4] eal: add tailq for new distributor component Bruce Richardson
@ 2014-05-20 10:00 ` Bruce Richardson
2014-05-20 18:18 ` Neil Horman
2014-05-20 10:00 ` [dpdk-dev] [PATCH 3/4] distributor: add distributor library to build Bruce Richardson
` (9 subsequent siblings)
11 siblings, 1 reply; 29+ messages in thread
From: Bruce Richardson @ 2014-05-20 10:00 UTC (permalink / raw)
To: dev
This adds the code for a new Intel DPDK library for packet distribution.
The distributor is a component which is designed to pass packets
one-at-a-time to workers, with dynamic load balancing. Using the RSS
field in the mbuf as a tag, the distributor tracks what packet tag is
being processed by what worker and then ensures that no two packets with
the same tag are in-flight simultaneously. Once a tag is not in-flight,
then the next packet with that tag will be sent to the next available
core.
Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
lib/librte_distributor/Makefile | 50 ++++
lib/librte_distributor/rte_distributor.c | 417 +++++++++++++++++++++++++++++++
lib/librte_distributor/rte_distributor.h | 173 +++++++++++++
3 files changed, 640 insertions(+)
create mode 100644 lib/librte_distributor/Makefile
create mode 100644 lib/librte_distributor/rte_distributor.c
create mode 100644 lib/librte_distributor/rte_distributor.h
diff --git a/lib/librte_distributor/Makefile b/lib/librte_distributor/Makefile
new file mode 100644
index 0000000..36699f8
--- /dev/null
+++ b/lib/librte_distributor/Makefile
@@ -0,0 +1,50 @@
+# BSD LICENSE
+#
+# Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Intel Corporation nor the names of its
+# contributors may be used to endorse or promote products derived
+# from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# library name
+LIB = librte_distributor.a
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
+
+# all source are stored in SRCS-y
+SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) := rte_distributor.c
+
+# install this header file
+SYMLINK-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR)-include := rte_distributor.h
+
+# this lib needs eal
+DEPDIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += lib/librte_eal
+DEPDIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += lib/librte_mbuf
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c
new file mode 100644
index 0000000..cc8384e
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor.c
@@ -0,0 +1,417 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <sys/queue.h>
+#include <string.h>
+#include <rte_mbuf.h>
+#include <rte_memzone.h>
+#include <rte_errno.h>
+#include <rte_string_fns.h>
+#include <rte_tailq.h>
+#include <rte_eal_memconfig.h>
+#include "rte_distributor.h"
+
+#define NO_FLAGS 0
+#define RTE_DISTRIB_PREFIX "DT_"
+
+/* we will use the bottom four bits of pointer for flags, shifting out
+ * the top four bits to make room (since a 64-bit pointer actually only uses
+ * 48 bits). An arithmetic-right-shift will then appropriately restore the
+ * original pointer value with proper sign extension into the top bits. */
+#define RTE_DISTRIB_FLAG_BITS 4
+#define RTE_DISTRIB_FLAGS_MASK (0x0F)
+#define RTE_DISTRIB_NO_BUF 0
+#define RTE_DISTRIB_GET_BUF (1)
+#define RTE_DISTRIB_RETURN_BUF (2)
+
+#define RTE_DISTRIB_BACKLOG_SIZE 8
+#define RTE_DISTRIB_BACKLOG_MASK (RTE_DISTRIB_BACKLOG_SIZE - 1)
+
+#define RTE_DISTRIB_MAX_RETURNS 128
+#define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1)
+
+union rte_distributor_buffer {
+ volatile int64_t bufptr64;
+ char pad[CACHE_LINE_SIZE*3];
+} __rte_cache_aligned;
+
+struct rte_distributor_backlog {
+ unsigned start;
+ unsigned count;
+ int64_t pkts[RTE_DISTRIB_BACKLOG_SIZE];
+};
+
+struct rte_distributor_returned_pkts {
+ unsigned start;
+ unsigned count;
+ struct rte_mbuf *mbufs[RTE_DISTRIB_MAX_RETURNS];
+};
+
+struct rte_distributor {
+ TAILQ_ENTRY(rte_distributor) next; /**< Next in list. */
+
+ char name[RTE_DISTRIBUTOR_NAMESIZE]; /**< Name of the ring. */
+ unsigned num_workers; /**< Number of workers polling */
+
+ uint32_t in_flight_tags[RTE_MAX_LCORE];
+ struct rte_distributor_backlog backlog[RTE_MAX_LCORE];
+
+ union rte_distributor_buffer bufs[RTE_MAX_LCORE];
+
+ struct rte_distributor_returned_pkts returns;
+};
+
+TAILQ_HEAD(rte_distributor_list, rte_distributor);
+
+/**** APIs called by workers ****/
+
+struct rte_mbuf *
+rte_distributor_get_pkt(struct rte_distributor *d,
+ unsigned worker_id, struct rte_mbuf *oldpkt,
+ unsigned reserved __rte_unused)
+{
+ union rte_distributor_buffer *buf = &d->bufs[worker_id];
+ int64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS) | \
+ RTE_DISTRIB_GET_BUF;
+ while (unlikely(buf->bufptr64 & RTE_DISTRIB_FLAGS_MASK))
+ rte_pause();
+ buf->bufptr64 = req;
+ while (buf->bufptr64 & RTE_DISTRIB_GET_BUF)
+ rte_pause();
+ /* since bufptr64 is a signed value, this should be an arithmetic shift */
+ int64_t ret = buf->bufptr64 >> RTE_DISTRIB_FLAG_BITS;
+ return (struct rte_mbuf *)((uintptr_t)ret);
+}
+
+int
+rte_distributor_return_pkt(struct rte_distributor *d,
+ unsigned worker_id, struct rte_mbuf *oldpkt)
+{
+ union rte_distributor_buffer *buf = &d->bufs[worker_id];
+ uint64_t req = ((uintptr_t)oldpkt << RTE_DISTRIB_FLAG_BITS) | \
+ RTE_DISTRIB_RETURN_BUF;
+ buf->bufptr64 = req;
+ return 0;
+}
+
+/**** APIs called on distributor core ***/
+
+/* as name suggests, adds a packet to the backlog for a particular worker */
+static int
+add_to_backlog(struct rte_distributor_backlog *bl, int64_t item)
+{
+ if (bl->count == RTE_DISTRIB_BACKLOG_SIZE)
+ return -1;
+
+ bl->pkts[(bl->start + bl->count++) & (RTE_DISTRIB_BACKLOG_MASK)] = item;
+ return 0;
+}
+
+/* takes the next packet for a worker off the backlog */
+static int64_t
+backlog_pop(struct rte_distributor_backlog *bl)
+{
+ bl->count--;
+ return bl->pkts[bl->start++ & RTE_DISTRIB_BACKLOG_MASK];
+}
+
+/* stores a packet returned from a worker inside the returns array */
+static inline void
+store_return(uintptr_t oldbuf, struct rte_distributor *d,
+ unsigned *ret_start, unsigned *ret_count)
+{
+ /* store returns in a circular buffer - code is branch-free */
+ d->returns.mbufs[(*ret_start + *ret_count)
+ & RTE_DISTRIB_RETURNS_MASK] = (void *)oldbuf;
+ *ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
+ *ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
+}
+
+/* process a set of packets to distribute them to workers */
+int
+rte_distributor_process(struct rte_distributor *d,
+ struct rte_mbuf **mbufs, unsigned num_mbufs)
+{
+ unsigned next_idx = 0;
+ unsigned worker = 0;
+ struct rte_mbuf *next_mb = NULL;
+ int64_t next_value = 0;
+ uint32_t new_tag = 0;
+ unsigned ret_start = d->returns.start,
+ ret_count = d->returns.count;
+
+ while (next_idx < num_mbufs || next_mb != NULL) {
+
+ int64_t data = d->bufs[worker].bufptr64;
+ uintptr_t oldbuf = 0;
+
+ if (!next_mb) {
+ next_mb = mbufs[next_idx++];
+ next_value = (((int64_t)(uintptr_t)next_mb) << RTE_DISTRIB_FLAG_BITS);
+ new_tag = (next_mb->pkt.hash.rss | 1);
+
+ uint32_t match = 0;
+ unsigned i;
+ for (i = 0; i < d->num_workers; i++)
+ match |= (!(d->in_flight_tags[i] ^ new_tag) << i);
+
+ if (match) {
+ next_mb = NULL;
+ unsigned worker = __builtin_ctz(match);
+ if (add_to_backlog(&d->backlog[worker], next_value) < 0)
+ next_idx--;
+ }
+ }
+
+ if ((data & RTE_DISTRIB_GET_BUF) &&
+ (d->backlog[worker].count || next_mb)) {
+
+ if (d->backlog[worker].count)
+ d->bufs[worker].bufptr64 =
+ backlog_pop(&d->backlog[worker]);
+
+ else {
+ d->bufs[worker].bufptr64 = next_value;
+ d->in_flight_tags[worker] = new_tag;
+ next_mb = NULL;
+ }
+ oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
+ }
+ else if (data & RTE_DISTRIB_RETURN_BUF) {
+ d->in_flight_tags[worker] = 0;
+ d->bufs[worker].bufptr64 = 0;
+ if (unlikely(d->backlog[worker].count != 0)) {
+ /* On return of a packet, we need to move the queued packets
+ * for this core elsewhere.
+ * Easiest solution is to set things up for
+ * a recursive call. That will cause those packets to be queued
+ * up for the next free core, i.e. it will return as soon as a
+ * core becomes free to accept the first packet, as subsequent
+ * ones will be added to the backlog for that core.
+ */
+ struct rte_mbuf *pkts[RTE_DISTRIB_BACKLOG_SIZE];
+ unsigned i;
+ struct rte_distributor_backlog *bl = &d->backlog[worker];
+
+ for (i = 0; i < bl->count; i++) {
+ unsigned idx = (bl->start + i) & RTE_DISTRIB_BACKLOG_MASK;
+ pkts[i] = (void *)((uintptr_t)
+ (bl->pkts[idx] >> RTE_DISTRIB_FLAG_BITS));
+ }
+ /* recursive call */
+ rte_distributor_process(d, pkts, i);
+ bl->count = bl->start = 0;
+ }
+ oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
+ }
+
+ /* store returns in a circular buffer */
+ store_return(oldbuf, d, &ret_start, &ret_count);
+
+ if (++worker == d->num_workers)
+ worker = 0;
+ }
+ /* to finish, check all workers for backlog and schedule work for them
+ * if they are ready */
+ for (worker = 0; worker < d->num_workers; worker++)
+ if (d->backlog[worker].count &&
+ (d->bufs[worker].bufptr64 & RTE_DISTRIB_GET_BUF)) {
+
+ int64_t oldbuf = d->bufs[worker].bufptr64 >> RTE_DISTRIB_FLAG_BITS;
+ store_return(oldbuf, d, &ret_start, &ret_count);
+
+ d->bufs[worker].bufptr64 =
+ backlog_pop(&d->backlog[worker]);
+ }
+
+ d->returns.start = ret_start;
+ d->returns.count = ret_count;
+ return num_mbufs;
+}
+
+/* return to the caller, packets returned from workers */
+int
+rte_distributor_returned_pkts(struct rte_distributor *d,
+ struct rte_mbuf **mbufs, unsigned max_mbufs)
+{
+ struct rte_distributor_returned_pkts *returns = &d->returns;
+ unsigned retval = max_mbufs < returns->count ? max_mbufs : returns->count;
+ unsigned i;
+
+ for (i = 0; i < retval; i++)
+ mbufs[i] = returns->mbufs[(returns->start + i) &
+ RTE_DISTRIB_RETURNS_MASK];
+ returns->start += i;
+ returns->count -= i;
+
+ return retval;
+}
+
+/* local function used by the flush function only, to reassign a backlog for
+ * a shutdown core. The process function uses a recursive call for this, but
+ * that is not done in flush, as we need to track the outstanding packets count.
+ */
+static inline int
+move_worker_backlog(struct rte_distributor *d, unsigned worker)
+{
+ struct rte_distributor_backlog *bl = &d->backlog[worker];
+ unsigned i;
+
+ for (i = 0; i < d->num_workers; i++) {
+ if (i == worker)
+ continue;
+ /* check worker is active and then if backlog will fit */
+ if ((d->in_flight_tags[i] != 0 ||
+ (d->bufs[i].bufptr64 & RTE_DISTRIB_GET_BUF)) &&
+ (bl->count + d->backlog[i].count) <= RTE_DISTRIB_BACKLOG_SIZE) {
+ while (bl->count)
+ add_to_backlog(&d->backlog[i], backlog_pop(bl));
+ return 0;
+ }
+ }
+ return -1;
+}
+
+/* flush the distributor, so that there are no outstanding packets in flight or
+ * queued up. */
+int
+rte_distributor_flush(struct rte_distributor *d)
+{
+ unsigned worker, total_outstanding = 0;
+ unsigned flushed = 0;
+ unsigned ret_start = d->returns.start,
+ ret_count = d->returns.count;
+
+ for (worker = 0; worker < d->num_workers; worker++)
+ total_outstanding += d->backlog[worker].count +
+ !!(d->in_flight_tags[worker]);
+
+ worker = 0;
+ while (flushed < total_outstanding) {
+
+ if (d->in_flight_tags[worker] != 0 || d->backlog[worker].count) {
+ const int64_t data = d->bufs[worker].bufptr64;
+ uintptr_t oldbuf = 0;
+
+ if (data & RTE_DISTRIB_GET_BUF) {
+ flushed += (d->in_flight_tags[worker] != 0);
+ if (d->backlog[worker].count) {
+ d->bufs[worker].bufptr64 =
+ backlog_pop(&d->backlog[worker]);
+ /* we need to mark something as being in-flight, but it
+ * doesn't matter what as we never check it except
+ * to check for non-zero.
+ */
+ d->in_flight_tags[worker] = 1;
+ } else {
+ d->bufs[worker].bufptr64 = RTE_DISTRIB_GET_BUF;
+ d->in_flight_tags[worker] = 0;
+ }
+ oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
+ }
+ else if (data & RTE_DISTRIB_RETURN_BUF) {
+ if (d->backlog[worker].count == 0 ||
+ move_worker_backlog(d, worker) == 0) {
+ /* only if we move backlog, process this packet */
+ d->bufs[worker].bufptr64 = 0;
+ oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
+ flushed ++;
+ d->in_flight_tags[worker] = 0;
+ }
+ }
+
+ store_return(oldbuf, d, &ret_start, &ret_count);
+ }
+
+ if (++worker == d->num_workers)
+ worker = 0;
+ }
+ d->returns.start = ret_start;
+ d->returns.count = ret_count;
+
+ return flushed;
+}
+
+/* clears the internal returns array in the distributor */
+void
+rte_distributor_clear_returns(struct rte_distributor *d)
+{
+ d->returns.start = d->returns.count = 0;
+#ifndef __OPTIMIZE__
+ memset(d->returns.mbufs, 0, sizeof(d->returns.mbufs));
+#endif
+}
+
+/* creates a distributor instance */
+struct rte_distributor *
+rte_distributor_create(const char *name,
+ unsigned socket_id,
+ unsigned num_workers,
+ struct rte_distributor_extra_args *args __rte_unused)
+{
+ struct rte_distributor *d;
+ struct rte_distributor_list *distributor_list;
+ char mz_name[RTE_MEMZONE_NAMESIZE];
+ const struct rte_memzone *mz;
+
+ /* compilation-time checks */
+ RTE_BUILD_BUG_ON((sizeof(*d) & CACHE_LINE_MASK) != 0);
+ RTE_BUILD_BUG_ON((RTE_MAX_LCORE & 7) != 0);
+
+ if (name == NULL || num_workers >= RTE_MAX_LCORE) {
+ rte_errno = EINVAL;
+ return NULL;
+ }
+ rte_snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
+ mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
+ if (mz == NULL) {
+ rte_errno = ENOMEM;
+ return NULL;
+ }
+
+ /* check that we have an initialised tail queue */
+ if ((distributor_list = RTE_TAILQ_LOOKUP_BY_IDX(RTE_TAILQ_DISTRIBUTOR,
+ rte_distributor_list)) == NULL) {
+ rte_errno = E_RTE_NO_TAILQ;
+ return NULL;
+ }
+
+ d = mz->addr;
+ rte_snprintf(d->name, sizeof(d->name), "%s", name);
+ d->num_workers = num_workers;
+ TAILQ_INSERT_TAIL(distributor_list, d, next);
+
+ return d;
+}
+
diff --git a/lib/librte_distributor/rte_distributor.h b/lib/librte_distributor/rte_distributor.h
new file mode 100644
index 0000000..d684ff9
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor.h
@@ -0,0 +1,173 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_DISTRIBUTE_H_
+#define _RTE_DISTRIBUTE_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <rte_mbuf.h>
+
+#define RTE_DISTRIBUTOR_NAMESIZE 32 /**< Length of name for instance */
+
+struct rte_distributor;
+
+struct rte_distributor_extra_args { }; /**< reserved for future use*/
+
+/**
+ * Function to create a new distributor instance
+ *
+ * Reserves the memory needed for the distributor operation and
+ * initializes the distributor to work with the configured number of workers.
+ *
+ * @param name
+ * The name to be given to the distributor instance.
+ * @param socket_id
+ * The NUMA node on which the memory is to be allocated
+ * @param num_workers
+ * The maximum number of workers that will request packets from this
+ * distributor
+ * @param extra_args
+ * Reserved for future use, should be passed in as NULL
+ * @return
+ * The newly created distributor instance
+ */
+struct rte_distributor *
+rte_distributor_create(const char *name, unsigned socket_id,
+ unsigned num_workers, struct rte_distributor_extra_args *extra_args);
+
+/**
+ * Process a set of packets by distributing them among workers that request
+ * packets. The distributor will ensure that no two packets that have the
+ * same flow id, or tag, in the mbuf will be procesed at the same time.
+ *
+ * NOTE: This is not thread safe, should only be called in one thread at a time
+ *
+ * @param d
+ * The distributor instance to be used
+ * @param mbufs
+ * The mbufs to be distributed
+ * @param num_mbufs
+ * The number of mbufs in the mbufs array
+ * @return
+ * The number of mbufs processed.
+ */
+int
+rte_distributor_process(struct rte_distributor *d,
+ struct rte_mbuf **mbufs, unsigned num_mbufs);
+
+/**
+ * Get a set of mbufs that have been returned to the distributor by workers
+ *
+ * @param d
+ * The distributor instance to be used
+ * @param mbufs
+ * The mbufs pointer array to be filled in
+ * @param max_mbufs
+ * The size of the mbufs array
+ * @return
+ * The number of mbufs returned in the mbufs array.
+ */
+int
+rte_distributor_returned_pkts(struct rte_distributor *d,
+ struct rte_mbuf **mbufs, unsigned max_mbufs);
+
+/**
+ * Flush the distributor component, so that there are no in-flight or
+ * backlogged packets awaiting processing
+ *
+ * @param d
+ * The distributor instance to be used
+ * @return
+ * The number of queued/in-flight packets that were completed by this call.
+ */
+int
+rte_distributor_flush(struct rte_distributor *d);
+
+/**
+ * Clears the array of returned packets used as the source for the
+ * rte_distributor_returned_pkts() API call.
+ *
+ * @param d
+ * The distributor instance to be used
+ */
+void
+rte_distributor_clear_returns(struct rte_distributor *d);
+
+/**
+ * API called by a worker to get a new packet to process. Any previous packet
+ * given to the worker is assumed to have completed processing, and may be
+ * optionally returned to the distributor via the oldpkt parameter.
+ *
+ * @param d
+ * The distributor instance to be used
+ * @param worker_id
+ * The worker instance number to use - must be less that num_workers passed
+ * at distributor creation time.
+ * @param oldpkt
+ * The previous packet, if any, being processed by the worker
+ * @param reserved
+ * Reserved for future use, should be set to zero.
+ *
+ * @return
+ * A new packet to be processed by the worker thread.
+ */
+struct rte_mbuf *
+rte_distributor_get_pkt(struct rte_distributor *d,
+ unsigned worker_id, struct rte_mbuf *oldpkt, unsigned reserved);
+
+/**
+ * API called by a worker to return a completed packet without requesting a
+ * new packet, for example, because a worker thread is shutting down
+ *
+ * @param d
+ * The distributor instance to be used
+ * @param worker_id
+ * The worker instance number to use - must be less that num_workers passed
+ * at distributor creation time.
+ * @param mbuf
+ * The previous packet being processed by the worker
+ */
+int
+rte_distributor_return_pkt(struct rte_distributor *d, unsigned worker_id,
+ struct rte_mbuf *mbuf);
+
+/******************************************/
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
--
1.9.0
^ permalink raw reply [flat|nested] 29+ messages in thread
* Re: [dpdk-dev] [PATCH 2/4] distributor: new packet distributor library
2014-05-20 10:00 ` [dpdk-dev] [PATCH 2/4] distributor: new packet distributor library Bruce Richardson
@ 2014-05-20 18:18 ` Neil Horman
2014-05-21 10:21 ` Richardson, Bruce
0 siblings, 1 reply; 29+ messages in thread
From: Neil Horman @ 2014-05-20 18:18 UTC (permalink / raw)
To: Bruce Richardson; +Cc: dev
On Tue, May 20, 2014 at 11:00:55AM +0100, Bruce Richardson wrote:
> This adds the code for a new Intel DPDK library for packet distribution.
> The distributor is a component which is designed to pass packets
> one-at-a-time to workers, with dynamic load balancing. Using the RSS
> field in the mbuf as a tag, the distributor tracks what packet tag is
> being processed by what worker and then ensures that no two packets with
> the same tag are in-flight simultaneously. Once a tag is not in-flight,
> then the next packet with that tag will be sent to the next available
> core.
>
> Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
><snip>
> +#define RTE_DISTRIB_GET_BUF (1)
> +#define RTE_DISTRIB_RETURN_BUF (2)
> +
Can you document the meaning of these bits please, the code makes it somewhat
confusing to differentiate them. As I read the code, GET_BUF is used as a flag
to indicate that rte_distributor_get_pkt needs to wait while a buffer is
filled in by the processing thread, while RETURN_BUF indicates that a worker is
leaving and the buffer needs to be (re)assigned to an alternate worker, is that
correct?
> +#define RTE_DISTRIB_BACKLOG_SIZE 8
> +#define RTE_DISTRIB_BACKLOG_MASK (RTE_DISTRIB_BACKLOG_SIZE - 1)
> +
> +#define RTE_DISTRIB_MAX_RETURNS 128
> +#define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1)
> +
> +union rte_distributor_buffer {
> + volatile int64_t bufptr64;
> + char pad[CACHE_LINE_SIZE*3];
Do you need the pad, if you mark the struct as cache aligned?
> +} __rte_cache_aligned;
>
+
><snip>
> +
> +struct rte_mbuf *
> +rte_distributor_get_pkt(struct rte_distributor *d,
> + unsigned worker_id, struct rte_mbuf *oldpkt,
> + unsigned reserved __rte_unused)
> +{
> + union rte_distributor_buffer *buf = &d->bufs[worker_id];
> + int64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS) | \
> + RTE_DISTRIB_GET_BUF;
> + while (unlikely(buf->bufptr64 & RTE_DISTRIB_FLAGS_MASK))
> + rte_pause();
> + buf->bufptr64 = req;
> + while (buf->bufptr64 & RTE_DISTRIB_GET_BUF)
> + rte_pause();
You may want to document the fact that this is deadlock prone. You clearly
state that only a single thread can run the processing routine, but if a user
selects a single worker thread to preform double duty, the GET_BUF_FLAG will
never get cleared here, and no other queues will get processed.
> + /* since bufptr64 is a signed value, this should be an arithmetic shift */
> + int64_t ret = buf->bufptr64 >> RTE_DISTRIB_FLAG_BITS;
> + return (struct rte_mbuf *)((uintptr_t)ret);
> +}
> +
> +int
> +rte_distributor_return_pkt(struct rte_distributor *d,
> + unsigned worker_id, struct rte_mbuf *oldpkt)
> +{
Maybe some optional sanity checking, here and above, to ensure that a packet
returned through get_pkt doesn't also get returned here, mangling the flags
field?
><snip>
> +
> +/* flush the distributor, so that there are no outstanding packets in flight or
> + * queued up. */
> +int
> +rte_distributor_flush(struct rte_distributor *d)
> +{
You need to document that this function can only be called by the same thread
that is running rte_distributor_process, lest your corrupt your queue data.
Alternatively, it might be nicer to modify this functions internals to set a
flag in the distributor status bits to make the process routine do the flush
work when it gets set. that would allow this function to be called by any
other thread, which seems like a more natural interface.
><snip>
> +}
> +
> +/* clears the internal returns array in the distributor */
> +void
> +rte_distributor_clear_returns(struct rte_distributor *d)
> +{
This can also only be called by the same thread that runs the process routine,
lest the start and count values get mis-assigned.
> + d->returns.start = d->returns.count = 0;
> +#ifndef __OPTIMIZE__
> + memset(d->returns.mbufs, 0, sizeof(d->returns.mbufs));
> +#endif
> +}
> +
> +/* creates a distributor instance */
> +struct rte_distributor *
> +rte_distributor_create(const char *name,
> + unsigned socket_id,
> + unsigned num_workers,
> + struct rte_distributor_extra_args *args __rte_unused)
> +{
> + struct rte_distributor *d;
> + struct rte_distributor_list *distributor_list;
> + char mz_name[RTE_MEMZONE_NAMESIZE];
> + const struct rte_memzone *mz;
> +
> + /* compilation-time checks */
> + RTE_BUILD_BUG_ON((sizeof(*d) & CACHE_LINE_MASK) != 0);
> + RTE_BUILD_BUG_ON((RTE_MAX_LCORE & 7) != 0);
> +
> + if (name == NULL || num_workers >= RTE_MAX_LCORE) {
> + rte_errno = EINVAL;
> + return NULL;
> + }
> + rte_snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
> + mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
> + if (mz == NULL) {
> + rte_errno = ENOMEM;
> + return NULL;
> + }
> +
> + /* check that we have an initialised tail queue */
> + if ((distributor_list = RTE_TAILQ_LOOKUP_BY_IDX(RTE_TAILQ_DISTRIBUTOR,
> + rte_distributor_list)) == NULL) {
> + rte_errno = E_RTE_NO_TAILQ;
> + return NULL;
> + }
> +
> + d = mz->addr;
> + rte_snprintf(d->name, sizeof(d->name), "%s", name);
> + d->num_workers = num_workers;
> + TAILQ_INSERT_TAIL(distributor_list, d, next);
You need locking around this list unless you intend to assert that distributor
creation and destruction must only be preformed from a single thread. Also,
where is the API method to tear down a distributor instance?
><snip>
> +#endif
> +
> +#include <rte_mbuf.h>
> +
> +#define RTE_DISTRIBUTOR_NAMESIZE 32 /**< Length of name for instance */
> +
> +struct rte_distributor;
> +
> +struct rte_distributor_extra_args { }; /**< reserved for future use*/
> +
You don't need to reserve a struct name for future use. No one will use it
until you create it.
> +struct rte_mbuf *
> +rte_distributor_get_pkt(struct rte_distributor *d,
> + unsigned worker_id, struct rte_mbuf *oldpkt, unsigned reserved);
> +
Don't need to reserve an extra argument here. You're not ABI safe currently,
and if DPDK becomes ABI safe in the future, we will use a linker script to
provide versions with backward compatibility easily enough.
Neil
^ permalink raw reply [flat|nested] 29+ messages in thread
* Re: [dpdk-dev] [PATCH 2/4] distributor: new packet distributor library
2014-05-20 18:18 ` Neil Horman
@ 2014-05-21 10:21 ` Richardson, Bruce
2014-05-21 15:23 ` Neil Horman
0 siblings, 1 reply; 29+ messages in thread
From: Richardson, Bruce @ 2014-05-21 10:21 UTC (permalink / raw)
To: Neil Horman; +Cc: dev
> -----Original Message-----
> From: Neil Horman [mailto:nhorman@tuxdriver.com]
> Sent: Tuesday, May 20, 2014 7:19 PM
> To: Richardson, Bruce
> Cc: dev@dpdk.org
> Subject: Re: [dpdk-dev] [PATCH 2/4] distributor: new packet distributor library
>
> On Tue, May 20, 2014 at 11:00:55AM +0100, Bruce Richardson wrote:
> > This adds the code for a new Intel DPDK library for packet distribution.
> > The distributor is a component which is designed to pass packets
> > one-at-a-time to workers, with dynamic load balancing. Using the RSS
> > field in the mbuf as a tag, the distributor tracks what packet tag is
> > being processed by what worker and then ensures that no two packets with
> > the same tag are in-flight simultaneously. Once a tag is not in-flight,
> > then the next packet with that tag will be sent to the next available
> > core.
> >
> > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> ><snip>
>
> > +#define RTE_DISTRIB_GET_BUF (1)
> > +#define RTE_DISTRIB_RETURN_BUF (2)
> > +
> Can you document the meaning of these bits please, the code makes it
> somewhat
> confusing to differentiate them. As I read the code, GET_BUF is used as a flag
> to indicate that rte_distributor_get_pkt needs to wait while a buffer is
> filled in by the processing thread, while RETURN_BUF indicates that a worker is
> leaving and the buffer needs to be (re)assigned to an alternate worker, is that
> correct?
Pretty much. I'll add additional comments to the code.
>
> > +#define RTE_DISTRIB_BACKLOG_SIZE 8
> > +#define RTE_DISTRIB_BACKLOG_MASK (RTE_DISTRIB_BACKLOG_SIZE - 1)
> > +
> > +#define RTE_DISTRIB_MAX_RETURNS 128
> > +#define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1)
> > +
> > +union rte_distributor_buffer {
> > + volatile int64_t bufptr64;
> > + char pad[CACHE_LINE_SIZE*3];
> Do you need the pad, if you mark the struct as cache aligned?
Yes, for performance reasons we actually want the structure to take up three cache lines, not just one. For instance, this will guarantee that we don't have adjacent line prefetcher in hardware pull in an additional cache line -belonging to a different worker - when we access the memory.
> > +} __rte_cache_aligned;
> >
> +
> ><snip>
> > +
> > +struct rte_mbuf *
> > +rte_distributor_get_pkt(struct rte_distributor *d,
> > + unsigned worker_id, struct rte_mbuf *oldpkt,
> > + unsigned reserved __rte_unused)
> > +{
> > + union rte_distributor_buffer *buf = &d->bufs[worker_id];
> > + int64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS) |
> \
> > + RTE_DISTRIB_GET_BUF;
> > + while (unlikely(buf->bufptr64 & RTE_DISTRIB_FLAGS_MASK))
> > + rte_pause();
> > + buf->bufptr64 = req;
> > + while (buf->bufptr64 & RTE_DISTRIB_GET_BUF)
> > + rte_pause();
> You may want to document the fact that this is deadlock prone. You clearly
> state that only a single thread can run the processing routine, but if a user
> selects a single worker thread to preform double duty, the GET_BUF_FLAG will
> never get cleared here, and no other queues will get processed.
Agreed, I'll update the comments.
>
> > + /* since bufptr64 is a signed value, this should be an arithmetic shift */
> > + int64_t ret = buf->bufptr64 >> RTE_DISTRIB_FLAG_BITS;
> > + return (struct rte_mbuf *)((uintptr_t)ret);
> > +}
> > +
> > +int
> > +rte_distributor_return_pkt(struct rte_distributor *d,
> > + unsigned worker_id, struct rte_mbuf *oldpkt)
> > +{
> Maybe some optional sanity checking, here and above, to ensure that a packet
> returned through get_pkt doesn't also get returned here, mangling the flags
> field?
That actually shouldn't be an issue.
When we return a packet using this call, we just get the in_flight_ids value for the worker to zero (and re-assign the backlog, if any), and move on to the next worker. No checking of the returned packet is done. Also, since get_pkt always returns a new packet, the internal logic will still work ok - all that will happen if you return the wrong packet, e.g. by returning the same packet twice rather than returning the latest packet each time, is that the returns array will have the duplicated pointer in it. Whatever gets passed back by the worker gets stored directly there - it's up to the worker to return the correct pointer to the distributor.
>
> ><snip>
> > +
> > +/* flush the distributor, so that there are no outstanding packets in flight or
> > + * queued up. */
> > +int
> > +rte_distributor_flush(struct rte_distributor *d)
> > +{
> You need to document that this function can only be called by the same thread
> that is running rte_distributor_process, lest your corrupt your queue data.
> Alternatively, it might be nicer to modify this functions internals to set a
> flag in the distributor status bits to make the process routine do the flush
> work when it gets set. that would allow this function to be called by any
> other thread, which seems like a more natural interface.
Agreed. At minimum I'll update the comments, and I'll also look into what would be involved in changing the mechanism like you describe. However, given the limited time to code freeze date, it may not be possible to do here. [I also don't anticipate this function being much used in normal operations anyway - it was written in order to allow me to write proper unit tests to test the process function. We need a flush function for unit testing to sure that our packet counts are predictable at the end of each test run, and eliminate any dependency in the tests on the internal buffer sizes of the distributor.]
>
> ><snip>
> > +}
> > +
> > +/* clears the internal returns array in the distributor */
> > +void
> > +rte_distributor_clear_returns(struct rte_distributor *d)
> > +{
> This can also only be called by the same thread that runs the process routine,
> lest the start and count values get mis-assigned.
Agreed. Will update comments.
>
> > + d->returns.start = d->returns.count = 0;
> > +#ifndef __OPTIMIZE__
> > + memset(d->returns.mbufs, 0, sizeof(d->returns.mbufs));
> > +#endif
> > +}
> > +
> > +/* creates a distributor instance */
> > +struct rte_distributor *
> > +rte_distributor_create(const char *name,
> > + unsigned socket_id,
> > + unsigned num_workers,
> > + struct rte_distributor_extra_args *args __rte_unused)
> > +{
> > + struct rte_distributor *d;
> > + struct rte_distributor_list *distributor_list;
> > + char mz_name[RTE_MEMZONE_NAMESIZE];
> > + const struct rte_memzone *mz;
> > +
> > + /* compilation-time checks */
> > + RTE_BUILD_BUG_ON((sizeof(*d) & CACHE_LINE_MASK) != 0);
> > + RTE_BUILD_BUG_ON((RTE_MAX_LCORE & 7) != 0);
> > +
> > + if (name == NULL || num_workers >= RTE_MAX_LCORE) {
> > + rte_errno = EINVAL;
> > + return NULL;
> > + }
> > + rte_snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s",
> name);
> > + mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id,
> NO_FLAGS);
> > + if (mz == NULL) {
> > + rte_errno = ENOMEM;
> > + return NULL;
> > + }
> > +
> > + /* check that we have an initialised tail queue */
> > + if ((distributor_list =
> RTE_TAILQ_LOOKUP_BY_IDX(RTE_TAILQ_DISTRIBUTOR,
> > + rte_distributor_list)) == NULL) {
> > + rte_errno = E_RTE_NO_TAILQ;
> > + return NULL;
> > + }
> > +
> > + d = mz->addr;
> > + rte_snprintf(d->name, sizeof(d->name), "%s", name);
> > + d->num_workers = num_workers;
> > + TAILQ_INSERT_TAIL(distributor_list, d, next);
> You need locking around this list unless you intend to assert that distributor
> creation and destruction must only be preformed from a single thread. Also,
> where is the API method to tear down a distributor instance?
Ack re locking, will make this as used in other structures.
For tearing down, that's not possible until such time as we get a function to free memzones back. Rings and mempools similarly have no free function.
>
> ><snip>
> > +#endif
> > +
> > +#include <rte_mbuf.h>
> > +
> > +#define RTE_DISTRIBUTOR_NAMESIZE 32 /**< Length of name for instance
> */
> > +
> > +struct rte_distributor;
> > +
> > +struct rte_distributor_extra_args { }; /**< reserved for future use*/
> > +
> You don't need to reserve a struct name for future use. No one will use it
> until you create it.
>
> > +struct rte_mbuf *
> > +rte_distributor_get_pkt(struct rte_distributor *d,
> > + unsigned worker_id, struct rte_mbuf *oldpkt, unsigned
> reserved);
> > +
> Don't need to reserve an extra argument here. You're not ABI safe currently,
> and if DPDK becomes ABI safe in the future, we will use a linker script to
> provide versions with backward compatibility easily enough.
We may not have ABI compatibility between releases, but on the other hand we try to reduce the amount of code changes that need to be made by our customers who are compiling their code against the libraries - generally linking against static rather than shared libraries. Since we have a reasonable expectation that this field will be needed in a future release, we want to include it now so that when we do need it, no code changes need to be made to upgrade this particular library to a new Intel DPDK version.
^ permalink raw reply [flat|nested] 29+ messages in thread
* Re: [dpdk-dev] [PATCH 2/4] distributor: new packet distributor library
2014-05-21 10:21 ` Richardson, Bruce
@ 2014-05-21 15:23 ` Neil Horman
0 siblings, 0 replies; 29+ messages in thread
From: Neil Horman @ 2014-05-21 15:23 UTC (permalink / raw)
To: Richardson, Bruce; +Cc: dev
On Wed, May 21, 2014 at 10:21:26AM +0000, Richardson, Bruce wrote:
> > -----Original Message-----
> > From: Neil Horman [mailto:nhorman@tuxdriver.com]
> > Sent: Tuesday, May 20, 2014 7:19 PM
> > To: Richardson, Bruce
> > Cc: dev@dpdk.org
> > Subject: Re: [dpdk-dev] [PATCH 2/4] distributor: new packet distributor library
> >
> > On Tue, May 20, 2014 at 11:00:55AM +0100, Bruce Richardson wrote:
> > > This adds the code for a new Intel DPDK library for packet distribution.
> > > The distributor is a component which is designed to pass packets
> > > one-at-a-time to workers, with dynamic load balancing. Using the RSS
> > > field in the mbuf as a tag, the distributor tracks what packet tag is
> > > being processed by what worker and then ensures that no two packets with
> > > the same tag are in-flight simultaneously. Once a tag is not in-flight,
> > > then the next packet with that tag will be sent to the next available
> > > core.
> > >
> > > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> > ><snip>
> >
> > ><snip other comments as I agree with your responses to them all save below>
> > Don't need to reserve an extra argument here. You're not ABI safe currently,
> > and if DPDK becomes ABI safe in the future, we will use a linker script to
> > provide versions with backward compatibility easily enough.
> We may not have ABI compatibility between releases, but on the other hand we try to reduce the amount of code changes that need to be made by our customers who are compiling their code against the libraries - generally linking against static rather than shared libraries. Since we have a reasonable expectation that this field will be needed in a future release, we want to include it now so that when we do need it, no code changes need to be made to upgrade this particular library to a new Intel DPDK version.
I understand why you added the reserved argument, but I still don't think its a
good idea, especially since you're not ABI safe/stable at the moment. By adding
this argument, you're forcing early users to declare a variable to pass into
your library that they know is unused, and as such likely uninitalized (or at
least initilized to an unknown value). When you do in the future make use of
this unknown value, your internal implementation will have to support being
called by both 'old' applications that just pass in any old value, and 'new'
users who pass in valid data, and the implementation wont have any way to
differentiate between the two. You can certainly document a reserved value that
current users must initilize that variable too, so that you can make that
differentiation, but you have to hope they do that correctly and consistently.
It seems to me it would be better to do something like:
1) Not include the reserved parameter
2) When you do add the extra parameter, rename the function as well, and
3) provide a compatibility function that preserves the old API and passes the
reserved value as the new parameter to the renamed function in (2)
That way old applications will run transparently, and you don't have to hope
they code the reserved values properly (note you can also do this with a macro
if you want to save the call instruction)
Ideally, you would just do this with a version script during linking, so that
you could include 2 versions of the same function name (v1 without the extra
paramter and v2 with the extra parameter), and old applications linked against
v1 would just continue to work, but dpdk isn't there yet :)
Neil
^ permalink raw reply [flat|nested] 29+ messages in thread
* [dpdk-dev] [PATCH 3/4] distributor: add distributor library to build
2014-05-20 10:00 [dpdk-dev] [PATCH 0/4] New library: rte_distributor Bruce Richardson
2014-05-20 10:00 ` [dpdk-dev] [PATCH 1/4] eal: add tailq for new distributor component Bruce Richardson
2014-05-20 10:00 ` [dpdk-dev] [PATCH 2/4] distributor: new packet distributor library Bruce Richardson
@ 2014-05-20 10:00 ` Bruce Richardson
2014-05-20 10:00 ` [dpdk-dev] [PATCH 4/4] distributor: add unit tests for distributor lib Bruce Richardson
` (8 subsequent siblings)
11 siblings, 0 replies; 29+ messages in thread
From: Bruce Richardson @ 2014-05-20 10:00 UTC (permalink / raw)
To: dev
add new configuration settings to enable/disable the distributor library
and add makefile entry to compile it once enabled.
Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
config/defconfig_i686-default-linuxapp-gcc | 5 +++++
config/defconfig_i686-default-linuxapp-icc | 5 +++++
config/defconfig_x86_64-default-bsdapp-gcc | 6 ++++++
config/defconfig_x86_64-default-linuxapp-gcc | 5 +++++
config/defconfig_x86_64-default-linuxapp-icc | 5 +++++
lib/Makefile | 1 +
mk/rte.app.mk | 4 ++++
7 files changed, 31 insertions(+)
diff --git a/config/defconfig_i686-default-linuxapp-gcc b/config/defconfig_i686-default-linuxapp-gcc
index 14bd3d1..5b4261e 100644
--- a/config/defconfig_i686-default-linuxapp-gcc
+++ b/config/defconfig_i686-default-linuxapp-gcc
@@ -335,3 +335,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
#
CONFIG_RTE_NIC_BYPASS=n
+#
+# Compile the distributor library
+#
+CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
+
diff --git a/config/defconfig_i686-default-linuxapp-icc b/config/defconfig_i686-default-linuxapp-icc
index ec3386e..d1d4aeb 100644
--- a/config/defconfig_i686-default-linuxapp-icc
+++ b/config/defconfig_i686-default-linuxapp-icc
@@ -334,3 +334,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
#
CONFIG_RTE_NIC_BYPASS=n
+#
+# Compile the distributor library
+#
+CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
+
diff --git a/config/defconfig_x86_64-default-bsdapp-gcc b/config/defconfig_x86_64-default-bsdapp-gcc
index d960e1d..329920e 100644
--- a/config/defconfig_x86_64-default-bsdapp-gcc
+++ b/config/defconfig_x86_64-default-bsdapp-gcc
@@ -300,3 +300,9 @@ CONFIG_RTE_APP_TEST=y
CONFIG_RTE_TEST_PMD=y
CONFIG_RTE_TEST_PMD_RECORD_CORE_CYCLES=n
CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
+
+#
+# Compile the distributor library
+#
+CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
+
diff --git a/config/defconfig_x86_64-default-linuxapp-gcc b/config/defconfig_x86_64-default-linuxapp-gcc
index f11ffbf..772a6b3 100644
--- a/config/defconfig_x86_64-default-linuxapp-gcc
+++ b/config/defconfig_x86_64-default-linuxapp-gcc
@@ -337,3 +337,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
#
CONFIG_RTE_NIC_BYPASS=n
+#
+# Compile the distributor library
+#
+CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
+
diff --git a/config/defconfig_x86_64-default-linuxapp-icc b/config/defconfig_x86_64-default-linuxapp-icc
index 4eaca4c..04affc8 100644
--- a/config/defconfig_x86_64-default-linuxapp-icc
+++ b/config/defconfig_x86_64-default-linuxapp-icc
@@ -333,3 +333,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
#
CONFIG_RTE_NIC_BYPASS=n
+#
+# Compile the distributor library
+#
+CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
+
diff --git a/lib/Makefile b/lib/Makefile
index b92b392..5a0b10f 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -55,6 +55,7 @@ DIRS-$(CONFIG_RTE_LIBRTE_METER) += librte_meter
DIRS-$(CONFIG_RTE_LIBRTE_SCHED) += librte_sched
DIRS-$(CONFIG_RTE_LIBRTE_ACL) += librte_acl
DIRS-$(CONFIG_RTE_LIBRTE_KVARGS) += librte_kvargs
+DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += librte_distributor
ifeq ($(CONFIG_RTE_EXEC_ENV_LINUXAPP),y)
DIRS-$(CONFIG_RTE_LIBRTE_KNI) += librte_kni
diff --git a/mk/rte.app.mk b/mk/rte.app.mk
index a2c60b6..82a160f 100644
--- a/mk/rte.app.mk
+++ b/mk/rte.app.mk
@@ -73,6 +73,10 @@ LDLIBS += -lrte_ivshmem
endif
endif
+ifeq ($(CONFIG_RTE_LIBRTE_DISTRIBUTOR),y)
+LDLIBS += -lrte_distributor
+endif
+
ifeq ($(CONFIG_RTE_LIBRTE_E1000_PMD),y)
LDLIBS += -lrte_pmd_e1000
endif
--
1.9.0
^ permalink raw reply [flat|nested] 29+ messages in thread
* [dpdk-dev] [PATCH 4/4] distributor: add unit tests for distributor lib
2014-05-20 10:00 [dpdk-dev] [PATCH 0/4] New library: rte_distributor Bruce Richardson
` (2 preceding siblings ...)
2014-05-20 10:00 ` [dpdk-dev] [PATCH 3/4] distributor: add distributor library to build Bruce Richardson
@ 2014-05-20 10:00 ` Bruce Richardson
2014-05-20 10:38 ` [dpdk-dev] [PATCH 0/4] New library: rte_distributor Neil Horman
` (7 subsequent siblings)
11 siblings, 0 replies; 29+ messages in thread
From: Bruce Richardson @ 2014-05-20 10:00 UTC (permalink / raw)
To: dev
Add a set of unit tests and some basic performance test for the
distributor library. These tests cover all the major functionality of
the library on both distributor and worker sides.
Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
app/test/Makefile | 2 +
app/test/commands.c | 7 +-
app/test/test.h | 2 +
app/test/test_distributor.c | 582 +++++++++++++++++++++++++++++++++++++++
app/test/test_distributor_perf.c | 274 ++++++++++++++++++
5 files changed, 866 insertions(+), 1 deletion(-)
create mode 100644 app/test/test_distributor.c
create mode 100644 app/test/test_distributor_perf.c
diff --git a/app/test/Makefile b/app/test/Makefile
index b49785e..7c2d351 100644
--- a/app/test/Makefile
+++ b/app/test/Makefile
@@ -93,6 +93,8 @@ SRCS-$(CONFIG_RTE_APP_TEST) += test_power.c
SRCS-$(CONFIG_RTE_APP_TEST) += test_common.c
SRCS-$(CONFIG_RTE_APP_TEST) += test_timer_perf.c
SRCS-$(CONFIG_RTE_APP_TEST) += test_ivshmem.c
+SRCS-$(CONFIG_RTE_APP_TEST) += test_distributor.c
+SRCS-$(CONFIG_RTE_APP_TEST) += test_distributor_perf.c
SRCS-$(CONFIG_RTE_APP_TEST) += test_devargs.c
ifeq ($(CONFIG_RTE_APP_TEST),y)
diff --git a/app/test/commands.c b/app/test/commands.c
index efa8566..dfdbd37 100644
--- a/app/test/commands.c
+++ b/app/test/commands.c
@@ -179,6 +179,10 @@ static void cmd_autotest_parsed(void *parsed_result,
ret = test_common();
if (!strcmp(res->autotest, "ivshmem_autotest"))
ret = test_ivshmem();
+ if (!strcmp(res->autotest, "distributor_autotest"))
+ ret = test_distributor();
+ if (!strcmp(res->autotest, "distributor_perf_autotest"))
+ ret = test_distributor_perf();
if (!strcmp(res->autotest, "devargs_autotest"))
ret = test_devargs();
#ifdef RTE_LIBRTE_PMD_RING
@@ -238,7 +242,8 @@ cmdline_parse_token_string_t cmd_autotest_autotest =
#ifdef RTE_LIBRTE_KVARGS
"kvargs_autotest#"
#endif
- "common_autotest");
+ "common_autotest#"
+ "distributor_autotest#distributor_perf_autotest");
cmdline_parse_inst_t cmd_autotest = {
.f = cmd_autotest_parsed, /* function to call */
diff --git a/app/test/test.h b/app/test/test.h
index 1945d29..9b83ade 100644
--- a/app/test/test.h
+++ b/app/test/test.h
@@ -92,6 +92,8 @@ int test_power(void);
int test_common(void);
int test_pmd_ring(void);
int test_ivshmem(void);
+int test_distributor(void);
+int test_distributor_perf(void);
int test_kvargs(void);
int test_devargs(void);
diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
new file mode 100644
index 0000000..3c6bef0
--- /dev/null
+++ b/app/test/test_distributor.c
@@ -0,0 +1,582 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+
+#ifdef RTE_LIBRTE_DISTRIBUTOR
+#include <unistd.h>
+#include <string.h>
+#include <rte_cycles.h>
+#include <rte_errno.h>
+#include <rte_distributor.h>
+
+#define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
+#define BURST 32
+#define BIG_BATCH 1024
+
+static volatile int quit = 0; /**< general quit variable for all threads */
+static volatile int zero_quit = 0; /**< var for when we just want thr0 to quit*/
+static volatile unsigned worker_idx =0;
+
+struct worker_stats{
+ volatile unsigned handled_packets;
+} __rte_cache_aligned;
+struct worker_stats worker_stats[RTE_MAX_LCORE];
+
+/* returns the total count of the number of packets handled by the worker
+ * functions given below.
+ */
+static inline unsigned
+total_packet_count(void)
+{
+ unsigned i, count = 0;
+ for (i = 0; i < worker_idx; i++)
+ count += worker_stats[i].handled_packets;
+ return count;
+}
+
+/* resets the packet counts for a new test */
+static inline void
+clear_packet_count(void)
+{
+ memset(&worker_stats, 0, sizeof(worker_stats));
+}
+
+/* this is the basic worker function for sanity test
+ * it does nothing but return packets and count them.
+ */
+static int
+handle_work(void *arg)
+{
+ struct rte_mbuf *pkt = NULL;
+ struct rte_distributor *d = arg;
+ unsigned count = 0;
+ unsigned id = __sync_fetch_and_add(&worker_idx, 1);
+
+ pkt = rte_distributor_get_pkt(d, id, NULL, 0);
+ while (!quit) {
+ worker_stats[id].handled_packets++, count++;
+ pkt = rte_distributor_get_pkt(d, id, pkt, 0);
+ }
+ worker_stats[id].handled_packets++, count++;
+ rte_distributor_return_pkt(d, id, pkt);
+ return 0;
+}
+
+/* do basic sanity testing of the distributor. This test tests the following:
+ * - send 32 packets through distributor with the same tag and ensure they
+ * all go to the one worker
+ * - send 32 packets throught the distributor with two different tags and
+ * verify that they go equally to two different workers.
+ * - send 32 packets with different tags through the distributors and
+ * just verify we get all packets back.
+ * - send 1024 packets through the distributor, gathering the returned packets
+ * as we go. Then verify that we correctly got all 1024 pointers back again,
+ * not necessarily in the same order (as different flows).
+ */
+static int
+sanity_test(struct rte_distributor *d, struct rte_mempool *p)
+{
+ struct rte_mbuf *bufs[BURST];
+ unsigned i;
+
+ printf("=== Basic distributor sanity tests ===\n");
+ clear_packet_count();
+ if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return -1;
+ }
+
+ /* now set all hash values in all buffers to zero, so all pkts go to the
+ * one worker thread */
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = 0;
+
+ rte_distributor_process(d, bufs, BURST);
+ rte_distributor_flush(d);
+ if (total_packet_count() != BURST) {
+ printf("Line %d: Error, not all packets flushed. Expected %u, got %u\n",
+ __LINE__, BURST, total_packet_count());
+ return -1;
+ }
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ worker_stats[i].handled_packets);
+ printf("Sanity test with all zero hashes done.\n");
+ if (worker_stats[0].handled_packets != BURST)
+ return -1;
+
+ /* pick two flows and check they go correctly */
+ if (rte_lcore_count() >= 3) {
+ clear_packet_count();
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = (i & 1) << 8;
+
+ rte_distributor_process(d, bufs, BURST);
+ rte_distributor_flush(d);
+ if (total_packet_count() != BURST) {
+ printf("Line %d: Error, not all packets flushed. "
+ "Expected %u, got %u\n",
+ __LINE__, BURST, total_packet_count());
+ return -1;
+ }
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ worker_stats[i].handled_packets);
+ printf("Sanity test with two hash values done\n");
+
+ if (worker_stats[0].handled_packets != 16 ||
+ worker_stats[1].handled_packets != 16)
+ return -1;
+ }
+
+ /* give a different hash value to each packet, so load gets distributed */
+ clear_packet_count();
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = i;
+
+ rte_distributor_process(d, bufs, BURST);
+ rte_distributor_flush(d);
+ if (total_packet_count() != BURST) {
+ printf("Line %d: Error, not all packets flushed. Expected %u, got %u\n",
+ __LINE__, BURST, total_packet_count());
+ return -1;
+ }
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ worker_stats[i].handled_packets);
+ printf("Sanity test with non-zero hashes done\n");
+
+ rte_mempool_put_bulk(p, (void *)bufs, BURST);
+
+ /* sanity test with BIG_BATCH packets to ensure they all arrived back from
+ * the returned packets function */
+ clear_packet_count();
+ struct rte_mbuf *many_bufs[BIG_BATCH], *return_bufs[BIG_BATCH];
+ unsigned num_returned = 0;
+
+ /* flush out any remaining packets */
+ rte_distributor_flush(d);
+ rte_distributor_clear_returns(d);
+ if (rte_mempool_get_bulk(p, (void *)many_bufs, BIG_BATCH) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return -1;
+ }
+ for (i = 0; i < BIG_BATCH; i++)
+ many_bufs[i]->pkt.hash.rss = i << 2;
+
+ for (i = 0; i < BIG_BATCH/BURST; i++) {
+ rte_distributor_process(d, &many_bufs[i*BURST], BURST);
+ num_returned += rte_distributor_returned_pkts(d,
+ &return_bufs[num_returned], BIG_BATCH - num_returned);
+ }
+ rte_distributor_flush(d);
+ num_returned += rte_distributor_returned_pkts(d,
+ &return_bufs[num_returned], BIG_BATCH - num_returned);
+
+ if (num_returned != BIG_BATCH) {
+ printf("line %d: Number returned is not the same as number sent\n",
+ __LINE__);
+ return -1;
+ }
+ /* big check - make sure all packets made it back!! */
+ for (i = 0; i < BIG_BATCH; i++) {
+ unsigned j;
+ struct rte_mbuf *src = many_bufs[i];
+ for (j = 0; j < BIG_BATCH; j++)
+ if (return_bufs[j] == src) {
+ break;
+ }
+ if (j == BIG_BATCH) {
+ printf("Error: could not find source packet #%u\n", i);
+ return -1;
+ }
+ }
+ printf("Sanity test of returned packets done\n");
+
+ rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
+
+ printf("\n");
+ return 0;
+}
+
+
+/* to test that the distributor does not lose packets, we use this worker
+ * function which frees mbufs when it gets them. The distributor thread does
+ * the mbuf allocation. If distributor drops packets we'll eventually run out
+ * of mbufs.
+ */
+static int
+handle_work_with_free_mbufs(void *arg)
+{
+ struct rte_mbuf *pkt = NULL;
+ struct rte_distributor *d = arg;
+ unsigned count = 0;
+ unsigned id = __sync_fetch_and_add(&worker_idx, 1);
+
+ pkt = rte_distributor_get_pkt(d, id, NULL, 0);
+ while (!quit) {
+ worker_stats[id].handled_packets++, count++;
+ rte_pktmbuf_free(pkt);
+ pkt = rte_distributor_get_pkt(d, id, pkt, 0);
+ }
+ worker_stats[id].handled_packets++, count++;
+ rte_distributor_return_pkt(d, id, pkt);
+ return 0;
+}
+
+/* Perform a sanity test of the distributor with a large number of packets,
+ * where we allocate a new set of mbufs for each burst. The workers then
+ * free the mbufs. This ensures that we don't have any packet leaks in the
+ * library.
+ */
+static int
+sanity_test_with_mbuf_alloc(struct rte_distributor *d, struct rte_mempool *p)
+{
+ unsigned i;
+ struct rte_mbuf *bufs[BURST];
+
+ printf("=== Sanity test with mbuf alloc/free ===\n");
+ clear_packet_count();
+ for (i = 0; i < ((1<<ITER_POWER)); i+=BURST) {
+ unsigned j;
+ while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0)
+ rte_distributor_process(d, NULL, 0);
+ for (j = 0; j < BURST; j++) {
+ bufs[j]->pkt.hash.rss = (i+j) << 1;
+ bufs[j]->refcnt = 1;
+ }
+
+ rte_distributor_process(d, bufs, BURST);
+ }
+
+ rte_distributor_flush(d);
+ if (total_packet_count() < (1<<ITER_POWER)) {
+ printf("Line %u: Packet count is incorrect, %u, expected %u\n",
+ __LINE__, total_packet_count(), (1<<ITER_POWER));
+ return -1;
+ }
+
+ printf("Sanity test with mbuf alloc/free passed\n\n");
+ return 0;
+}
+
+static int
+handle_work_for_shutdown_test(void *arg)
+{
+ struct rte_mbuf *pkt = NULL;
+ struct rte_distributor *d = arg;
+ unsigned count = 0;
+ const unsigned id = __sync_fetch_and_add(&worker_idx, 1);
+
+ pkt = rte_distributor_get_pkt(d, id, NULL, 0);
+ /* wait for quit single globally, or for worker zero wait for zero_quit */
+ while (!quit && !(id == 0 && zero_quit)) {
+ worker_stats[id].handled_packets++, count++;
+ rte_pktmbuf_free(pkt);
+ pkt = rte_distributor_get_pkt(d, id, NULL, 0);
+ }
+ worker_stats[id].handled_packets++, count++;
+ rte_distributor_return_pkt(d, id, pkt);
+
+ if (id == 0) {
+ /* for worker zero, allow it to restart to pick up last packet
+ * when all workers are shutting down.
+ */
+ while(zero_quit)
+ usleep(100);
+ pkt = rte_distributor_get_pkt(d, id, NULL, 0);
+ while (!quit) {
+ worker_stats[id].handled_packets++, count++;
+ rte_pktmbuf_free(pkt);
+ pkt = rte_distributor_get_pkt(d, id, NULL, 0);
+ }
+ rte_distributor_return_pkt(d, id, pkt);
+ }
+ return 0;
+}
+
+
+/* Perform a sanity test of the distributor with a large number of packets,
+ * where we allocate a new set of mbufs for each burst. The workers then
+ * free the mbufs. This ensures that we don't have any packet leaks in the
+ * library.
+ */
+static int
+sanity_test_with_worker_shutdown(struct rte_distributor *d, struct rte_mempool *p)
+{
+ struct rte_mbuf *bufs[BURST];
+ unsigned i;
+
+ printf("=== Sanity test of worker shutdown ===\n");
+
+ clear_packet_count();
+ if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return -1;
+ }
+
+ /* now set all hash values in all buffers to zero, so all pkts go to the
+ * one worker thread */
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = 0;
+
+ rte_distributor_process(d, bufs, BURST);
+ /* at this point, we will have processed some packets and have a full backlog
+ * for the other ones at worker 0.
+ */
+
+ /* get more buffers to queue up, again setting them to the same flow */
+ if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return -1;
+ }
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = 0;
+
+ /* get worker zero to quit */
+ zero_quit = 1;
+ rte_distributor_process(d, bufs, BURST);
+
+ /* flush the distributor */
+ rte_distributor_flush(d);
+ if (total_packet_count() != BURST * 2) {
+ printf("Line %d: Error, not all packets flushed. Expected %u, got %u\n",
+ __LINE__, BURST * 2, total_packet_count());
+ return -1;
+ }
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ worker_stats[i].handled_packets);
+
+ printf("Sanity test with worker shutdown passed\n\n");
+ return 0;
+}
+
+/* Test that the flush function is able to move packets between workers when
+ * one worker shuts down..
+ */
+static int
+test_flush_with_worker_shutdown(struct rte_distributor *d, struct rte_mempool *p)
+{
+ struct rte_mbuf *bufs[BURST];
+ unsigned i;
+
+ printf("=== Test flush fn with worker shutdown ===\n");
+
+ clear_packet_count();
+ if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return -1;
+ }
+
+ /* now set all hash values in all buffers to zero, so all pkts go to the
+ * one worker thread */
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = 0;
+
+ rte_distributor_process(d, bufs, BURST);
+ /* at this point, we will have processed some packets and have a full backlog
+ * for the other ones at worker 0.
+ */
+
+ /* get worker zero to quit */
+ zero_quit = 1;
+
+ /* flush the distributor */
+ rte_distributor_flush(d);
+
+ zero_quit = 0;
+ if (total_packet_count() != BURST) {
+ printf("Line %d: Error, not all packets flushed. Expected %u, got %u\n",
+ __LINE__, BURST, total_packet_count());
+ return -1;
+ }
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ worker_stats[i].handled_packets);
+
+ printf("Flush test with worker shutdown passed\n\n");
+ return 0;
+}
+
+static
+int test_error_distributor_create_name(void)
+{
+ struct rte_distributor *d = NULL;
+ char* name = NULL;
+
+ d = rte_distributor_create(name, rte_socket_id(),
+ rte_lcore_count() - 1, NULL);
+ if (d != NULL || rte_errno != EINVAL) {
+ printf("ERROR: distributor_name is NULL, yet create call succeeded\n");
+ return -1;
+ }
+
+ return 0;
+}
+
+
+static
+int test_error_distributor_create_numworkers(void)
+{
+ struct rte_distributor *d = NULL;
+ d = rte_distributor_create("test_numworkers", rte_socket_id(),
+ RTE_MAX_LCORE + 10, NULL);
+ if (d != NULL || rte_errno != EINVAL) {
+ printf("ERROR: num_workers >= RTE_MAX_LCORE yet create call succeed\n");
+ return -1;
+ }
+ return 0;
+}
+
+
+/* Useful function which ensures that all worker functions terminate */
+static void
+quit_workers(struct rte_distributor *d, struct rte_mempool *p)
+{
+ const unsigned num_workers = rte_lcore_count() - 1;
+ unsigned i;
+ struct rte_mbuf *bufs[RTE_MAX_LCORE];
+ rte_mempool_get_bulk(p, (void *)bufs, num_workers);
+
+ zero_quit = 0;
+ quit = 1;
+ for (i = 0; i < num_workers; i++)
+ bufs[i]->pkt.hash.rss = i << 1;
+ rte_distributor_process(d, bufs, num_workers);
+
+ rte_mempool_put_bulk(p, (void *)bufs, num_workers);
+
+ rte_distributor_process(d, NULL, 0);
+ rte_distributor_flush(d);
+ rte_eal_mp_wait_lcore();
+ quit = 0;
+ worker_idx = 0;
+}
+
+#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
+
+int
+test_distributor(void)
+{
+ static struct rte_distributor *d = NULL;
+ static struct rte_mempool *p = NULL;
+
+ if (rte_lcore_count() < 2) {
+ printf("ERROR: not enough cores to test distributor\n");
+ return -1;
+ }
+
+ if (d == NULL) {
+ d = rte_distributor_create("Test_distributor", rte_socket_id(),
+ rte_lcore_count() - 1, NULL);
+ if (d == NULL) {
+ printf("Error creating distributor\n");
+ return -1;
+ }
+ } else {
+ rte_distributor_flush(d);
+ rte_distributor_clear_returns(d);
+ }
+
+ const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
+ BIG_BATCH * 2 -1 : (511 * rte_lcore_count());
+ if (p == NULL) {
+ p = rte_mempool_create("DT_MBUF_POOL", nb_bufs,
+ MBUF_SIZE, BURST,
+ sizeof(struct rte_pktmbuf_pool_private),
+ rte_pktmbuf_pool_init, NULL,
+ rte_pktmbuf_init, NULL,
+ rte_socket_id(), 0);
+ if (p == NULL) {
+ printf("Error creating mempool\n");
+ return -1;
+ }
+ }
+
+ rte_eal_mp_remote_launch(handle_work, d, SKIP_MASTER);
+ if (sanity_test(d, p) < 0)
+ goto err;
+ quit_workers(d, p);
+
+ rte_eal_mp_remote_launch(handle_work_with_free_mbufs, d, SKIP_MASTER);
+ if (sanity_test_with_mbuf_alloc(d, p) < 0)
+ goto err;
+ quit_workers(d, p);
+
+ if (rte_lcore_count() > 2) {
+ rte_eal_mp_remote_launch(handle_work_for_shutdown_test, d, SKIP_MASTER);
+ if (sanity_test_with_worker_shutdown(d, p) < 0)
+ goto err;
+ quit_workers(d, p);
+
+ rte_eal_mp_remote_launch(handle_work_for_shutdown_test, d, SKIP_MASTER);
+ if (test_flush_with_worker_shutdown(d, p) < 0)
+ goto err;
+ quit_workers(d, p);
+
+ } else {
+ printf("Not enough cores to run tests for worker shutdown\n");
+ }
+
+ if (test_error_distributor_create_numworkers() == -1 ||
+ test_error_distributor_create_name() == -1) {
+ printf("rte_distributor_create parameter check tests failed");
+ return -1;
+ }
+
+ return 0;
+
+err:
+ quit_workers(d, p);
+ return -1;
+}
+
+#else
+
+#include <stdio.h>
+
+int
+test_distributor(void)
+{
+ printf("Distributor is not enabled in configuration\n");
+ return 0;
+}
+
+#endif
diff --git a/app/test/test_distributor_perf.c b/app/test/test_distributor_perf.c
new file mode 100644
index 0000000..29e4013
--- /dev/null
+++ b/app/test/test_distributor_perf.c
@@ -0,0 +1,274 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+
+#ifdef RTE_LIBRTE_DISTRIBUTOR
+#include <unistd.h>
+#include <string.h>
+#include <rte_cycles.h>
+#include <rte_distributor.h>
+
+#define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
+#define BURST 32
+#define BIG_BATCH 1024
+
+static volatile int quit = 0;
+static volatile unsigned worker_idx;
+
+struct worker_stats{
+ volatile unsigned handled_packets;
+} __rte_cache_aligned;
+struct worker_stats worker_stats[RTE_MAX_LCORE];
+
+/* worker thread used for testing the time to do a round-trip of a cache
+ * line between two cores and back again
+ */
+static void
+flip_bit(volatile uint64_t *arg)
+{
+ uint64_t old_val = 0;
+ while (old_val != 2) {
+ while(!*arg)
+ rte_pause();
+ old_val = *arg;
+ *arg = 0;
+ }
+}
+
+/* test case to time the number of cycles to round-trip a cache line between
+ * two cores and back again.
+ */
+static void
+time_cache_line_switch(void)
+{
+ /* allocate a full cache line for data, we use only first byte of it */
+ uint64_t data[CACHE_LINE_SIZE*3 / sizeof(uint64_t)];
+
+ unsigned i, slaveid = rte_get_next_lcore(rte_lcore_id(), 0, 0);
+ volatile uint64_t *pdata = &data[0];
+ *pdata = 1;
+ rte_eal_remote_launch((lcore_function_t *)flip_bit, &data[0], slaveid);
+ while (*pdata)
+ rte_pause();
+
+ const uint64_t start_time = rte_rdtsc();
+ for (i = 0; i < (1<< ITER_POWER); i++) {
+ while (*pdata)
+ rte_pause();
+ *pdata = 1;
+ }
+ const uint64_t end_time = rte_rdtsc();
+
+ while (*pdata)
+ rte_pause();
+ *pdata = 2;
+ rte_eal_wait_lcore(slaveid);
+ printf("==== Cache line switch test ===\n");
+ printf("Time for %u iterations = %"PRIu64" ticks\n", (1<<ITER_POWER),
+ end_time-start_time);
+ printf("Ticks per iteration = %"PRIu64"\n\n",
+ (end_time-start_time) >> ITER_POWER);
+}
+
+/* returns the total count of the number of packets handled by the worker
+ * functions given below.
+ */
+static unsigned
+total_packet_count(void)
+{
+ unsigned i, count = 0;
+ for (i = 0; i < worker_idx; i++)
+ count += worker_stats[i].handled_packets;
+ return count;
+}
+
+/* resets the packet counts for a new test */
+static void
+clear_packet_count(void)
+{
+ memset(&worker_stats, 0, sizeof(worker_stats));
+}
+
+/* this is the basic worker function for performance tests.
+ * it does nothing but return packets and count them.
+ */
+static int
+handle_work(void *arg)
+{
+ struct rte_mbuf *pkt = NULL;
+ struct rte_distributor *d = arg;
+ unsigned count = 0;
+ unsigned id = __sync_fetch_and_add(&worker_idx, 1);
+
+ pkt = rte_distributor_get_pkt(d, id, NULL, 0);
+ while (!quit) {
+ worker_stats[id].handled_packets++, count++;
+ pkt = rte_distributor_get_pkt(d, id, pkt, 0);
+ }
+ worker_stats[id].handled_packets++, count++;
+ rte_distributor_return_pkt(d, id, pkt);
+ return 0;
+}
+
+/* this basic performance test just repeatedly sends in 32 packets at a time
+ * to the distributor and verifies at the end that we got them all in the worker
+ * threads and finally how long per packet the processing took.
+ */
+static inline int
+perf_test(struct rte_distributor *d, struct rte_mempool *p)
+{
+ unsigned i;
+ uint64_t start, end;
+ struct rte_mbuf *bufs[BURST];
+
+ clear_packet_count();
+ if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+ printf("Error getting mbufs from pool\n");
+ return -1;
+ }
+ /* ensure we have different hash value for each pkt */
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = i;
+
+ start = rte_rdtsc();
+ for (i = 0; i < (1<<ITER_POWER); i++)
+ rte_distributor_process(d, bufs, BURST);
+ end = rte_rdtsc();
+
+ do {
+ usleep(100);
+ rte_distributor_process(d, NULL, 0);
+ } while (total_packet_count() < (BURST << ITER_POWER));
+
+ printf("=== Performance test of distributor ===\n");
+ printf("Time per burst: %"PRIu64"\n", (end - start) >> ITER_POWER);
+ printf("Time per packet: %"PRIu64"\n\n",
+ ((end - start) >> ITER_POWER)/BURST);
+ rte_mempool_put_bulk(p, (void *)bufs, BURST);
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ worker_stats[i].handled_packets);
+ printf("Total packets: %u (%x)\n", total_packet_count(),
+ total_packet_count());
+ printf("=== Perf test done ===\n\n");
+
+ return 0;
+}
+
+/* Useful function which ensures that all worker functions terminate */
+static void
+quit_workers(struct rte_distributor *d, struct rte_mempool *p)
+{
+ const unsigned num_workers = rte_lcore_count() - 1;
+ unsigned i;
+ struct rte_mbuf *bufs[RTE_MAX_LCORE];
+ rte_mempool_get_bulk(p, (void *)bufs, num_workers);
+
+ quit = 1;
+ for (i = 0; i < num_workers; i++)
+ bufs[i]->pkt.hash.rss = i << 1;
+ rte_distributor_process(d, bufs, num_workers);
+
+ rte_mempool_put_bulk(p, (void *)bufs, num_workers);
+
+ rte_distributor_process(d, NULL, 0);
+ rte_eal_mp_wait_lcore();
+ quit = 0;
+ worker_idx = 0;
+}
+
+#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
+
+int
+test_distributor_perf(void)
+{
+ static struct rte_distributor *d = NULL;
+ static struct rte_mempool *p = NULL;
+
+ if (rte_lcore_count() < 2) {
+ printf("ERROR: not enough cores to test distributor\n");
+ return -1;
+ }
+
+ /* first time how long it takes to round-trip a cache line */
+ time_cache_line_switch();
+
+ if (d == NULL) {
+ d = rte_distributor_create("Test_perf", rte_socket_id(),
+ rte_lcore_count() - 1, NULL);
+ if (d == NULL) {
+ printf("Error creating distributor\n");
+ return -1;
+ }
+ } else {
+ rte_distributor_flush(d);
+ rte_distributor_clear_returns(d);
+ }
+
+ const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
+ BIG_BATCH * 2 -1 : (511 * rte_lcore_count());
+ if (p == NULL) {
+ p = rte_mempool_create("DPT_MBUF_POOL", nb_bufs,
+ MBUF_SIZE, BURST,
+ sizeof(struct rte_pktmbuf_pool_private),
+ rte_pktmbuf_pool_init, NULL,
+ rte_pktmbuf_init, NULL,
+ rte_socket_id(), 0);
+ if (p == NULL) {
+ printf("Error creating mempool\n");
+ return -1;
+ }
+ }
+
+ rte_eal_mp_remote_launch(handle_work, d, SKIP_MASTER);
+ if (perf_test(d,p) < 0)
+ return -1;
+ quit_workers(d, p);
+
+ return 0;
+}
+
+#else
+
+#include <stdio.h>
+
+int
+test_distributor_perf(void)
+{
+ printf("Distributor is not enabled in configuration\n");
+ return 0;
+}
+
+#endif
--
1.9.0
^ permalink raw reply [flat|nested] 29+ messages in thread
* Re: [dpdk-dev] [PATCH 0/4] New library: rte_distributor
2014-05-20 10:00 [dpdk-dev] [PATCH 0/4] New library: rte_distributor Bruce Richardson
` (3 preceding siblings ...)
2014-05-20 10:00 ` [dpdk-dev] [PATCH 4/4] distributor: add unit tests for distributor lib Bruce Richardson
@ 2014-05-20 10:38 ` Neil Horman
2014-05-20 11:02 ` Richardson, Bruce
2014-05-27 22:32 ` Thomas Monjalon
` (6 subsequent siblings)
11 siblings, 1 reply; 29+ messages in thread
From: Neil Horman @ 2014-05-20 10:38 UTC (permalink / raw)
To: Bruce Richardson; +Cc: dev
On Tue, May 20, 2014 at 11:00:53AM +0100, Bruce Richardson wrote:
> This adds a new library to the Intel DPDK whereby a set of packets can be distributed one-at-a-time to a set of worker cores, with dynamic load balancing being done between those workers. Flows are identified by a tag within the mbuf (currently the RSS hash field, 32-bit value), which is used to ensure that no two packets of the same flow are processed in parallel, thereby preserving ordering.
>
> Bruce Richardson (4):
> eal: add tailq for new distributor component
> distributor: new packet distributor library
> distributor: add distributor library to build
> distributor: add unit tests for distributor lib
>
> app/test/Makefile | 2 +
> app/test/commands.c | 7 +-
> app/test/test.h | 2 +
> app/test/test_distributor.c | 582 +++++++++++++++++++++++++
> app/test/test_distributor_perf.c | 274 ++++++++++++
> config/defconfig_i686-default-linuxapp-gcc | 5 +
> config/defconfig_i686-default-linuxapp-icc | 5 +
> config/defconfig_x86_64-default-bsdapp-gcc | 6 +
> config/defconfig_x86_64-default-linuxapp-gcc | 5 +
> config/defconfig_x86_64-default-linuxapp-icc | 5 +
> lib/Makefile | 1 +
> lib/librte_distributor/Makefile | 50 +++
> lib/librte_distributor/rte_distributor.c | 417 ++++++++++++++++++
> lib/librte_distributor/rte_distributor.h | 173 ++++++++
> lib/librte_eal/common/include/rte_tailq_elem.h | 2 +
> mk/rte.app.mk | 4 +
> 16 files changed, 1539 insertions(+), 1 deletion(-)
> create mode 100644 app/test/test_distributor.c
> create mode 100644 app/test/test_distributor_perf.c
> create mode 100644 lib/librte_distributor/Makefile
> create mode 100644 lib/librte_distributor/rte_distributor.c
> create mode 100644 lib/librte_distributor/rte_distributor.h
>
> --
> 1.9.0
>
>
This sounds an awful lot like the team and bonding drivers. Why implement this
as a separate application accessible api, rather than a stacked PMD? If you do
the latter then existing applications could concievably change their
configurations to use this technology and gain the benefit of load distribution
without having to alter the application to use a new api.
Neil
^ permalink raw reply [flat|nested] 29+ messages in thread
* Re: [dpdk-dev] [PATCH 0/4] New library: rte_distributor
2014-05-20 10:38 ` [dpdk-dev] [PATCH 0/4] New library: rte_distributor Neil Horman
@ 2014-05-20 11:02 ` Richardson, Bruce
2014-05-20 17:14 ` Neil Horman
0 siblings, 1 reply; 29+ messages in thread
From: Richardson, Bruce @ 2014-05-20 11:02 UTC (permalink / raw)
To: Neil Horman; +Cc: dev
> -----Original Message-----
> From: Neil Horman [mailto:nhorman@tuxdriver.com]
> Sent: Tuesday, May 20, 2014 11:39 AM
> To: Richardson, Bruce
> Cc: dev@dpdk.org
> Subject: Re: [dpdk-dev] [PATCH 0/4] New library: rte_distributor
>
> >
> This sounds an awful lot like the team and bonding drivers. Why implement this
> as a separate application accessible api, rather than a stacked PMD? If you do
> the latter then existing applications could concievably change their
> configurations to use this technology and gain the benefit of load distribution
> without having to alter the application to use a new api.
>
I'm not sure I see the similarity with the bonded driver, which merges multiple ports into a single logical port, i.e. you pull packets from a single source which is actually pull packets from possibly multiple sources behind the scenes, whereas this takes packets from an unknown source and distributes them among a set of workers a single packet at a time. (While handling single packets is slower than handling packet bursts, it is something that is sometimes needed to support existing code which may not be written to work with packet bursts.)
The load balancing is also more dynamic than that done by existing mechanisms, since no calculation is done on the packets or the packet metadata to assign a packet to a worker - instead if a particular flow tag is not in-flight with a worker, the next packet with that tag goes to the next available worker. In this way, the library also takes care of ensuring that packets from a single flow are maintained in order, and provides a mechanism to have the packets passed back to the distributor thread when done, for further processing there, e.g. rescheduling a second time, or other actions.
While in certain circumstances an ethdev rx/tx API could be used (and it is something we have thought about and may well add to this library in future), there are certain requirements that cannot be met by just making this a stacked ethdev/PMD:
* not all packets come from an rx_burst call on another PMD, especially where the tags on the packets need to be computed by software
* the rx_burst API call provides no way to pass back packets to the source when finished.
^ permalink raw reply [flat|nested] 29+ messages in thread
* Re: [dpdk-dev] [PATCH 0/4] New library: rte_distributor
2014-05-20 11:02 ` Richardson, Bruce
@ 2014-05-20 17:14 ` Neil Horman
2014-05-20 19:32 ` Richardson, Bruce
0 siblings, 1 reply; 29+ messages in thread
From: Neil Horman @ 2014-05-20 17:14 UTC (permalink / raw)
To: Richardson, Bruce; +Cc: dev
On Tue, May 20, 2014 at 11:02:15AM +0000, Richardson, Bruce wrote:
> > -----Original Message-----
> > From: Neil Horman [mailto:nhorman@tuxdriver.com]
> > Sent: Tuesday, May 20, 2014 11:39 AM
> > To: Richardson, Bruce
> > Cc: dev@dpdk.org
> > Subject: Re: [dpdk-dev] [PATCH 0/4] New library: rte_distributor
> >
> > >
> > This sounds an awful lot like the team and bonding drivers. Why implement this
> > as a separate application accessible api, rather than a stacked PMD? If you do
> > the latter then existing applications could concievably change their
> > configurations to use this technology and gain the benefit of load distribution
> > without having to alter the application to use a new api.
> >
>
> I'm not sure I see the similarity with the bonded driver, which merges multiple ports into a single logical port, i.e. you pull packets from a single source which is actually pull packets from possibly multiple sources behind the scenes, whereas this takes packets from an unknown source and distributes them among a set of workers a single packet at a time. (While handling single packets is slower than handling packet bursts, it is something that is sometimes needed to support existing code which may not be written to work with packet bursts.)
>
Ah, my bad, I was looking at the API as a way of multiplexing locally generated
data to multiple workers for transmission over multiple network interfaces, not
to demultiplex received data to multiple workers. That makes more sense. Sorry
for the noise. I've got a few more comments inline with the rest of your
patches.
Neil
^ permalink raw reply [flat|nested] 29+ messages in thread
* Re: [dpdk-dev] [PATCH 0/4] New library: rte_distributor
2014-05-20 17:14 ` Neil Horman
@ 2014-05-20 19:32 ` Richardson, Bruce
0 siblings, 0 replies; 29+ messages in thread
From: Richardson, Bruce @ 2014-05-20 19:32 UTC (permalink / raw)
To: Neil Horman; +Cc: dev
> -----Original Message-----
> From: Neil Horman [mailto:nhorman@tuxdriver.com]
> Sent: Tuesday, May 20, 2014 6:14 PM
> To: Richardson, Bruce
> Cc: dev@dpdk.org
> Subject: Re: [dpdk-dev] [PATCH 0/4] New library: rte_distributor
>
> Ah, my bad, I was looking at the API as a way of multiplexing locally generated
> data to multiple workers for transmission over multiple > On Tue, May 20, 2014 at 11:02:15AM +0000, Richardson, Bruce wrote:
> > > -----Original Message-----
> > > From: Neil Horman [mailto:nhorman@tuxdriver.com]
> > > Sent: Tuesday, May 20, 2014 11:39 AM
> > > To: Richardson, Bruce
> > > Cc: dev@dpdk.org
> > > Subject: Re: [dpdk-dev] [PATCH 0/4] New library: rte_distributor
> > >
> > > >
> > > This sounds an awful lot like the team and bonding drivers. Why implement
> this
> > > as a separate application accessible api, rather than a stacked PMD? If you
> do
> > > the latter then existing applications could concievably change their
> > > configurations to use this technology and gain the benefit of load
> distribution
> > > without having to alter the application to use a new api.
> > >
> >
> > I'm not sure I see the similarity with the bonded driver, which merges multiple
> ports into a single logical port, i.e. you pull packets from a single source which is
> actually pull packets from possibly multiple sources behind the scenes, whereas
> this takes packets from an unknown source and distributes them among a set of
> workers a single packet at a time. (While handling single packets is slower than
> handling packet bursts, it is something that is sometimes needed to support
> existing code which may not be written to work with packet bursts.)
> >network interfaces, not
> to demultiplex received data to multiple workers. That makes more sense.
> Sorry
> for the noise. I've got a few more comments inline with the rest of your
> patches.
> Neil
No problem, thanks for the feedback, I'll work through it and submit a v2 patch as soon as I can.
/Bruce
^ permalink raw reply [flat|nested] 29+ messages in thread
* Re: [dpdk-dev] [PATCH 0/4] New library: rte_distributor
2014-05-20 10:00 [dpdk-dev] [PATCH 0/4] New library: rte_distributor Bruce Richardson
` (4 preceding siblings ...)
2014-05-20 10:38 ` [dpdk-dev] [PATCH 0/4] New library: rte_distributor Neil Horman
@ 2014-05-27 22:32 ` Thomas Monjalon
2014-05-28 8:48 ` Richardson, Bruce
2014-05-29 10:12 ` [dpdk-dev] [PATCH v2 0/5] " Bruce Richardson
` (5 subsequent siblings)
11 siblings, 1 reply; 29+ messages in thread
From: Thomas Monjalon @ 2014-05-27 22:32 UTC (permalink / raw)
To: Bruce Richardson; +Cc: dev
Hi Bruce,
As for rte_acl, I have some formatting comments.
2014-05-20 11:00, Bruce Richardson:
> This adds a new library to the Intel DPDK whereby a set of packets can be
> distributed one-at-a-time to a set of worker cores, with dynamic load
> balancing being done between those workers. Flows are identified by a tag
> within the mbuf (currently the RSS hash field, 32-bit value), which is used
> to ensure that no two packets of the same flow are processed in parallel,
> thereby preserving ordering.
>
> app/test/Makefile | 2 +
> app/test/commands.c | 7 +-
> app/test/test.h | 2 +
> app/test/test_distributor.c | 582 +++++++++++++++++++++++++
> app/test/test_distributor_perf.c | 274 ++++++++++++
> config/defconfig_i686-default-linuxapp-gcc | 5 +
> config/defconfig_i686-default-linuxapp-icc | 5 +
> config/defconfig_x86_64-default-bsdapp-gcc | 6 +
> config/defconfig_x86_64-default-linuxapp-gcc | 5 +
> config/defconfig_x86_64-default-linuxapp-icc | 5 +
> lib/Makefile | 1 +
> lib/librte_distributor/Makefile | 50 +++
> lib/librte_distributor/rte_distributor.c | 417 ++++++++++++++++++
> lib/librte_distributor/rte_distributor.h | 173 ++++++++
> lib/librte_eal/common/include/rte_tailq_elem.h | 2 +
> mk/rte.app.mk | 4 +
> 16 files changed, 1539 insertions(+), 1 deletion(-)
As you are introducing a new library, you need to update
doxygen configuration and start page:
doc/doxy-api.conf
doc/doxy-api-index.md
I've run checkpatch.pl from kernel.org on these distributor patches
and it reports some code style issues.
Could you have a look at it please?
Thanks
--
Thomas
^ permalink raw reply [flat|nested] 29+ messages in thread
* Re: [dpdk-dev] [PATCH 0/4] New library: rte_distributor
2014-05-27 22:32 ` Thomas Monjalon
@ 2014-05-28 8:48 ` Richardson, Bruce
0 siblings, 0 replies; 29+ messages in thread
From: Richardson, Bruce @ 2014-05-28 8:48 UTC (permalink / raw)
To: Thomas Monjalon; +Cc: dev
> -----Original Message-----
> From: Thomas Monjalon [mailto:thomas.monjalon@6wind.com]
> Sent: Tuesday, May 27, 2014 11:33 PM
> To: Richardson, Bruce
> Cc: dev@dpdk.org
> Subject: Re: [dpdk-dev] [PATCH 0/4] New library: rte_distributor
>
> Hi Bruce,
>
> As for rte_acl, I have some formatting comments.
>
> 2014-05-20 11:00, Bruce Richardson:
> > This adds a new library to the Intel DPDK whereby a set of packets can be
> > distributed one-at-a-time to a set of worker cores, with dynamic load
> > balancing being done between those workers. Flows are identified by a tag
> > within the mbuf (currently the RSS hash field, 32-bit value), which is used
> > to ensure that no two packets of the same flow are processed in parallel,
> > thereby preserving ordering.
> >
> > app/test/Makefile | 2 +
> > app/test/commands.c | 7 +-
> > app/test/test.h | 2 +
> > app/test/test_distributor.c | 582 +++++++++++++++++++++++++
> > app/test/test_distributor_perf.c | 274 ++++++++++++
> > config/defconfig_i686-default-linuxapp-gcc | 5 +
> > config/defconfig_i686-default-linuxapp-icc | 5 +
> > config/defconfig_x86_64-default-bsdapp-gcc | 6 +
> > config/defconfig_x86_64-default-linuxapp-gcc | 5 +
> > config/defconfig_x86_64-default-linuxapp-icc | 5 +
> > lib/Makefile | 1 +
> > lib/librte_distributor/Makefile | 50 +++
> > lib/librte_distributor/rte_distributor.c | 417 ++++++++++++++++++
> > lib/librte_distributor/rte_distributor.h | 173 ++++++++
> > lib/librte_eal/common/include/rte_tailq_elem.h | 2 +
> > mk/rte.app.mk | 4 +
> > 16 files changed, 1539 insertions(+), 1 deletion(-)
>
> As you are introducing a new library, you need to update
> doxygen configuration and start page:
> doc/doxy-api.conf
> doc/doxy-api-index.md
Didn't know to update those, I'll add it in to the v2 patch set.
>
> I've run checkpatch.pl from kernel.org on these distributor patches
> and it reports some code style issues.
> Could you have a look at it please?
Yep. I've downloaded and run that patch myself in preparation for a V2 patch set (due really soon), so hopefully all should be well second time round.
^ permalink raw reply [flat|nested] 29+ messages in thread
* [dpdk-dev] [PATCH v2 0/5] New library: rte_distributor
2014-05-20 10:00 [dpdk-dev] [PATCH 0/4] New library: rte_distributor Bruce Richardson
` (5 preceding siblings ...)
2014-05-27 22:32 ` Thomas Monjalon
@ 2014-05-29 10:12 ` Bruce Richardson
2014-06-05 1:58 ` Cao, Waterman
2014-06-12 13:57 ` Thomas Monjalon
2014-05-29 10:12 ` [dpdk-dev] [PATCH v2 1/5] eal: add tailq for new distributor component Bruce Richardson
` (4 subsequent siblings)
11 siblings, 2 replies; 29+ messages in thread
From: Bruce Richardson @ 2014-05-29 10:12 UTC (permalink / raw)
To: dev
This adds a new library to the Intel DPDK whereby a set of packets can be distributed one-at-a-time to a set of worker cores, with dynamic load balancing being done between those workers. Flows are identified by a tag within the mbuf (currently the RSS hash field, 32-bit value), which is used to ensure that no two packets of the same flow are processed in parallel, thereby preserving ordering.
Major changes in V2 set:
* Updates to take account of Neil's comments on original patch set
* Updates to fix issues highlighted by checkpatch.pl
* Additional handling in library for special case when process() is called with zero mbufs
Bruce Richardson (5):
eal: add tailq for new distributor component
distributor: new packet distributor library
distributor: add distributor library to build
distributor: add unit tests for distributor lib
docs: add distributor lib to API docs
app/test/Makefile | 2 +
app/test/commands.c | 7 +-
app/test/test.h | 2 +
app/test/test_distributor.c | 595 +++++++++++++++++++++++++
app/test/test_distributor_perf.c | 275 ++++++++++++
config/common_bsdapp | 6 +
config/common_linuxapp | 5 +
doc/doxy-api-index.md | 1 +
doc/doxy-api.conf | 1 +
lib/Makefile | 1 +
lib/librte_distributor/Makefile | 50 +++
lib/librte_distributor/rte_distributor.c | 487 ++++++++++++++++++++
lib/librte_distributor/rte_distributor.h | 186 ++++++++
lib/librte_eal/common/include/rte_tailq_elem.h | 2 +
mk/rte.app.mk | 4 +
15 files changed, 1623 insertions(+), 1 deletion(-)
create mode 100644 app/test/test_distributor.c
create mode 100644 app/test/test_distributor_perf.c
create mode 100644 lib/librte_distributor/Makefile
create mode 100644 lib/librte_distributor/rte_distributor.c
create mode 100644 lib/librte_distributor/rte_distributor.h
--
1.9.3
^ permalink raw reply [flat|nested] 29+ messages in thread
* Re: [dpdk-dev] [PATCH v2 0/5] New library: rte_distributor
2014-05-29 10:12 ` [dpdk-dev] [PATCH v2 0/5] " Bruce Richardson
@ 2014-06-05 1:58 ` Cao, Waterman
2014-06-12 13:57 ` Thomas Monjalon
1 sibling, 0 replies; 29+ messages in thread
From: Cao, Waterman @ 2014-06-05 1:58 UTC (permalink / raw)
To: Richardson, Bruce, dev
Tested-by: Waterman Cao <waterman.cao@intel.com>
This patch is composed of 5 files including cover letter.
All files have been tested by Intel.
We performed function and performance test on new Distributor patch.
Total Cases Passed Failed
10 10 0
Please see example as the following:
cd /root/dpdk/app/test
make
./test -n 4 -c 0xC
./distributor_autotest
Test Environment:
Fedora 20 x86_64, Linux Kernel 3.13.6-200, GCC 4.8.2
Intel Xeon CPU E5-2680 v2 @ 2.80GHz,Intel Niantic 82599
Based off commit 41d9a8250dc8d35bcd4499ba8e59809be943e712.
^ permalink raw reply [flat|nested] 29+ messages in thread
* Re: [dpdk-dev] [PATCH v2 0/5] New library: rte_distributor
2014-05-29 10:12 ` [dpdk-dev] [PATCH v2 0/5] " Bruce Richardson
2014-06-05 1:58 ` Cao, Waterman
@ 2014-06-12 13:57 ` Thomas Monjalon
1 sibling, 0 replies; 29+ messages in thread
From: Thomas Monjalon @ 2014-06-12 13:57 UTC (permalink / raw)
To: Bruce Richardson; +Cc: dev
> This adds a new library to the Intel DPDK whereby a set of packets can be
> distributed one-at-a-time to a set of worker cores, with dynamic load
> balancing being done between those workers. Flows are identified by a tag
> within the mbuf (currently the RSS hash field, 32-bit value), which is used
> to ensure that no two packets of the same flow are processed in parallel,
> thereby preserving ordering.
>
> Major changes in V2 set:
> * Updates to take account of Neil's comments on original patch set
> * Updates to fix issues highlighted by checkpatch.pl
> * Additional handling in library for special case when process() is called
with zero mbufs
>
> Bruce Richardson (5):
> eal: add tailq for new distributor component
> distributor: new packet distributor library
> distributor: add distributor library to build
> distributor: add unit tests for distributor lib
> docs: add distributor lib to API docs
lib Acked-by: Neil Horman <nhorman@tuxdriver.com>
Tested-by: Waterman Cao <waterman.cao@intel.com>
I've added a doxygen comment @file in order to generate the according entry.
Makefiles and configs are merged with the main patch (no need to split here).
Applied for version 1.7.0.
Thanks
--
Thomas
^ permalink raw reply [flat|nested] 29+ messages in thread
* [dpdk-dev] [PATCH v2 1/5] eal: add tailq for new distributor component
2014-05-20 10:00 [dpdk-dev] [PATCH 0/4] New library: rte_distributor Bruce Richardson
` (6 preceding siblings ...)
2014-05-29 10:12 ` [dpdk-dev] [PATCH v2 0/5] " Bruce Richardson
@ 2014-05-29 10:12 ` Bruce Richardson
2014-05-29 10:12 ` [dpdk-dev] [PATCH v2 2/5] distributor: new packet distributor library Bruce Richardson
` (3 subsequent siblings)
11 siblings, 0 replies; 29+ messages in thread
From: Bruce Richardson @ 2014-05-29 10:12 UTC (permalink / raw)
To: dev
add new tailq to the EAL for new distributor library component.
Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
lib/librte_eal/common/include/rte_tailq_elem.h | 2 ++
1 file changed, 2 insertions(+)
diff --git a/lib/librte_eal/common/include/rte_tailq_elem.h b/lib/librte_eal/common/include/rte_tailq_elem.h
index 2de4010..fdd2faf 100644
--- a/lib/librte_eal/common/include/rte_tailq_elem.h
+++ b/lib/librte_eal/common/include/rte_tailq_elem.h
@@ -82,6 +82,8 @@ rte_tailq_elem(RTE_TAILQ_PM, "RTE_PM")
rte_tailq_elem(RTE_TAILQ_ACL, "RTE_ACL")
+rte_tailq_elem(RTE_TAILQ_DISTRIBUTOR, "RTE_DISTRIBUTOR")
+
rte_tailq_end(RTE_TAILQ_NUM)
#undef rte_tailq_elem
--
1.9.3
^ permalink raw reply [flat|nested] 29+ messages in thread
* [dpdk-dev] [PATCH v2 2/5] distributor: new packet distributor library
2014-05-20 10:00 [dpdk-dev] [PATCH 0/4] New library: rte_distributor Bruce Richardson
` (7 preceding siblings ...)
2014-05-29 10:12 ` [dpdk-dev] [PATCH v2 1/5] eal: add tailq for new distributor component Bruce Richardson
@ 2014-05-29 10:12 ` Bruce Richardson
2014-05-29 13:48 ` Neil Horman
2014-06-03 18:04 ` [dpdk-dev] [PATCH v3 " Bruce Richardson
2014-05-29 10:12 ` [dpdk-dev] [PATCH v2 3/5] distributor: add distributor library to build Bruce Richardson
` (2 subsequent siblings)
11 siblings, 2 replies; 29+ messages in thread
From: Bruce Richardson @ 2014-05-29 10:12 UTC (permalink / raw)
To: dev
This adds the code for a new Intel DPDK library for packet distribution.
The distributor is a component which is designed to pass packets
one-at-a-time to workers, with dynamic load balancing. Using the RSS
field in the mbuf as a tag, the distributor tracks what packet tag is
being processed by what worker and then ensures that no two packets with
the same tag are in-flight simultaneously. Once a tag is not in-flight,
then the next packet with that tag will be sent to the next available
core.
Changes in V2 patch :
* added support for a partial distributor flush when process() API
called without any new mbufs
* Removed unused "future use" parameters from functions
* Improved comments to be clearer about thread safety
* Add locks around the tailq add in create() API fn
* Stylistic improvements for issues flagged by checkpatch
Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
lib/librte_distributor/Makefile | 50 ++++
lib/librte_distributor/rte_distributor.c | 487 +++++++++++++++++++++++++++++++
lib/librte_distributor/rte_distributor.h | 186 ++++++++++++
3 files changed, 723 insertions(+)
create mode 100644 lib/librte_distributor/Makefile
create mode 100644 lib/librte_distributor/rte_distributor.c
create mode 100644 lib/librte_distributor/rte_distributor.h
diff --git a/lib/librte_distributor/Makefile b/lib/librte_distributor/Makefile
new file mode 100644
index 0000000..36699f8
--- /dev/null
+++ b/lib/librte_distributor/Makefile
@@ -0,0 +1,50 @@
+# BSD LICENSE
+#
+# Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Intel Corporation nor the names of its
+# contributors may be used to endorse or promote products derived
+# from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# library name
+LIB = librte_distributor.a
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
+
+# all source are stored in SRCS-y
+SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) := rte_distributor.c
+
+# install this header file
+SYMLINK-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR)-include := rte_distributor.h
+
+# this lib needs eal
+DEPDIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += lib/librte_eal
+DEPDIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += lib/librte_mbuf
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c
new file mode 100644
index 0000000..35b7da3
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor.c
@@ -0,0 +1,487 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <sys/queue.h>
+#include <string.h>
+#include <rte_mbuf.h>
+#include <rte_memzone.h>
+#include <rte_errno.h>
+#include <rte_string_fns.h>
+#include <rte_tailq.h>
+#include <rte_eal_memconfig.h>
+#include "rte_distributor.h"
+
+#define NO_FLAGS 0
+#define RTE_DISTRIB_PREFIX "DT_"
+
+/* we will use the bottom four bits of pointer for flags, shifting out
+ * the top four bits to make room (since a 64-bit pointer actually only uses
+ * 48 bits). An arithmetic-right-shift will then appropriately restore the
+ * original pointer value with proper sign extension into the top bits. */
+#define RTE_DISTRIB_FLAG_BITS 4
+#define RTE_DISTRIB_FLAGS_MASK (0x0F)
+#define RTE_DISTRIB_NO_BUF 0 /**< empty flags: no buffer requested */
+#define RTE_DISTRIB_GET_BUF (1) /**< worker requests a buffer, returns old */
+#define RTE_DISTRIB_RETURN_BUF (2) /**< worker returns a buffer, no request */
+
+#define RTE_DISTRIB_BACKLOG_SIZE 8
+#define RTE_DISTRIB_BACKLOG_MASK (RTE_DISTRIB_BACKLOG_SIZE - 1)
+
+#define RTE_DISTRIB_MAX_RETURNS 128
+#define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1)
+
+/**
+ * Buffer structure used to pass the pointer data between cores. This is cache
+ * line aligned, but to improve performance and prevent adjacent cache-line
+ * prefetches of buffers for other workers, e.g. when worker 1's buffer is on
+ * the next cache line to worker 0, we pad this out to three cache lines.
+ * Only 64-bits of the memory is actually used though.
+ */
+union rte_distributor_buffer {
+ volatile int64_t bufptr64;
+ char pad[CACHE_LINE_SIZE*3];
+} __rte_cache_aligned;
+
+struct rte_distributor_backlog {
+ unsigned start;
+ unsigned count;
+ int64_t pkts[RTE_DISTRIB_BACKLOG_SIZE];
+};
+
+struct rte_distributor_returned_pkts {
+ unsigned start;
+ unsigned count;
+ struct rte_mbuf *mbufs[RTE_DISTRIB_MAX_RETURNS];
+};
+
+struct rte_distributor {
+ TAILQ_ENTRY(rte_distributor) next; /**< Next in list. */
+
+ char name[RTE_DISTRIBUTOR_NAMESIZE]; /**< Name of the ring. */
+ unsigned num_workers; /**< Number of workers polling */
+
+ uint32_t in_flight_tags[RTE_MAX_LCORE];
+ struct rte_distributor_backlog backlog[RTE_MAX_LCORE];
+
+ union rte_distributor_buffer bufs[RTE_MAX_LCORE];
+
+ struct rte_distributor_returned_pkts returns;
+};
+
+TAILQ_HEAD(rte_distributor_list, rte_distributor);
+
+/**** APIs called by workers ****/
+
+struct rte_mbuf *
+rte_distributor_get_pkt(struct rte_distributor *d,
+ unsigned worker_id, struct rte_mbuf *oldpkt)
+{
+ union rte_distributor_buffer *buf = &d->bufs[worker_id];
+ int64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
+ | RTE_DISTRIB_GET_BUF;
+ while (unlikely(buf->bufptr64 & RTE_DISTRIB_FLAGS_MASK))
+ rte_pause();
+ buf->bufptr64 = req;
+ while (buf->bufptr64 & RTE_DISTRIB_GET_BUF)
+ rte_pause();
+ /* since bufptr64 is signed, this should be an arithmetic shift */
+ int64_t ret = buf->bufptr64 >> RTE_DISTRIB_FLAG_BITS;
+ return (struct rte_mbuf *)((uintptr_t)ret);
+}
+
+int
+rte_distributor_return_pkt(struct rte_distributor *d,
+ unsigned worker_id, struct rte_mbuf *oldpkt)
+{
+ union rte_distributor_buffer *buf = &d->bufs[worker_id];
+ uint64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
+ | RTE_DISTRIB_RETURN_BUF;
+ buf->bufptr64 = req;
+ return 0;
+}
+
+/**** APIs called on distributor core ***/
+
+/* as name suggests, adds a packet to the backlog for a particular worker */
+static int
+add_to_backlog(struct rte_distributor_backlog *bl, int64_t item)
+{
+ if (bl->count == RTE_DISTRIB_BACKLOG_SIZE)
+ return -1;
+
+ bl->pkts[(bl->start + bl->count++) & (RTE_DISTRIB_BACKLOG_MASK)]
+ = item;
+ return 0;
+}
+
+/* takes the next packet for a worker off the backlog */
+static int64_t
+backlog_pop(struct rte_distributor_backlog *bl)
+{
+ bl->count--;
+ return bl->pkts[bl->start++ & RTE_DISTRIB_BACKLOG_MASK];
+}
+
+/* stores a packet returned from a worker inside the returns array */
+static inline void
+store_return(uintptr_t oldbuf, struct rte_distributor *d,
+ unsigned *ret_start, unsigned *ret_count)
+{
+ /* store returns in a circular buffer - code is branch-free */
+ d->returns.mbufs[(*ret_start + *ret_count) & RTE_DISTRIB_RETURNS_MASK]
+ = (void *)oldbuf;
+ *ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
+ *ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
+}
+
+static inline void
+handle_worker_shutdown(struct rte_distributor *d, unsigned wkr)
+{
+ d->in_flight_tags[wkr] = 0;
+ d->bufs[wkr].bufptr64 = 0;
+ if (unlikely(d->backlog[wkr].count != 0)) {
+ /* On return of a packet, we need to move the
+ * queued packets for this core elsewhere.
+ * Easiest solution is to set things up for
+ * a recursive call. That will cause those
+ * packets to be queued up for the next free
+ * core, i.e. it will return as soon as a
+ * core becomes free to accept the first
+ * packet, as subsequent ones will be added to
+ * the backlog for that core.
+ */
+ struct rte_mbuf *pkts[RTE_DISTRIB_BACKLOG_SIZE];
+ unsigned i;
+ struct rte_distributor_backlog *bl = &d->backlog[wkr];
+
+ for (i = 0; i < bl->count; i++) {
+ unsigned idx = (bl->start + i) &
+ RTE_DISTRIB_BACKLOG_MASK;
+ pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
+ RTE_DISTRIB_FLAG_BITS));
+ }
+ /* recursive call */
+ rte_distributor_process(d, pkts, i);
+ bl->count = bl->start = 0;
+ }
+}
+
+/* this function is called when process() fn is called without any new
+ * packets. It goes through all the workers and clears any returned packets
+ * to do a partial flush.
+ */
+static int
+process_returns(struct rte_distributor *d)
+{
+ unsigned wkr;
+ unsigned flushed = 0;
+ unsigned ret_start = d->returns.start,
+ ret_count = d->returns.count;
+
+ for (wkr = 0; wkr < d->num_workers; wkr++) {
+
+ const int64_t data = d->bufs[wkr].bufptr64;
+ uintptr_t oldbuf = 0;
+
+ if (data & RTE_DISTRIB_GET_BUF) {
+ flushed++;
+ if (d->backlog[wkr].count)
+ d->bufs[wkr].bufptr64 =
+ backlog_pop(&d->backlog[wkr]);
+ else {
+ d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF;
+ d->in_flight_tags[wkr] = 0;
+ }
+ oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
+ } else if (data & RTE_DISTRIB_RETURN_BUF) {
+ handle_worker_shutdown(d, wkr);
+ oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
+ }
+
+ store_return(oldbuf, d, &ret_start, &ret_count);
+ }
+
+ d->returns.start = ret_start;
+ d->returns.count = ret_count;
+
+ return flushed;
+}
+
+/* process a set of packets to distribute them to workers */
+int
+rte_distributor_process(struct rte_distributor *d,
+ struct rte_mbuf **mbufs, unsigned num_mbufs)
+{
+ unsigned next_idx = 0;
+ unsigned wkr = 0;
+ struct rte_mbuf *next_mb = NULL;
+ int64_t next_value = 0;
+ uint32_t new_tag = 0;
+ unsigned ret_start = d->returns.start,
+ ret_count = d->returns.count;
+
+ if (unlikely(num_mbufs == 0))
+ return process_returns(d);
+
+ while (next_idx < num_mbufs || next_mb != NULL) {
+
+ int64_t data = d->bufs[wkr].bufptr64;
+ uintptr_t oldbuf = 0;
+
+ if (!next_mb) {
+ next_mb = mbufs[next_idx++];
+ next_value = (((int64_t)(uintptr_t)next_mb)
+ << RTE_DISTRIB_FLAG_BITS);
+ new_tag = (next_mb->pkt.hash.rss | 1);
+
+ uint32_t match = 0;
+ unsigned i;
+ for (i = 0; i < d->num_workers; i++)
+ match |= (!(d->in_flight_tags[i] ^ new_tag)
+ << i);
+
+ if (match) {
+ next_mb = NULL;
+ unsigned worker = __builtin_ctz(match);
+ if (add_to_backlog(&d->backlog[worker],
+ next_value) < 0)
+ next_idx--;
+ }
+ }
+
+ if ((data & RTE_DISTRIB_GET_BUF) &&
+ (d->backlog[wkr].count || next_mb)) {
+
+ if (d->backlog[wkr].count)
+ d->bufs[wkr].bufptr64 =
+ backlog_pop(&d->backlog[wkr]);
+
+ else {
+ d->bufs[wkr].bufptr64 = next_value;
+ d->in_flight_tags[wkr] = new_tag;
+ next_mb = NULL;
+ }
+ oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
+ } else if (data & RTE_DISTRIB_RETURN_BUF) {
+ handle_worker_shutdown(d, wkr);
+ oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
+ }
+
+ /* store returns in a circular buffer */
+ store_return(oldbuf, d, &ret_start, &ret_count);
+
+ if (++wkr == d->num_workers)
+ wkr = 0;
+ }
+ /* to finish, check all workers for backlog and schedule work for them
+ * if they are ready */
+ for (wkr = 0; wkr < d->num_workers; wkr++)
+ if (d->backlog[wkr].count &&
+ (d->bufs[wkr].bufptr64 & RTE_DISTRIB_GET_BUF)) {
+
+ int64_t oldbuf = d->bufs[wkr].bufptr64 >>
+ RTE_DISTRIB_FLAG_BITS;
+ store_return(oldbuf, d, &ret_start, &ret_count);
+
+ d->bufs[wkr].bufptr64 = backlog_pop(&d->backlog[wkr]);
+ }
+
+ d->returns.start = ret_start;
+ d->returns.count = ret_count;
+ return num_mbufs;
+}
+
+/* return to the caller, packets returned from workers */
+int
+rte_distributor_returned_pkts(struct rte_distributor *d,
+ struct rte_mbuf **mbufs, unsigned max_mbufs)
+{
+ struct rte_distributor_returned_pkts *returns = &d->returns;
+ unsigned retval = (max_mbufs < returns->count) ?
+ max_mbufs : returns->count;
+ unsigned i;
+
+ for (i = 0; i < retval; i++) {
+ unsigned idx = (returns->start + i) & RTE_DISTRIB_RETURNS_MASK;
+ mbufs[i] = returns->mbufs[idx];
+ }
+ returns->start += i;
+ returns->count -= i;
+
+ return retval;
+}
+
+/* local function used by the flush function only, to reassign a backlog for
+ * a shutdown core. The process function uses a recursive call for this, but
+ * that is not done in flush, as we need to track the outstanding packets count.
+ */
+static inline int
+move_backlog(struct rte_distributor *d, unsigned worker)
+{
+ struct rte_distributor_backlog *bl = &d->backlog[worker];
+ unsigned i;
+
+ for (i = 0; i < d->num_workers; i++) {
+ if (i == worker)
+ continue;
+ /* check worker is active and then if backlog will fit */
+ if ((d->in_flight_tags[i] != 0 ||
+ (d->bufs[i].bufptr64 & RTE_DISTRIB_GET_BUF)) &&
+ (bl->count + d->backlog[i].count)
+ <= RTE_DISTRIB_BACKLOG_SIZE) {
+ while (bl->count)
+ add_to_backlog(&d->backlog[i], backlog_pop(bl));
+ return 0;
+ }
+ }
+ return -1;
+}
+
+/* flush the distributor, so that there are no outstanding packets in flight or
+ * queued up. */
+int
+rte_distributor_flush(struct rte_distributor *d)
+{
+ unsigned wkr, total_outstanding = 0;
+ unsigned flushed = 0;
+ unsigned ret_start = d->returns.start,
+ ret_count = d->returns.count;
+
+ for (wkr = 0; wkr < d->num_workers; wkr++)
+ total_outstanding += d->backlog[wkr].count +
+ !!(d->in_flight_tags[wkr]);
+
+ wkr = 0;
+ while (flushed < total_outstanding) {
+
+ if (d->in_flight_tags[wkr] != 0 || d->backlog[wkr].count) {
+ const int64_t data = d->bufs[wkr].bufptr64;
+ uintptr_t oldbuf = 0;
+
+ if (data & RTE_DISTRIB_GET_BUF) {
+ flushed += (d->in_flight_tags[wkr] != 0);
+ if (d->backlog[wkr].count) {
+ d->bufs[wkr].bufptr64 =
+ backlog_pop(&d->backlog[wkr]);
+ /* we need to mark something as being
+ * in-flight, but it doesn't matter what
+ * as we never check it except
+ * to check for non-zero.
+ */
+ d->in_flight_tags[wkr] = 1;
+ } else {
+ d->bufs[wkr].bufptr64 =
+ RTE_DISTRIB_GET_BUF;
+ d->in_flight_tags[wkr] = 0;
+ }
+ oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
+ } else if (data & RTE_DISTRIB_RETURN_BUF) {
+ if (d->backlog[wkr].count == 0 ||
+ move_backlog(d, wkr) == 0) {
+ /* only if we move backlog,
+ * process this packet */
+ d->bufs[wkr].bufptr64 = 0;
+ oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
+ flushed++;
+ d->in_flight_tags[wkr] = 0;
+ }
+ }
+
+ store_return(oldbuf, d, &ret_start, &ret_count);
+ }
+
+ if (++wkr == d->num_workers)
+ wkr = 0;
+ }
+ d->returns.start = ret_start;
+ d->returns.count = ret_count;
+
+ return flushed;
+}
+
+/* clears the internal returns array in the distributor */
+void
+rte_distributor_clear_returns(struct rte_distributor *d)
+{
+ d->returns.start = d->returns.count = 0;
+#ifndef __OPTIMIZE__
+ memset(d->returns.mbufs, 0, sizeof(d->returns.mbufs));
+#endif
+}
+
+/* creates a distributor instance */
+struct rte_distributor *
+rte_distributor_create(const char *name,
+ unsigned socket_id,
+ unsigned num_workers)
+{
+ struct rte_distributor *d;
+ struct rte_distributor_list *distributor_list;
+ char mz_name[RTE_MEMZONE_NAMESIZE];
+ const struct rte_memzone *mz;
+
+ /* compilation-time checks */
+ RTE_BUILD_BUG_ON((sizeof(*d) & CACHE_LINE_MASK) != 0);
+ RTE_BUILD_BUG_ON((RTE_MAX_LCORE & 7) != 0);
+
+ if (name == NULL || num_workers >= RTE_MAX_LCORE) {
+ rte_errno = EINVAL;
+ return NULL;
+ }
+
+ /* check that we have an initialised tail queue */
+ distributor_list = RTE_TAILQ_LOOKUP_BY_IDX(RTE_TAILQ_DISTRIBUTOR,
+ rte_distributor_list);
+ if (distributor_list == NULL) {
+ rte_errno = E_RTE_NO_TAILQ;
+ return NULL;
+ }
+
+ rte_snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
+ mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
+ if (mz == NULL) {
+ rte_errno = ENOMEM;
+ return NULL;
+ }
+
+ d = mz->addr;
+ rte_snprintf(d->name, sizeof(d->name), "%s", name);
+ d->num_workers = num_workers;
+
+ rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+ TAILQ_INSERT_TAIL(distributor_list, d, next);
+ rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+ return d;
+}
diff --git a/lib/librte_distributor/rte_distributor.h b/lib/librte_distributor/rte_distributor.h
new file mode 100644
index 0000000..d8e953f
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor.h
@@ -0,0 +1,186 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_DISTRIBUTE_H_
+#define _RTE_DISTRIBUTE_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <rte_mbuf.h>
+
+#define RTE_DISTRIBUTOR_NAMESIZE 32 /**< Length of name for instance */
+
+struct rte_distributor;
+
+/**
+ * Function to create a new distributor instance
+ *
+ * Reserves the memory needed for the distributor operation and
+ * initializes the distributor to work with the configured number of workers.
+ *
+ * @param name
+ * The name to be given to the distributor instance.
+ * @param socket_id
+ * The NUMA node on which the memory is to be allocated
+ * @param num_workers
+ * The maximum number of workers that will request packets from this
+ * distributor
+ * @return
+ * The newly created distributor instance
+ */
+struct rte_distributor *
+rte_distributor_create(const char *name, unsigned socket_id,
+ unsigned num_workers);
+
+/* *** APIS to be called on the distributor lcore *** */
+/*
+ * The following APIs are the public APIs which are designed for use on a
+ * single lcore which acts as the distributor lcore for a given distributor
+ * instance. These functions cannot be called on multiple cores simultaneously
+ * without using locking to protect access to the internals of the distributor.
+ *
+ * NOTE: a given lcore cannot act as both a distributor lcore and a worker lcore
+ * for the same distributor instance, otherwise deadlock will result.
+ */
+
+/**
+ * Process a set of packets by distributing them among workers that request
+ * packets. The distributor will ensure that no two packets that have the
+ * same flow id, or tag, in the mbuf will be procesed at the same time.
+ *
+ * @param d
+ * The distributor instance to be used
+ * @param mbufs
+ * The mbufs to be distributed
+ * @param num_mbufs
+ * The number of mbufs in the mbufs array
+ * @return
+ * The number of mbufs processed.
+ */
+int
+rte_distributor_process(struct rte_distributor *d,
+ struct rte_mbuf **mbufs, unsigned num_mbufs);
+
+/**
+ * Get a set of mbufs that have been returned to the distributor by workers
+ *
+ * @param d
+ * The distributor instance to be used
+ * @param mbufs
+ * The mbufs pointer array to be filled in
+ * @param max_mbufs
+ * The size of the mbufs array
+ * @return
+ * The number of mbufs returned in the mbufs array.
+ */
+int
+rte_distributor_returned_pkts(struct rte_distributor *d,
+ struct rte_mbuf **mbufs, unsigned max_mbufs);
+
+/**
+ * Flush the distributor component, so that there are no in-flight or
+ * backlogged packets awaiting processing
+ *
+ * @param d
+ * The distributor instance to be used
+ * @return
+ * The number of queued/in-flight packets that were completed by this call.
+ */
+int
+rte_distributor_flush(struct rte_distributor *d);
+
+/**
+ * Clears the array of returned packets used as the source for the
+ * rte_distributor_returned_pkts() API call.
+ *
+ * @param d
+ * The distributor instance to be used
+ */
+void
+rte_distributor_clear_returns(struct rte_distributor *d);
+
+/* *** APIS to be called on the worker lcores *** */
+/*
+ * The following APIs are the public APIs which are designed for use on
+ * multiple lcores which act as workers for a distributor. Each lcore should use
+ * a unique worker id when requesting packets.
+ *
+ * NOTE: a given lcore cannot act as both a distributor lcore and a worker lcore
+ * for the same distributor instance, otherwise deadlock will result.
+ */
+
+/**
+ * API called by a worker to get a new packet to process. Any previous packet
+ * given to the worker is assumed to have completed processing, and may be
+ * optionally returned to the distributor via the oldpkt parameter.
+ *
+ * @param d
+ * The distributor instance to be used
+ * @param worker_id
+ * The worker instance number to use - must be less that num_workers passed
+ * at distributor creation time.
+ * @param oldpkt
+ * The previous packet, if any, being processed by the worker
+ *
+ * @return
+ * A new packet to be processed by the worker thread.
+ */
+struct rte_mbuf *
+rte_distributor_get_pkt(struct rte_distributor *d,
+ unsigned worker_id, struct rte_mbuf *oldpkt);
+
+/**
+ * API called by a worker to return a completed packet without requesting a
+ * new packet, for example, because a worker thread is shutting down
+ *
+ * @param d
+ * The distributor instance to be used
+ * @param worker_id
+ * The worker instance number to use - must be less that num_workers passed
+ * at distributor creation time.
+ * @param mbuf
+ * The previous packet being processed by the worker
+ */
+int
+rte_distributor_return_pkt(struct rte_distributor *d, unsigned worker_id,
+ struct rte_mbuf *mbuf);
+
+/******************************************/
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
--
1.9.3
^ permalink raw reply [flat|nested] 29+ messages in thread
* Re: [dpdk-dev] [PATCH v2 2/5] distributor: new packet distributor library
2014-05-29 10:12 ` [dpdk-dev] [PATCH v2 2/5] distributor: new packet distributor library Bruce Richardson
@ 2014-05-29 13:48 ` Neil Horman
2014-06-02 21:40 ` Richardson, Bruce
2014-06-03 18:04 ` [dpdk-dev] [PATCH v3 " Bruce Richardson
1 sibling, 1 reply; 29+ messages in thread
From: Neil Horman @ 2014-05-29 13:48 UTC (permalink / raw)
To: Bruce Richardson; +Cc: dev
> +
> +/* flush the distributor, so that there are no outstanding packets in flight or
> + * queued up. */
Its not clear to me that this is a distributor only function. You modified the
comments to indicate that lcores can't preform double duty as both a worker and
a distributor, which is fine, but it implies that there is a clear distinction
between functions that are 'worker' functions and 'distributor' functions.
While its for the most part clear-ish (workers call rte_distributor_get_pkt and
rte_distibutor_return_pkt, distibutors calls rte_distributor_create/process.
This is in a grey area. the analogy I'm thinking of here are kernel workqueues.
Theres a specific workqueue thread that processes the workqueue, but any process
can sync or flush the workqueue, leading me to think this process can be called
by a worker lcore.
> +int
> +rte_distributor_flush(struct rte_distributor *d)
> +{
> + unsigned wkr, total_outstanding = 0;
> + unsigned flushed = 0;
> + unsigned ret_start = d->returns.start,
> + ret_count = d->returns.count;
> +
> + for (wkr = 0; wkr < d->num_workers; wkr++)
> + total_outstanding += d->backlog[wkr].count +
> + !!(d->in_flight_tags[wkr]);
> +
> + wkr = 0;
> + while (flushed < total_outstanding) {
> +
> + if (d->in_flight_tags[wkr] != 0 || d->backlog[wkr].count) {
> + const int64_t data = d->bufs[wkr].bufptr64;
> + uintptr_t oldbuf = 0;
> +
> + if (data & RTE_DISTRIB_GET_BUF) {
> + flushed += (d->in_flight_tags[wkr] != 0);
> + if (d->backlog[wkr].count) {
> + d->bufs[wkr].bufptr64 =
> + backlog_pop(&d->backlog[wkr]);
> + /* we need to mark something as being
> + * in-flight, but it doesn't matter what
> + * as we never check it except
> + * to check for non-zero.
> + */
> + d->in_flight_tags[wkr] = 1;
> + } else {
> + d->bufs[wkr].bufptr64 =
> + RTE_DISTRIB_GET_BUF;
> + d->in_flight_tags[wkr] = 0;
> + }
> + oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> + } else if (data & RTE_DISTRIB_RETURN_BUF) {
> + if (d->backlog[wkr].count == 0 ||
> + move_backlog(d, wkr) == 0) {
> + /* only if we move backlog,
> + * process this packet */
> + d->bufs[wkr].bufptr64 = 0;
> + oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> + flushed++;
> + d->in_flight_tags[wkr] = 0;
> + }
> + }
> +
> + store_return(oldbuf, d, &ret_start, &ret_count);
> + }
> +
I know the comments for move_backlog say you use that function here rather than
what you do in distributor_process because you're tracking the flush count here.
That said, if you instead recomputed the total_outstanding count on each loop
iteration, and tested it for 0, I think you could just reduce the flush
operation to a looping call to rte_distributor_process. It would save you
having to maintain the flush code and the move_backlog code separately, which
would be a nice savings.
> + if (++wkr == d->num_workers)
> + wkr = 0;
Nit: wkr = ++wkr % d->num_workers avoids the additional branch in your loop
Regards
Neil
^ permalink raw reply [flat|nested] 29+ messages in thread
* Re: [dpdk-dev] [PATCH v2 2/5] distributor: new packet distributor library
2014-05-29 13:48 ` Neil Horman
@ 2014-06-02 21:40 ` Richardson, Bruce
2014-06-03 11:01 ` Neil Horman
0 siblings, 1 reply; 29+ messages in thread
From: Richardson, Bruce @ 2014-06-02 21:40 UTC (permalink / raw)
To: Neil Horman; +Cc: dev
> -----Original Message-----
> From: Neil Horman [mailto:nhorman@tuxdriver.com]
> Sent: Thursday, May 29, 2014 6:48 AM
> To: Richardson, Bruce
> Cc: dev@dpdk.org
> Subject: Re: [dpdk-dev] [PATCH v2 2/5] distributor: new packet distributor library
>
> > +
> > +/* flush the distributor, so that there are no outstanding packets in flight or
> > + * queued up. */
> Its not clear to me that this is a distributor only function. You modified the
> comments to indicate that lcores can't preform double duty as both a worker
> and
> a distributor, which is fine, but it implies that there is a clear distinction
> between functions that are 'worker' functions and 'distributor' functions.
> While its for the most part clear-ish (workers call rte_distributor_get_pkt and
> rte_distibutor_return_pkt, distibutors calls rte_distributor_create/process.
> This is in a grey area. the analogy I'm thinking of here are kernel workqueues.
> Theres a specific workqueue thread that processes the workqueue, but any
> process
> can sync or flush the workqueue, leading me to think this process can be called
> by a worker lcore.
I can update comments here further, but I was hoping the way things were right now was clear enough. In the header and C files, I have the functions explicitly split up into distributor and worker function sets, with a big block of text in the header at the start of each section explaining the threading use of the follow functions.
>
> > +int
> > +rte_distributor_flush(struct rte_distributor *d)
> > +{
> > + unsigned wkr, total_outstanding = 0;
> > + unsigned flushed = 0;
> > + unsigned ret_start = d->returns.start,
> > + ret_count = d->returns.count;
> > +
> > + for (wkr = 0; wkr < d->num_workers; wkr++)
> > + total_outstanding += d->backlog[wkr].count +
> > + !!(d->in_flight_tags[wkr]);
> > +
> > + wkr = 0;
> > + while (flushed < total_outstanding) {
> > +
> > + if (d->in_flight_tags[wkr] != 0 || d->backlog[wkr].count) {
> > + const int64_t data = d->bufs[wkr].bufptr64;
> > + uintptr_t oldbuf = 0;
> > +
> > + if (data & RTE_DISTRIB_GET_BUF) {
> > + flushed += (d->in_flight_tags[wkr] != 0);
> > + if (d->backlog[wkr].count) {
> > + d->bufs[wkr].bufptr64 =
> > + backlog_pop(&d-
> >backlog[wkr]);
> > + /* we need to mark something as being
> > + * in-flight, but it doesn't matter what
> > + * as we never check it except
> > + * to check for non-zero.
> > + */
> > + d->in_flight_tags[wkr] = 1;
> > + } else {
> > + d->bufs[wkr].bufptr64 =
> > +
> RTE_DISTRIB_GET_BUF;
> > + d->in_flight_tags[wkr] = 0;
> > + }
> > + oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> > + } else if (data & RTE_DISTRIB_RETURN_BUF) {
> > + if (d->backlog[wkr].count == 0 ||
> > + move_backlog(d, wkr) == 0) {
> > + /* only if we move backlog,
> > + * process this packet */
> > + d->bufs[wkr].bufptr64 = 0;
> > + oldbuf = data >>
> RTE_DISTRIB_FLAG_BITS;
> > + flushed++;
> > + d->in_flight_tags[wkr] = 0;
> > + }
> > + }
> > +
> > + store_return(oldbuf, d, &ret_start, &ret_count);
> > + }
> > +
> I know the comments for move_backlog say you use that function here rather
> than
> what you do in distributor_process because you're tracking the flush count here.
> That said, if you instead recomputed the total_outstanding count on each loop
> iteration, and tested it for 0, I think you could just reduce the flush
> operation to a looping call to rte_distributor_process. It would save you
> having to maintain the flush code and the move_backlog code separately, which
> would be a nice savings.
Yes, agreed, I should have spotted that myself. I'll look to rework this as soon as I can.
>
> > + if (++wkr == d->num_workers)
> > + wkr = 0;
> Nit: wkr = ++wkr % d->num_workers avoids the additional branch in your loop
>
a) branch should be easily predictable as the number of workers doesn't change. So long as branch doesn't mis-predict there should be little or no perf penalty to having it.
b) The compare plus update can also be done branchless using a "cmov" instruction if we want branch free code.
c) The modulus operator is very slow and takes far more cycles than a branch would do. If we could limit the number of workers to a power of two, then an & operation would work nicely, but that is too big a restriction to have.
So, in short, I think I'll keep this snippet as-is. :-)
^ permalink raw reply [flat|nested] 29+ messages in thread
* Re: [dpdk-dev] [PATCH v2 2/5] distributor: new packet distributor library
2014-06-02 21:40 ` Richardson, Bruce
@ 2014-06-03 11:01 ` Neil Horman
2014-06-03 14:33 ` Richardson, Bruce
0 siblings, 1 reply; 29+ messages in thread
From: Neil Horman @ 2014-06-03 11:01 UTC (permalink / raw)
To: Richardson, Bruce; +Cc: dev
On Mon, Jun 02, 2014 at 09:40:04PM +0000, Richardson, Bruce wrote:
>
>
> > -----Original Message-----
> > From: Neil Horman [mailto:nhorman@tuxdriver.com]
> > Sent: Thursday, May 29, 2014 6:48 AM
> > To: Richardson, Bruce
> > Cc: dev@dpdk.org
> > Subject: Re: [dpdk-dev] [PATCH v2 2/5] distributor: new packet distributor library
> >
> > > +
> > > +/* flush the distributor, so that there are no outstanding packets in flight or
> > > + * queued up. */
> > Its not clear to me that this is a distributor only function. You modified the
> > comments to indicate that lcores can't preform double duty as both a worker
> > and
> > a distributor, which is fine, but it implies that there is a clear distinction
> > between functions that are 'worker' functions and 'distributor' functions.
> > While its for the most part clear-ish (workers call rte_distributor_get_pkt and
> > rte_distibutor_return_pkt, distibutors calls rte_distributor_create/process.
> > This is in a grey area. the analogy I'm thinking of here are kernel workqueues.
> > Theres a specific workqueue thread that processes the workqueue, but any
> > process
> > can sync or flush the workqueue, leading me to think this process can be called
> > by a worker lcore.
>
> I can update comments here further, but I was hoping the way things were right now was clear enough. In the header and C files, I have the functions explicitly split up into distributor and worker function sets, with a big block of text in the header at the start of each section explaining the threading use of the follow functions.
>
Very well, we can let use be the determinant here. We can leave it as is, and
if reports of lockups come in, we can change it, otherwise no harm done.
> >
> > > +int
> > > +rte_distributor_flush(struct rte_distributor *d)
> > > +{
> > > + unsigned wkr, total_outstanding = 0;
> > > + unsigned flushed = 0;
> > > + unsigned ret_start = d->returns.start,
> > > + ret_count = d->returns.count;
> > > +
> > > + for (wkr = 0; wkr < d->num_workers; wkr++)
> > > + total_outstanding += d->backlog[wkr].count +
> > > + !!(d->in_flight_tags[wkr]);
> > > +
> > > + wkr = 0;
> > > + while (flushed < total_outstanding) {
> > > +
> > > + if (d->in_flight_tags[wkr] != 0 || d->backlog[wkr].count) {
> > > + const int64_t data = d->bufs[wkr].bufptr64;
> > > + uintptr_t oldbuf = 0;
> > > +
> > > + if (data & RTE_DISTRIB_GET_BUF) {
> > > + flushed += (d->in_flight_tags[wkr] != 0);
> > > + if (d->backlog[wkr].count) {
> > > + d->bufs[wkr].bufptr64 =
> > > + backlog_pop(&d-
> > >backlog[wkr]);
> > > + /* we need to mark something as being
> > > + * in-flight, but it doesn't matter what
> > > + * as we never check it except
> > > + * to check for non-zero.
> > > + */
> > > + d->in_flight_tags[wkr] = 1;
> > > + } else {
> > > + d->bufs[wkr].bufptr64 =
> > > +
> > RTE_DISTRIB_GET_BUF;
> > > + d->in_flight_tags[wkr] = 0;
> > > + }
> > > + oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> > > + } else if (data & RTE_DISTRIB_RETURN_BUF) {
> > > + if (d->backlog[wkr].count == 0 ||
> > > + move_backlog(d, wkr) == 0) {
> > > + /* only if we move backlog,
> > > + * process this packet */
> > > + d->bufs[wkr].bufptr64 = 0;
> > > + oldbuf = data >>
> > RTE_DISTRIB_FLAG_BITS;
> > > + flushed++;
> > > + d->in_flight_tags[wkr] = 0;
> > > + }
> > > + }
> > > +
> > > + store_return(oldbuf, d, &ret_start, &ret_count);
> > > + }
> > > +
> > I know the comments for move_backlog say you use that function here rather
> > than
> > what you do in distributor_process because you're tracking the flush count here.
> > That said, if you instead recomputed the total_outstanding count on each loop
> > iteration, and tested it for 0, I think you could just reduce the flush
> > operation to a looping call to rte_distributor_process. It would save you
> > having to maintain the flush code and the move_backlog code separately, which
> > would be a nice savings.
>
> Yes, agreed, I should have spotted that myself. I'll look to rework this as soon as I can.
>
Ok, thanks.
> >
> > > + if (++wkr == d->num_workers)
> > > + wkr = 0;
> > Nit: wkr = ++wkr % d->num_workers avoids the additional branch in your loop
> >
> a) branch should be easily predictable as the number of workers doesn't change. So long as branch doesn't mis-predict there should be little or no perf penalty to having it.
> b) The compare plus update can also be done branchless using a "cmov" instruction if we want branch free code.
> c) The modulus operator is very slow and takes far more cycles than a branch would do. If we could limit the number of workers to a power of two, then an & operation would work nicely, but that is too big a restriction to have.
> So, in short, I think I'll keep this snippet as-is. :-)
>
Yup, you're right, confirmed it with perf. Thanks!
neil
^ permalink raw reply [flat|nested] 29+ messages in thread
* Re: [dpdk-dev] [PATCH v2 2/5] distributor: new packet distributor library
2014-06-03 11:01 ` Neil Horman
@ 2014-06-03 14:33 ` Richardson, Bruce
2014-06-03 14:51 ` Neil Horman
0 siblings, 1 reply; 29+ messages in thread
From: Richardson, Bruce @ 2014-06-03 14:33 UTC (permalink / raw)
To: Neil Horman; +Cc: dev
> -----Original Message-----
> From: Neil Horman [mailto:nhorman@tuxdriver.com]
> Sent: Tuesday, June 03, 2014 4:01 AM
> To: Richardson, Bruce
> Cc: dev@dpdk.org
> Subject: Re: [dpdk-dev] [PATCH v2 2/5] distributor: new packet distributor library
>
> On Mon, Jun 02, 2014 at 09:40:04PM +0000, Richardson, Bruce wrote:
> >
> >
> > > -----Original Message-----
> > > From: Neil Horman [mailto:nhorman@tuxdriver.com]
> > > Sent: Thursday, May 29, 2014 6:48 AM
> > > To: Richardson, Bruce
> > > Cc: dev@dpdk.org
> > > Subject: Re: [dpdk-dev] [PATCH v2 2/5] distributor: new packet distributor
> library
> > >
> > > > +
> > > > +/* flush the distributor, so that there are no outstanding packets in flight
> or
> > > > + * queued up. */
> > > Its not clear to me that this is a distributor only function. You modified the
> > > comments to indicate that lcores can't preform double duty as both a worker
> > > and
> > > a distributor, which is fine, but it implies that there is a clear distinction
> > > between functions that are 'worker' functions and 'distributor' functions.
> > > While its for the most part clear-ish (workers call rte_distributor_get_pkt and
> > > rte_distibutor_return_pkt, distibutors calls rte_distributor_create/process.
> > > This is in a grey area. the analogy I'm thinking of here are kernel
> workqueues.
> > > Theres a specific workqueue thread that processes the workqueue, but any
> > > process
> > > can sync or flush the workqueue, leading me to think this process can be
> called
> > > by a worker lcore.
> >
> > I can update comments here further, but I was hoping the way things were
> right now was clear enough. In the header and C files, I have the functions
> explicitly split up into distributor and worker function sets, with a big block of
> text in the header at the start of each section explaining the threading use of the
> follow functions.
> >
> Very well, we can let use be the determinant here. We can leave it as is, and
> if reports of lockups come in, we can change it, otherwise no harm done.
>
Since I'm not a big fan of the "let's wait for the lock-ups" approach, I'll add in a single-line addition to each function's doxygen comment that should make its way into the official API docs. :-)
^ permalink raw reply [flat|nested] 29+ messages in thread
* Re: [dpdk-dev] [PATCH v2 2/5] distributor: new packet distributor library
2014-06-03 14:33 ` Richardson, Bruce
@ 2014-06-03 14:51 ` Neil Horman
0 siblings, 0 replies; 29+ messages in thread
From: Neil Horman @ 2014-06-03 14:51 UTC (permalink / raw)
To: Richardson, Bruce; +Cc: dev
On Tue, Jun 03, 2014 at 02:33:16PM +0000, Richardson, Bruce wrote:
>
>
> > -----Original Message-----
> > From: Neil Horman [mailto:nhorman@tuxdriver.com]
> > Sent: Tuesday, June 03, 2014 4:01 AM
> > To: Richardson, Bruce
> > Cc: dev@dpdk.org
> > Subject: Re: [dpdk-dev] [PATCH v2 2/5] distributor: new packet distributor library
> >
> > On Mon, Jun 02, 2014 at 09:40:04PM +0000, Richardson, Bruce wrote:
> > >
> > >
> > > > -----Original Message-----
> > > > From: Neil Horman [mailto:nhorman@tuxdriver.com]
> > > > Sent: Thursday, May 29, 2014 6:48 AM
> > > > To: Richardson, Bruce
> > > > Cc: dev@dpdk.org
> > > > Subject: Re: [dpdk-dev] [PATCH v2 2/5] distributor: new packet distributor
> > library
> > > >
> > > > > +
> > > > > +/* flush the distributor, so that there are no outstanding packets in flight
> > or
> > > > > + * queued up. */
> > > > Its not clear to me that this is a distributor only function. You modified the
> > > > comments to indicate that lcores can't preform double duty as both a worker
> > > > and
> > > > a distributor, which is fine, but it implies that there is a clear distinction
> > > > between functions that are 'worker' functions and 'distributor' functions.
> > > > While its for the most part clear-ish (workers call rte_distributor_get_pkt and
> > > > rte_distibutor_return_pkt, distibutors calls rte_distributor_create/process.
> > > > This is in a grey area. the analogy I'm thinking of here are kernel
> > workqueues.
> > > > Theres a specific workqueue thread that processes the workqueue, but any
> > > > process
> > > > can sync or flush the workqueue, leading me to think this process can be
> > called
> > > > by a worker lcore.
> > >
> > > I can update comments here further, but I was hoping the way things were
> > right now was clear enough. In the header and C files, I have the functions
> > explicitly split up into distributor and worker function sets, with a big block of
> > text in the header at the start of each section explaining the threading use of the
> > follow functions.
> > >
> > Very well, we can let use be the determinant here. We can leave it as is, and
> > if reports of lockups come in, we can change it, otherwise no harm done.
> >
> Since I'm not a big fan of the "let's wait for the lock-ups" approach, I'll add in a single-line addition to each function's doxygen comment that should make its way into the official API docs. :-)
>
If you're planning on collapsing the flush routine into an iterative call to
distributor_process anyway, then, sure, I'd appreciate it.
Thanks!
Neil
^ permalink raw reply [flat|nested] 29+ messages in thread
* [dpdk-dev] [PATCH v3 2/5] distributor: new packet distributor library
2014-05-29 10:12 ` [dpdk-dev] [PATCH v2 2/5] distributor: new packet distributor library Bruce Richardson
2014-05-29 13:48 ` Neil Horman
@ 2014-06-03 18:04 ` Bruce Richardson
2014-06-03 18:38 ` Neil Horman
1 sibling, 1 reply; 29+ messages in thread
From: Bruce Richardson @ 2014-06-03 18:04 UTC (permalink / raw)
To: dev
This adds the code for a new Intel DPDK library for packet distribution.
The distributor is a component which is designed to pass packets
one-at-a-time to workers, with dynamic load balancing. Using the RSS
field in the mbuf as a tag, the distributor tracks what packet tag is
being processed by what worker and then ensures that no two packets with
the same tag are in-flight simultaneously. Once a tag is not in-flight,
then the next packet with that tag will be sent to the next available
core.
Changes in V2 patch:
* added support for a partial distributor flush when process() API
called without any new mbufs
* Removed unused "future use" parameters from functions
* Improved comments to be clearer about thread safety
* Add locks around the tailq add in create() API fn
* Stylistic improvements for issues flagged by checkpatch
Changes in V3 patch:
* Flush function rewritten as calls to process API
* Additional doxygen comments on thread-safety of APIs
Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
lib/librte_distributor/Makefile | 50 ++++
lib/librte_distributor/rte_distributor.c | 425 +++++++++++++++++++++++++++++++
lib/librte_distributor/rte_distributor.h | 194 ++++++++++++++
3 files changed, 669 insertions(+)
create mode 100644 lib/librte_distributor/Makefile
create mode 100644 lib/librte_distributor/rte_distributor.c
create mode 100644 lib/librte_distributor/rte_distributor.h
diff --git a/lib/librte_distributor/Makefile b/lib/librte_distributor/Makefile
new file mode 100644
index 0000000..36699f8
--- /dev/null
+++ b/lib/librte_distributor/Makefile
@@ -0,0 +1,50 @@
+# BSD LICENSE
+#
+# Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Intel Corporation nor the names of its
+# contributors may be used to endorse or promote products derived
+# from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# library name
+LIB = librte_distributor.a
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
+
+# all source are stored in SRCS-y
+SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) := rte_distributor.c
+
+# install this header file
+SYMLINK-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR)-include := rte_distributor.h
+
+# this lib needs eal
+DEPDIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += lib/librte_eal
+DEPDIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += lib/librte_mbuf
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c
new file mode 100644
index 0000000..5eee442
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor.c
@@ -0,0 +1,425 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <sys/queue.h>
+#include <string.h>
+#include <rte_mbuf.h>
+#include <rte_memzone.h>
+#include <rte_errno.h>
+#include <rte_string_fns.h>
+#include <rte_tailq.h>
+#include <rte_eal_memconfig.h>
+#include "rte_distributor.h"
+
+#define NO_FLAGS 0
+#define RTE_DISTRIB_PREFIX "DT_"
+
+/* we will use the bottom four bits of pointer for flags, shifting out
+ * the top four bits to make room (since a 64-bit pointer actually only uses
+ * 48 bits). An arithmetic-right-shift will then appropriately restore the
+ * original pointer value with proper sign extension into the top bits. */
+#define RTE_DISTRIB_FLAG_BITS 4
+#define RTE_DISTRIB_FLAGS_MASK (0x0F)
+#define RTE_DISTRIB_NO_BUF 0 /**< empty flags: no buffer requested */
+#define RTE_DISTRIB_GET_BUF (1) /**< worker requests a buffer, returns old */
+#define RTE_DISTRIB_RETURN_BUF (2) /**< worker returns a buffer, no request */
+
+#define RTE_DISTRIB_BACKLOG_SIZE 8
+#define RTE_DISTRIB_BACKLOG_MASK (RTE_DISTRIB_BACKLOG_SIZE - 1)
+
+#define RTE_DISTRIB_MAX_RETURNS 128
+#define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1)
+
+/**
+ * Buffer structure used to pass the pointer data between cores. This is cache
+ * line aligned, but to improve performance and prevent adjacent cache-line
+ * prefetches of buffers for other workers, e.g. when worker 1's buffer is on
+ * the next cache line to worker 0, we pad this out to three cache lines.
+ * Only 64-bits of the memory is actually used though.
+ */
+union rte_distributor_buffer {
+ volatile int64_t bufptr64;
+ char pad[CACHE_LINE_SIZE*3];
+} __rte_cache_aligned;
+
+struct rte_distributor_backlog {
+ unsigned start;
+ unsigned count;
+ int64_t pkts[RTE_DISTRIB_BACKLOG_SIZE];
+};
+
+struct rte_distributor_returned_pkts {
+ unsigned start;
+ unsigned count;
+ struct rte_mbuf *mbufs[RTE_DISTRIB_MAX_RETURNS];
+};
+
+struct rte_distributor {
+ TAILQ_ENTRY(rte_distributor) next; /**< Next in list. */
+
+ char name[RTE_DISTRIBUTOR_NAMESIZE]; /**< Name of the ring. */
+ unsigned num_workers; /**< Number of workers polling */
+
+ uint32_t in_flight_tags[RTE_MAX_LCORE];
+ struct rte_distributor_backlog backlog[RTE_MAX_LCORE];
+
+ union rte_distributor_buffer bufs[RTE_MAX_LCORE];
+
+ struct rte_distributor_returned_pkts returns;
+};
+
+TAILQ_HEAD(rte_distributor_list, rte_distributor);
+
+/**** APIs called by workers ****/
+
+struct rte_mbuf *
+rte_distributor_get_pkt(struct rte_distributor *d,
+ unsigned worker_id, struct rte_mbuf *oldpkt)
+{
+ union rte_distributor_buffer *buf = &d->bufs[worker_id];
+ int64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
+ | RTE_DISTRIB_GET_BUF;
+ while (unlikely(buf->bufptr64 & RTE_DISTRIB_FLAGS_MASK))
+ rte_pause();
+ buf->bufptr64 = req;
+ while (buf->bufptr64 & RTE_DISTRIB_GET_BUF)
+ rte_pause();
+ /* since bufptr64 is signed, this should be an arithmetic shift */
+ int64_t ret = buf->bufptr64 >> RTE_DISTRIB_FLAG_BITS;
+ return (struct rte_mbuf *)((uintptr_t)ret);
+}
+
+int
+rte_distributor_return_pkt(struct rte_distributor *d,
+ unsigned worker_id, struct rte_mbuf *oldpkt)
+{
+ union rte_distributor_buffer *buf = &d->bufs[worker_id];
+ uint64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
+ | RTE_DISTRIB_RETURN_BUF;
+ buf->bufptr64 = req;
+ return 0;
+}
+
+/**** APIs called on distributor core ***/
+
+/* as name suggests, adds a packet to the backlog for a particular worker */
+static int
+add_to_backlog(struct rte_distributor_backlog *bl, int64_t item)
+{
+ if (bl->count == RTE_DISTRIB_BACKLOG_SIZE)
+ return -1;
+
+ bl->pkts[(bl->start + bl->count++) & (RTE_DISTRIB_BACKLOG_MASK)]
+ = item;
+ return 0;
+}
+
+/* takes the next packet for a worker off the backlog */
+static int64_t
+backlog_pop(struct rte_distributor_backlog *bl)
+{
+ bl->count--;
+ return bl->pkts[bl->start++ & RTE_DISTRIB_BACKLOG_MASK];
+}
+
+/* stores a packet returned from a worker inside the returns array */
+static inline void
+store_return(uintptr_t oldbuf, struct rte_distributor *d,
+ unsigned *ret_start, unsigned *ret_count)
+{
+ /* store returns in a circular buffer - code is branch-free */
+ d->returns.mbufs[(*ret_start + *ret_count) & RTE_DISTRIB_RETURNS_MASK]
+ = (void *)oldbuf;
+ *ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
+ *ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
+}
+
+static inline void
+handle_worker_shutdown(struct rte_distributor *d, unsigned wkr)
+{
+ d->in_flight_tags[wkr] = 0;
+ d->bufs[wkr].bufptr64 = 0;
+ if (unlikely(d->backlog[wkr].count != 0)) {
+ /* On return of a packet, we need to move the
+ * queued packets for this core elsewhere.
+ * Easiest solution is to set things up for
+ * a recursive call. That will cause those
+ * packets to be queued up for the next free
+ * core, i.e. it will return as soon as a
+ * core becomes free to accept the first
+ * packet, as subsequent ones will be added to
+ * the backlog for that core.
+ */
+ struct rte_mbuf *pkts[RTE_DISTRIB_BACKLOG_SIZE];
+ unsigned i;
+ struct rte_distributor_backlog *bl = &d->backlog[wkr];
+
+ for (i = 0; i < bl->count; i++) {
+ unsigned idx = (bl->start + i) &
+ RTE_DISTRIB_BACKLOG_MASK;
+ pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
+ RTE_DISTRIB_FLAG_BITS));
+ }
+ /* recursive call */
+ rte_distributor_process(d, pkts, i);
+ bl->count = bl->start = 0;
+ }
+}
+
+/* this function is called when process() fn is called without any new
+ * packets. It goes through all the workers and clears any returned packets
+ * to do a partial flush.
+ */
+static int
+process_returns(struct rte_distributor *d)
+{
+ unsigned wkr;
+ unsigned flushed = 0;
+ unsigned ret_start = d->returns.start,
+ ret_count = d->returns.count;
+
+ for (wkr = 0; wkr < d->num_workers; wkr++) {
+
+ const int64_t data = d->bufs[wkr].bufptr64;
+ uintptr_t oldbuf = 0;
+
+ if (data & RTE_DISTRIB_GET_BUF) {
+ flushed++;
+ if (d->backlog[wkr].count)
+ d->bufs[wkr].bufptr64 =
+ backlog_pop(&d->backlog[wkr]);
+ else {
+ d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF;
+ d->in_flight_tags[wkr] = 0;
+ }
+ oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
+ } else if (data & RTE_DISTRIB_RETURN_BUF) {
+ handle_worker_shutdown(d, wkr);
+ oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
+ }
+
+ store_return(oldbuf, d, &ret_start, &ret_count);
+ }
+
+ d->returns.start = ret_start;
+ d->returns.count = ret_count;
+
+ return flushed;
+}
+
+/* process a set of packets to distribute them to workers */
+int
+rte_distributor_process(struct rte_distributor *d,
+ struct rte_mbuf **mbufs, unsigned num_mbufs)
+{
+ unsigned next_idx = 0;
+ unsigned wkr = 0;
+ struct rte_mbuf *next_mb = NULL;
+ int64_t next_value = 0;
+ uint32_t new_tag = 0;
+ unsigned ret_start = d->returns.start,
+ ret_count = d->returns.count;
+
+ if (unlikely(num_mbufs == 0))
+ return process_returns(d);
+
+ while (next_idx < num_mbufs || next_mb != NULL) {
+
+ int64_t data = d->bufs[wkr].bufptr64;
+ uintptr_t oldbuf = 0;
+
+ if (!next_mb) {
+ next_mb = mbufs[next_idx++];
+ next_value = (((int64_t)(uintptr_t)next_mb)
+ << RTE_DISTRIB_FLAG_BITS);
+ new_tag = (next_mb->pkt.hash.rss | 1);
+
+ uint32_t match = 0;
+ unsigned i;
+ for (i = 0; i < d->num_workers; i++)
+ match |= (!(d->in_flight_tags[i] ^ new_tag)
+ << i);
+
+ if (match) {
+ next_mb = NULL;
+ unsigned worker = __builtin_ctz(match);
+ if (add_to_backlog(&d->backlog[worker],
+ next_value) < 0)
+ next_idx--;
+ }
+ }
+
+ if ((data & RTE_DISTRIB_GET_BUF) &&
+ (d->backlog[wkr].count || next_mb)) {
+
+ if (d->backlog[wkr].count)
+ d->bufs[wkr].bufptr64 =
+ backlog_pop(&d->backlog[wkr]);
+
+ else {
+ d->bufs[wkr].bufptr64 = next_value;
+ d->in_flight_tags[wkr] = new_tag;
+ next_mb = NULL;
+ }
+ oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
+ } else if (data & RTE_DISTRIB_RETURN_BUF) {
+ handle_worker_shutdown(d, wkr);
+ oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
+ }
+
+ /* store returns in a circular buffer */
+ store_return(oldbuf, d, &ret_start, &ret_count);
+
+ if (++wkr == d->num_workers)
+ wkr = 0;
+ }
+ /* to finish, check all workers for backlog and schedule work for them
+ * if they are ready */
+ for (wkr = 0; wkr < d->num_workers; wkr++)
+ if (d->backlog[wkr].count &&
+ (d->bufs[wkr].bufptr64 & RTE_DISTRIB_GET_BUF)) {
+
+ int64_t oldbuf = d->bufs[wkr].bufptr64 >>
+ RTE_DISTRIB_FLAG_BITS;
+ store_return(oldbuf, d, &ret_start, &ret_count);
+
+ d->bufs[wkr].bufptr64 = backlog_pop(&d->backlog[wkr]);
+ }
+
+ d->returns.start = ret_start;
+ d->returns.count = ret_count;
+ return num_mbufs;
+}
+
+/* return to the caller, packets returned from workers */
+int
+rte_distributor_returned_pkts(struct rte_distributor *d,
+ struct rte_mbuf **mbufs, unsigned max_mbufs)
+{
+ struct rte_distributor_returned_pkts *returns = &d->returns;
+ unsigned retval = (max_mbufs < returns->count) ?
+ max_mbufs : returns->count;
+ unsigned i;
+
+ for (i = 0; i < retval; i++) {
+ unsigned idx = (returns->start + i) & RTE_DISTRIB_RETURNS_MASK;
+ mbufs[i] = returns->mbufs[idx];
+ }
+ returns->start += i;
+ returns->count -= i;
+
+ return retval;
+}
+
+/* return the number of packets in-flight in a distributor, i.e. packets
+ * being workered on or queued up in a backlog. */
+static inline unsigned
+total_outstanding(const struct rte_distributor *d)
+{
+ unsigned wkr, total_outstanding = 0;
+
+ for (wkr = 0; wkr < d->num_workers; wkr++)
+ total_outstanding += d->backlog[wkr].count +
+ !!(d->in_flight_tags[wkr]);
+ return total_outstanding;
+}
+
+/* flush the distributor, so that there are no outstanding packets in flight or
+ * queued up. */
+int
+rte_distributor_flush(struct rte_distributor *d)
+{
+ const unsigned flushed = total_outstanding(d);
+
+ while (total_outstanding(d) > 0)
+ rte_distributor_process(d, NULL, 0);
+
+ return flushed;
+}
+
+/* clears the internal returns array in the distributor */
+void
+rte_distributor_clear_returns(struct rte_distributor *d)
+{
+ d->returns.start = d->returns.count = 0;
+#ifndef __OPTIMIZE__
+ memset(d->returns.mbufs, 0, sizeof(d->returns.mbufs));
+#endif
+}
+
+/* creates a distributor instance */
+struct rte_distributor *
+rte_distributor_create(const char *name,
+ unsigned socket_id,
+ unsigned num_workers)
+{
+ struct rte_distributor *d;
+ struct rte_distributor_list *distributor_list;
+ char mz_name[RTE_MEMZONE_NAMESIZE];
+ const struct rte_memzone *mz;
+
+ /* compilation-time checks */
+ RTE_BUILD_BUG_ON((sizeof(*d) & CACHE_LINE_MASK) != 0);
+ RTE_BUILD_BUG_ON((RTE_MAX_LCORE & 7) != 0);
+
+ if (name == NULL || num_workers >= RTE_MAX_LCORE) {
+ rte_errno = EINVAL;
+ return NULL;
+ }
+
+ /* check that we have an initialised tail queue */
+ distributor_list = RTE_TAILQ_LOOKUP_BY_IDX(RTE_TAILQ_DISTRIBUTOR,
+ rte_distributor_list);
+ if (distributor_list == NULL) {
+ rte_errno = E_RTE_NO_TAILQ;
+ return NULL;
+ }
+
+ rte_snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
+ mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
+ if (mz == NULL) {
+ rte_errno = ENOMEM;
+ return NULL;
+ }
+
+ d = mz->addr;
+ rte_snprintf(d->name, sizeof(d->name), "%s", name);
+ d->num_workers = num_workers;
+
+ rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+ TAILQ_INSERT_TAIL(distributor_list, d, next);
+ rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+ return d;
+}
diff --git a/lib/librte_distributor/rte_distributor.h b/lib/librte_distributor/rte_distributor.h
new file mode 100644
index 0000000..bc70f0f
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor.h
@@ -0,0 +1,194 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_DISTRIBUTE_H_
+#define _RTE_DISTRIBUTE_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <rte_mbuf.h>
+
+#define RTE_DISTRIBUTOR_NAMESIZE 32 /**< Length of name for instance */
+
+struct rte_distributor;
+
+/**
+ * Function to create a new distributor instance
+ *
+ * Reserves the memory needed for the distributor operation and
+ * initializes the distributor to work with the configured number of workers.
+ *
+ * @param name
+ * The name to be given to the distributor instance.
+ * @param socket_id
+ * The NUMA node on which the memory is to be allocated
+ * @param num_workers
+ * The maximum number of workers that will request packets from this
+ * distributor
+ * @return
+ * The newly created distributor instance
+ */
+struct rte_distributor *
+rte_distributor_create(const char *name, unsigned socket_id,
+ unsigned num_workers);
+
+/* *** APIS to be called on the distributor lcore *** */
+/*
+ * The following APIs are the public APIs which are designed for use on a
+ * single lcore which acts as the distributor lcore for a given distributor
+ * instance. These functions cannot be called on multiple cores simultaneously
+ * without using locking to protect access to the internals of the distributor.
+ *
+ * NOTE: a given lcore cannot act as both a distributor lcore and a worker lcore
+ * for the same distributor instance, otherwise deadlock will result.
+ */
+
+/**
+ * Process a set of packets by distributing them among workers that request
+ * packets. The distributor will ensure that no two packets that have the
+ * same flow id, or tag, in the mbuf will be procesed at the same time.
+ *
+ * This is not multi-thread safe and should only be called on a single lcore.
+ *
+ * @param d
+ * The distributor instance to be used
+ * @param mbufs
+ * The mbufs to be distributed
+ * @param num_mbufs
+ * The number of mbufs in the mbufs array
+ * @return
+ * The number of mbufs processed.
+ */
+int
+rte_distributor_process(struct rte_distributor *d,
+ struct rte_mbuf **mbufs, unsigned num_mbufs);
+
+/**
+ * Get a set of mbufs that have been returned to the distributor by workers
+ *
+ * This should only be called on the same lcore as rte_distributor_process()
+ *
+ * @param d
+ * The distributor instance to be used
+ * @param mbufs
+ * The mbufs pointer array to be filled in
+ * @param max_mbufs
+ * The size of the mbufs array
+ * @return
+ * The number of mbufs returned in the mbufs array.
+ */
+int
+rte_distributor_returned_pkts(struct rte_distributor *d,
+ struct rte_mbuf **mbufs, unsigned max_mbufs);
+
+/**
+ * Flush the distributor component, so that there are no in-flight or
+ * backlogged packets awaiting processing
+ *
+ * This should only be called on the same lcore as rte_distributor_process()
+ *
+ * @param d
+ * The distributor instance to be used
+ * @return
+ * The number of queued/in-flight packets that were completed by this call.
+ */
+int
+rte_distributor_flush(struct rte_distributor *d);
+
+/**
+ * Clears the array of returned packets used as the source for the
+ * rte_distributor_returned_pkts() API call.
+ *
+ * This should only be called on the same lcore as rte_distributor_process()
+ *
+ * @param d
+ * The distributor instance to be used
+ */
+void
+rte_distributor_clear_returns(struct rte_distributor *d);
+
+/* *** APIS to be called on the worker lcores *** */
+/*
+ * The following APIs are the public APIs which are designed for use on
+ * multiple lcores which act as workers for a distributor. Each lcore should use
+ * a unique worker id when requesting packets.
+ *
+ * NOTE: a given lcore cannot act as both a distributor lcore and a worker lcore
+ * for the same distributor instance, otherwise deadlock will result.
+ */
+
+/**
+ * API called by a worker to get a new packet to process. Any previous packet
+ * given to the worker is assumed to have completed processing, and may be
+ * optionally returned to the distributor via the oldpkt parameter.
+ *
+ * @param d
+ * The distributor instance to be used
+ * @param worker_id
+ * The worker instance number to use - must be less that num_workers passed
+ * at distributor creation time.
+ * @param oldpkt
+ * The previous packet, if any, being processed by the worker
+ *
+ * @return
+ * A new packet to be processed by the worker thread.
+ */
+struct rte_mbuf *
+rte_distributor_get_pkt(struct rte_distributor *d,
+ unsigned worker_id, struct rte_mbuf *oldpkt);
+
+/**
+ * API called by a worker to return a completed packet without requesting a
+ * new packet, for example, because a worker thread is shutting down
+ *
+ * @param d
+ * The distributor instance to be used
+ * @param worker_id
+ * The worker instance number to use - must be less that num_workers passed
+ * at distributor creation time.
+ * @param mbuf
+ * The previous packet being processed by the worker
+ */
+int
+rte_distributor_return_pkt(struct rte_distributor *d, unsigned worker_id,
+ struct rte_mbuf *mbuf);
+
+/******************************************/
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
--
1.9.3
^ permalink raw reply [flat|nested] 29+ messages in thread
* Re: [dpdk-dev] [PATCH v3 2/5] distributor: new packet distributor library
2014-06-03 18:04 ` [dpdk-dev] [PATCH v3 " Bruce Richardson
@ 2014-06-03 18:38 ` Neil Horman
0 siblings, 0 replies; 29+ messages in thread
From: Neil Horman @ 2014-06-03 18:38 UTC (permalink / raw)
To: Bruce Richardson; +Cc: dev
On Tue, Jun 03, 2014 at 07:04:12PM +0100, Bruce Richardson wrote:
> This adds the code for a new Intel DPDK library for packet distribution.
> The distributor is a component which is designed to pass packets
> one-at-a-time to workers, with dynamic load balancing. Using the RSS
> field in the mbuf as a tag, the distributor tracks what packet tag is
> being processed by what worker and then ensures that no two packets with
> the same tag are in-flight simultaneously. Once a tag is not in-flight,
> then the next packet with that tag will be sent to the next available
> core.
>
> Changes in V2 patch:
> * added support for a partial distributor flush when process() API
> called without any new mbufs
> * Removed unused "future use" parameters from functions
> * Improved comments to be clearer about thread safety
> * Add locks around the tailq add in create() API fn
> * Stylistic improvements for issues flagged by checkpatch
>
> Changes in V3 patch:
> * Flush function rewritten as calls to process API
> * Additional doxygen comments on thread-safety of APIs
>
> Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> ---
> lib/librte_distributor/Makefile | 50 ++++
> lib/librte_distributor/rte_distributor.c | 425 +++++++++++++++++++++++++++++++
> lib/librte_distributor/rte_distributor.h | 194 ++++++++++++++
> 3 files changed, 669 insertions(+)
> create mode 100644 lib/librte_distributor/Makefile
> create mode 100644 lib/librte_distributor/rte_distributor.c
> create mode 100644 lib/librte_distributor/rte_distributor.h
>
> diff --git a/lib/librte_distributor/Makefile b/lib/librte_distributor/Makefile
> new file mode 100644
> index 0000000..36699f8
> --- /dev/null
> +++ b/lib/librte_distributor/Makefile
> @@ -0,0 +1,50 @@
> +# BSD LICENSE
> +#
> +# Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
> +# All rights reserved.
> +#
> +# Redistribution and use in source and binary forms, with or without
> +# modification, are permitted provided that the following conditions
> +# are met:
> +#
> +# * Redistributions of source code must retain the above copyright
> +# notice, this list of conditions and the following disclaimer.
> +# * Redistributions in binary form must reproduce the above copyright
> +# notice, this list of conditions and the following disclaimer in
> +# the documentation and/or other materials provided with the
> +# distribution.
> +# * Neither the name of Intel Corporation nor the names of its
> +# contributors may be used to endorse or promote products derived
> +# from this software without specific prior written permission.
> +#
> +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> +
> +include $(RTE_SDK)/mk/rte.vars.mk
> +
> +# library name
> +LIB = librte_distributor.a
> +
> +CFLAGS += -O3
> +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
> +
> +# all source are stored in SRCS-y
> +SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) := rte_distributor.c
> +
> +# install this header file
> +SYMLINK-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR)-include := rte_distributor.h
> +
> +# this lib needs eal
> +DEPDIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += lib/librte_eal
> +DEPDIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += lib/librte_mbuf
> +
> +include $(RTE_SDK)/mk/rte.lib.mk
> diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c
> new file mode 100644
> index 0000000..5eee442
> --- /dev/null
> +++ b/lib/librte_distributor/rte_distributor.c
> @@ -0,0 +1,425 @@
> +/*-
> + * BSD LICENSE
> + *
> + * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
> + * All rights reserved.
> + *
> + * Redistribution and use in source and binary forms, with or without
> + * modification, are permitted provided that the following conditions
> + * are met:
> + *
> + * * Redistributions of source code must retain the above copyright
> + * notice, this list of conditions and the following disclaimer.
> + * * Redistributions in binary form must reproduce the above copyright
> + * notice, this list of conditions and the following disclaimer in
> + * the documentation and/or other materials provided with the
> + * distribution.
> + * * Neither the name of Intel Corporation nor the names of its
> + * contributors may be used to endorse or promote products derived
> + * from this software without specific prior written permission.
> + *
> + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> + */
> +
> +#include <stdio.h>
> +#include <sys/queue.h>
> +#include <string.h>
> +#include <rte_mbuf.h>
> +#include <rte_memzone.h>
> +#include <rte_errno.h>
> +#include <rte_string_fns.h>
> +#include <rte_tailq.h>
> +#include <rte_eal_memconfig.h>
> +#include "rte_distributor.h"
> +
> +#define NO_FLAGS 0
> +#define RTE_DISTRIB_PREFIX "DT_"
> +
> +/* we will use the bottom four bits of pointer for flags, shifting out
> + * the top four bits to make room (since a 64-bit pointer actually only uses
> + * 48 bits). An arithmetic-right-shift will then appropriately restore the
> + * original pointer value with proper sign extension into the top bits. */
> +#define RTE_DISTRIB_FLAG_BITS 4
> +#define RTE_DISTRIB_FLAGS_MASK (0x0F)
> +#define RTE_DISTRIB_NO_BUF 0 /**< empty flags: no buffer requested */
> +#define RTE_DISTRIB_GET_BUF (1) /**< worker requests a buffer, returns old */
> +#define RTE_DISTRIB_RETURN_BUF (2) /**< worker returns a buffer, no request */
> +
> +#define RTE_DISTRIB_BACKLOG_SIZE 8
> +#define RTE_DISTRIB_BACKLOG_MASK (RTE_DISTRIB_BACKLOG_SIZE - 1)
> +
> +#define RTE_DISTRIB_MAX_RETURNS 128
> +#define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1)
> +
> +/**
> + * Buffer structure used to pass the pointer data between cores. This is cache
> + * line aligned, but to improve performance and prevent adjacent cache-line
> + * prefetches of buffers for other workers, e.g. when worker 1's buffer is on
> + * the next cache line to worker 0, we pad this out to three cache lines.
> + * Only 64-bits of the memory is actually used though.
> + */
> +union rte_distributor_buffer {
> + volatile int64_t bufptr64;
> + char pad[CACHE_LINE_SIZE*3];
> +} __rte_cache_aligned;
> +
> +struct rte_distributor_backlog {
> + unsigned start;
> + unsigned count;
> + int64_t pkts[RTE_DISTRIB_BACKLOG_SIZE];
> +};
> +
> +struct rte_distributor_returned_pkts {
> + unsigned start;
> + unsigned count;
> + struct rte_mbuf *mbufs[RTE_DISTRIB_MAX_RETURNS];
> +};
> +
> +struct rte_distributor {
> + TAILQ_ENTRY(rte_distributor) next; /**< Next in list. */
> +
> + char name[RTE_DISTRIBUTOR_NAMESIZE]; /**< Name of the ring. */
> + unsigned num_workers; /**< Number of workers polling */
> +
> + uint32_t in_flight_tags[RTE_MAX_LCORE];
> + struct rte_distributor_backlog backlog[RTE_MAX_LCORE];
> +
> + union rte_distributor_buffer bufs[RTE_MAX_LCORE];
> +
> + struct rte_distributor_returned_pkts returns;
> +};
> +
> +TAILQ_HEAD(rte_distributor_list, rte_distributor);
> +
> +/**** APIs called by workers ****/
> +
> +struct rte_mbuf *
> +rte_distributor_get_pkt(struct rte_distributor *d,
> + unsigned worker_id, struct rte_mbuf *oldpkt)
> +{
> + union rte_distributor_buffer *buf = &d->bufs[worker_id];
> + int64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
> + | RTE_DISTRIB_GET_BUF;
> + while (unlikely(buf->bufptr64 & RTE_DISTRIB_FLAGS_MASK))
> + rte_pause();
> + buf->bufptr64 = req;
> + while (buf->bufptr64 & RTE_DISTRIB_GET_BUF)
> + rte_pause();
> + /* since bufptr64 is signed, this should be an arithmetic shift */
> + int64_t ret = buf->bufptr64 >> RTE_DISTRIB_FLAG_BITS;
> + return (struct rte_mbuf *)((uintptr_t)ret);
> +}
> +
> +int
> +rte_distributor_return_pkt(struct rte_distributor *d,
> + unsigned worker_id, struct rte_mbuf *oldpkt)
> +{
> + union rte_distributor_buffer *buf = &d->bufs[worker_id];
> + uint64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
> + | RTE_DISTRIB_RETURN_BUF;
> + buf->bufptr64 = req;
> + return 0;
> +}
> +
> +/**** APIs called on distributor core ***/
> +
> +/* as name suggests, adds a packet to the backlog for a particular worker */
> +static int
> +add_to_backlog(struct rte_distributor_backlog *bl, int64_t item)
> +{
> + if (bl->count == RTE_DISTRIB_BACKLOG_SIZE)
> + return -1;
> +
> + bl->pkts[(bl->start + bl->count++) & (RTE_DISTRIB_BACKLOG_MASK)]
> + = item;
> + return 0;
> +}
> +
> +/* takes the next packet for a worker off the backlog */
> +static int64_t
> +backlog_pop(struct rte_distributor_backlog *bl)
> +{
> + bl->count--;
> + return bl->pkts[bl->start++ & RTE_DISTRIB_BACKLOG_MASK];
> +}
> +
> +/* stores a packet returned from a worker inside the returns array */
> +static inline void
> +store_return(uintptr_t oldbuf, struct rte_distributor *d,
> + unsigned *ret_start, unsigned *ret_count)
> +{
> + /* store returns in a circular buffer - code is branch-free */
> + d->returns.mbufs[(*ret_start + *ret_count) & RTE_DISTRIB_RETURNS_MASK]
> + = (void *)oldbuf;
> + *ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
> + *ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
> +}
> +
> +static inline void
> +handle_worker_shutdown(struct rte_distributor *d, unsigned wkr)
> +{
> + d->in_flight_tags[wkr] = 0;
> + d->bufs[wkr].bufptr64 = 0;
> + if (unlikely(d->backlog[wkr].count != 0)) {
> + /* On return of a packet, we need to move the
> + * queued packets for this core elsewhere.
> + * Easiest solution is to set things up for
> + * a recursive call. That will cause those
> + * packets to be queued up for the next free
> + * core, i.e. it will return as soon as a
> + * core becomes free to accept the first
> + * packet, as subsequent ones will be added to
> + * the backlog for that core.
> + */
> + struct rte_mbuf *pkts[RTE_DISTRIB_BACKLOG_SIZE];
> + unsigned i;
> + struct rte_distributor_backlog *bl = &d->backlog[wkr];
> +
> + for (i = 0; i < bl->count; i++) {
> + unsigned idx = (bl->start + i) &
> + RTE_DISTRIB_BACKLOG_MASK;
> + pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
> + RTE_DISTRIB_FLAG_BITS));
> + }
> + /* recursive call */
> + rte_distributor_process(d, pkts, i);
> + bl->count = bl->start = 0;
> + }
> +}
> +
> +/* this function is called when process() fn is called without any new
> + * packets. It goes through all the workers and clears any returned packets
> + * to do a partial flush.
> + */
> +static int
> +process_returns(struct rte_distributor *d)
> +{
> + unsigned wkr;
> + unsigned flushed = 0;
> + unsigned ret_start = d->returns.start,
> + ret_count = d->returns.count;
> +
> + for (wkr = 0; wkr < d->num_workers; wkr++) {
> +
> + const int64_t data = d->bufs[wkr].bufptr64;
> + uintptr_t oldbuf = 0;
> +
> + if (data & RTE_DISTRIB_GET_BUF) {
> + flushed++;
> + if (d->backlog[wkr].count)
> + d->bufs[wkr].bufptr64 =
> + backlog_pop(&d->backlog[wkr]);
> + else {
> + d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF;
> + d->in_flight_tags[wkr] = 0;
> + }
> + oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> + } else if (data & RTE_DISTRIB_RETURN_BUF) {
> + handle_worker_shutdown(d, wkr);
> + oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> + }
> +
> + store_return(oldbuf, d, &ret_start, &ret_count);
> + }
> +
> + d->returns.start = ret_start;
> + d->returns.count = ret_count;
> +
> + return flushed;
> +}
> +
> +/* process a set of packets to distribute them to workers */
> +int
> +rte_distributor_process(struct rte_distributor *d,
> + struct rte_mbuf **mbufs, unsigned num_mbufs)
> +{
> + unsigned next_idx = 0;
> + unsigned wkr = 0;
> + struct rte_mbuf *next_mb = NULL;
> + int64_t next_value = 0;
> + uint32_t new_tag = 0;
> + unsigned ret_start = d->returns.start,
> + ret_count = d->returns.count;
> +
> + if (unlikely(num_mbufs == 0))
> + return process_returns(d);
> +
> + while (next_idx < num_mbufs || next_mb != NULL) {
> +
> + int64_t data = d->bufs[wkr].bufptr64;
> + uintptr_t oldbuf = 0;
> +
> + if (!next_mb) {
> + next_mb = mbufs[next_idx++];
> + next_value = (((int64_t)(uintptr_t)next_mb)
> + << RTE_DISTRIB_FLAG_BITS);
> + new_tag = (next_mb->pkt.hash.rss | 1);
> +
> + uint32_t match = 0;
> + unsigned i;
> + for (i = 0; i < d->num_workers; i++)
> + match |= (!(d->in_flight_tags[i] ^ new_tag)
> + << i);
> +
> + if (match) {
> + next_mb = NULL;
> + unsigned worker = __builtin_ctz(match);
> + if (add_to_backlog(&d->backlog[worker],
> + next_value) < 0)
> + next_idx--;
> + }
> + }
> +
> + if ((data & RTE_DISTRIB_GET_BUF) &&
> + (d->backlog[wkr].count || next_mb)) {
> +
> + if (d->backlog[wkr].count)
> + d->bufs[wkr].bufptr64 =
> + backlog_pop(&d->backlog[wkr]);
> +
> + else {
> + d->bufs[wkr].bufptr64 = next_value;
> + d->in_flight_tags[wkr] = new_tag;
> + next_mb = NULL;
> + }
> + oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> + } else if (data & RTE_DISTRIB_RETURN_BUF) {
> + handle_worker_shutdown(d, wkr);
> + oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> + }
> +
> + /* store returns in a circular buffer */
> + store_return(oldbuf, d, &ret_start, &ret_count);
> +
> + if (++wkr == d->num_workers)
> + wkr = 0;
> + }
> + /* to finish, check all workers for backlog and schedule work for them
> + * if they are ready */
> + for (wkr = 0; wkr < d->num_workers; wkr++)
> + if (d->backlog[wkr].count &&
> + (d->bufs[wkr].bufptr64 & RTE_DISTRIB_GET_BUF)) {
> +
> + int64_t oldbuf = d->bufs[wkr].bufptr64 >>
> + RTE_DISTRIB_FLAG_BITS;
> + store_return(oldbuf, d, &ret_start, &ret_count);
> +
> + d->bufs[wkr].bufptr64 = backlog_pop(&d->backlog[wkr]);
> + }
> +
> + d->returns.start = ret_start;
> + d->returns.count = ret_count;
> + return num_mbufs;
> +}
> +
> +/* return to the caller, packets returned from workers */
> +int
> +rte_distributor_returned_pkts(struct rte_distributor *d,
> + struct rte_mbuf **mbufs, unsigned max_mbufs)
> +{
> + struct rte_distributor_returned_pkts *returns = &d->returns;
> + unsigned retval = (max_mbufs < returns->count) ?
> + max_mbufs : returns->count;
> + unsigned i;
> +
> + for (i = 0; i < retval; i++) {
> + unsigned idx = (returns->start + i) & RTE_DISTRIB_RETURNS_MASK;
> + mbufs[i] = returns->mbufs[idx];
> + }
> + returns->start += i;
> + returns->count -= i;
> +
> + return retval;
> +}
> +
> +/* return the number of packets in-flight in a distributor, i.e. packets
> + * being workered on or queued up in a backlog. */
> +static inline unsigned
> +total_outstanding(const struct rte_distributor *d)
> +{
> + unsigned wkr, total_outstanding = 0;
> +
> + for (wkr = 0; wkr < d->num_workers; wkr++)
> + total_outstanding += d->backlog[wkr].count +
> + !!(d->in_flight_tags[wkr]);
> + return total_outstanding;
> +}
> +
> +/* flush the distributor, so that there are no outstanding packets in flight or
> + * queued up. */
> +int
> +rte_distributor_flush(struct rte_distributor *d)
> +{
> + const unsigned flushed = total_outstanding(d);
> +
> + while (total_outstanding(d) > 0)
> + rte_distributor_process(d, NULL, 0);
> +
> + return flushed;
> +}
> +
> +/* clears the internal returns array in the distributor */
> +void
> +rte_distributor_clear_returns(struct rte_distributor *d)
> +{
> + d->returns.start = d->returns.count = 0;
> +#ifndef __OPTIMIZE__
> + memset(d->returns.mbufs, 0, sizeof(d->returns.mbufs));
> +#endif
> +}
> +
> +/* creates a distributor instance */
> +struct rte_distributor *
> +rte_distributor_create(const char *name,
> + unsigned socket_id,
> + unsigned num_workers)
> +{
> + struct rte_distributor *d;
> + struct rte_distributor_list *distributor_list;
> + char mz_name[RTE_MEMZONE_NAMESIZE];
> + const struct rte_memzone *mz;
> +
> + /* compilation-time checks */
> + RTE_BUILD_BUG_ON((sizeof(*d) & CACHE_LINE_MASK) != 0);
> + RTE_BUILD_BUG_ON((RTE_MAX_LCORE & 7) != 0);
> +
> + if (name == NULL || num_workers >= RTE_MAX_LCORE) {
> + rte_errno = EINVAL;
> + return NULL;
> + }
> +
> + /* check that we have an initialised tail queue */
> + distributor_list = RTE_TAILQ_LOOKUP_BY_IDX(RTE_TAILQ_DISTRIBUTOR,
> + rte_distributor_list);
> + if (distributor_list == NULL) {
> + rte_errno = E_RTE_NO_TAILQ;
> + return NULL;
> + }
> +
> + rte_snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
> + mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
> + if (mz == NULL) {
> + rte_errno = ENOMEM;
> + return NULL;
> + }
> +
> + d = mz->addr;
> + rte_snprintf(d->name, sizeof(d->name), "%s", name);
> + d->num_workers = num_workers;
> +
> + rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
> + TAILQ_INSERT_TAIL(distributor_list, d, next);
> + rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
> +
> + return d;
> +}
> diff --git a/lib/librte_distributor/rte_distributor.h b/lib/librte_distributor/rte_distributor.h
> new file mode 100644
> index 0000000..bc70f0f
> --- /dev/null
> +++ b/lib/librte_distributor/rte_distributor.h
> @@ -0,0 +1,194 @@
> +/*-
> + * BSD LICENSE
> + *
> + * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
> + * All rights reserved.
> + *
> + * Redistribution and use in source and binary forms, with or without
> + * modification, are permitted provided that the following conditions
> + * are met:
> + *
> + * * Redistributions of source code must retain the above copyright
> + * notice, this list of conditions and the following disclaimer.
> + * * Redistributions in binary form must reproduce the above copyright
> + * notice, this list of conditions and the following disclaimer in
> + * the documentation and/or other materials provided with the
> + * distribution.
> + * * Neither the name of Intel Corporation nor the names of its
> + * contributors may be used to endorse or promote products derived
> + * from this software without specific prior written permission.
> + *
> + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> + */
> +
> +#ifndef _RTE_DISTRIBUTE_H_
> +#define _RTE_DISTRIBUTE_H_
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <rte_mbuf.h>
> +
> +#define RTE_DISTRIBUTOR_NAMESIZE 32 /**< Length of name for instance */
> +
> +struct rte_distributor;
> +
> +/**
> + * Function to create a new distributor instance
> + *
> + * Reserves the memory needed for the distributor operation and
> + * initializes the distributor to work with the configured number of workers.
> + *
> + * @param name
> + * The name to be given to the distributor instance.
> + * @param socket_id
> + * The NUMA node on which the memory is to be allocated
> + * @param num_workers
> + * The maximum number of workers that will request packets from this
> + * distributor
> + * @return
> + * The newly created distributor instance
> + */
> +struct rte_distributor *
> +rte_distributor_create(const char *name, unsigned socket_id,
> + unsigned num_workers);
> +
> +/* *** APIS to be called on the distributor lcore *** */
> +/*
> + * The following APIs are the public APIs which are designed for use on a
> + * single lcore which acts as the distributor lcore for a given distributor
> + * instance. These functions cannot be called on multiple cores simultaneously
> + * without using locking to protect access to the internals of the distributor.
> + *
> + * NOTE: a given lcore cannot act as both a distributor lcore and a worker lcore
> + * for the same distributor instance, otherwise deadlock will result.
> + */
> +
> +/**
> + * Process a set of packets by distributing them among workers that request
> + * packets. The distributor will ensure that no two packets that have the
> + * same flow id, or tag, in the mbuf will be procesed at the same time.
> + *
> + * This is not multi-thread safe and should only be called on a single lcore.
> + *
> + * @param d
> + * The distributor instance to be used
> + * @param mbufs
> + * The mbufs to be distributed
> + * @param num_mbufs
> + * The number of mbufs in the mbufs array
> + * @return
> + * The number of mbufs processed.
> + */
> +int
> +rte_distributor_process(struct rte_distributor *d,
> + struct rte_mbuf **mbufs, unsigned num_mbufs);
> +
> +/**
> + * Get a set of mbufs that have been returned to the distributor by workers
> + *
> + * This should only be called on the same lcore as rte_distributor_process()
> + *
> + * @param d
> + * The distributor instance to be used
> + * @param mbufs
> + * The mbufs pointer array to be filled in
> + * @param max_mbufs
> + * The size of the mbufs array
> + * @return
> + * The number of mbufs returned in the mbufs array.
> + */
> +int
> +rte_distributor_returned_pkts(struct rte_distributor *d,
> + struct rte_mbuf **mbufs, unsigned max_mbufs);
> +
> +/**
> + * Flush the distributor component, so that there are no in-flight or
> + * backlogged packets awaiting processing
> + *
> + * This should only be called on the same lcore as rte_distributor_process()
> + *
> + * @param d
> + * The distributor instance to be used
> + * @return
> + * The number of queued/in-flight packets that were completed by this call.
> + */
> +int
> +rte_distributor_flush(struct rte_distributor *d);
> +
> +/**
> + * Clears the array of returned packets used as the source for the
> + * rte_distributor_returned_pkts() API call.
> + *
> + * This should only be called on the same lcore as rte_distributor_process()
> + *
> + * @param d
> + * The distributor instance to be used
> + */
> +void
> +rte_distributor_clear_returns(struct rte_distributor *d);
> +
> +/* *** APIS to be called on the worker lcores *** */
> +/*
> + * The following APIs are the public APIs which are designed for use on
> + * multiple lcores which act as workers for a distributor. Each lcore should use
> + * a unique worker id when requesting packets.
> + *
> + * NOTE: a given lcore cannot act as both a distributor lcore and a worker lcore
> + * for the same distributor instance, otherwise deadlock will result.
> + */
> +
> +/**
> + * API called by a worker to get a new packet to process. Any previous packet
> + * given to the worker is assumed to have completed processing, and may be
> + * optionally returned to the distributor via the oldpkt parameter.
> + *
> + * @param d
> + * The distributor instance to be used
> + * @param worker_id
> + * The worker instance number to use - must be less that num_workers passed
> + * at distributor creation time.
> + * @param oldpkt
> + * The previous packet, if any, being processed by the worker
> + *
> + * @return
> + * A new packet to be processed by the worker thread.
> + */
> +struct rte_mbuf *
> +rte_distributor_get_pkt(struct rte_distributor *d,
> + unsigned worker_id, struct rte_mbuf *oldpkt);
> +
> +/**
> + * API called by a worker to return a completed packet without requesting a
> + * new packet, for example, because a worker thread is shutting down
> + *
> + * @param d
> + * The distributor instance to be used
> + * @param worker_id
> + * The worker instance number to use - must be less that num_workers passed
> + * at distributor creation time.
> + * @param mbuf
> + * The previous packet being processed by the worker
> + */
> +int
> +rte_distributor_return_pkt(struct rte_distributor *d, unsigned worker_id,
> + struct rte_mbuf *mbuf);
> +
> +/******************************************/
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif
> --
> 1.9.3
>
>
Acked-by: Neil Horman <nhorman@tuxdriver.com>
^ permalink raw reply [flat|nested] 29+ messages in thread
* [dpdk-dev] [PATCH v2 3/5] distributor: add distributor library to build
2014-05-20 10:00 [dpdk-dev] [PATCH 0/4] New library: rte_distributor Bruce Richardson
` (8 preceding siblings ...)
2014-05-29 10:12 ` [dpdk-dev] [PATCH v2 2/5] distributor: new packet distributor library Bruce Richardson
@ 2014-05-29 10:12 ` Bruce Richardson
2014-05-29 10:12 ` [dpdk-dev] [PATCH v2 4/5] distributor: add unit tests for distributor lib Bruce Richardson
2014-05-29 10:12 ` [dpdk-dev] [PATCH v2 5/5] docs: add distributor lib to API docs Bruce Richardson
11 siblings, 0 replies; 29+ messages in thread
From: Bruce Richardson @ 2014-05-29 10:12 UTC (permalink / raw)
To: dev
add new configuration settings to enable/disable the distributor library
and add makefile entry to compile it once enabled.
Changes in V2:
* Patch updated to use new common config files
Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
config/common_bsdapp | 6 ++++++
config/common_linuxapp | 5 +++++
lib/Makefile | 1 +
mk/rte.app.mk | 4 ++++
4 files changed, 16 insertions(+)
diff --git a/config/common_bsdapp b/config/common_bsdapp
index 2cc7b80..2af6191 100644
--- a/config/common_bsdapp
+++ b/config/common_bsdapp
@@ -300,3 +300,9 @@ CONFIG_RTE_APP_TEST=y
CONFIG_RTE_TEST_PMD=y
CONFIG_RTE_TEST_PMD_RECORD_CORE_CYCLES=n
CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
+
+#
+# Compile the distributor library
+#
+CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
+
diff --git a/config/common_linuxapp b/config/common_linuxapp
index 62619c6..1663289 100644
--- a/config/common_linuxapp
+++ b/config/common_linuxapp
@@ -337,3 +337,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
#
CONFIG_RTE_NIC_BYPASS=n
+#
+# Compile the distributor library
+#
+CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
+
diff --git a/lib/Makefile b/lib/Makefile
index b92b392..5a0b10f 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -55,6 +55,7 @@ DIRS-$(CONFIG_RTE_LIBRTE_METER) += librte_meter
DIRS-$(CONFIG_RTE_LIBRTE_SCHED) += librte_sched
DIRS-$(CONFIG_RTE_LIBRTE_ACL) += librte_acl
DIRS-$(CONFIG_RTE_LIBRTE_KVARGS) += librte_kvargs
+DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += librte_distributor
ifeq ($(CONFIG_RTE_EXEC_ENV_LINUXAPP),y)
DIRS-$(CONFIG_RTE_LIBRTE_KNI) += librte_kni
diff --git a/mk/rte.app.mk b/mk/rte.app.mk
index a836577..ec5fbd8 100644
--- a/mk/rte.app.mk
+++ b/mk/rte.app.mk
@@ -61,6 +61,10 @@ ifeq ($(NO_AUTOLIBS),)
LDLIBS += --whole-archive
+ifeq ($(CONFIG_RTE_LIBRTE_DISTRIBUTOR),y)
+LDLIBS += -lrte_distributor
+endif
+
ifeq ($(CONFIG_RTE_LIBRTE_KNI),y)
ifeq ($(CONFIG_RTE_EXEC_ENV_LINUXAPP),y)
LDLIBS += -lrte_kni
--
1.9.3
^ permalink raw reply [flat|nested] 29+ messages in thread
* [dpdk-dev] [PATCH v2 4/5] distributor: add unit tests for distributor lib
2014-05-20 10:00 [dpdk-dev] [PATCH 0/4] New library: rte_distributor Bruce Richardson
` (9 preceding siblings ...)
2014-05-29 10:12 ` [dpdk-dev] [PATCH v2 3/5] distributor: add distributor library to build Bruce Richardson
@ 2014-05-29 10:12 ` Bruce Richardson
2014-05-29 10:12 ` [dpdk-dev] [PATCH v2 5/5] docs: add distributor lib to API docs Bruce Richardson
11 siblings, 0 replies; 29+ messages in thread
From: Bruce Richardson @ 2014-05-29 10:12 UTC (permalink / raw)
To: dev
Add a set of unit tests and some basic performance test for the
distributor library. These tests cover all the major functionality of
the library on both distributor and worker sides.
Changes in V2:
* Updated tests to work with APIs in distributor patch V2
* Stylistic changes to clear checkpatch.pl errors
Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
app/test/Makefile | 2 +
app/test/commands.c | 7 +-
app/test/test.h | 2 +
app/test/test_distributor.c | 595 +++++++++++++++++++++++++++++++++++++++
app/test/test_distributor_perf.c | 275 ++++++++++++++++++
5 files changed, 880 insertions(+), 1 deletion(-)
create mode 100644 app/test/test_distributor.c
create mode 100644 app/test/test_distributor_perf.c
diff --git a/app/test/Makefile b/app/test/Makefile
index b49785e..7c2d351 100644
--- a/app/test/Makefile
+++ b/app/test/Makefile
@@ -93,6 +93,8 @@ SRCS-$(CONFIG_RTE_APP_TEST) += test_power.c
SRCS-$(CONFIG_RTE_APP_TEST) += test_common.c
SRCS-$(CONFIG_RTE_APP_TEST) += test_timer_perf.c
SRCS-$(CONFIG_RTE_APP_TEST) += test_ivshmem.c
+SRCS-$(CONFIG_RTE_APP_TEST) += test_distributor.c
+SRCS-$(CONFIG_RTE_APP_TEST) += test_distributor_perf.c
SRCS-$(CONFIG_RTE_APP_TEST) += test_devargs.c
ifeq ($(CONFIG_RTE_APP_TEST),y)
diff --git a/app/test/commands.c b/app/test/commands.c
index efa8566..dfdbd37 100644
--- a/app/test/commands.c
+++ b/app/test/commands.c
@@ -179,6 +179,10 @@ static void cmd_autotest_parsed(void *parsed_result,
ret = test_common();
if (!strcmp(res->autotest, "ivshmem_autotest"))
ret = test_ivshmem();
+ if (!strcmp(res->autotest, "distributor_autotest"))
+ ret = test_distributor();
+ if (!strcmp(res->autotest, "distributor_perf_autotest"))
+ ret = test_distributor_perf();
if (!strcmp(res->autotest, "devargs_autotest"))
ret = test_devargs();
#ifdef RTE_LIBRTE_PMD_RING
@@ -238,7 +242,8 @@ cmdline_parse_token_string_t cmd_autotest_autotest =
#ifdef RTE_LIBRTE_KVARGS
"kvargs_autotest#"
#endif
- "common_autotest");
+ "common_autotest#"
+ "distributor_autotest#distributor_perf_autotest");
cmdline_parse_inst_t cmd_autotest = {
.f = cmd_autotest_parsed, /* function to call */
diff --git a/app/test/test.h b/app/test/test.h
index 1945d29..9b83ade 100644
--- a/app/test/test.h
+++ b/app/test/test.h
@@ -92,6 +92,8 @@ int test_power(void);
int test_common(void);
int test_pmd_ring(void);
int test_ivshmem(void);
+int test_distributor(void);
+int test_distributor_perf(void);
int test_kvargs(void);
int test_devargs(void);
diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
new file mode 100644
index 0000000..e7dc1fb
--- /dev/null
+++ b/app/test/test_distributor.c
@@ -0,0 +1,595 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+
+#ifdef RTE_LIBRTE_DISTRIBUTOR
+#include <unistd.h>
+#include <string.h>
+#include <rte_cycles.h>
+#include <rte_errno.h>
+#include <rte_distributor.h>
+
+#define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
+#define BURST 32
+#define BIG_BATCH 1024
+
+/* statics - all zero-initialized by default */
+static volatile int quit; /**< general quit variable for all threads */
+static volatile int zero_quit; /**< var for when we just want thr0 to quit*/
+static volatile unsigned worker_idx;
+
+struct worker_stats {
+ volatile unsigned handled_packets;
+} __rte_cache_aligned;
+struct worker_stats worker_stats[RTE_MAX_LCORE];
+
+/* returns the total count of the number of packets handled by the worker
+ * functions given below.
+ */
+static inline unsigned
+total_packet_count(void)
+{
+ unsigned i, count = 0;
+ for (i = 0; i < worker_idx; i++)
+ count += worker_stats[i].handled_packets;
+ return count;
+}
+
+/* resets the packet counts for a new test */
+static inline void
+clear_packet_count(void)
+{
+ memset(&worker_stats, 0, sizeof(worker_stats));
+}
+
+/* this is the basic worker function for sanity test
+ * it does nothing but return packets and count them.
+ */
+static int
+handle_work(void *arg)
+{
+ struct rte_mbuf *pkt = NULL;
+ struct rte_distributor *d = arg;
+ unsigned count = 0;
+ unsigned id = __sync_fetch_and_add(&worker_idx, 1);
+
+ pkt = rte_distributor_get_pkt(d, id, NULL);
+ while (!quit) {
+ worker_stats[id].handled_packets++, count++;
+ pkt = rte_distributor_get_pkt(d, id, pkt);
+ }
+ worker_stats[id].handled_packets++, count++;
+ rte_distributor_return_pkt(d, id, pkt);
+ return 0;
+}
+
+/* do basic sanity testing of the distributor. This test tests the following:
+ * - send 32 packets through distributor with the same tag and ensure they
+ * all go to the one worker
+ * - send 32 packets throught the distributor with two different tags and
+ * verify that they go equally to two different workers.
+ * - send 32 packets with different tags through the distributors and
+ * just verify we get all packets back.
+ * - send 1024 packets through the distributor, gathering the returned packets
+ * as we go. Then verify that we correctly got all 1024 pointers back again,
+ * not necessarily in the same order (as different flows).
+ */
+static int
+sanity_test(struct rte_distributor *d, struct rte_mempool *p)
+{
+ struct rte_mbuf *bufs[BURST];
+ unsigned i;
+
+ printf("=== Basic distributor sanity tests ===\n");
+ clear_packet_count();
+ if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return -1;
+ }
+
+ /* now set all hash values in all buffers to zero, so all pkts go to the
+ * one worker thread */
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = 0;
+
+ rte_distributor_process(d, bufs, BURST);
+ rte_distributor_flush(d);
+ if (total_packet_count() != BURST) {
+ printf("Line %d: Error, not all packets flushed. "
+ "Expected %u, got %u\n",
+ __LINE__, BURST, total_packet_count());
+ return -1;
+ }
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ worker_stats[i].handled_packets);
+ printf("Sanity test with all zero hashes done.\n");
+ if (worker_stats[0].handled_packets != BURST)
+ return -1;
+
+ /* pick two flows and check they go correctly */
+ if (rte_lcore_count() >= 3) {
+ clear_packet_count();
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = (i & 1) << 8;
+
+ rte_distributor_process(d, bufs, BURST);
+ rte_distributor_flush(d);
+ if (total_packet_count() != BURST) {
+ printf("Line %d: Error, not all packets flushed. "
+ "Expected %u, got %u\n",
+ __LINE__, BURST, total_packet_count());
+ return -1;
+ }
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ worker_stats[i].handled_packets);
+ printf("Sanity test with two hash values done\n");
+
+ if (worker_stats[0].handled_packets != 16 ||
+ worker_stats[1].handled_packets != 16)
+ return -1;
+ }
+
+ /* give a different hash value to each packet,
+ * so load gets distributed */
+ clear_packet_count();
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = i;
+
+ rte_distributor_process(d, bufs, BURST);
+ rte_distributor_flush(d);
+ if (total_packet_count() != BURST) {
+ printf("Line %d: Error, not all packets flushed. "
+ "Expected %u, got %u\n",
+ __LINE__, BURST, total_packet_count());
+ return -1;
+ }
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ worker_stats[i].handled_packets);
+ printf("Sanity test with non-zero hashes done\n");
+
+ rte_mempool_put_bulk(p, (void *)bufs, BURST);
+
+ /* sanity test with BIG_BATCH packets to ensure they all arrived back
+ * from the returned packets function */
+ clear_packet_count();
+ struct rte_mbuf *many_bufs[BIG_BATCH], *return_bufs[BIG_BATCH];
+ unsigned num_returned = 0;
+
+ /* flush out any remaining packets */
+ rte_distributor_flush(d);
+ rte_distributor_clear_returns(d);
+ if (rte_mempool_get_bulk(p, (void *)many_bufs, BIG_BATCH) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return -1;
+ }
+ for (i = 0; i < BIG_BATCH; i++)
+ many_bufs[i]->pkt.hash.rss = i << 2;
+
+ for (i = 0; i < BIG_BATCH/BURST; i++) {
+ rte_distributor_process(d, &many_bufs[i*BURST], BURST);
+ num_returned += rte_distributor_returned_pkts(d,
+ &return_bufs[num_returned],
+ BIG_BATCH - num_returned);
+ }
+ rte_distributor_flush(d);
+ num_returned += rte_distributor_returned_pkts(d,
+ &return_bufs[num_returned], BIG_BATCH - num_returned);
+
+ if (num_returned != BIG_BATCH) {
+ printf("line %d: Number returned is not the same as "
+ "number sent\n", __LINE__);
+ return -1;
+ }
+ /* big check - make sure all packets made it back!! */
+ for (i = 0; i < BIG_BATCH; i++) {
+ unsigned j;
+ struct rte_mbuf *src = many_bufs[i];
+ for (j = 0; j < BIG_BATCH; j++)
+ if (return_bufs[j] == src)
+ break;
+
+ if (j == BIG_BATCH) {
+ printf("Error: could not find source packet #%u\n", i);
+ return -1;
+ }
+ }
+ printf("Sanity test of returned packets done\n");
+
+ rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
+
+ printf("\n");
+ return 0;
+}
+
+
+/* to test that the distributor does not lose packets, we use this worker
+ * function which frees mbufs when it gets them. The distributor thread does
+ * the mbuf allocation. If distributor drops packets we'll eventually run out
+ * of mbufs.
+ */
+static int
+handle_work_with_free_mbufs(void *arg)
+{
+ struct rte_mbuf *pkt = NULL;
+ struct rte_distributor *d = arg;
+ unsigned count = 0;
+ unsigned id = __sync_fetch_and_add(&worker_idx, 1);
+
+ pkt = rte_distributor_get_pkt(d, id, NULL);
+ while (!quit) {
+ worker_stats[id].handled_packets++, count++;
+ rte_pktmbuf_free(pkt);
+ pkt = rte_distributor_get_pkt(d, id, pkt);
+ }
+ worker_stats[id].handled_packets++, count++;
+ rte_distributor_return_pkt(d, id, pkt);
+ return 0;
+}
+
+/* Perform a sanity test of the distributor with a large number of packets,
+ * where we allocate a new set of mbufs for each burst. The workers then
+ * free the mbufs. This ensures that we don't have any packet leaks in the
+ * library.
+ */
+static int
+sanity_test_with_mbuf_alloc(struct rte_distributor *d, struct rte_mempool *p)
+{
+ unsigned i;
+ struct rte_mbuf *bufs[BURST];
+
+ printf("=== Sanity test with mbuf alloc/free ===\n");
+ clear_packet_count();
+ for (i = 0; i < ((1<<ITER_POWER)); i += BURST) {
+ unsigned j;
+ while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0)
+ rte_distributor_process(d, NULL, 0);
+ for (j = 0; j < BURST; j++) {
+ bufs[j]->pkt.hash.rss = (i+j) << 1;
+ bufs[j]->refcnt = 1;
+ }
+
+ rte_distributor_process(d, bufs, BURST);
+ }
+
+ rte_distributor_flush(d);
+ if (total_packet_count() < (1<<ITER_POWER)) {
+ printf("Line %u: Packet count is incorrect, %u, expected %u\n",
+ __LINE__, total_packet_count(),
+ (1<<ITER_POWER));
+ return -1;
+ }
+
+ printf("Sanity test with mbuf alloc/free passed\n\n");
+ return 0;
+}
+
+static int
+handle_work_for_shutdown_test(void *arg)
+{
+ struct rte_mbuf *pkt = NULL;
+ struct rte_distributor *d = arg;
+ unsigned count = 0;
+ const unsigned id = __sync_fetch_and_add(&worker_idx, 1);
+
+ pkt = rte_distributor_get_pkt(d, id, NULL);
+ /* wait for quit single globally, or for worker zero, wait
+ * for zero_quit */
+ while (!quit && !(id == 0 && zero_quit)) {
+ worker_stats[id].handled_packets++, count++;
+ rte_pktmbuf_free(pkt);
+ pkt = rte_distributor_get_pkt(d, id, NULL);
+ }
+ worker_stats[id].handled_packets++, count++;
+ rte_distributor_return_pkt(d, id, pkt);
+
+ if (id == 0) {
+ /* for worker zero, allow it to restart to pick up last packet
+ * when all workers are shutting down.
+ */
+ while (zero_quit)
+ usleep(100);
+ pkt = rte_distributor_get_pkt(d, id, NULL);
+ while (!quit) {
+ worker_stats[id].handled_packets++, count++;
+ rte_pktmbuf_free(pkt);
+ pkt = rte_distributor_get_pkt(d, id, NULL);
+ }
+ rte_distributor_return_pkt(d, id, pkt);
+ }
+ return 0;
+}
+
+
+/* Perform a sanity test of the distributor with a large number of packets,
+ * where we allocate a new set of mbufs for each burst. The workers then
+ * free the mbufs. This ensures that we don't have any packet leaks in the
+ * library.
+ */
+static int
+sanity_test_with_worker_shutdown(struct rte_distributor *d,
+ struct rte_mempool *p)
+{
+ struct rte_mbuf *bufs[BURST];
+ unsigned i;
+
+ printf("=== Sanity test of worker shutdown ===\n");
+
+ clear_packet_count();
+ if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return -1;
+ }
+
+ /* now set all hash values in all buffers to zero, so all pkts go to the
+ * one worker thread */
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = 0;
+
+ rte_distributor_process(d, bufs, BURST);
+ /* at this point, we will have processed some packets and have a full
+ * backlog for the other ones at worker 0.
+ */
+
+ /* get more buffers to queue up, again setting them to the same flow */
+ if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return -1;
+ }
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = 0;
+
+ /* get worker zero to quit */
+ zero_quit = 1;
+ rte_distributor_process(d, bufs, BURST);
+
+ /* flush the distributor */
+ rte_distributor_flush(d);
+ if (total_packet_count() != BURST * 2) {
+ printf("Line %d: Error, not all packets flushed. "
+ "Expected %u, got %u\n",
+ __LINE__, BURST * 2, total_packet_count());
+ return -1;
+ }
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ worker_stats[i].handled_packets);
+
+ printf("Sanity test with worker shutdown passed\n\n");
+ return 0;
+}
+
+/* Test that the flush function is able to move packets between workers when
+ * one worker shuts down..
+ */
+static int
+test_flush_with_worker_shutdown(struct rte_distributor *d,
+ struct rte_mempool *p)
+{
+ struct rte_mbuf *bufs[BURST];
+ unsigned i;
+
+ printf("=== Test flush fn with worker shutdown ===\n");
+
+ clear_packet_count();
+ if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return -1;
+ }
+
+ /* now set all hash values in all buffers to zero, so all pkts go to the
+ * one worker thread */
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = 0;
+
+ rte_distributor_process(d, bufs, BURST);
+ /* at this point, we will have processed some packets and have a full
+ * backlog for the other ones at worker 0.
+ */
+
+ /* get worker zero to quit */
+ zero_quit = 1;
+
+ /* flush the distributor */
+ rte_distributor_flush(d);
+
+ zero_quit = 0;
+ if (total_packet_count() != BURST) {
+ printf("Line %d: Error, not all packets flushed. "
+ "Expected %u, got %u\n",
+ __LINE__, BURST, total_packet_count());
+ return -1;
+ }
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ worker_stats[i].handled_packets);
+
+ printf("Flush test with worker shutdown passed\n\n");
+ return 0;
+}
+
+static
+int test_error_distributor_create_name(void)
+{
+ struct rte_distributor *d = NULL;
+ char *name = NULL;
+
+ d = rte_distributor_create(name, rte_socket_id(),
+ rte_lcore_count() - 1);
+ if (d != NULL || rte_errno != EINVAL) {
+ printf("ERROR: No error on create() with NULL name param\n");
+ return -1;
+ }
+
+ return 0;
+}
+
+
+static
+int test_error_distributor_create_numworkers(void)
+{
+ struct rte_distributor *d = NULL;
+ d = rte_distributor_create("test_numworkers", rte_socket_id(),
+ RTE_MAX_LCORE + 10);
+ if (d != NULL || rte_errno != EINVAL) {
+ printf("ERROR: No error on create() with num_workers > MAX\n");
+ return -1;
+ }
+ return 0;
+}
+
+
+/* Useful function which ensures that all worker functions terminate */
+static void
+quit_workers(struct rte_distributor *d, struct rte_mempool *p)
+{
+ const unsigned num_workers = rte_lcore_count() - 1;
+ unsigned i;
+ struct rte_mbuf *bufs[RTE_MAX_LCORE];
+ rte_mempool_get_bulk(p, (void *)bufs, num_workers);
+
+ zero_quit = 0;
+ quit = 1;
+ for (i = 0; i < num_workers; i++)
+ bufs[i]->pkt.hash.rss = i << 1;
+ rte_distributor_process(d, bufs, num_workers);
+
+ rte_mempool_put_bulk(p, (void *)bufs, num_workers);
+
+ rte_distributor_process(d, NULL, 0);
+ rte_distributor_flush(d);
+ rte_eal_mp_wait_lcore();
+ quit = 0;
+ worker_idx = 0;
+}
+
+#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
+
+int
+test_distributor(void)
+{
+ static struct rte_distributor *d;
+ static struct rte_mempool *p;
+
+ if (rte_lcore_count() < 2) {
+ printf("ERROR: not enough cores to test distributor\n");
+ return -1;
+ }
+
+ if (d == NULL) {
+ d = rte_distributor_create("Test_distributor", rte_socket_id(),
+ rte_lcore_count() - 1);
+ if (d == NULL) {
+ printf("Error creating distributor\n");
+ return -1;
+ }
+ } else {
+ rte_distributor_flush(d);
+ rte_distributor_clear_returns(d);
+ }
+
+ const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
+ (BIG_BATCH * 2) - 1 : (511 * rte_lcore_count());
+ if (p == NULL) {
+ p = rte_mempool_create("DT_MBUF_POOL", nb_bufs,
+ MBUF_SIZE, BURST,
+ sizeof(struct rte_pktmbuf_pool_private),
+ rte_pktmbuf_pool_init, NULL,
+ rte_pktmbuf_init, NULL,
+ rte_socket_id(), 0);
+ if (p == NULL) {
+ printf("Error creating mempool\n");
+ return -1;
+ }
+ }
+
+ rte_eal_mp_remote_launch(handle_work, d, SKIP_MASTER);
+ if (sanity_test(d, p) < 0)
+ goto err;
+ quit_workers(d, p);
+
+ rte_eal_mp_remote_launch(handle_work_with_free_mbufs, d, SKIP_MASTER);
+ if (sanity_test_with_mbuf_alloc(d, p) < 0)
+ goto err;
+ quit_workers(d, p);
+
+ if (rte_lcore_count() > 2) {
+ rte_eal_mp_remote_launch(handle_work_for_shutdown_test, d,
+ SKIP_MASTER);
+ if (sanity_test_with_worker_shutdown(d, p) < 0)
+ goto err;
+ quit_workers(d, p);
+
+ rte_eal_mp_remote_launch(handle_work_for_shutdown_test, d,
+ SKIP_MASTER);
+ if (test_flush_with_worker_shutdown(d, p) < 0)
+ goto err;
+ quit_workers(d, p);
+
+ } else {
+ printf("Not enough cores to run tests for worker shutdown\n");
+ }
+
+ if (test_error_distributor_create_numworkers() == -1 ||
+ test_error_distributor_create_name() == -1) {
+ printf("rte_distributor_create parameter check tests failed");
+ return -1;
+ }
+
+ return 0;
+
+err:
+ quit_workers(d, p);
+ return -1;
+}
+
+#else
+
+#include <stdio.h>
+
+int
+test_distributor(void)
+{
+ printf("Distributor is not enabled in configuration\n");
+ return 0;
+}
+
+#endif
diff --git a/app/test/test_distributor_perf.c b/app/test/test_distributor_perf.c
new file mode 100644
index 0000000..1031baa
--- /dev/null
+++ b/app/test/test_distributor_perf.c
@@ -0,0 +1,275 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+
+#ifdef RTE_LIBRTE_DISTRIBUTOR
+#include <unistd.h>
+#include <string.h>
+#include <rte_cycles.h>
+#include <rte_distributor.h>
+
+#define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
+#define BURST 32
+#define BIG_BATCH 1024
+
+/* static vars - zero initialized by default */
+static volatile int quit;
+static volatile unsigned worker_idx;
+
+struct worker_stats {
+ volatile unsigned handled_packets;
+} __rte_cache_aligned;
+struct worker_stats worker_stats[RTE_MAX_LCORE];
+
+/* worker thread used for testing the time to do a round-trip of a cache
+ * line between two cores and back again
+ */
+static void
+flip_bit(volatile uint64_t *arg)
+{
+ uint64_t old_val = 0;
+ while (old_val != 2) {
+ while (!*arg)
+ rte_pause();
+ old_val = *arg;
+ *arg = 0;
+ }
+}
+
+/* test case to time the number of cycles to round-trip a cache line between
+ * two cores and back again.
+ */
+static void
+time_cache_line_switch(void)
+{
+ /* allocate a full cache line for data, we use only first byte of it */
+ uint64_t data[CACHE_LINE_SIZE*3 / sizeof(uint64_t)];
+
+ unsigned i, slaveid = rte_get_next_lcore(rte_lcore_id(), 0, 0);
+ volatile uint64_t *pdata = &data[0];
+ *pdata = 1;
+ rte_eal_remote_launch((lcore_function_t *)flip_bit, &data[0], slaveid);
+ while (*pdata)
+ rte_pause();
+
+ const uint64_t start_time = rte_rdtsc();
+ for (i = 0; i < (1 << ITER_POWER); i++) {
+ while (*pdata)
+ rte_pause();
+ *pdata = 1;
+ }
+ const uint64_t end_time = rte_rdtsc();
+
+ while (*pdata)
+ rte_pause();
+ *pdata = 2;
+ rte_eal_wait_lcore(slaveid);
+ printf("==== Cache line switch test ===\n");
+ printf("Time for %u iterations = %"PRIu64" ticks\n", (1<<ITER_POWER),
+ end_time-start_time);
+ printf("Ticks per iteration = %"PRIu64"\n\n",
+ (end_time-start_time) >> ITER_POWER);
+}
+
+/* returns the total count of the number of packets handled by the worker
+ * functions given below.
+ */
+static unsigned
+total_packet_count(void)
+{
+ unsigned i, count = 0;
+ for (i = 0; i < worker_idx; i++)
+ count += worker_stats[i].handled_packets;
+ return count;
+}
+
+/* resets the packet counts for a new test */
+static void
+clear_packet_count(void)
+{
+ memset(&worker_stats, 0, sizeof(worker_stats));
+}
+
+/* this is the basic worker function for performance tests.
+ * it does nothing but return packets and count them.
+ */
+static int
+handle_work(void *arg)
+{
+ struct rte_mbuf *pkt = NULL;
+ struct rte_distributor *d = arg;
+ unsigned count = 0;
+ unsigned id = __sync_fetch_and_add(&worker_idx, 1);
+
+ pkt = rte_distributor_get_pkt(d, id, NULL);
+ while (!quit) {
+ worker_stats[id].handled_packets++, count++;
+ pkt = rte_distributor_get_pkt(d, id, pkt);
+ }
+ worker_stats[id].handled_packets++, count++;
+ rte_distributor_return_pkt(d, id, pkt);
+ return 0;
+}
+
+/* this basic performance test just repeatedly sends in 32 packets at a time
+ * to the distributor and verifies at the end that we got them all in the worker
+ * threads and finally how long per packet the processing took.
+ */
+static inline int
+perf_test(struct rte_distributor *d, struct rte_mempool *p)
+{
+ unsigned i;
+ uint64_t start, end;
+ struct rte_mbuf *bufs[BURST];
+
+ clear_packet_count();
+ if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+ printf("Error getting mbufs from pool\n");
+ return -1;
+ }
+ /* ensure we have different hash value for each pkt */
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = i;
+
+ start = rte_rdtsc();
+ for (i = 0; i < (1<<ITER_POWER); i++)
+ rte_distributor_process(d, bufs, BURST);
+ end = rte_rdtsc();
+
+ do {
+ usleep(100);
+ rte_distributor_process(d, NULL, 0);
+ } while (total_packet_count() < (BURST << ITER_POWER));
+
+ printf("=== Performance test of distributor ===\n");
+ printf("Time per burst: %"PRIu64"\n", (end - start) >> ITER_POWER);
+ printf("Time per packet: %"PRIu64"\n\n",
+ ((end - start) >> ITER_POWER)/BURST);
+ rte_mempool_put_bulk(p, (void *)bufs, BURST);
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ worker_stats[i].handled_packets);
+ printf("Total packets: %u (%x)\n", total_packet_count(),
+ total_packet_count());
+ printf("=== Perf test done ===\n\n");
+
+ return 0;
+}
+
+/* Useful function which ensures that all worker functions terminate */
+static void
+quit_workers(struct rte_distributor *d, struct rte_mempool *p)
+{
+ const unsigned num_workers = rte_lcore_count() - 1;
+ unsigned i;
+ struct rte_mbuf *bufs[RTE_MAX_LCORE];
+ rte_mempool_get_bulk(p, (void *)bufs, num_workers);
+
+ quit = 1;
+ for (i = 0; i < num_workers; i++)
+ bufs[i]->pkt.hash.rss = i << 1;
+ rte_distributor_process(d, bufs, num_workers);
+
+ rte_mempool_put_bulk(p, (void *)bufs, num_workers);
+
+ rte_distributor_process(d, NULL, 0);
+ rte_eal_mp_wait_lcore();
+ quit = 0;
+ worker_idx = 0;
+}
+
+#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
+
+int
+test_distributor_perf(void)
+{
+ static struct rte_distributor *d;
+ static struct rte_mempool *p;
+
+ if (rte_lcore_count() < 2) {
+ printf("ERROR: not enough cores to test distributor\n");
+ return -1;
+ }
+
+ /* first time how long it takes to round-trip a cache line */
+ time_cache_line_switch();
+
+ if (d == NULL) {
+ d = rte_distributor_create("Test_perf", rte_socket_id(),
+ rte_lcore_count() - 1);
+ if (d == NULL) {
+ printf("Error creating distributor\n");
+ return -1;
+ }
+ } else {
+ rte_distributor_flush(d);
+ rte_distributor_clear_returns(d);
+ }
+
+ const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
+ (BIG_BATCH * 2) - 1 : (511 * rte_lcore_count());
+ if (p == NULL) {
+ p = rte_mempool_create("DPT_MBUF_POOL", nb_bufs,
+ MBUF_SIZE, BURST,
+ sizeof(struct rte_pktmbuf_pool_private),
+ rte_pktmbuf_pool_init, NULL,
+ rte_pktmbuf_init, NULL,
+ rte_socket_id(), 0);
+ if (p == NULL) {
+ printf("Error creating mempool\n");
+ return -1;
+ }
+ }
+
+ rte_eal_mp_remote_launch(handle_work, d, SKIP_MASTER);
+ if (perf_test(d, p) < 0)
+ return -1;
+ quit_workers(d, p);
+
+ return 0;
+}
+
+#else
+
+#include <stdio.h>
+
+int
+test_distributor_perf(void)
+{
+ printf("Distributor is not enabled in configuration\n");
+ return 0;
+}
+
+#endif
--
1.9.3
^ permalink raw reply [flat|nested] 29+ messages in thread
* [dpdk-dev] [PATCH v2 5/5] docs: add distributor lib to API docs
2014-05-20 10:00 [dpdk-dev] [PATCH 0/4] New library: rte_distributor Bruce Richardson
` (10 preceding siblings ...)
2014-05-29 10:12 ` [dpdk-dev] [PATCH v2 4/5] distributor: add unit tests for distributor lib Bruce Richardson
@ 2014-05-29 10:12 ` Bruce Richardson
11 siblings, 0 replies; 29+ messages in thread
From: Bruce Richardson @ 2014-05-29 10:12 UTC (permalink / raw)
To: dev
Add entries into the API documentation for the new distributor library.
Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
doc/doxy-api-index.md | 1 +
doc/doxy-api.conf | 1 +
2 files changed, 2 insertions(+)
diff --git a/doc/doxy-api-index.md b/doc/doxy-api-index.md
index 2825c08..6e75a6e 100644
--- a/doc/doxy-api-index.md
+++ b/doc/doxy-api-index.md
@@ -94,6 +94,7 @@ There are many libraries, so their headers may be grouped by topics:
- **containers**:
[mbuf] (@ref rte_mbuf.h),
[ring] (@ref rte_ring.h),
+ [distributor] (@ref rte_distributor.h),
[tailq] (@ref rte_tailq.h),
[bitmap] (@ref rte_bitmap.h)
diff --git a/doc/doxy-api.conf b/doc/doxy-api.conf
index 642f77a..9df7356 100644
--- a/doc/doxy-api.conf
+++ b/doc/doxy-api.conf
@@ -31,6 +31,7 @@
PROJECT_NAME = DPDK
INPUT = doc/doxy-api-index.md \
lib/librte_eal/common/include \
+ lib/librte_distributor \
lib/librte_ether \
lib/librte_hash \
lib/librte_kni \
--
1.9.3
^ permalink raw reply [flat|nested] 29+ messages in thread