DPDK patches and discussions
 help / color / mirror / Atom feed
* [dpdk-dev] [PATCH 0/2] 10G PMD: vectorized RX and TX functions
@ 2014-05-12  9:23 Bruce Richardson
  2014-05-12  9:23 ` [dpdk-dev] [PATCH 1/2] 10G PMD: New vectorized functions for RX/TX Bruce Richardson
                   ` (2 more replies)
  0 siblings, 3 replies; 5+ messages in thread
From: Bruce Richardson @ 2014-05-12  9:23 UTC (permalink / raw)
  To: dev

This patch set contains the vectorized PMD implementation for the Intel DPDK.
This code was previously released under a proprietary license, but is now
being released under a BSD license to allow its integration with the rest
of the Intel DPDK codebase.

Bruce Richardson (2):
  10G PMD: New vectorized functions for RX/TX
  10G PMD: enable vector PMD compile for 64b linux

 config/defconfig_x86_64-default-linuxapp-gcc |    2 +
 config/defconfig_x86_64-default-linuxapp-icc |    2 +
 lib/librte_pmd_ixgbe/ixgbe_rxtx_vec.c        |  726 ++++++++++++++++++++++++++
 3 files changed, 730 insertions(+), 0 deletions(-)
 create mode 100644 lib/librte_pmd_ixgbe/ixgbe_rxtx_vec.c

^ permalink raw reply	[flat|nested] 5+ messages in thread

* [dpdk-dev] [PATCH 1/2] 10G PMD: New vectorized functions for RX/TX
  2014-05-12  9:23 [dpdk-dev] [PATCH 0/2] 10G PMD: vectorized RX and TX functions Bruce Richardson
@ 2014-05-12  9:23 ` Bruce Richardson
  2014-05-12  9:23 ` [dpdk-dev] [PATCH 2/2] 10G PMD: enable vector PMD compile for 64b linux Bruce Richardson
  2014-05-12  9:23 ` Bruce Richardson
  2 siblings, 0 replies; 5+ messages in thread
From: Bruce Richardson @ 2014-05-12  9:23 UTC (permalink / raw)
  To: dev

New file containing optimized receive and transmit functions which
use 128bit vector instructions to improve performance. When conditions
permit, these functions will be enabled at runtime by the device
initialization routines already in the PMD.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 lib/librte_pmd_ixgbe/ixgbe_rxtx_vec.c |  726 +++++++++++++++++++++++++++++++++
 1 files changed, 726 insertions(+), 0 deletions(-)
 create mode 100644 lib/librte_pmd_ixgbe/ixgbe_rxtx_vec.c

diff --git a/lib/librte_pmd_ixgbe/ixgbe_rxtx_vec.c b/lib/librte_pmd_ixgbe/ixgbe_rxtx_vec.c
new file mode 100644
index 0000000..1cfdbe9
--- /dev/null
+++ b/lib/librte_pmd_ixgbe/ixgbe_rxtx_vec.c
@@ -0,0 +1,726 @@
+/*-
+ *   BSD LICENSE
+ * 
+ *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ *   All rights reserved.
+ * 
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ * 
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ * 
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <sys/queue.h>
+
+#include <endian.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <stdint.h>
+#include <stdarg.h>
+#include <unistd.h>
+#include <inttypes.h>
+
+#include <rte_byteorder.h>
+#include <rte_common.h>
+#include <rte_cycles.h>
+#include <rte_log.h>
+#include <rte_debug.h>
+#include <rte_interrupts.h>
+#include <rte_pci.h>
+#include <rte_memory.h>
+#include <rte_memzone.h>
+#include <rte_launch.h>
+#include <rte_tailq.h>
+#include <rte_eal.h>
+#include <rte_per_lcore.h>
+#include <rte_lcore.h>
+#include <rte_atomic.h>
+#include <rte_branch_prediction.h>
+#include <rte_ring.h>
+#include <rte_mempool.h>
+#include <rte_malloc.h>
+#include <rte_mbuf.h>
+#include <rte_ether.h>
+#include <rte_ethdev.h>
+#include <rte_prefetch.h>
+#include <rte_udp.h>
+#include <rte_tcp.h>
+#include <rte_sctp.h>
+#include <rte_string_fns.h>
+#include <rte_errno.h>
+
+#include "ixgbe_logs.h"
+#include "ixgbe/ixgbe_api.h"
+#include "ixgbe/ixgbe_vf.h"
+#include "ixgbe_ethdev.h"
+#include "ixgbe/ixgbe_dcb.h"
+
+#include "ixgbe_rxtx.h"
+
+#include <emmintrin.h>
+#include <xmmintrin.h>
+#include <nmmintrin.h>
+
+#ifndef __INTEL_COMPILER
+#pragma GCC diagnostic ignored "-Wcast-qual"
+#endif
+
+static struct rte_mbuf mb_def = {
+
+	.ol_flags = 0,
+	{
+		.pkt = {
+			.data_len = 0,
+			.pkt_len = 0,
+
+			.vlan_macip = {
+				.data = 0,
+			},
+			.hash = {
+				.rss = 0,
+			},
+
+			.nb_segs = 1,
+			.in_port = 0,
+
+			.next = NULL,
+			.data = NULL,
+		},
+	},
+};
+
+static inline void
+ixgbe_rxq_rearm (struct igb_rx_queue * rxq)
+{
+	int i;
+	uint16_t rx_id;
+	volatile union ixgbe_adv_rx_desc *rxdp;
+	struct igb_rx_entry *rxep = &rxq->sw_ring[rxq->rxrearm_start];
+	struct rte_mbuf *mb0, *mb1;
+	__m128i def_low;
+	__m128i hdr_room = _mm_set_epi64x(RTE_PKTMBUF_HEADROOM, RTE_PKTMBUF_HEADROOM);
+
+	/* Pull 'n' more MBUFs into the software ring */
+	if (rte_mempool_get_bulk(rxq->mb_pool, 
+				 (void *)rxep, RTE_IXGBE_RXQ_REARM_THRESH) < 0)
+		return;
+
+	rxdp = rxq->rx_ring + rxq->rxrearm_start;
+
+	def_low = _mm_load_si128((__m128i *)&(mb_def.pkt));
+
+	/* Initialize the mbufs in vector, process 2 mbufs in one loop */
+	for (i = 0; i < RTE_IXGBE_RXQ_REARM_THRESH; i += 2, rxep+=2)
+	{
+		__m128i dma_addr0, dma_addr1;
+		__m128i vaddr0, vaddr1;
+
+		mb0 = rxep[0].mbuf;
+		mb1 = rxep[1].mbuf;
+
+		/* load buf_addr(lo 64bit) and buf_physaddr(hi 64bit) */
+		vaddr0 = _mm_loadu_si128((__m128i *)&(mb0->buf_addr));
+		vaddr1 = _mm_loadu_si128((__m128i *)&(mb1->buf_addr));
+		
+		/* calc va/pa of pkt data point */
+		vaddr0 = _mm_add_epi64(vaddr0, hdr_room);
+		vaddr1 = _mm_add_epi64(vaddr1, hdr_room);
+
+		/* convert pa to dma_addr hdr/data */
+		dma_addr0 = _mm_unpackhi_epi64(vaddr0, vaddr0);
+		dma_addr1 = _mm_unpackhi_epi64(vaddr1, vaddr1);
+
+		/* fill va into t0 def pkt template */
+		vaddr0 = _mm_unpacklo_epi64(def_low, vaddr0);
+		vaddr1 = _mm_unpacklo_epi64(def_low, vaddr1);
+
+		/* flush desc with pa dma_addr */
+		_mm_store_si128((__m128i *)&rxdp++->read, dma_addr0);
+		_mm_store_si128((__m128i *)&rxdp++->read, dma_addr1);
+
+		/* flush mbuf with pkt template */
+		_mm_store_si128 ((__m128i *)&mb0->pkt, vaddr0);
+		_mm_store_si128 ((__m128i *)&mb1->pkt, vaddr1);
+
+		/* update refcnt per pkt */
+		rte_mbuf_refcnt_set(mb0, 1);
+		rte_mbuf_refcnt_set(mb1, 1);
+	}
+
+	rxq->rxrearm_start += RTE_IXGBE_RXQ_REARM_THRESH;
+	if (rxq->rxrearm_start >= rxq->nb_rx_desc)
+		rxq->rxrearm_start = 0;
+
+	rxq->rxrearm_nb -= RTE_IXGBE_RXQ_REARM_THRESH;
+
+	rx_id = (uint16_t) ((rxq->rxrearm_start == 0) ?
+			     (rxq->nb_rx_desc - 1) : (rxq->rxrearm_start - 1));
+
+	/* Update the tail pointer on the NIC */
+	IXGBE_PCI_REG_WRITE(rxq->rdt_reg_addr, rx_id);
+}
+
+#ifndef RTE_IXGBE_RX_OLFLAGS_DISABLE
+#define OLFLAGS_MASK     ((uint16_t)(PKT_RX_VLAN_PKT | PKT_RX_IPV4_HDR |\
+				     PKT_RX_IPV4_HDR_EXT | PKT_RX_IPV6_HDR |\
+				     PKT_RX_IPV6_HDR_EXT))
+#define OLFLAGS_MASK_V   (((uint64_t)OLFLAGS_MASK << 48) | ((uint64_t)OLFLAGS_MASK << 32) | \
+			  ((uint64_t)OLFLAGS_MASK << 16) | ((uint64_t)OLFLAGS_MASK))
+#define PTYPE_SHIFT    (1)
+#define VTAG_SHIFT     (3)
+
+static inline void
+desc_to_olflags_v(__m128i descs[4], struct rte_mbuf **rx_pkts)
+{
+	__m128i ptype0, ptype1, vtag0, vtag1;
+	union {
+		uint16_t e[4];
+		uint64_t dword;
+	}vol;
+	
+	ptype0 = _mm_unpacklo_epi16(descs[0], descs[1]);
+	ptype1 = _mm_unpacklo_epi16(descs[2], descs[3]);
+	vtag0 = _mm_unpackhi_epi16(descs[0], descs[1]);
+	vtag1 = _mm_unpackhi_epi16(descs[2], descs[3]);
+
+	ptype1 = _mm_unpacklo_epi32(ptype0, ptype1);
+	vtag1 = _mm_unpacklo_epi32(vtag0, vtag1);
+
+	ptype1 = _mm_slli_epi16(ptype1, PTYPE_SHIFT);
+	vtag1 = _mm_srli_epi16(vtag1, VTAG_SHIFT);
+
+	ptype1 = _mm_or_si128(ptype1, vtag1);
+	vol.dword = _mm_cvtsi128_si64(ptype1) & OLFLAGS_MASK_V;
+
+	rx_pkts[0]->ol_flags = vol.e[0];
+	rx_pkts[1]->ol_flags = vol.e[1];	
+	rx_pkts[2]->ol_flags = vol.e[2];
+	rx_pkts[3]->ol_flags = vol.e[3];
+}
+#else
+#define desc_to_olflags_v(desc, rx_pkts)            do{} while(0)
+#endif
+
+/*
+ * vPMD receive routine, now only accept (nb_pkts == RTE_IXGBE_VPMD_RX_BURST) in one loop
+ * Notice:
+ * - nb_pkts < RTE_IXGBE_VPMD_RX_BURST, just return no packet
+ * - nb_pkts > RTE_IXGBE_VPMD_RX_BURST, only scan RTE_IXGBE_VPMD_RX_BURST numbers of DD bit
+ * - don't support ol_flags for rss and csum err
+ */
+uint16_t
+ixgbe_recv_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts,
+		uint16_t nb_pkts)
+{
+	volatile union ixgbe_adv_rx_desc * rxdp;
+	struct igb_rx_queue * rxq = rx_queue;
+	struct igb_rx_entry *sw_ring;	
+	uint16_t nb_pkts_recd;
+	int pos;
+	uint64_t var;
+	__m128i shuf_msk;
+	__m128i in_port;
+	__m128i dd_check; 
+
+	if (unlikely(nb_pkts < RTE_IXGBE_VPMD_RX_BURST))
+		return 0;
+
+	/* Just the act of getting into the function from the application is
+	 * going to cost about 7 cycles */
+	rxdp = rxq->rx_ring + rxq->rx_tail;
+
+	_mm_prefetch((const void *)rxdp, _MM_HINT_T0);
+
+	/* See if we need to rearm the RX queue - gives the prefetch a bit of time to act */
+	if (rxq->rxrearm_nb > RTE_IXGBE_RXQ_REARM_THRESH)
+		ixgbe_rxq_rearm(rxq);
+
+	/* Before we start moving massive data around, check to see if there is actually a
+	 * packet available */
+	if (! (rxdp->wb.upper.status_error & rte_cpu_to_le_32(IXGBE_RXDADV_STAT_DD)))
+		return 0;
+
+	/* 4 packets DD mask */
+	dd_check = _mm_set_epi64x(0x0000000100000001LL, 0x0000000100000001LL);
+
+	/* mask to shuffle from desc. to mbuf */
+	shuf_msk = _mm_set_epi8(
+		7,6,5,4,    /* octet 4~7, 32bits rss */
+		0xFF,0xFF,  /* skip high 16 bits vlan_macip, zero out */
+		15,14,      /* octet 14~15, low 16 bits vlan_macip */
+		0xFF,0xFF,  /* skip high 16 bits pkt_len, zero out */
+		13,12,      /* octet 12~13, low 16 bits pkt_len */
+		0xFF,0xFF,  /* skip nb_segs and in_port, zero out */
+		13,12       /* octet 12~13, 16 bits data_len */
+		);
+
+
+	/* Cache is empty -> need to scan the buffer rings, but first move the next
+	 * 'n' mbufs into the cache */
+	sw_ring = &rxq->sw_ring[rxq->rx_tail];
+	
+	/* in_port, nb_seg = 1, crc_len */
+	in_port = rxq->misc_info; 
+
+	/*
+	 * A. load 4 packet in one loop
+	 * B. copy 4 mbuf point from swring to rx_pkts
+	 * C. calc the number of DD bits among the 4 packets  
+	 * D. fill info. from desc to mbuf 
+	 */
+	for (pos = 0, nb_pkts_recd = 0; pos < RTE_IXGBE_VPMD_RX_BURST; 
+	     pos += RTE_IXGBE_DESCS_PER_LOOP, rxdp += RTE_IXGBE_DESCS_PER_LOOP)
+	{
+		__m128i descs[RTE_IXGBE_DESCS_PER_LOOP];
+		__m128i pkt_mb1, pkt_mb2, pkt_mb3, pkt_mb4;
+		__m128i zero, staterr, sterr_tmp1, sterr_tmp2;
+	        __m128i mbp1, mbp2; /* two mbuf pointer in one XMM reg. */
+
+		/* B.1 load 1 mbuf point */
+		mbp1 = _mm_loadu_si128((__m128i *)&sw_ring[pos]);
+
+		/* Read desc statuses backwards to avoid race condition */
+		/* A.1 load 4 pkts desc */
+		descs[3] = _mm_loadu_si128((__m128i *)(rxdp + 3));
+
+		/* B.2 copy 2 mbuf point into rx_pkts  */
+		_mm_store_si128((__m128i *)&rx_pkts[pos], mbp1);
+
+		/* B.1 load 1 mbuf point */
+		mbp2 = _mm_loadu_si128((__m128i *)&sw_ring[pos+2]);
+
+		descs[2] = _mm_loadu_si128((__m128i *)(rxdp + 2));
+		/* B.1 load 2 mbuf point */
+		descs[1] = _mm_loadu_si128((__m128i *)(rxdp + 1));
+		descs[0] = _mm_loadu_si128((__m128i *)(rxdp));	
+
+		/* B.2 copy 2 mbuf point into rx_pkts  */
+		_mm_store_si128((__m128i *)&rx_pkts[pos+2], mbp2);
+
+		/* avoid compiler reorder optimization */
+		rte_compiler_barrier();
+
+		/* D.1 pkt 3,4 convert format from desc to pktmbuf */
+		pkt_mb4 = _mm_shuffle_epi8(descs[3], shuf_msk);
+		pkt_mb3 = _mm_shuffle_epi8(descs[2], shuf_msk);
+
+		/* C.1 4=>2 filter staterr info only */
+		sterr_tmp2 = _mm_unpackhi_epi32(descs[3], descs[2]);
+		/* C.1 4=>2 filter staterr info only */
+		sterr_tmp1 = _mm_unpackhi_epi32(descs[1], descs[0]);
+
+		/* set ol_flags with packet type and vlan tag */
+		desc_to_olflags_v(descs, &rx_pkts[pos]);
+
+		/* D.2 pkt 3,4 set in_port/nb_seg and remove crc */
+		pkt_mb4 = _mm_add_epi16(pkt_mb4, in_port);
+		pkt_mb3 = _mm_add_epi16(pkt_mb3, in_port);
+
+		/* D.1 pkt 1,2 convert format from desc to pktmbuf */
+		pkt_mb2 = _mm_shuffle_epi8(descs[1], shuf_msk);
+		pkt_mb1 = _mm_shuffle_epi8(descs[0], shuf_msk);		
+
+		/* C.2 get 4 pkts staterr value  */
+		zero = _mm_xor_si128(dd_check, dd_check);
+		staterr = _mm_unpacklo_epi32(sterr_tmp1, sterr_tmp2);
+
+		/* D.3 copy final 3,4 data to rx_pkts */
+		_mm_storeu_si128((__m128i *)&(rx_pkts[pos+3]->pkt.data_len), pkt_mb4);
+		_mm_storeu_si128((__m128i *)&(rx_pkts[pos+2]->pkt.data_len), pkt_mb3);
+
+		/* D.2 pkt 1,2 set in_port/nb_seg and remove crc */
+		pkt_mb2 = _mm_add_epi16(pkt_mb2, in_port);
+		pkt_mb1 = _mm_add_epi16(pkt_mb1, in_port);
+
+		/* C.3 calc avaialbe number of desc */
+		staterr = _mm_and_si128(staterr, dd_check);
+		staterr = _mm_packs_epi32(staterr, zero);
+
+		/* D.3 copy final 1,2 data to rx_pkts */
+		_mm_storeu_si128((__m128i *)&(rx_pkts[pos+1]->pkt.data_len), pkt_mb2);
+		_mm_storeu_si128((__m128i *)&(rx_pkts[pos]->pkt.data_len), pkt_mb1);
+
+		/* C.4 calc avaialbe number of desc */
+		var = _mm_popcnt_u64(_mm_cvtsi128_si64(staterr));
+		nb_pkts_recd += var;
+		if (likely(var != RTE_IXGBE_DESCS_PER_LOOP))
+			break;
+	}
+
+	/* Update our internal tail pointer */
+	rxq->rx_tail = (uint16_t)(rxq->rx_tail + nb_pkts_recd);
+	rxq->rx_tail = (uint16_t)(rxq->rx_tail & (rxq->nb_rx_desc - 1));
+	rxq->rxrearm_nb = (uint16_t)(rxq->rxrearm_nb + nb_pkts_recd);
+
+	return nb_pkts_recd;
+}
+
+static inline void 
+vtx1(volatile union ixgbe_adv_tx_desc *txdp, 
+     struct rte_mbuf *pkt, __m128i flags)
+{
+	__m128i t0, t1, offset, ols, ba, ctl;
+ 
+	/* load buf_addr/buf_physaddr in t0 */
+	t0 = _mm_loadu_si128((__m128i *)&(pkt->buf_addr));
+	/* load data, ... pkt_len in t1 */
+	t1 = _mm_loadu_si128((__m128i *)&(pkt->pkt.data));
+		
+	/* calc offset = (data - buf_adr) */
+	offset = _mm_sub_epi64(t1, t0);
+
+	/* cmd_type_len: pkt_len |= DCMD_DTYP_FLAGS */
+	ctl = _mm_or_si128(t1, flags); 
+
+	/* reorder as buf_physaddr/buf_addr */
+	offset = _mm_shuffle_epi32(offset, 0x4E);
+
+	/* olinfo_stats: pkt_len << IXGBE_ADVTXD_PAYLEN_SHIFT */
+	ols = _mm_slli_epi32(t1, IXGBE_ADVTXD_PAYLEN_SHIFT);
+		
+	/* buffer_addr = buf_physaddr + offset */
+	ba = _mm_add_epi64(t0, offset);
+	
+	/* format cmd_type_len/olinfo_status */
+	ctl = _mm_unpackhi_epi32(ctl, ols);
+
+	/* format buf_physaddr/cmd_type_len */
+	ba = _mm_unpackhi_epi64(ba, ctl);
+
+	/* write desc */
+	_mm_store_si128((__m128i *)&txdp->read, ba);
+}
+
+static inline void 
+vtx(volatile union ixgbe_adv_tx_desc *txdp, 
+    struct rte_mbuf **pkt, uint16_t nb_pkts,  __m128i flags)
+{
+	int i;
+	for (i = 0; i < nb_pkts; ++i, ++txdp, ++pkt)
+		vtx1(txdp, *pkt, flags);
+}
+
+static inline int __attribute__((always_inline))
+ixgbe_tx_free_bufs(struct igb_tx_queue *txq)
+{
+	struct igb_tx_entry_v *txep;
+	struct igb_tx_entry_seq *txsp;
+	uint32_t status;
+	uint32_t n, k;
+#ifdef RTE_MBUF_SCATTER_GATHER
+	uint32_t i;
+	int nb_free = 0;
+	struct rte_mbuf *m, *free[RTE_IXGBE_TX_MAX_FREE_BUF_SZ];
+#endif
+
+	/* check DD bit on threshold descriptor */
+	status = txq->tx_ring[txq->tx_next_dd].wb.status;
+	if (! (status & IXGBE_ADVTXD_STAT_DD))
+		return 0;
+
+	n = txq->tx_rs_thresh;
+
+	/*
+	 * first buffer to free from S/W ring is at index
+	 * tx_next_dd - (tx_rs_thresh-1)
+	 */
+	txep = &((struct igb_tx_entry_v*)txq->sw_ring)[txq->tx_next_dd - (n - 1)];
+	txsp = &txq->sw_ring_seq[txq->tx_next_dd - (n - 1)];
+
+	while (n > 0) {
+		k = RTE_MIN(n, txsp[n-1].same_pool);
+#ifdef RTE_MBUF_SCATTER_GATHER
+		for (i = 0; i < k; i++) {
+			if((m = __rte_pktmbuf_prefree_seg((txep+n-k+i)->mbuf)) != NULL)
+			        free[nb_free++] = m;
+		}
+		rte_mempool_put_bulk((void*)txsp[n-1].pool, (void**)free, nb_free);
+#else
+		rte_mempool_put_bulk((void*)txsp[n-1].pool, (void**)(txep+n-k), k);
+#endif
+		n -= k;
+	}
+
+	/* buffers were freed, update counters */
+	txq->nb_tx_free = (uint16_t)(txq->nb_tx_free + txq->tx_rs_thresh);
+	txq->tx_next_dd = (uint16_t)(txq->tx_next_dd + txq->tx_rs_thresh);
+	if (txq->tx_next_dd >= txq->nb_tx_desc)
+		txq->tx_next_dd = (uint16_t)(txq->tx_rs_thresh - 1);
+
+	return txq->tx_rs_thresh;
+}
+
+static inline void __attribute__((always_inline))
+tx_backlog_entry(struct igb_tx_entry_v *txep, 
+		 struct igb_tx_entry_seq *txsp,
+		 struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
+{
+	int i;
+	for (i = 0; i < (int)nb_pkts; ++i) {
+		txep[i].mbuf = tx_pkts[i];
+		/* check and update sequence number */
+		txsp[i].pool = tx_pkts[i]->pool;
+		if (txsp[i-1].pool == tx_pkts[i]->pool)
+			txsp[i].same_pool = txsp[i-1].same_pool + 1;
+		else
+			txsp[i].same_pool = 1;
+	}
+}
+
+uint16_t
+ixgbe_xmit_pkts_vec(void *tx_queue, struct rte_mbuf **tx_pkts,
+		       uint16_t nb_pkts)
+{
+	struct igb_tx_queue *txq = (struct igb_tx_queue *)tx_queue;
+	volatile union ixgbe_adv_tx_desc *txdp;
+	struct igb_tx_entry_v *txep;
+	struct igb_tx_entry_seq *txsp;
+	uint16_t n, nb_commit, tx_id;
+	__m128i flags = _mm_set_epi32(DCMD_DTYP_FLAGS,0,0,0);
+	__m128i rs = _mm_set_epi32(IXGBE_ADVTXD_DCMD_RS|DCMD_DTYP_FLAGS,0,0,0);
+	int i;
+
+	if (unlikely(nb_pkts > RTE_IXGBE_VPMD_TX_BURST))
+		nb_pkts = RTE_IXGBE_VPMD_TX_BURST;
+
+	if (txq->nb_tx_free < txq->tx_free_thresh)
+		ixgbe_tx_free_bufs(txq);
+
+	nb_commit = nb_pkts = (uint16_t)RTE_MIN(txq->nb_tx_free, nb_pkts);
+	if (unlikely(nb_pkts == 0))
+		return 0;
+
+	tx_id = txq->tx_tail;
+	txdp = &txq->tx_ring[tx_id];
+	txep = &((struct igb_tx_entry_v*)txq->sw_ring)[tx_id];
+	txsp = &txq->sw_ring_seq[tx_id];
+
+	txq->nb_tx_free = (uint16_t)(txq->nb_tx_free - nb_pkts);
+
+	if (nb_commit >= (n = (uint16_t)(txq->nb_tx_desc - tx_id))) {
+
+		tx_backlog_entry(txep, txsp, tx_pkts, n);
+
+		for (i = 0; i < n - 1; ++i, ++tx_pkts, ++txdp)
+			vtx1(txdp, *tx_pkts, flags);
+
+		vtx1(txdp, *tx_pkts++, rs);
+
+		nb_commit = (uint16_t)(nb_commit - n);
+
+		tx_id = 0;
+		txq->tx_next_rs = (uint16_t)(txq->tx_rs_thresh - 1);
+
+		/* avoid reach the end of ring */
+		txdp = &(txq->tx_ring[tx_id]);
+		txep = &(((struct igb_tx_entry_v*)txq->sw_ring)[tx_id]);
+		txsp = &(txq->sw_ring_seq[tx_id]);
+	}
+
+	tx_backlog_entry(txep, txsp, tx_pkts, nb_commit);
+
+	vtx(txdp, tx_pkts, nb_commit, flags);
+
+	tx_id = (uint16_t)(tx_id + nb_commit);
+	if (tx_id > txq->tx_next_rs) {
+		txq->tx_ring[txq->tx_next_rs].read.cmd_type_len |=
+			rte_cpu_to_le_32(IXGBE_ADVTXD_DCMD_RS);
+		txq->tx_next_rs = (uint16_t)(txq->tx_next_rs +
+			txq->tx_rs_thresh);
+	}
+	
+	txq->tx_tail = tx_id;
+
+	IXGBE_PCI_REG_WRITE(txq->tdt_reg_addr, txq->tx_tail);
+
+	return nb_pkts;
+}
+
+static void
+ixgbe_tx_queue_release_mbufs(struct igb_tx_queue *txq)
+{
+	unsigned i;
+	struct igb_tx_entry_v *txe;
+	struct igb_tx_entry_seq *txs;
+	uint16_t nb_free, max_desc;
+
+	if (txq->sw_ring != NULL) {
+		/* release the used mbufs in sw_ring */
+		nb_free = txq->nb_tx_free; 
+		max_desc = (uint16_t)(txq->nb_tx_desc - 1);
+		for (i = txq->tx_next_dd - (txq->tx_rs_thresh - 1); 
+		     nb_free < max_desc && i != txq->tx_tail;
+		     i = (i + 1) & max_desc) {
+			txe = (struct igb_tx_entry_v*)&txq->sw_ring[i];			
+			if (txe->mbuf != NULL)
+				rte_pktmbuf_free_seg(txe->mbuf);
+		}
+		/* reset tx_entry */
+		for (i = 0; i < txq->nb_tx_desc; i++) {
+			txe = (struct igb_tx_entry_v*)&txq->sw_ring[i];
+			txe->mbuf = NULL;
+
+			txs = &txq->sw_ring_seq[i];
+			txs->pool = NULL;
+			txs->same_pool = 0;
+		}
+	}
+}
+
+static void
+ixgbe_tx_free_swring(struct igb_tx_queue *txq)
+{
+	if (txq == NULL)
+		return;
+ 
+	if (txq->sw_ring != NULL) {
+		rte_free((struct igb_rx_entry*)txq->sw_ring - 1);
+		txq->sw_ring = NULL;
+	}
+
+	if (txq->sw_ring_seq != NULL) {
+		rte_free(txq->sw_ring_seq - 1);
+		txq->sw_ring_seq = NULL;
+	}
+}
+
+static void
+ixgbe_reset_tx_queue(struct igb_tx_queue *txq)
+{
+	static const union ixgbe_adv_tx_desc zeroed_desc = { .read = {
+			.buffer_addr = 0}};
+	struct igb_tx_entry_v *txe = (struct igb_tx_entry_v*)txq->sw_ring;
+	struct igb_tx_entry_seq *txs = txq->sw_ring_seq;
+	uint16_t i;
+
+	/* Zero out HW ring memory */
+	for (i = 0; i < txq->nb_tx_desc; i++) {
+		txq->tx_ring[i] = zeroed_desc;
+	}
+
+	/* Initialize SW ring entries */
+	for (i = 0; i < txq->nb_tx_desc; i++) {
+		volatile union ixgbe_adv_tx_desc *txd = &txq->tx_ring[i];
+		txd->wb.status = IXGBE_TXD_STAT_DD;
+		txe[i].mbuf = NULL;
+		txs[i].pool = NULL;
+		txs[i].same_pool = 0;
+	}
+
+	txq->tx_next_dd = (uint16_t)(txq->tx_rs_thresh - 1);
+	txq->tx_next_rs = (uint16_t)(txq->tx_rs_thresh - 1);
+
+	txq->tx_tail = 0;
+	txq->nb_tx_used = 0;
+	/*
+	 * Always allow 1 descriptor to be un-allocated to avoid
+	 * a H/W race condition
+	 */
+	txq->last_desc_cleaned = (uint16_t)(txq->nb_tx_desc - 1);
+	txq->nb_tx_free = (uint16_t)(txq->nb_tx_desc - 1);
+	txq->ctx_curr = 0;
+	memset((void*)&txq->ctx_cache, 0,
+		IXGBE_CTX_NUM * sizeof(struct ixgbe_advctx_info));
+}
+
+static struct ixgbe_txq_ops vec_txq_ops = {
+	.release_mbufs = ixgbe_tx_queue_release_mbufs,
+	.free_swring = ixgbe_tx_free_swring,
+	.reset = ixgbe_reset_tx_queue,
+};
+
+int ixgbe_txq_vec_setup(struct igb_tx_queue *txq, 
+			unsigned int socket_id)
+{
+	uint16_t nb_desc;
+
+	if (txq->sw_ring == NULL)
+		return -1;
+
+	/* request addtional one entry for continous sequence check */
+	nb_desc = (uint16_t)(txq->nb_tx_desc + 1);
+
+	txq->sw_ring_seq = rte_zmalloc_socket("txq->sw_ring_seq",
+				sizeof(struct igb_tx_entry_seq) * nb_desc,
+				CACHE_LINE_SIZE, socket_id);
+	if (txq->sw_ring_seq == NULL) 
+		return -1;
+
+
+	/* leave the first one for overflow */
+	txq->sw_ring = (struct igb_tx_entry*)
+		((struct igb_tx_entry_v*)txq->sw_ring + 1);
+	txq->sw_ring_seq += 1;
+	txq->ops = &vec_txq_ops;
+
+	return 0;
+}
+
+int ixgbe_rxq_vec_setup(struct igb_rx_queue *rxq,
+			__rte_unused unsigned int socket_id)
+{
+	rxq->misc_info = 
+		_mm_set_epi16(
+			0,0,0,0,0,
+			(uint16_t)-rxq->crc_len, /* sub crc on pkt_len */
+			(uint16_t)(rxq->port_id << 8 | 1),
+			/* 8b port_id and 8b nb_seg*/ 
+			(uint16_t)-rxq->crc_len  /* sub crc on data_len */
+			); 
+	
+	return 0;
+}
+
+int ixgbe_rx_vec_condition_check(struct rte_eth_dev *dev)
+{
+#ifndef RTE_LIBRTE_IEEE1588
+	struct rte_eth_rxmode *rxmode = &dev->data->dev_conf.rxmode;
+	struct rte_fdir_conf *fconf = &dev->data->dev_conf.fdir_conf;
+
+#ifdef RTE_IXGBE_RX_OLFLAGS_DISABLE
+	/* whithout rx ol_flags, no VP flag report */
+	if (rxmode->hw_vlan_strip != 0 ||
+	    rxmode->hw_vlan_extend != 0)
+		return -1;
+#endif
+
+	/* no fdir support */
+	if (fconf->mode != RTE_FDIR_MODE_NONE)
+		return -1;
+
+	/* 
+	 * - no csum error report support
+	 * - no header split support
+	 */
+	if (rxmode->hw_ip_checksum == 1 ||
+	    rxmode->header_split == 1)
+		return -1;
+	
+	return 0;
+#else
+	return -1;
+#endif
+}
-- 
1.7.0.7

^ permalink raw reply	[flat|nested] 5+ messages in thread

* [dpdk-dev] [PATCH 2/2] 10G PMD: enable vector PMD compile for 64b linux
  2014-05-12  9:23 [dpdk-dev] [PATCH 0/2] 10G PMD: vectorized RX and TX functions Bruce Richardson
  2014-05-12  9:23 ` [dpdk-dev] [PATCH 1/2] 10G PMD: New vectorized functions for RX/TX Bruce Richardson
@ 2014-05-12  9:23 ` Bruce Richardson
  2014-05-12  9:23 ` Bruce Richardson
  2 siblings, 0 replies; 5+ messages in thread
From: Bruce Richardson @ 2014-05-12  9:23 UTC (permalink / raw)
  To: dev

Add in and set the necessary compile-time options to enable the
vector PMD to be used in 64-bit linux.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 config/defconfig_x86_64-default-linuxapp-gcc |    2 ++
 config/defconfig_x86_64-default-linuxapp-icc |    2 ++
 2 files changed, 4 insertions(+), 0 deletions(-)

diff --git a/config/defconfig_x86_64-default-linuxapp-gcc b/config/defconfig_x86_64-default-linuxapp-gcc
index f11ffbf..00c7c73 100644
--- a/config/defconfig_x86_64-default-linuxapp-gcc
+++ b/config/defconfig_x86_64-default-linuxapp-gcc
@@ -177,6 +177,8 @@ CONFIG_RTE_LIBRTE_IXGBE_DEBUG_DRIVER=n
 CONFIG_RTE_LIBRTE_IXGBE_PF_DISABLE_STRIP_CRC=n
 CONFIG_RTE_LIBRTE_IXGBE_RX_ALLOW_BULK_ALLOC=y
 CONFIG_RTE_LIBRTE_IXGBE_ALLOW_UNSUPPORTED_SFP=n
+CONFIG_RTE_IXGBE_INC_VECTOR=y
+CONFIG_RTE_IXGBE_RX_OLFLAGS_DISABLE=n
 
 #
 # Compile burst-oriented VIRTIO PMD driver
diff --git a/config/defconfig_x86_64-default-linuxapp-icc b/config/defconfig_x86_64-default-linuxapp-icc
index 4eaca4c..e166b29 100644
--- a/config/defconfig_x86_64-default-linuxapp-icc
+++ b/config/defconfig_x86_64-default-linuxapp-icc
@@ -177,6 +177,8 @@ CONFIG_RTE_LIBRTE_IXGBE_DEBUG_DRIVER=n
 CONFIG_RTE_LIBRTE_IXGBE_PF_DISABLE_STRIP_CRC=n
 CONFIG_RTE_LIBRTE_IXGBE_RX_ALLOW_BULK_ALLOC=y
 CONFIG_RTE_LIBRTE_IXGBE_ALLOW_UNSUPPORTED_SFP=n
+CONFIG_RTE_IXGBE_INC_VECTOR=y
+CONFIG_RTE_IXGBE_RX_OLFLAGS_DISABLE=n
 
 #
 # Compile burst-oriented VIRTIO PMD driver
-- 
1.7.0.7

^ permalink raw reply	[flat|nested] 5+ messages in thread

* [dpdk-dev] [PATCH 2/2] 10G PMD: enable vector PMD compile for 64b linux
  2014-05-12  9:23 [dpdk-dev] [PATCH 0/2] 10G PMD: vectorized RX and TX functions Bruce Richardson
  2014-05-12  9:23 ` [dpdk-dev] [PATCH 1/2] 10G PMD: New vectorized functions for RX/TX Bruce Richardson
  2014-05-12  9:23 ` [dpdk-dev] [PATCH 2/2] 10G PMD: enable vector PMD compile for 64b linux Bruce Richardson
@ 2014-05-12  9:23 ` Bruce Richardson
  2014-05-12  9:26   ` Richardson, Bruce
  2 siblings, 1 reply; 5+ messages in thread
From: Bruce Richardson @ 2014-05-12  9:23 UTC (permalink / raw)
  To: dev

 ifeq ($(CONFIG_RTE_APP_TEST),y)
 SRCS-$(CONFIG_RTE_LIBRTE_ACL) += test_acl.c
diff --git a/app/test/commands.c b/app/test/commands.c
index a153026..5978cec 100644
--- a/app/test/commands.c
+++ b/app/test/commands.c
@@ -178,6 +178,8 @@ static void cmd_autotest_parsed(void *parsed_result,
 		ret = test_common();
 	if (!strcmp(res->autotest, "ivshmem_autotest"))
 		ret = test_ivshmem();
+	if (!strcmp(res->autotest, "distributor_autotest"))
+		ret = test_distributor();
 #ifdef RTE_LIBRTE_PMD_RING
 	if (!strcmp(res->autotest, "ring_pmd_autotest"))
 		ret = test_pmd_ring();
@@ -234,7 +236,7 @@ cmdline_parse_token_string_t cmd_autotest_autotest =
 #ifdef RTE_LIBRTE_KVARGS
 			"kvargs_autotest#"
 #endif
-			"common_autotest");
+			"common_autotest#distributor_autotest");
 
 cmdline_parse_inst_t cmd_autotest = {
 	.f = cmd_autotest_parsed,  /* function to call */
diff --git a/app/test/test.h b/app/test/test.h
index bc001b9..4621fc9 100644
--- a/app/test/test.h
+++ b/app/test/test.h
@@ -92,6 +92,7 @@ int test_power(void);
 int test_common(void);
 int test_pmd_ring(void);
 int test_ivshmem(void);
+int test_distributor(void);
 int test_kvargs(void);
 
 int test_pci_run;
diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
new file mode 100644
index 0000000..0456478
--- /dev/null
+++ b/app/test/test_distributor.c
@@ -0,0 +1,261 @@
+/*
+ * <COPYRIGHT_TAG>
+ */
+
+#include "test.h"
+
+#ifdef RTE_LIBRTE_DISTRIBUTOR
+#include <unistd.h>
+#include <string.h>
+#include <rte_cycles.h>
+#include <rte_distributor.h>
+#include "wmmintrin.h"
+
+#define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
+
+static volatile int quit = 0;
+static volatile unsigned worker_idx;
+
+struct worker_stats{
+	volatile unsigned handled_packets;
+} __rte_cache_aligned;
+struct worker_stats worker_stats[RTE_MAX_LCORE];
+
+#define do_big_pause() do { \
+		__m128i a = {0}; \
+		volatile __m128i res; \
+		res = _mm_clmulepi64_si128(a, res, 0); \
+		res = _mm_clmulepi64_si128(a, res, 0); \
+		res = _mm_clmulepi64_si128(a, res, 0); \
+		res = _mm_clmulepi64_si128(a, res, 0); \
+		RTE_SET_USED(res); \
+		/*rte_pause();*/ \
+} while(0)
+
+#define do_pause() do { \
+	rte_pause(); \
+} while(0)
+
+static void
+flip_bit(volatile uint64_t *arg)
+{
+	uint64_t old_val = 0;
+	while (old_val != 2) {
+		while(!*arg)
+			do_big_pause();
+		old_val = *arg;
+		*arg = 0;
+	}
+}
+
+static void
+time_cache_line_switch(void)
+{
+	/* allocate a full cache line for data, we use only first byte of it */
+	uint64_t data[CACHE_LINE_SIZE / sizeof(uint64_t)];
+
+	unsigned i, slaveid = rte_get_next_lcore(rte_lcore_id(), 0, 0);
+	volatile uint64_t *pdata = &data[0];
+	*pdata = 1;
+	rte_eal_remote_launch((lcore_function_t *)flip_bit, &data[0], slaveid);
+	while (*pdata)
+		rte_pause();
+
+	const uint64_t start_time = rte_rdtsc();
+	for (i = 0; i < (1<< ITER_POWER); i++) {
+		while (*pdata)
+			do_big_pause();
+		*pdata = 1;
+	}
+	const uint64_t end_time = rte_rdtsc();
+
+	while (*pdata)
+		do_pause();
+	*pdata = 2;
+	rte_eal_wait_lcore(slaveid);
+	printf("Time for %u iterations = %"PRIu64" ticks\n", (1<<ITER_POWER),
+			end_time-start_time);
+	printf("Ticks per iteration = %"PRIu64"\n",
+			(end_time-start_time) >> ITER_POWER);
+}
+
+static inline unsigned
+total_packet_count(void)
+{
+	unsigned i, count = 0;
+	for (i = 0; i < worker_idx; i++)
+		count += worker_stats[i].handled_packets;
+	return count;
+}
+
+static inline void
+clear_packet_count(void)
+{
+	memset(&worker_stats, 0, sizeof(worker_stats));
+}
+
+static int
+handle_work(void *arg)
+{
+	struct rte_mbuf *pkt = NULL;
+	struct rte_distributor *d = arg;
+	unsigned count = 0;
+	unsigned id = __sync_fetch_and_add(&worker_idx, 1);
+
+	printf("Worker %u starting\n", worker_idx);
+	pkt = rte_distributor_get_pkt(d, id);
+	while (!quit) {
+		worker_stats[id].handled_packets++;
+		pkt = rte_distributor_get_next_pkt(d, id, pkt);
+	}
+	rte_distributor_return_pkt(d, id, pkt);
+	printf("Worker %u handled %u pkts\n", id, count);
+	return 0;
+}
+
+static inline int
+perf_test(struct rte_distributor *d, struct rte_mempool *p)
+{
+#define BURST 32
+	unsigned i;
+	uint64_t start, end;
+	struct rte_mbuf *bufs[BURST];
+
+	clear_packet_count();
+	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+		printf("Error getting mbufs from pool\n");
+		return -1;
+	}
+	/* ensure we have different hash value for each pkt */
+	for (i = 0; i < 32; i++)
+		bufs[i]->pkt.hash.rss = i;
+
+	start = rte_rdtsc();
+	for (i = 0; i < (1<<ITER_POWER); i++)
+		rte_distributor_process(d, bufs, BURST);
+	end = rte_rdtsc();
+
+	do {
+		usleep(100);
+		rte_distributor_process(d, NULL, 0);
+	} while (total_packet_count() < (BURST << ITER_POWER));
+
+	printf("Time per burst:  %"PRIu64"\n", (end - start) >> ITER_POWER);
+	printf("Time per packet: %"PRIu64"\n", ((end - start) >> ITER_POWER)/BURST);
+	rte_mempool_put_bulk(p, (void *)bufs, BURST);
+
+	for (i = 0; i < rte_lcore_count() - 1; i++)
+		printf("Worker %u handled %u packets\n", i,
+				worker_stats[i].handled_packets);
+	printf("Total packets: %u (%x)\n", total_packet_count(),
+			total_packet_count());
+	printf("Perf test done\n");
+
+	return 0;
+}
+
+static int
+sanity_test(struct rte_distributor *d, struct rte_mempool *p)
+{
+	struct rte_mbuf *bufs[32];
+	unsigned i;
+
+	clear_packet_count();
+	if (rte_mempool_get_bulk(p, (void *)bufs, 32) != 0) {
+		printf("Error getting mbufs from pool\n");
+		return -1;
+	}
+
+	/* now set all hash values in all buffers to zero, so all pkts go to the
+	 * one worker thread */
+	for (i = 0; i < 32; i++)
+		bufs[i]->pkt.hash.rss = 0;
+
+	rte_distributor_process(d, bufs, 32);
+	do {
+		usleep(100);
+		rte_distributor_process(d, NULL, 0);
+	} while(total_packet_count() < 32);
+
+	for (i = 0; i < rte_lcore_count() - 1; i++)
+		printf("Worker %u handled %u packets\n", i,
+				worker_stats[i].handled_packets);
+	printf("Sanity test with all zero hashes done.\n");
+	if (worker_stats[0].handled_packets != 32)
+		return -1;
+
+	/* give a different hash value to each packet, so load gets distributed */
+	clear_packet_count();
+	for (i = 0; i < 32; i++)
+		bufs[i]->pkt.hash.rss = i;
+
+	rte_distributor_process(d, bufs, 32);
+	do {
+		usleep(100);
+		rte_distributor_process(d, NULL, 0);
+	} while(total_packet_count() < 32);
+
+	for (i = 0; i < rte_lcore_count() - 1; i++)
+		printf("Worker %u handled %u packets\n", i,
+				worker_stats[i].handled_packets);
+	printf("Sanity test with non-zero hashes done\n");
+
+	rte_mempool_put_bulk(p, (void *)bufs, 32);
+	return 0;
+}
+
+#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
+
+int
+test_distributor(void)
+{
+	static struct rte_distributor *d = NULL;
+	static struct rte_mempool *p = NULL;
+
+	if (rte_lcore_count() < 2) {
+		printf("ERROR: not enough cores to test distributor\n");
+		return -1;
+	}
+
+	/* first time how long it takes to round-trip a cache line */
+	time_cache_line_switch();
+
+	if (d == NULL) {
+		d = rte_distributor_create("Test_distributor", rte_socket_id(),
+				rte_lcore_count() - 1, RTE_DISTRIBUTOR_NOFLAGS);
+		if (d == NULL) {
+			printf("Error creating distributor\n");
+			return -1;
+		}
+		rte_eal_mp_remote_launch(handle_work, d, SKIP_MASTER);
+	}
+
+	if (p == NULL) {
+		p = rte_mempool_create("MBUF_POOL", 511, MBUF_SIZE, 64,
+				sizeof(struct rte_pktmbuf_pool_private),
+				rte_pktmbuf_pool_init, NULL,
+				rte_pktmbuf_init, NULL,
+				rte_socket_id(), 0);
+		if (p == NULL) {
+			printf("Error creating mempool\n");
+			return -1;
+		}
+	}
+
+	sanity_test(d, p);
+	perf_test(d,p);
+	return 0;
+}
+
+#else
+
+#include <stdio.h>
+
+int
+test_distributor(void)
+{
+	printf("Distributor is not enabled in configuration\n");
+	return 0;
+}
+
+#endif
diff --git a/config/defconfig_i686-default-linuxapp-gcc b/config/defconfig_i686-default-linuxapp-gcc
index 14bd3d1..5b4261e 100644
--- a/config/defconfig_i686-default-linuxapp-gcc
+++ b/config/defconfig_i686-default-linuxapp-gcc
@@ -335,3 +335,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
 #
 CONFIG_RTE_NIC_BYPASS=n
 
+#
+# Compile the distributor library
+#
+CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
+
diff --git a/config/defconfig_i686-default-linuxapp-icc b/config/defconfig_i686-default-linuxapp-icc
index ec3386e..d1d4aeb 100644
--- a/config/defconfig_i686-default-linuxapp-icc
+++ b/config/defconfig_i686-default-linuxapp-icc
@@ -334,3 +334,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
 #
 CONFIG_RTE_NIC_BYPASS=n
 
+#
+# Compile the distributor library
+#
+CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
+
diff --git a/config/defconfig_x86_64-default-bsdapp-gcc b/config/defconfig_x86_64-default-bsdapp-gcc
index d960e1d..329920e 100644
--- a/config/defconfig_x86_64-default-bsdapp-gcc
+++ b/config/defconfig_x86_64-default-bsdapp-gcc
@@ -300,3 +300,9 @@ CONFIG_RTE_APP_TEST=y
 CONFIG_RTE_TEST_PMD=y
 CONFIG_RTE_TEST_PMD_RECORD_CORE_CYCLES=n
 CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
+
+#
+# Compile the distributor library
+#
+CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
+
diff --git a/config/defconfig_x86_64-default-linuxapp-gcc b/config/defconfig_x86_64-default-linuxapp-gcc
index f11ffbf..772a6b3 100644
--- a/config/defconfig_x86_64-default-linuxapp-gcc
+++ b/config/defconfig_x86_64-default-linuxapp-gcc
@@ -337,3 +337,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
 #
 CONFIG_RTE_NIC_BYPASS=n
 
+#
+# Compile the distributor library
+#
+CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
+
diff --git a/config/defconfig_x86_64-default-linuxapp-icc b/config/defconfig_x86_64-default-linuxapp-icc
index 4eaca4c..04affc8 100644
--- a/config/defconfig_x86_64-default-linuxapp-icc
+++ b/config/defconfig_x86_64-default-linuxapp-icc
@@ -333,3 +333,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
 #
 CONFIG_RTE_NIC_BYPASS=n
 
+#
+# Compile the distributor library
+#
+CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
+
diff --git a/config/defconfig_x86_64-ivshmem-linuxapp-gcc b/config/defconfig_x86_64-ivshmem-linuxapp-gcc
index 2f55a69..0002dca 100644
--- a/config/defconfig_x86_64-ivshmem-linuxapp-gcc
+++ b/config/defconfig_x86_64-ivshmem-linuxapp-gcc
@@ -46,4 +46,4 @@ CONFIG_RTE_LIBRTE_IVSHMEM_MAX_ENTRIES=128
 CONFIG_RTE_LIBRTE_IVSHMEM_MAX_METADATA_FILES=32
 
 # Set EAL to single file segments
-CONFIG_RTE_EAL_SINGLE_FILE_SEGMENTS=y
\ No newline at end of file
+CONFIG_RTE_EAL_SINGLE_FILE_SEGMENTS=y
diff --git a/examples/distributor_app/Makefile b/examples/distributor_app/Makefile
new file mode 100644
index 0000000..9a1ebd3
--- /dev/null
+++ b/examples/distributor_app/Makefile
@@ -0,0 +1,28 @@
+# <COPYRIGHT_TAG>
+
+ifeq ($(RTE_SDK),)
+$(error "Please define RTE_SDK environment variable")
+endif
+
+# Default target, can be overriden by command line or environment
+RTE_TARGET ?= x86_64-default-linuxapp-gcc
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# binary name
+APP = basicfwd
+
+# all source are stored in SRCS-y
+SRCS-y := basicfwd.c
+
+CFLAGS += $(WERROR_FLAGS)
+
+# workaround for a gcc bug with noreturn attribute
+# http://gcc.gnu.org/bugzilla/show_bug.cgi?id=12603
+ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y)
+CFLAGS_main.o += -Wno-return-type
+endif
+
+EXTRA_CFLAGS += -O3 -g -Wfatal-errors
+
+include $(RTE_SDK)/mk/rte.extapp.mk
diff --git a/examples/distributor_app/basicfwd.c b/examples/distributor_app/basicfwd.c
new file mode 100644
index 0000000..7662d91
--- /dev/null
+++ b/examples/distributor_app/basicfwd.c
@@ -0,0 +1,248 @@
+/*-
+ * <COPYRIGHT_TAG>
+ */
+
+#include <stdint.h>
+#include <inttypes.h>
+#include <unistd.h>
+
+#include <rte_eal.h>
+#include <rte_ethdev.h>
+#include <rte_cycles.h>
+#include <rte_distributor.h>
+#include "basicfwd.h"
+
+#define RX_RING_SIZE 128
+#define RX_FREE_THRESH 32
+#define RX_PTHRESH 8
+#define RX_HTHRESH 8
+#define RX_WTHRESH 0
+
+#define TX_RING_SIZE 512
+#define TX_FREE_THRESH 32
+#define TX_PTHRESH 32
+#define TX_HTHRESH 0
+#define TX_WTHRESH 0
+#define TX_RSBIT_THRESH 32
+#define TX_Q_FLAGS (ETH_TXQ_FLAGS_NOMULTSEGS | ETH_TXQ_FLAGS_NOVLANOFFL |\
+	ETH_TXQ_FLAGS_NOXSUMSCTP | ETH_TXQ_FLAGS_NOXSUMUDP | \
+	ETH_TXQ_FLAGS_NOXSUMTCP)
+
+#define NUM_MBUFS 8191
+#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
+#define MBUF_CACHE_SIZE 250
+#define BURST_SIZE 32
+
+static struct rte_eth_conf port_conf_default = {
+	.rxmode = {
+		.mq_mode = ETH_MQ_RX_RSS,
+		.max_rx_pkt_len = ETHER_MAX_LEN,
+		.split_hdr_size = 0,
+		.header_split   = 0, /**< Header Split disabled */
+		.hw_ip_checksum = 0, /**< IP checksum offload enabled */
+		.hw_vlan_filter = 0, /**< VLAN filtering disabled */
+		.jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
+		.hw_strip_crc   = 0, /**< CRC stripped by hardware */
+	},
+	.txmode = {
+		.mq_mode = ETH_MQ_TX_NONE,
+	},
+	.lpbk_mode = 0,
+	.rx_adv_conf = {
+			.rss_conf = {
+				.rss_hf = ETH_RSS_IPV4 | ETH_RSS_IPV6 | ETH_RSS_IPV4_TCP |
+					ETH_RSS_IPV4_UDP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
+			}
+	},
+};
+
+static const struct rte_eth_rxconf rx_conf_default = {
+	.rx_thresh = {
+		.pthresh = RX_PTHRESH,
+		.hthresh = RX_HTHRESH,
+		.wthresh = RX_WTHRESH,
+	},
+	.rx_free_thresh = RX_FREE_THRESH,
+	.rx_drop_en = 0,
+};
+
+static struct rte_eth_txconf tx_conf_default = {
+	.tx_thresh = {
+		.pthresh = TX_PTHRESH,
+		.hthresh = TX_HTHRESH,
+		.wthresh = TX_WTHRESH,
+	},
+	.tx_free_thresh = TX_FREE_THRESH,
+	.tx_rs_thresh = TX_RSBIT_THRESH,
+	.txq_flags = TX_Q_FLAGS
+
+};
+
+#define NOFLAGS 0
+
+
+/*
+ * Initialises a given port using global settings and with the rx buffers
+ * coming from the mbuf_pool passed as parameter
+ */
+static inline int
+port_init(uint8_t port, struct rte_mempool *mbuf_pool)
+{
+	struct rte_eth_conf port_conf = port_conf_default;
+	const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1;
+	int retval;
+	uint16_t q;
+
+	if (port >= rte_eth_dev_count())
+		return -1;
+
+	retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf);
+	if (retval != 0)
+		return retval;
+
+	for (q = 0; q < rxRings; q ++) {
+		retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE,
+						rte_eth_dev_socket_id(port), &rx_conf_default,
+						mbuf_pool);
+		if (retval < 0)
+			return retval;
+	}
+
+	for (q = 0; q < txRings; q ++) {
+		retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE,
+						rte_eth_dev_socket_id(port), &tx_conf_default);
+		if (retval < 0)
+			return retval;
+	}
+
+	retval  = rte_eth_dev_start(port);
+	if (retval < 0)
+		return retval;
+
+	struct rte_eth_link link;
+	rte_eth_link_get_nowait(port, &link);
+	if (!link.link_status) {
+		sleep(1);
+		rte_eth_link_get_nowait(port, &link);
+	}
+
+	if (!link.link_status) {
+		printf("Link down on port %"PRIu8"\n", port);
+		return 0;
+	}
+
+	struct ether_addr addr;
+	rte_eth_macaddr_get(port, &addr);
+	printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
+			" %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
+			(unsigned)port,
+			addr.addr_bytes[0], addr.addr_bytes[1], addr.addr_bytes[2],
+			addr.addr_bytes[3], addr.addr_bytes[4], addr.addr_bytes[5]);
+
+	rte_eth_promiscuous_enable(port);
+
+	return 0;
+}
+
+/*
+ * Main thread that does the work, reading from INPUT_PORT
+ * and writing to OUTPUT_PORT
+ */
+static  __attribute__((noreturn)) void
+lcore_main(struct rte_distributor *d)
+{
+	const uint8_t nb_ports = rte_eth_dev_count();
+	uint8_t port;
+	for (port = 0; port < nb_ports; port++)
+		if (rte_eth_dev_socket_id(port) > 0 && 
+				rte_eth_dev_socket_id(port) != (int)rte_socket_id())
+			printf("WARNING, port %u is on remote NUMA node to polling thread."
+					"\n\tPerformance will not be optimal.\n", port);
+
+	printf("\nCore %u distributing packets. [Ctrl+C to quit]\n", rte_lcore_id());
+	for(;;) {
+		for (port = 0; port < nb_ports; port++) {
+			struct rte_mbuf *bufs[BURST_SIZE];
+			const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, BURST_SIZE);
+			if (unlikely(nb_rx == 0))
+				continue;
+
+			rte_distributor_process(d, bufs, nb_rx);
+		}
+	}
+}
+
+static __attribute__((noreturn)) void
+tx_main(struct rte_distributor *d)
+{
+	static volatile unsigned num_workers = 0;
+	struct rte_mbuf *to_send[BURST_SIZE];
+	unsigned nb_to_send = 0;
+	unsigned port_id = 0;
+	unsigned id = __sync_fetch_and_add(&num_workers, 1);
+
+	struct rte_mbuf *pkt = rte_distributor_get_pkt(d, id);
+	for (;;) {
+		to_send[nb_to_send++] = pkt;
+		if (nb_to_send == BURST_SIZE) {
+			rte_eth_tx_burst(port_id++, id, to_send, BURST_SIZE);
+			nb_to_send = 0;
+			if (port_id == rte_eth_dev_count())
+				port_id = 0;
+		}
+		pkt = rte_distributor_get_next_pkt(d, id, pkt);
+	}
+}
+
+/* Main function, does initialisation and calls the per-lcore functions */
+int
+MAIN(int argc, char *argv[])
+{
+	struct rte_mempool *mbuf_pool;
+	struct rte_distributor *d;
+	unsigned nb_ports;
+	uint8_t portid;
+
+	/* init EAL */
+	int ret = rte_eal_init(argc, argv);
+	if (ret < 0)
+		rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
+	argc -= ret;
+	argv += ret;
+
+	if (rte_ixgbe_pmd_init() != 0 ||
+			rte_eal_pci_probe() != 0)
+		rte_exit(EXIT_FAILURE, "Error with NIC driver initialization\n");
+	
+	nb_ports = rte_eth_dev_count();
+	if (nb_ports < 2 || (nb_ports & 1))
+		rte_exit(EXIT_FAILURE, "Error: number of ports must be even\n");
+
+	mbuf_pool = rte_mempool_create("MBUF_POOL", NUM_MBUFS * nb_ports,
+				       MBUF_SIZE, MBUF_CACHE_SIZE,
+				       sizeof(struct rte_pktmbuf_pool_private),
+				       rte_pktmbuf_pool_init, NULL,
+				       rte_pktmbuf_init, NULL,
+				       rte_socket_id(), 0);
+	if (mbuf_pool == NULL)
+		rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");
+
+	/* initialize all ports */
+	for (portid = 0; portid < nb_ports; portid++)
+		if (port_init(portid, mbuf_pool) != 0) 
+			rte_exit(EXIT_FAILURE, "Cannot initialize port %"PRIu8"\n", portid);
+
+	if (rte_lcore_count() == 1)
+		rte_exit(EXIT_FAILURE, "Error, need worker cores\n");
+
+	d = rte_distributor_create("PKT_DIST", rte_socket_id(),
+			rte_lcore_count() - 1, NOFLAGS);
+	if (d == NULL)
+		rte_exit(EXIT_FAILURE, "Cannot create distributor\n");
+
+	rte_eal_mp_remote_launch((lcore_function_t *)tx_main, d, SKIP_MASTER);
+	/* call lcore_main on master core only */
+	lcore_main(d);
+	return 0;
+}
+
diff --git a/examples/distributor_app/basicfwd.h b/examples/distributor_app/basicfwd.h
new file mode 100644
index 0000000..4ff20bc
--- /dev/null
+++ b/examples/distributor_app/basicfwd.h
@@ -0,0 +1,17 @@
+/*-
+ * <COPYRIGHT_TAG>
+ */
+
+#ifndef _MAIN_H_
+#define _MAIN_H_
+
+
+#ifdef RTE_EXEC_ENV_BAREMETAL
+#define MAIN _main
+#else
+#define MAIN main
+#endif
+
+int MAIN( int argc, char *argv[] );
+
+#endif /* ifndef _MAIN_H_ */
diff --git a/lib/Makefile b/lib/Makefile
index b92b392..5a0b10f 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -55,6 +55,7 @@ DIRS-$(CONFIG_RTE_LIBRTE_METER) += librte_meter
 DIRS-$(CONFIG_RTE_LIBRTE_SCHED) += librte_sched
 DIRS-$(CONFIG_RTE_LIBRTE_ACL) += librte_acl
 DIRS-$(CONFIG_RTE_LIBRTE_KVARGS) += librte_kvargs
+DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += librte_distributor
 
 ifeq ($(CONFIG_RTE_EXEC_ENV_LINUXAPP),y)
 DIRS-$(CONFIG_RTE_LIBRTE_KNI) += librte_kni
diff --git a/lib/librte_distributor/Makefile b/lib/librte_distributor/Makefile
new file mode 100644
index 0000000..794dd46
--- /dev/null
+++ b/lib/librte_distributor/Makefile
@@ -0,0 +1,21 @@
+# <COPYRIGHT_TAG>
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# library name
+LIB = librte_distributor.a
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
+
+# all source are stored in SRCS-y
+SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) := rte_distributor.c
+
+# install this header file
+SYMLINK-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR)-include := rte_distributor.h
+
+# this lib needs eal
+DEPDIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += lib/librte_eal
+DEPDIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += lib/librte_mbuf
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c
new file mode 100644
index 0000000..f7aea9a
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor.c
@@ -0,0 +1,201 @@
+/*
+ * <COPYRIGHT_TAG>
+ */
+
+#include <stdio.h>
+#include <sys/queue.h>
+#include <x86intrin.h>
+#include <rte_mbuf.h>
+#include <rte_memzone.h>
+#include <rte_errno.h>
+#include <rte_string_fns.h>
+#include <rte_tailq.h>
+#include <rte_eal_memconfig.h>
+#include "rte_distributor.h"
+
+#define NO_FLAGS 0
+#define DISTRIBUTOR_PREFIX "DT_"
+
+#define RTE_DISTRIBUTOR_FLAG_BITS 4
+#define RTE_DISTRIBUTOR_NO_BUF 0
+/* we will use the top four bits of pointer for flags */
+#define RTE_DISTRIBUTOR_GET_BUF (1)
+#define RTE_DISTRIBUTOR_RETURN_BUF (2)
+
+#define RTE_DISTRIBUTOR_BACKLOG_SIZE 8
+#define RTE_DISTRIBUTOR_BACKLOG_MASK 7
+
+#define RTE_DISTRIBUTOR_FLAGS_MASK (0x0F)
+
+union rte_distributor_buffer {
+	volatile int64_t bufptr64;
+	char pad[CACHE_LINE_SIZE*3];
+} __rte_cache_aligned;
+
+struct rte_distributor_backlog {
+	int64_t pkts[RTE_DISTRIBUTOR_BACKLOG_SIZE];
+	unsigned start;
+	unsigned count;
+};
+
+struct rte_distributor {
+	TAILQ_ENTRY(rte_distributor) next;    /**< Next in list. */
+
+	char name[RTE_DISTRIBUTOR_NAMESIZE];   /**< Name of the ring. */
+	unsigned flags;                       /**< Flags supplied at creation. */
+	unsigned num_workers;                 /**< Number of workers polling */
+
+	uint32_t in_flight_ids[RTE_MAX_LCORE];
+	struct rte_distributor_backlog backlog[RTE_MAX_LCORE];
+
+	union rte_distributor_buffer buffers[RTE_MAX_LCORE];
+};
+
+TAILQ_HEAD(rte_distributor_list, rte_distributor);
+
+static int
+add_to_backlog(struct rte_distributor_backlog *bl, int64_t item)
+{
+	if (bl->count == RTE_DISTRIBUTOR_BACKLOG_SIZE)
+		return -1;
+
+	bl->pkts[(bl->start + bl->count++) &  (RTE_DISTRIBUTOR_BACKLOG_MASK)] = item;
+	return 0;
+}
+
+static int64_t
+backlog_pop(struct rte_distributor_backlog *bl)
+{
+	bl->count--;
+	return bl->pkts[bl->start++ & RTE_DISTRIBUTOR_BACKLOG_MASK];
+}
+
+int
+rte_distributor_process(struct rte_distributor *d,
+		struct rte_mbuf **mbufs, const unsigned num_mbufs)
+{
+	unsigned next_idx = 0;
+	unsigned worker_offset = 0;
+
+	while (next_idx < num_mbufs) {
+		int64_t data = d->buffers[worker_offset].bufptr64;
+		if (data & RTE_DISTRIBUTOR_GET_BUF) {
+
+			if (d->backlog[worker_offset].count)
+				d->buffers[worker_offset].bufptr64 =
+						backlog_pop(&d->backlog[worker_offset]);
+
+			else {
+				struct rte_mbuf *next_mb = mbufs[next_idx++];
+				int64_t next_value = ((uintptr_t)next_mb << RTE_DISTRIBUTOR_FLAG_BITS);
+
+				/* note signed variable - arithmetic shift */
+				int64_t oldbuf = data >> RTE_DISTRIBUTOR_FLAG_BITS;
+				if (oldbuf)
+					d->in_flight_ids[worker_offset] = 0;
+
+				uint32_t match = 0, newid = (next_mb->pkt.hash.rss | 1);
+				unsigned i;
+				for (i = 0; i < d->num_workers; i++)
+					match |= (!(d->in_flight_ids[i] ^ newid) << i);
+				if (!match) {
+					d->buffers[worker_offset].bufptr64 = next_value;
+					d->in_flight_ids[worker_offset] = newid;
+				} else {
+					unsigned worker = __builtin_ctz(match);
+					if (add_to_backlog(&d->backlog[worker], next_value) < 0)
+						next_idx--;
+				}
+			}
+		}
+		if (++worker_offset == d->num_workers)
+			worker_offset = 0;
+	}
+	/* to finish, check all workers for backlog and schedule work for them
+	 * if they are ready */
+	for (worker_offset = 0; worker_offset < d->num_workers; worker_offset++)
+		if (d->backlog[worker_offset].count &&
+				(d->buffers[worker_offset].bufptr64 & RTE_DISTRIBUTOR_GET_BUF))
+			d->buffers[worker_offset].bufptr64 =
+									backlog_pop(&d->backlog[worker_offset]);
+
+	return num_mbufs;
+}
+
+struct rte_mbuf *
+rte_distributor_get_pkt(struct rte_distributor *d,
+		unsigned worker_id)
+{
+	union rte_distributor_buffer *buf = &d->buffers[worker_id];
+	buf->bufptr64 |= RTE_DISTRIBUTOR_GET_BUF;
+	while (buf->bufptr64 & RTE_DISTRIBUTOR_GET_BUF)
+		rte_pause();
+	/* since bufptr64 is a signed value, this should be an arithmetic shift */
+	int64_t ret = buf->bufptr64 >> RTE_DISTRIBUTOR_FLAG_BITS;
+	return (struct rte_mbuf *)((uintptr_t)ret);
+}
+
+struct rte_mbuf *
+rte_distributor_get_next_pkt(struct rte_distributor *d,
+		unsigned worker_id, struct rte_mbuf *oldpkt)
+{
+	union rte_distributor_buffer *buf = &d->buffers[worker_id];
+	int64_t req = ((uintptr_t)oldpkt << RTE_DISTRIBUTOR_FLAG_BITS) | \
+			RTE_DISTRIBUTOR_GET_BUF;
+	buf->bufptr64 = req;
+	while (buf->bufptr64 & RTE_DISTRIBUTOR_GET_BUF)
+		rte_pause();
+	/* since bufptr64 is a signed value, this should be an arithmetic shift */
+	int64_t ret = buf->bufptr64 >> RTE_DISTRIBUTOR_FLAG_BITS;
+	return (struct rte_mbuf *)((uintptr_t)ret);
+}
+
+int
+rte_distributor_return_pkt(struct rte_distributor *d,
+		unsigned worker_id, struct rte_mbuf *oldpkt)
+{
+	union rte_distributor_buffer *buf = &d->buffers[worker_id];
+	uint64_t req = ((uintptr_t)oldpkt << RTE_DISTRIBUTOR_FLAG_BITS) | \
+			RTE_DISTRIBUTOR_RETURN_BUF;
+	buf->bufptr64 = req;
+	while (buf->bufptr64 & RTE_DISTRIBUTOR_RETURN_BUF)
+		rte_pause();
+	return 0;
+}
+
+struct rte_distributor *
+rte_distributor_create(const char *name, unsigned socket_id,
+		unsigned num_workers, unsigned flags)
+{
+	struct rte_distributor *d;
+	struct rte_distributor_list *distributor_list;
+	char mz_name[RTE_MEMZONE_NAMESIZE];
+	const struct rte_memzone *mz;
+
+	/* compilation-time checks */
+	RTE_BUILD_BUG_ON((sizeof(*d) & CACHE_LINE_MASK) != 0);
+	RTE_BUILD_BUG_ON((RTE_MAX_LCORE & 7) != 0);
+
+	rte_snprintf(mz_name, sizeof(mz_name), DISTRIBUTOR_PREFIX"%s", name);
+	mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
+	if (mz == NULL) {
+		rte_errno = ENOMEM;
+		return NULL;
+	}
+
+	/* check that we have an initialised tail queue */
+	if ((distributor_list = RTE_TAILQ_LOOKUP_BY_IDX(RTE_TAILQ_DISTRIBUTOR,
+			rte_distributor_list)) == NULL) {
+		rte_errno = E_RTE_NO_TAILQ;
+		return NULL;
+	}
+
+	d = mz->addr;
+	rte_snprintf(d->name, sizeof(d->name), "%s", name);
+	d->flags = flags;
+	d->num_workers = num_workers;
+	TAILQ_INSERT_TAIL(distributor_list, d, next);
+
+	return d;
+}
+
diff --git a/lib/librte_distributor/rte_distributor.h b/lib/librte_distributor/rte_distributor.h
new file mode 100644
index 0000000..b8021a0
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor.h
@@ -0,0 +1,45 @@
+/*
+ * <COPYIRHGT_TAG>
+ */
+
+#ifndef _RTE_DISTRIBUTE_H_
+#define _RTE_DISTRIBUTE_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <rte_mbuf.h>
+
+#define RTE_DISTRIBUTOR_NAMESIZE 32
+
+#define RTE_DISTRIBUTOR_NOFLAGS 0
+
+struct rte_distributor;
+
+struct rte_distributor *
+rte_distributor_create(const char *name, unsigned socket_id,
+		unsigned num_workers, unsigned flags);
+
+int
+rte_distributor_process(struct rte_distributor *d,
+		struct rte_mbuf **mbufs, const unsigned num_mbufs);
+
+struct rte_mbuf *
+rte_distributor_get_pkt(struct rte_distributor *d, unsigned worker_id);
+
+struct rte_mbuf *
+rte_distributor_get_next_pkt(struct rte_distributor *d,
+		unsigned worker_id, struct rte_mbuf *oldpkt);
+
+int
+rte_distributor_return_pkt(struct rte_distributor *d, unsigned worker_id,
+		struct rte_mbuf *mbuf);
+
+/******************************************/
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/librte_eal/common/include/rte_tailq_elem.h b/lib/librte_eal/common/include/rte_tailq_elem.h
index 2de4010..fdd2faf 100644
--- a/lib/librte_eal/common/include/rte_tailq_elem.h
+++ b/lib/librte_eal/common/include/rte_tailq_elem.h
@@ -82,6 +82,8 @@ rte_tailq_elem(RTE_TAILQ_PM, "RTE_PM")
 
 rte_tailq_elem(RTE_TAILQ_ACL, "RTE_ACL")
 
+rte_tailq_elem(RTE_TAILQ_DISTRIBUTOR, "RTE_DISTRIBUTOR")
+
 rte_tailq_end(RTE_TAILQ_NUM)
 
 #undef rte_tailq_elem
diff --git a/mk/rte.app.mk b/mk/rte.app.mk
index 9c70ce0..64c0f6e 100644
--- a/mk/rte.app.mk
+++ b/mk/rte.app.mk
@@ -70,6 +70,10 @@ LDLIBS += -lrte_ivshmem
 endif
 endif
 
+ifeq ($(CONFIG_RTE_LIBRTE_DISTRIBUTOR),y)
+LDLIBS += -lrte_distributor
+endif
+
 ifeq ($(CONFIG_RTE_LIBRTE_E1000_PMD),y)
 LDLIBS += -lrte_pmd_e1000
 endif

^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [dpdk-dev] [PATCH 2/2] 10G PMD: enable vector PMD compile for 64b linux
  2014-05-12  9:23 ` Bruce Richardson
@ 2014-05-12  9:26   ` Richardson, Bruce
  0 siblings, 0 replies; 5+ messages in thread
From: Richardson, Bruce @ 2014-05-12  9:26 UTC (permalink / raw)
  To: Richardson, Bruce, dev

Please ignore, my patch seems to have got corrupted with some other WIP content that will be upstreamed in a future patch set.

/Bruce

> -----Original Message-----
> From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Bruce Richardson
> Sent: Monday, May 12, 2014 10:23 AM
> To: dev@dpdk.org
> Subject: [dpdk-dev] [PATCH 2/2] 10G PMD: enable vector PMD compile for 64b
> linux
> 
>  ifeq ($(CONFIG_RTE_APP_TEST),y)
>  SRCS-$(CONFIG_RTE_LIBRTE_ACL) += test_acl.c diff --git
> a/app/test/commands.c b/app/test/commands.c index a153026..5978cec
> 100644
> --- a/app/test/commands.c
> +++ b/app/test/commands.c
> @@ -178,6 +178,8 @@ static void cmd_autotest_parsed(void *parsed_result,
>  		ret = test_common();
>  	if (!strcmp(res->autotest, "ivshmem_autotest"))
>  		ret = test_ivshmem();
> +	if (!strcmp(res->autotest, "distributor_autotest"))
> +		ret = test_distributor();
>  #ifdef RTE_LIBRTE_PMD_RING
>  	if (!strcmp(res->autotest, "ring_pmd_autotest"))
>  		ret = test_pmd_ring();
> @@ -234,7 +236,7 @@ cmdline_parse_token_string_t cmd_autotest_autotest
> =  #ifdef RTE_LIBRTE_KVARGS
>  			"kvargs_autotest#"
>  #endif
> -			"common_autotest");
> +			"common_autotest#distributor_autotest");
> 
>  cmdline_parse_inst_t cmd_autotest = {
>  	.f = cmd_autotest_parsed,  /* function to call */ diff --git
> a/app/test/test.h b/app/test/test.h index bc001b9..4621fc9 100644
> --- a/app/test/test.h
> +++ b/app/test/test.h
> @@ -92,6 +92,7 @@ int test_power(void);
>  int test_common(void);
>  int test_pmd_ring(void);
>  int test_ivshmem(void);
> +int test_distributor(void);
>  int test_kvargs(void);
> 
>  int test_pci_run;
> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c new file
> mode 100644 index 0000000..0456478
> --- /dev/null
> +++ b/app/test/test_distributor.c
> @@ -0,0 +1,261 @@
> +/*
> + * <COPYRIGHT_TAG>
> + */
> +
> +#include "test.h"
> +
> +#ifdef RTE_LIBRTE_DISTRIBUTOR
> +#include <unistd.h>
> +#include <string.h>
> +#include <rte_cycles.h>
> +#include <rte_distributor.h>
> +#include "wmmintrin.h"
> +
> +#define ITER_POWER 20 /* log 2 of how many iterations we do when
> +timing. */
> +
> +static volatile int quit = 0;
> +static volatile unsigned worker_idx;
> +
> +struct worker_stats{
> +	volatile unsigned handled_packets;
> +} __rte_cache_aligned;
> +struct worker_stats worker_stats[RTE_MAX_LCORE];
> +
> +#define do_big_pause() do { \
> +		__m128i a = {0}; \
> +		volatile __m128i res; \
> +		res = _mm_clmulepi64_si128(a, res, 0); \
> +		res = _mm_clmulepi64_si128(a, res, 0); \
> +		res = _mm_clmulepi64_si128(a, res, 0); \
> +		res = _mm_clmulepi64_si128(a, res, 0); \
> +		RTE_SET_USED(res); \
> +		/*rte_pause();*/ \
> +} while(0)
> +
> +#define do_pause() do { \
> +	rte_pause(); \
> +} while(0)
> +
> +static void
> +flip_bit(volatile uint64_t *arg)
> +{
> +	uint64_t old_val = 0;
> +	while (old_val != 2) {
> +		while(!*arg)
> +			do_big_pause();
> +		old_val = *arg;
> +		*arg = 0;
> +	}
> +}
> +
> +static void
> +time_cache_line_switch(void)
> +{
> +	/* allocate a full cache line for data, we use only first byte of it */
> +	uint64_t data[CACHE_LINE_SIZE / sizeof(uint64_t)];
> +
> +	unsigned i, slaveid = rte_get_next_lcore(rte_lcore_id(), 0, 0);
> +	volatile uint64_t *pdata = &data[0];
> +	*pdata = 1;
> +	rte_eal_remote_launch((lcore_function_t *)flip_bit, &data[0], slaveid);
> +	while (*pdata)
> +		rte_pause();
> +
> +	const uint64_t start_time = rte_rdtsc();
> +	for (i = 0; i < (1<< ITER_POWER); i++) {
> +		while (*pdata)
> +			do_big_pause();
> +		*pdata = 1;
> +	}
> +	const uint64_t end_time = rte_rdtsc();
> +
> +	while (*pdata)
> +		do_pause();
> +	*pdata = 2;
> +	rte_eal_wait_lcore(slaveid);
> +	printf("Time for %u iterations = %"PRIu64" ticks\n", (1<<ITER_POWER),
> +			end_time-start_time);
> +	printf("Ticks per iteration = %"PRIu64"\n",
> +			(end_time-start_time) >> ITER_POWER); }
> +
> +static inline unsigned
> +total_packet_count(void)
> +{
> +	unsigned i, count = 0;
> +	for (i = 0; i < worker_idx; i++)
> +		count += worker_stats[i].handled_packets;
> +	return count;
> +}
> +
> +static inline void
> +clear_packet_count(void)
> +{
> +	memset(&worker_stats, 0, sizeof(worker_stats)); }
> +
> +static int
> +handle_work(void *arg)
> +{
> +	struct rte_mbuf *pkt = NULL;
> +	struct rte_distributor *d = arg;
> +	unsigned count = 0;
> +	unsigned id = __sync_fetch_and_add(&worker_idx, 1);
> +
> +	printf("Worker %u starting\n", worker_idx);
> +	pkt = rte_distributor_get_pkt(d, id);
> +	while (!quit) {
> +		worker_stats[id].handled_packets++;
> +		pkt = rte_distributor_get_next_pkt(d, id, pkt);
> +	}
> +	rte_distributor_return_pkt(d, id, pkt);
> +	printf("Worker %u handled %u pkts\n", id, count);
> +	return 0;
> +}
> +
> +static inline int
> +perf_test(struct rte_distributor *d, struct rte_mempool *p) { #define
> +BURST 32
> +	unsigned i;
> +	uint64_t start, end;
> +	struct rte_mbuf *bufs[BURST];
> +
> +	clear_packet_count();
> +	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
> +		printf("Error getting mbufs from pool\n");
> +		return -1;
> +	}
> +	/* ensure we have different hash value for each pkt */
> +	for (i = 0; i < 32; i++)
> +		bufs[i]->pkt.hash.rss = i;
> +
> +	start = rte_rdtsc();
> +	for (i = 0; i < (1<<ITER_POWER); i++)
> +		rte_distributor_process(d, bufs, BURST);
> +	end = rte_rdtsc();
> +
> +	do {
> +		usleep(100);
> +		rte_distributor_process(d, NULL, 0);
> +	} while (total_packet_count() < (BURST << ITER_POWER));
> +
> +	printf("Time per burst:  %"PRIu64"\n", (end - start) >> ITER_POWER);
> +	printf("Time per packet: %"PRIu64"\n", ((end - start) >>
> ITER_POWER)/BURST);
> +	rte_mempool_put_bulk(p, (void *)bufs, BURST);
> +
> +	for (i = 0; i < rte_lcore_count() - 1; i++)
> +		printf("Worker %u handled %u packets\n", i,
> +				worker_stats[i].handled_packets);
> +	printf("Total packets: %u (%x)\n", total_packet_count(),
> +			total_packet_count());
> +	printf("Perf test done\n");
> +
> +	return 0;
> +}
> +
> +static int
> +sanity_test(struct rte_distributor *d, struct rte_mempool *p) {
> +	struct rte_mbuf *bufs[32];
> +	unsigned i;
> +
> +	clear_packet_count();
> +	if (rte_mempool_get_bulk(p, (void *)bufs, 32) != 0) {
> +		printf("Error getting mbufs from pool\n");
> +		return -1;
> +	}
> +
> +	/* now set all hash values in all buffers to zero, so all pkts go to the
> +	 * one worker thread */
> +	for (i = 0; i < 32; i++)
> +		bufs[i]->pkt.hash.rss = 0;
> +
> +	rte_distributor_process(d, bufs, 32);
> +	do {
> +		usleep(100);
> +		rte_distributor_process(d, NULL, 0);
> +	} while(total_packet_count() < 32);
> +
> +	for (i = 0; i < rte_lcore_count() - 1; i++)
> +		printf("Worker %u handled %u packets\n", i,
> +				worker_stats[i].handled_packets);
> +	printf("Sanity test with all zero hashes done.\n");
> +	if (worker_stats[0].handled_packets != 32)
> +		return -1;
> +
> +	/* give a different hash value to each packet, so load gets distributed */
> +	clear_packet_count();
> +	for (i = 0; i < 32; i++)
> +		bufs[i]->pkt.hash.rss = i;
> +
> +	rte_distributor_process(d, bufs, 32);
> +	do {
> +		usleep(100);
> +		rte_distributor_process(d, NULL, 0);
> +	} while(total_packet_count() < 32);
> +
> +	for (i = 0; i < rte_lcore_count() - 1; i++)
> +		printf("Worker %u handled %u packets\n", i,
> +				worker_stats[i].handled_packets);
> +	printf("Sanity test with non-zero hashes done\n");
> +
> +	rte_mempool_put_bulk(p, (void *)bufs, 32);
> +	return 0;
> +}
> +
> +#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) +
> +RTE_PKTMBUF_HEADROOM)
> +
> +int
> +test_distributor(void)
> +{
> +	static struct rte_distributor *d = NULL;
> +	static struct rte_mempool *p = NULL;
> +
> +	if (rte_lcore_count() < 2) {
> +		printf("ERROR: not enough cores to test distributor\n");
> +		return -1;
> +	}
> +
> +	/* first time how long it takes to round-trip a cache line */
> +	time_cache_line_switch();
> +
> +	if (d == NULL) {
> +		d = rte_distributor_create("Test_distributor", rte_socket_id(),
> +				rte_lcore_count() - 1,
> RTE_DISTRIBUTOR_NOFLAGS);
> +		if (d == NULL) {
> +			printf("Error creating distributor\n");
> +			return -1;
> +		}
> +		rte_eal_mp_remote_launch(handle_work, d, SKIP_MASTER);
> +	}
> +
> +	if (p == NULL) {
> +		p = rte_mempool_create("MBUF_POOL", 511, MBUF_SIZE, 64,
> +				sizeof(struct rte_pktmbuf_pool_private),
> +				rte_pktmbuf_pool_init, NULL,
> +				rte_pktmbuf_init, NULL,
> +				rte_socket_id(), 0);
> +		if (p == NULL) {
> +			printf("Error creating mempool\n");
> +			return -1;
> +		}
> +	}
> +
> +	sanity_test(d, p);
> +	perf_test(d,p);
> +	return 0;
> +}
> +
> +#else
> +
> +#include <stdio.h>
> +
> +int
> +test_distributor(void)
> +{
> +	printf("Distributor is not enabled in configuration\n");
> +	return 0;
> +}
> +
> +#endif
> diff --git a/config/defconfig_i686-default-linuxapp-gcc b/config/defconfig_i686-
> default-linuxapp-gcc
> index 14bd3d1..5b4261e 100644
> --- a/config/defconfig_i686-default-linuxapp-gcc
> +++ b/config/defconfig_i686-default-linuxapp-gcc
> @@ -335,3 +335,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
>  #
>  CONFIG_RTE_NIC_BYPASS=n
> 
> +#
> +# Compile the distributor library
> +#
> +CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
> +
> diff --git a/config/defconfig_i686-default-linuxapp-icc b/config/defconfig_i686-
> default-linuxapp-icc
> index ec3386e..d1d4aeb 100644
> --- a/config/defconfig_i686-default-linuxapp-icc
> +++ b/config/defconfig_i686-default-linuxapp-icc
> @@ -334,3 +334,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
>  #
>  CONFIG_RTE_NIC_BYPASS=n
> 
> +#
> +# Compile the distributor library
> +#
> +CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
> +
> diff --git a/config/defconfig_x86_64-default-bsdapp-gcc
> b/config/defconfig_x86_64-default-bsdapp-gcc
> index d960e1d..329920e 100644
> --- a/config/defconfig_x86_64-default-bsdapp-gcc
> +++ b/config/defconfig_x86_64-default-bsdapp-gcc
> @@ -300,3 +300,9 @@ CONFIG_RTE_APP_TEST=y  CONFIG_RTE_TEST_PMD=y
> CONFIG_RTE_TEST_PMD_RECORD_CORE_CYCLES=n
>  CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
> +
> +#
> +# Compile the distributor library
> +#
> +CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
> +
> diff --git a/config/defconfig_x86_64-default-linuxapp-gcc
> b/config/defconfig_x86_64-default-linuxapp-gcc
> index f11ffbf..772a6b3 100644
> --- a/config/defconfig_x86_64-default-linuxapp-gcc
> +++ b/config/defconfig_x86_64-default-linuxapp-gcc
> @@ -337,3 +337,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
>  #
>  CONFIG_RTE_NIC_BYPASS=n
> 
> +#
> +# Compile the distributor library
> +#
> +CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
> +
> diff --git a/config/defconfig_x86_64-default-linuxapp-icc
> b/config/defconfig_x86_64-default-linuxapp-icc
> index 4eaca4c..04affc8 100644
> --- a/config/defconfig_x86_64-default-linuxapp-icc
> +++ b/config/defconfig_x86_64-default-linuxapp-icc
> @@ -333,3 +333,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
>  #
>  CONFIG_RTE_NIC_BYPASS=n
> 
> +#
> +# Compile the distributor library
> +#
> +CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
> +
> diff --git a/config/defconfig_x86_64-ivshmem-linuxapp-gcc
> b/config/defconfig_x86_64-ivshmem-linuxapp-gcc
> index 2f55a69..0002dca 100644
> --- a/config/defconfig_x86_64-ivshmem-linuxapp-gcc
> +++ b/config/defconfig_x86_64-ivshmem-linuxapp-gcc
> @@ -46,4 +46,4 @@ CONFIG_RTE_LIBRTE_IVSHMEM_MAX_ENTRIES=128
>  CONFIG_RTE_LIBRTE_IVSHMEM_MAX_METADATA_FILES=32
> 
>  # Set EAL to single file segments
> -CONFIG_RTE_EAL_SINGLE_FILE_SEGMENTS=y
> \ No newline at end of file
> +CONFIG_RTE_EAL_SINGLE_FILE_SEGMENTS=y
> diff --git a/examples/distributor_app/Makefile
> b/examples/distributor_app/Makefile
> new file mode 100644
> index 0000000..9a1ebd3
> --- /dev/null
> +++ b/examples/distributor_app/Makefile
> @@ -0,0 +1,28 @@
> +# <COPYRIGHT_TAG>
> +
> +ifeq ($(RTE_SDK),)
> +$(error "Please define RTE_SDK environment variable") endif
> +
> +# Default target, can be overriden by command line or environment
> +RTE_TARGET ?= x86_64-default-linuxapp-gcc
> +
> +include $(RTE_SDK)/mk/rte.vars.mk
> +
> +# binary name
> +APP = basicfwd
> +
> +# all source are stored in SRCS-y
> +SRCS-y := basicfwd.c
> +
> +CFLAGS += $(WERROR_FLAGS)
> +
> +# workaround for a gcc bug with noreturn attribute #
> +http://gcc.gnu.org/bugzilla/show_bug.cgi?id=12603
> +ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y)
> +CFLAGS_main.o += -Wno-return-type
> +endif
> +
> +EXTRA_CFLAGS += -O3 -g -Wfatal-errors
> +
> +include $(RTE_SDK)/mk/rte.extapp.mk
> diff --git a/examples/distributor_app/basicfwd.c
> b/examples/distributor_app/basicfwd.c
> new file mode 100644
> index 0000000..7662d91
> --- /dev/null
> +++ b/examples/distributor_app/basicfwd.c
> @@ -0,0 +1,248 @@
> +/*-
> + * <COPYRIGHT_TAG>
> + */
> +
> +#include <stdint.h>
> +#include <inttypes.h>
> +#include <unistd.h>
> +
> +#include <rte_eal.h>
> +#include <rte_ethdev.h>
> +#include <rte_cycles.h>
> +#include <rte_distributor.h>
> +#include "basicfwd.h"
> +
> +#define RX_RING_SIZE 128
> +#define RX_FREE_THRESH 32
> +#define RX_PTHRESH 8
> +#define RX_HTHRESH 8
> +#define RX_WTHRESH 0
> +
> +#define TX_RING_SIZE 512
> +#define TX_FREE_THRESH 32
> +#define TX_PTHRESH 32
> +#define TX_HTHRESH 0
> +#define TX_WTHRESH 0
> +#define TX_RSBIT_THRESH 32
> +#define TX_Q_FLAGS (ETH_TXQ_FLAGS_NOMULTSEGS |
> ETH_TXQ_FLAGS_NOVLANOFFL |\
> +	ETH_TXQ_FLAGS_NOXSUMSCTP | ETH_TXQ_FLAGS_NOXSUMUDP | \
> +	ETH_TXQ_FLAGS_NOXSUMTCP)
> +
> +#define NUM_MBUFS 8191
> +#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) +
> +RTE_PKTMBUF_HEADROOM) #define MBUF_CACHE_SIZE 250 #define
> BURST_SIZE 32
> +
> +static struct rte_eth_conf port_conf_default = {
> +	.rxmode = {
> +		.mq_mode = ETH_MQ_RX_RSS,
> +		.max_rx_pkt_len = ETHER_MAX_LEN,
> +		.split_hdr_size = 0,
> +		.header_split   = 0, /**< Header Split disabled */
> +		.hw_ip_checksum = 0, /**< IP checksum offload enabled */
> +		.hw_vlan_filter = 0, /**< VLAN filtering disabled */
> +		.jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
> +		.hw_strip_crc   = 0, /**< CRC stripped by hardware */
> +	},
> +	.txmode = {
> +		.mq_mode = ETH_MQ_TX_NONE,
> +	},
> +	.lpbk_mode = 0,
> +	.rx_adv_conf = {
> +			.rss_conf = {
> +				.rss_hf = ETH_RSS_IPV4 | ETH_RSS_IPV6 |
> ETH_RSS_IPV4_TCP |
> +					ETH_RSS_IPV4_UDP |
> ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
> +			}
> +	},
> +};
> +
> +static const struct rte_eth_rxconf rx_conf_default = {
> +	.rx_thresh = {
> +		.pthresh = RX_PTHRESH,
> +		.hthresh = RX_HTHRESH,
> +		.wthresh = RX_WTHRESH,
> +	},
> +	.rx_free_thresh = RX_FREE_THRESH,
> +	.rx_drop_en = 0,
> +};
> +
> +static struct rte_eth_txconf tx_conf_default = {
> +	.tx_thresh = {
> +		.pthresh = TX_PTHRESH,
> +		.hthresh = TX_HTHRESH,
> +		.wthresh = TX_WTHRESH,
> +	},
> +	.tx_free_thresh = TX_FREE_THRESH,
> +	.tx_rs_thresh = TX_RSBIT_THRESH,
> +	.txq_flags = TX_Q_FLAGS
> +
> +};
> +
> +#define NOFLAGS 0
> +
> +
> +/*
> + * Initialises a given port using global settings and with the rx
> +buffers
> + * coming from the mbuf_pool passed as parameter  */ static inline int
> +port_init(uint8_t port, struct rte_mempool *mbuf_pool) {
> +	struct rte_eth_conf port_conf = port_conf_default;
> +	const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1;
> +	int retval;
> +	uint16_t q;
> +
> +	if (port >= rte_eth_dev_count())
> +		return -1;
> +
> +	retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf);
> +	if (retval != 0)
> +		return retval;
> +
> +	for (q = 0; q < rxRings; q ++) {
> +		retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE,
> +						rte_eth_dev_socket_id(port),
> &rx_conf_default,
> +						mbuf_pool);
> +		if (retval < 0)
> +			return retval;
> +	}
> +
> +	for (q = 0; q < txRings; q ++) {
> +		retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE,
> +						rte_eth_dev_socket_id(port),
> &tx_conf_default);
> +		if (retval < 0)
> +			return retval;
> +	}
> +
> +	retval  = rte_eth_dev_start(port);
> +	if (retval < 0)
> +		return retval;
> +
> +	struct rte_eth_link link;
> +	rte_eth_link_get_nowait(port, &link);
> +	if (!link.link_status) {
> +		sleep(1);
> +		rte_eth_link_get_nowait(port, &link);
> +	}
> +
> +	if (!link.link_status) {
> +		printf("Link down on port %"PRIu8"\n", port);
> +		return 0;
> +	}
> +
> +	struct ether_addr addr;
> +	rte_eth_macaddr_get(port, &addr);
> +	printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
> +			" %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
> +			(unsigned)port,
> +			addr.addr_bytes[0], addr.addr_bytes[1],
> addr.addr_bytes[2],
> +			addr.addr_bytes[3], addr.addr_bytes[4],
> addr.addr_bytes[5]);
> +
> +	rte_eth_promiscuous_enable(port);
> +
> +	return 0;
> +}
> +
> +/*
> + * Main thread that does the work, reading from INPUT_PORT
> + * and writing to OUTPUT_PORT
> + */
> +static  __attribute__((noreturn)) void
> +lcore_main(struct rte_distributor *d)
> +{
> +	const uint8_t nb_ports = rte_eth_dev_count();
> +	uint8_t port;
> +	for (port = 0; port < nb_ports; port++)
> +		if (rte_eth_dev_socket_id(port) > 0 &&
> +				rte_eth_dev_socket_id(port) !=
> (int)rte_socket_id())
> +			printf("WARNING, port %u is on remote NUMA node to
> polling thread."
> +					"\n\tPerformance will not be
> optimal.\n", port);
> +
> +	printf("\nCore %u distributing packets. [Ctrl+C to quit]\n",
> rte_lcore_id());
> +	for(;;) {
> +		for (port = 0; port < nb_ports; port++) {
> +			struct rte_mbuf *bufs[BURST_SIZE];
> +			const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
> BURST_SIZE);
> +			if (unlikely(nb_rx == 0))
> +				continue;
> +
> +			rte_distributor_process(d, bufs, nb_rx);
> +		}
> +	}
> +}
> +
> +static __attribute__((noreturn)) void
> +tx_main(struct rte_distributor *d)
> +{
> +	static volatile unsigned num_workers = 0;
> +	struct rte_mbuf *to_send[BURST_SIZE];
> +	unsigned nb_to_send = 0;
> +	unsigned port_id = 0;
> +	unsigned id = __sync_fetch_and_add(&num_workers, 1);
> +
> +	struct rte_mbuf *pkt = rte_distributor_get_pkt(d, id);
> +	for (;;) {
> +		to_send[nb_to_send++] = pkt;
> +		if (nb_to_send == BURST_SIZE) {
> +			rte_eth_tx_burst(port_id++, id, to_send, BURST_SIZE);
> +			nb_to_send = 0;
> +			if (port_id == rte_eth_dev_count())
> +				port_id = 0;
> +		}
> +		pkt = rte_distributor_get_next_pkt(d, id, pkt);
> +	}
> +}
> +
> +/* Main function, does initialisation and calls the per-lcore functions
> +*/ int MAIN(int argc, char *argv[]) {
> +	struct rte_mempool *mbuf_pool;
> +	struct rte_distributor *d;
> +	unsigned nb_ports;
> +	uint8_t portid;
> +
> +	/* init EAL */
> +	int ret = rte_eal_init(argc, argv);
> +	if (ret < 0)
> +		rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
> +	argc -= ret;
> +	argv += ret;
> +
> +	if (rte_ixgbe_pmd_init() != 0 ||
> +			rte_eal_pci_probe() != 0)
> +		rte_exit(EXIT_FAILURE, "Error with NIC driver initialization\n");
> +
> +	nb_ports = rte_eth_dev_count();
> +	if (nb_ports < 2 || (nb_ports & 1))
> +		rte_exit(EXIT_FAILURE, "Error: number of ports must be
> even\n");
> +
> +	mbuf_pool = rte_mempool_create("MBUF_POOL", NUM_MBUFS *
> nb_ports,
> +				       MBUF_SIZE, MBUF_CACHE_SIZE,
> +				       sizeof(struct rte_pktmbuf_pool_private),
> +				       rte_pktmbuf_pool_init, NULL,
> +				       rte_pktmbuf_init, NULL,
> +				       rte_socket_id(), 0);
> +	if (mbuf_pool == NULL)
> +		rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");
> +
> +	/* initialize all ports */
> +	for (portid = 0; portid < nb_ports; portid++)
> +		if (port_init(portid, mbuf_pool) != 0)
> +			rte_exit(EXIT_FAILURE, "Cannot initialize port
> %"PRIu8"\n", portid);
> +
> +	if (rte_lcore_count() == 1)
> +		rte_exit(EXIT_FAILURE, "Error, need worker cores\n");
> +
> +	d = rte_distributor_create("PKT_DIST", rte_socket_id(),
> +			rte_lcore_count() - 1, NOFLAGS);
> +	if (d == NULL)
> +		rte_exit(EXIT_FAILURE, "Cannot create distributor\n");
> +
> +	rte_eal_mp_remote_launch((lcore_function_t *)tx_main, d,
> SKIP_MASTER);
> +	/* call lcore_main on master core only */
> +	lcore_main(d);
> +	return 0;
> +}
> +
> diff --git a/examples/distributor_app/basicfwd.h
> b/examples/distributor_app/basicfwd.h
> new file mode 100644
> index 0000000..4ff20bc
> --- /dev/null
> +++ b/examples/distributor_app/basicfwd.h
> @@ -0,0 +1,17 @@
> +/*-
> + * <COPYRIGHT_TAG>
> + */
> +
> +#ifndef _MAIN_H_
> +#define _MAIN_H_
> +
> +
> +#ifdef RTE_EXEC_ENV_BAREMETAL
> +#define MAIN _main
> +#else
> +#define MAIN main
> +#endif
> +
> +int MAIN( int argc, char *argv[] );
> +
> +#endif /* ifndef _MAIN_H_ */
> diff --git a/lib/Makefile b/lib/Makefile index b92b392..5a0b10f 100644
> --- a/lib/Makefile
> +++ b/lib/Makefile
> @@ -55,6 +55,7 @@ DIRS-$(CONFIG_RTE_LIBRTE_METER) += librte_meter
>  DIRS-$(CONFIG_RTE_LIBRTE_SCHED) += librte_sched
>  DIRS-$(CONFIG_RTE_LIBRTE_ACL) += librte_acl
>  DIRS-$(CONFIG_RTE_LIBRTE_KVARGS) += librte_kvargs
> +DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += librte_distributor
> 
>  ifeq ($(CONFIG_RTE_EXEC_ENV_LINUXAPP),y)
>  DIRS-$(CONFIG_RTE_LIBRTE_KNI) += librte_kni diff --git
> a/lib/librte_distributor/Makefile b/lib/librte_distributor/Makefile new file mode
> 100644 index 0000000..794dd46
> --- /dev/null
> +++ b/lib/librte_distributor/Makefile
> @@ -0,0 +1,21 @@
> +# <COPYRIGHT_TAG>
> +
> +include $(RTE_SDK)/mk/rte.vars.mk
> +
> +# library name
> +LIB = librte_distributor.a
> +
> +CFLAGS += -O3
> +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
> +
> +# all source are stored in SRCS-y
> +SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) := rte_distributor.c
> +
> +# install this header file
> +SYMLINK-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR)-include := rte_distributor.h
> +
> +# this lib needs eal
> +DEPDIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += lib/librte_eal
> +DEPDIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += lib/librte_mbuf
> +
> +include $(RTE_SDK)/mk/rte.lib.mk
> diff --git a/lib/librte_distributor/rte_distributor.c
> b/lib/librte_distributor/rte_distributor.c
> new file mode 100644
> index 0000000..f7aea9a
> --- /dev/null
> +++ b/lib/librte_distributor/rte_distributor.c
> @@ -0,0 +1,201 @@
> +/*
> + * <COPYRIGHT_TAG>
> + */
> +
> +#include <stdio.h>
> +#include <sys/queue.h>
> +#include <x86intrin.h>
> +#include <rte_mbuf.h>
> +#include <rte_memzone.h>
> +#include <rte_errno.h>
> +#include <rte_string_fns.h>
> +#include <rte_tailq.h>
> +#include <rte_eal_memconfig.h>
> +#include "rte_distributor.h"
> +
> +#define NO_FLAGS 0
> +#define DISTRIBUTOR_PREFIX "DT_"
> +
> +#define RTE_DISTRIBUTOR_FLAG_BITS 4
> +#define RTE_DISTRIBUTOR_NO_BUF 0
> +/* we will use the top four bits of pointer for flags */ #define
> +RTE_DISTRIBUTOR_GET_BUF (1) #define RTE_DISTRIBUTOR_RETURN_BUF (2)
> +
> +#define RTE_DISTRIBUTOR_BACKLOG_SIZE 8
> +#define RTE_DISTRIBUTOR_BACKLOG_MASK 7
> +
> +#define RTE_DISTRIBUTOR_FLAGS_MASK (0x0F)
> +
> +union rte_distributor_buffer {
> +	volatile int64_t bufptr64;
> +	char pad[CACHE_LINE_SIZE*3];
> +} __rte_cache_aligned;
> +
> +struct rte_distributor_backlog {
> +	int64_t pkts[RTE_DISTRIBUTOR_BACKLOG_SIZE];
> +	unsigned start;
> +	unsigned count;
> +};
> +
> +struct rte_distributor {
> +	TAILQ_ENTRY(rte_distributor) next;    /**< Next in list. */
> +
> +	char name[RTE_DISTRIBUTOR_NAMESIZE];   /**< Name of the ring. */
> +	unsigned flags;                       /**< Flags supplied at creation. */
> +	unsigned num_workers;                 /**< Number of workers polling */
> +
> +	uint32_t in_flight_ids[RTE_MAX_LCORE];
> +	struct rte_distributor_backlog backlog[RTE_MAX_LCORE];
> +
> +	union rte_distributor_buffer buffers[RTE_MAX_LCORE]; };
> +
> +TAILQ_HEAD(rte_distributor_list, rte_distributor);
> +
> +static int
> +add_to_backlog(struct rte_distributor_backlog *bl, int64_t item) {
> +	if (bl->count == RTE_DISTRIBUTOR_BACKLOG_SIZE)
> +		return -1;
> +
> +	bl->pkts[(bl->start + bl->count++) &
> (RTE_DISTRIBUTOR_BACKLOG_MASK)] = item;
> +	return 0;
> +}
> +
> +static int64_t
> +backlog_pop(struct rte_distributor_backlog *bl) {
> +	bl->count--;
> +	return bl->pkts[bl->start++ & RTE_DISTRIBUTOR_BACKLOG_MASK]; }
> +
> +int
> +rte_distributor_process(struct rte_distributor *d,
> +		struct rte_mbuf **mbufs, const unsigned num_mbufs) {
> +	unsigned next_idx = 0;
> +	unsigned worker_offset = 0;
> +
> +	while (next_idx < num_mbufs) {
> +		int64_t data = d->buffers[worker_offset].bufptr64;
> +		if (data & RTE_DISTRIBUTOR_GET_BUF) {
> +
> +			if (d->backlog[worker_offset].count)
> +				d->buffers[worker_offset].bufptr64 =
> +						backlog_pop(&d-
> >backlog[worker_offset]);
> +
> +			else {
> +				struct rte_mbuf *next_mb =
> mbufs[next_idx++];
> +				int64_t next_value = ((uintptr_t)next_mb <<
> +RTE_DISTRIBUTOR_FLAG_BITS);
> +
> +				/* note signed variable - arithmetic shift */
> +				int64_t oldbuf = data >>
> RTE_DISTRIBUTOR_FLAG_BITS;
> +				if (oldbuf)
> +					d->in_flight_ids[worker_offset] = 0;
> +
> +				uint32_t match = 0, newid = (next_mb-
> >pkt.hash.rss | 1);
> +				unsigned i;
> +				for (i = 0; i < d->num_workers; i++)
> +					match |= (!(d->in_flight_ids[i] ^ newid)
> << i);
> +				if (!match) {
> +					d->buffers[worker_offset].bufptr64 =
> next_value;
> +					d->in_flight_ids[worker_offset] =
> newid;
> +				} else {
> +					unsigned worker =
> __builtin_ctz(match);
> +					if (add_to_backlog(&d-
> >backlog[worker], next_value) < 0)
> +						next_idx--;
> +				}
> +			}
> +		}
> +		if (++worker_offset == d->num_workers)
> +			worker_offset = 0;
> +	}
> +	/* to finish, check all workers for backlog and schedule work for them
> +	 * if they are ready */
> +	for (worker_offset = 0; worker_offset < d->num_workers;
> worker_offset++)
> +		if (d->backlog[worker_offset].count &&
> +				(d->buffers[worker_offset].bufptr64 &
> RTE_DISTRIBUTOR_GET_BUF))
> +			d->buffers[worker_offset].bufptr64 =
> +
> 	backlog_pop(&d->backlog[worker_offset]);
> +
> +	return num_mbufs;
> +}
> +
> +struct rte_mbuf *
> +rte_distributor_get_pkt(struct rte_distributor *d,
> +		unsigned worker_id)
> +{
> +	union rte_distributor_buffer *buf = &d->buffers[worker_id];
> +	buf->bufptr64 |= RTE_DISTRIBUTOR_GET_BUF;
> +	while (buf->bufptr64 & RTE_DISTRIBUTOR_GET_BUF)
> +		rte_pause();
> +	/* since bufptr64 is a signed value, this should be an arithmetic shift */
> +	int64_t ret = buf->bufptr64 >> RTE_DISTRIBUTOR_FLAG_BITS;
> +	return (struct rte_mbuf *)((uintptr_t)ret); }
> +
> +struct rte_mbuf *
> +rte_distributor_get_next_pkt(struct rte_distributor *d,
> +		unsigned worker_id, struct rte_mbuf *oldpkt) {
> +	union rte_distributor_buffer *buf = &d->buffers[worker_id];
> +	int64_t req = ((uintptr_t)oldpkt << RTE_DISTRIBUTOR_FLAG_BITS) | \
> +			RTE_DISTRIBUTOR_GET_BUF;
> +	buf->bufptr64 = req;
> +	while (buf->bufptr64 & RTE_DISTRIBUTOR_GET_BUF)
> +		rte_pause();
> +	/* since bufptr64 is a signed value, this should be an arithmetic shift */
> +	int64_t ret = buf->bufptr64 >> RTE_DISTRIBUTOR_FLAG_BITS;
> +	return (struct rte_mbuf *)((uintptr_t)ret); }
> +
> +int
> +rte_distributor_return_pkt(struct rte_distributor *d,
> +		unsigned worker_id, struct rte_mbuf *oldpkt) {
> +	union rte_distributor_buffer *buf = &d->buffers[worker_id];
> +	uint64_t req = ((uintptr_t)oldpkt << RTE_DISTRIBUTOR_FLAG_BITS) | \
> +			RTE_DISTRIBUTOR_RETURN_BUF;
> +	buf->bufptr64 = req;
> +	while (buf->bufptr64 & RTE_DISTRIBUTOR_RETURN_BUF)
> +		rte_pause();
> +	return 0;
> +}
> +
> +struct rte_distributor *
> +rte_distributor_create(const char *name, unsigned socket_id,
> +		unsigned num_workers, unsigned flags) {
> +	struct rte_distributor *d;
> +	struct rte_distributor_list *distributor_list;
> +	char mz_name[RTE_MEMZONE_NAMESIZE];
> +	const struct rte_memzone *mz;
> +
> +	/* compilation-time checks */
> +	RTE_BUILD_BUG_ON((sizeof(*d) & CACHE_LINE_MASK) != 0);
> +	RTE_BUILD_BUG_ON((RTE_MAX_LCORE & 7) != 0);
> +
> +	rte_snprintf(mz_name, sizeof(mz_name), DISTRIBUTOR_PREFIX"%s",
> name);
> +	mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id,
> NO_FLAGS);
> +	if (mz == NULL) {
> +		rte_errno = ENOMEM;
> +		return NULL;
> +	}
> +
> +	/* check that we have an initialised tail queue */
> +	if ((distributor_list =
> RTE_TAILQ_LOOKUP_BY_IDX(RTE_TAILQ_DISTRIBUTOR,
> +			rte_distributor_list)) == NULL) {
> +		rte_errno = E_RTE_NO_TAILQ;
> +		return NULL;
> +	}
> +
> +	d = mz->addr;
> +	rte_snprintf(d->name, sizeof(d->name), "%s", name);
> +	d->flags = flags;
> +	d->num_workers = num_workers;
> +	TAILQ_INSERT_TAIL(distributor_list, d, next);
> +
> +	return d;
> +}
> +
> diff --git a/lib/librte_distributor/rte_distributor.h
> b/lib/librte_distributor/rte_distributor.h
> new file mode 100644
> index 0000000..b8021a0
> --- /dev/null
> +++ b/lib/librte_distributor/rte_distributor.h
> @@ -0,0 +1,45 @@
> +/*
> + * <COPYIRHGT_TAG>
> + */
> +
> +#ifndef _RTE_DISTRIBUTE_H_
> +#define _RTE_DISTRIBUTE_H_
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <rte_mbuf.h>
> +
> +#define RTE_DISTRIBUTOR_NAMESIZE 32
> +
> +#define RTE_DISTRIBUTOR_NOFLAGS 0
> +
> +struct rte_distributor;
> +
> +struct rte_distributor *
> +rte_distributor_create(const char *name, unsigned socket_id,
> +		unsigned num_workers, unsigned flags);
> +
> +int
> +rte_distributor_process(struct rte_distributor *d,
> +		struct rte_mbuf **mbufs, const unsigned num_mbufs);
> +
> +struct rte_mbuf *
> +rte_distributor_get_pkt(struct rte_distributor *d, unsigned worker_id);
> +
> +struct rte_mbuf *
> +rte_distributor_get_next_pkt(struct rte_distributor *d,
> +		unsigned worker_id, struct rte_mbuf *oldpkt);
> +
> +int
> +rte_distributor_return_pkt(struct rte_distributor *d, unsigned worker_id,
> +		struct rte_mbuf *mbuf);
> +
> +/******************************************/
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif
> diff --git a/lib/librte_eal/common/include/rte_tailq_elem.h
> b/lib/librte_eal/common/include/rte_tailq_elem.h
> index 2de4010..fdd2faf 100644
> --- a/lib/librte_eal/common/include/rte_tailq_elem.h
> +++ b/lib/librte_eal/common/include/rte_tailq_elem.h
> @@ -82,6 +82,8 @@ rte_tailq_elem(RTE_TAILQ_PM, "RTE_PM")
> 
>  rte_tailq_elem(RTE_TAILQ_ACL, "RTE_ACL")
> 
> +rte_tailq_elem(RTE_TAILQ_DISTRIBUTOR, "RTE_DISTRIBUTOR")
> +
>  rte_tailq_end(RTE_TAILQ_NUM)
> 
>  #undef rte_tailq_elem
> diff --git a/mk/rte.app.mk b/mk/rte.app.mk index 9c70ce0..64c0f6e 100644
> --- a/mk/rte.app.mk
> +++ b/mk/rte.app.mk
> @@ -70,6 +70,10 @@ LDLIBS += -lrte_ivshmem  endif  endif
> 
> +ifeq ($(CONFIG_RTE_LIBRTE_DISTRIBUTOR),y)
> +LDLIBS += -lrte_distributor
> +endif
> +
>  ifeq ($(CONFIG_RTE_LIBRTE_E1000_PMD),y)
>  LDLIBS += -lrte_pmd_e1000
>  endif

^ permalink raw reply	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2014-05-12 13:52 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2014-05-12  9:23 [dpdk-dev] [PATCH 0/2] 10G PMD: vectorized RX and TX functions Bruce Richardson
2014-05-12  9:23 ` [dpdk-dev] [PATCH 1/2] 10G PMD: New vectorized functions for RX/TX Bruce Richardson
2014-05-12  9:23 ` [dpdk-dev] [PATCH 2/2] 10G PMD: enable vector PMD compile for 64b linux Bruce Richardson
2014-05-12  9:23 ` Bruce Richardson
2014-05-12  9:26   ` Richardson, Bruce

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).