DPDK patches and discussions
 help / color / mirror / Atom feed
* [dpdk-dev] [PATCH] librte_reorder: New reorder library with unit tests and app
@ 2015-01-07 15:28 Reshma Pattan
  2015-01-07 16:37 ` Pattan, Reshma
  0 siblings, 1 reply; 2+ messages in thread
From: Reshma Pattan @ 2015-01-07 15:28 UTC (permalink / raw)
  To: dev

From: Reshma Pattan <reshma.pattan@intel.com>

        1)New library to provide reordering of out of ordered
        mbufs based on sequence number of mbuf. Library uses reorder buffer structure
        which in tern uses two circular buffers called ready and order buffers.
        *rte_reorder_create API creates instance of reorder buffer.
        *rte_reorder_init API initializes given reorder buffer instance.
        *rte_reorder_reset API resets given reorder buffer instance.
        *rte_reorder_insert API inserts the mbuf into order circular buffer.
        *rte_reorder_fill_overflow moves mbufs from order buffer to ready buffer
        to accomodate early packets in order buffer.
        *rte_reorder_drain API provides draining facility to fetch out
        reordered mbufs from order and ready buffers.

        2)New unit test cases added.

        3)New application added to verify the performance of library.

    Signed-off-by: Reshma Pattan <reshma.pattan@intel.com>
    Signed-off-by: Richardson Bruce <bruce.richardson@intel.com>
---
 app/test/Makefile                              |   2 +
 app/test/test_reorder.c                        | 452 ++++++++++++++++++
 config/common_bsdapp                           |   5 +
 config/common_linuxapp                         |   5 +
 examples/packet_ordering/Makefile              |  50 ++
 examples/packet_ordering/main.c                | 637 +++++++++++++++++++++++++
 lib/Makefile                                   |   1 +
 lib/librte_eal/common/include/rte_tailq_elem.h |   2 +
 lib/librte_mbuf/rte_mbuf.h                     |   3 +
 lib/librte_reorder/Makefile                    |  50 ++
 lib/librte_reorder/rte_reorder.c               | 464 ++++++++++++++++++
 lib/librte_reorder/rte_reorder.h               | 184 +++++++
 mk/rte.app.mk                                  |   4 +
 13 files changed, 1859 insertions(+)
 create mode 100644 app/test/test_reorder.c
 create mode 100644 examples/packet_ordering/Makefile
 create mode 100644 examples/packet_ordering/main.c
 create mode 100644 lib/librte_reorder/Makefile
 create mode 100644 lib/librte_reorder/rte_reorder.c
 create mode 100644 lib/librte_reorder/rte_reorder.h

diff --git a/app/test/Makefile b/app/test/Makefile
index 4311f96..24b27d7 100644
--- a/app/test/Makefile
+++ b/app/test/Makefile
@@ -124,6 +124,8 @@ SRCS-$(CONFIG_RTE_LIBRTE_IVSHMEM) += test_ivshmem.c
 SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += test_distributor.c
 SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += test_distributor_perf.c
 
+SRCS-$(CONFIG_RTE_LIBRTE_REORDER) += test_reorder.c
+
 SRCS-y += test_devargs.c
 SRCS-y += virtual_pmd.c
 SRCS-y += packet_burst_generator.c
diff --git a/app/test/test_reorder.c b/app/test/test_reorder.c
new file mode 100644
index 0000000..6a673e2
--- /dev/null
+++ b/app/test/test_reorder.c
@@ -0,0 +1,452 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+#include "stdio.h"
+
+#include <unistd.h>
+#include <string.h>
+
+#include <rte_cycles.h>
+#include <rte_errno.h>
+#include <rte_mbuf.h>
+#include <rte_reorder.h>
+#include <rte_lcore.h>
+#include <rte_malloc.h>
+
+#include "test.h"
+
+#define BURST 32
+#define REORDER_BUFFER_SIZE 16384
+#define NUM_MBUFS (2*REORDER_BUFFER_SIZE)
+#define REORDER_BUFFER_SIZE_INVALID 2049
+#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
+
+struct reorder_unittest_params {
+	struct rte_mempool *p;
+	struct rte_reorder_buffer *b;
+};
+
+static struct reorder_unittest_params default_params  = {
+	.p = NULL,
+	.b = NULL
+};
+
+static struct reorder_unittest_params *test_params = &default_params;
+
+static int
+test_reorder_create_inval_name(void)
+{
+	struct rte_reorder_buffer *b = NULL;
+	char *name = NULL;
+
+	b = rte_reorder_create(name, rte_socket_id(), REORDER_BUFFER_SIZE);
+	TEST_ASSERT_EQUAL(b, NULL, "No error on create() with invalid name param.");
+	TEST_ASSERT_EQUAL(rte_errno, EINVAL,
+				"No error on create() with invalid name param.");
+	return 0;
+}
+
+static int
+test_reorder_create_inval_size(void)
+{
+	struct rte_reorder_buffer *b = NULL;
+
+	b = rte_reorder_create("PKT", rte_socket_id(), REORDER_BUFFER_SIZE_INVALID);
+	TEST_ASSERT_EQUAL(b, NULL,
+				"No error on create() with invalid buffer size param.");
+	TEST_ASSERT_EQUAL(rte_errno, EINVAL,
+				"No error on create() with invalid buffer size param.");
+	return 0;
+}
+
+static int
+test_reorder_init_null_buffer(void)
+{
+	struct rte_reorder_buffer *b = NULL;
+	/*
+	 * The minimum memory area size that should be passed to library is,
+	 * sizeof(struct rte_reorder_buffer) + (2 * size * sizeof(struct rte_mbuf *));
+	 * Otherwise error will be thrown
+	 */
+	unsigned int mzsize = 262336;
+	b = rte_reorder_init(b, mzsize, "PKT1", REORDER_BUFFER_SIZE);
+	TEST_ASSERT_EQUAL(b, NULL, "No error on init with NULL buffer.");
+	TEST_ASSERT_EQUAL(rte_errno, EINVAL, "No error on init with NULL buffer.");
+	return 0;
+}
+
+static int
+test_reorder_init_inval_mzsize(void)
+{
+	struct rte_reorder_buffer *b = NULL;
+	unsigned int mzsize =  100;
+	b = rte_malloc(NULL, mzsize, 0);
+	b = rte_reorder_init(b, mzsize, "PKT1", REORDER_BUFFER_SIZE);
+	TEST_ASSERT_EQUAL(b, NULL, "No error on init with invalid mem zone size.");
+	TEST_ASSERT_EQUAL(rte_errno, ENOMEM,
+				"No error on init with invalid mem zone size.");
+	rte_free(b);
+	return 0;
+}
+
+static int
+test_reorder_init_inval_size(void)
+{
+	struct rte_reorder_buffer *b = NULL;
+	unsigned int mzsize =  262336;
+	b = rte_malloc(NULL, mzsize, 0);
+	b = rte_reorder_init(b, mzsize, "PKT1", REORDER_BUFFER_SIZE_INVALID);
+	TEST_ASSERT_EQUAL(b, NULL, "No error on init with invalid buffer size param.");
+	TEST_ASSERT_EQUAL(rte_errno, EINVAL,
+				"No error on init with invalid buffer size param.");
+	rte_free(b);
+	return 0;
+}
+
+static int
+test_reorder_init_inval_name(void)
+{
+	struct rte_reorder_buffer *b = NULL;
+	char *name = NULL;
+	unsigned int mzsize =  262336;
+	b = rte_malloc(NULL, mzsize, 0);
+	b = rte_reorder_init(b, mzsize, name, REORDER_BUFFER_SIZE);
+	TEST_ASSERT_EQUAL(b, NULL, "No error on init with invalid name.");
+	TEST_ASSERT_EQUAL(rte_errno, EINVAL, "No error on init with invalid name.");
+	rte_free(b);
+	return 0;
+}
+
+static int
+test_reorder_buf_instance_existance(void)
+{
+	struct rte_reorder_buffer *result = NULL;
+	struct rte_reorder_buffer *b1 = NULL;
+	struct rte_reorder_buffer *b2 = NULL;
+	unsigned int mzsize =  262336;
+
+	/* Try to find existing reorder buffer instance */
+	result = rte_reorder_find_existing("PKT_RO1");
+	TEST_ASSERT_EQUAL(test_params->b, result,
+			"existing reorder buffer instance not found");
+
+	/* Try to find non existing reorder buffer instance */
+	result = rte_reorder_find_existing("ro_find_non_existing");
+	TEST_ASSERT_EQUAL(result, NULL,
+			"non existing reorder buffer instance found");
+	TEST_ASSERT_EQUAL(rte_errno, ENOENT,
+			"non existing reorder buffer instance found");
+
+	b1 = rte_malloc(NULL, mzsize, 0);
+	b2 = rte_reorder_init(b1, mzsize, "PKT_RO1", REORDER_BUFFER_SIZE);
+	TEST_ASSERT_EQUAL(b2, test_params->b,
+			"no error on init with existing reorder instance name");
+	rte_free(b1);
+
+	b1 = rte_malloc(NULL, mzsize, 0);
+	b2 = rte_reorder_init(b1, mzsize, "ro_find_nonexisting1", REORDER_BUFFER_SIZE);
+	TEST_ASSERT_EQUAL(b2, b1,
+			"error on init with non existing reorder instance name");
+	rte_reorder_free(b1);
+
+	return 0;
+}
+
+static int
+test_reorder_insert(void)
+{
+	struct rte_reorder_buffer *b = test_params->b;
+	struct rte_mempool *p = test_params->p;
+	rte_reorder_reset(b);
+	int num_bufs = 4;
+	struct rte_mbuf *bufs[num_bufs];
+	int ret = 0;
+	if (rte_mempool_get_bulk(p, (void *)bufs, num_bufs) != 0) {
+		printf("%s: Error getting mbuf from pool\n", __func__);
+		return -1;
+	}
+
+	/* too early packet */
+	bufs[0]->seqn = (3*REORDER_BUFFER_SIZE);
+	ret = rte_reorder_insert(b, bufs[0]);
+	if (ret != -1 || rte_errno != ERANGE) {
+		printf("%s:%d: No error on insert() of too early packet with seqn:"
+				" (3*REORDER_BUFFER_SIZE)\n", __func__, __LINE__);
+		rte_mempool_put_bulk(p, (void *)bufs, num_bufs);
+		return -1;
+	}
+
+	/* early packet */
+	bufs[1]->seqn = (2*REORDER_BUFFER_SIZE)-2;
+	ret = rte_reorder_insert(b, bufs[1]);
+	if (ret == -1 || rte_errno == ENOSPC) {
+		printf("%s:%d: Error on insert of early packet with seqn:"
+			" (2*REORDER_BUFFER_SIZE)-2\n", __func__ , __LINE__);
+		rte_mempool_put_bulk(p, (void *)bufs, num_bufs);
+		return -1;
+	}
+
+	bufs[2]->seqn = (3*REORDER_BUFFER_SIZE)-1;
+	ret = rte_reorder_insert(b, bufs[2]);
+	if (ret != -1 && rte_errno != ENOSPC) {
+		printf("%s:%d: Error on insert of early packet with seqn:"
+			" (3*REORDER_BUFFER_SIZE)-3\n", __func__ , __LINE__);
+		rte_mempool_put_bulk(p, (void *)bufs, num_bufs);
+		return -1;
+	}
+
+	rte_mempool_put_bulk(p, (void *)bufs, num_bufs);
+	return 0;
+}
+
+/* Test case covers draining conditions on order buffer */
+static int
+test_reorder_drain_order_buf(void)
+{
+
+	struct rte_reorder_buffer *b = test_params->b;
+	struct rte_mempool *p = test_params->p;
+	rte_reorder_reset(b);
+	struct rte_mbuf *bufs[REORDER_BUFFER_SIZE+10] = {NULL};
+	struct rte_mbuf *robufs[REORDER_BUFFER_SIZE+10] = {NULL};
+	int cnt;
+	int i = 0;
+
+	if (rte_mempool_get_bulk(p, (void *)bufs, REORDER_BUFFER_SIZE+10) != 0) {
+		printf("%s: Error getting mbuf from pool\n", __func__);
+		return -1;
+	}
+
+	/* insert mbufs in order buffer with gaps i.e seqn 0 to 5 and 8,9 inserted */
+	for (i = 0; i < 10; ) {
+		bufs[i]->seqn = i;
+		rte_reorder_insert(b, bufs[i]);
+		if (i == 5)
+			i += 3;
+		else
+			i++;
+	}
+
+	/* should drain till first gap */
+	cnt = rte_reorder_drain(b, robufs, BURST);
+	if (cnt != 6) {
+		printf("%s:%d:%d: number of expected packets not drained\n",
+			__func__, __LINE__, cnt);
+		rte_mempool_put_bulk(p, (void *)bufs, REORDER_BUFFER_SIZE+10);
+		return -1;
+	}
+
+	/* now add missing entries and remaining entries till end of order buf */
+	bufs[6]->seqn = 6;
+	bufs[7]->seqn = 7;
+	rte_reorder_insert(b, bufs[6]);
+	rte_reorder_insert(b, bufs[7]);
+	for (i = 10; i < REORDER_BUFFER_SIZE; i++) {
+		bufs[i]->seqn = i;
+		rte_reorder_insert(b, bufs[i]);
+	}
+
+	/*
+	 * hence gaps are filled now, drain should return entries
+	 * from last gap to till end
+	 */
+	cnt = rte_reorder_drain(b, robufs, REORDER_BUFFER_SIZE+1);
+	if (cnt != REORDER_BUFFER_SIZE-6) {
+		printf("%s:%d: number of expected packets not drained\n",
+			__func__, __LINE__);
+		rte_mempool_put_bulk(p, (void *)bufs, REORDER_BUFFER_SIZE+10);
+		return -1;
+	}
+	rte_mempool_put_bulk(p, (void *)bufs, REORDER_BUFFER_SIZE+10);
+	return 0;
+}
+
+/* Test case covers draining conditions on ready buffer */
+static int
+test_reorder_drain_ready_buf(void)
+{
+
+	struct rte_reorder_buffer *b = test_params->b;
+	struct rte_mempool *p = test_params->p;
+	rte_reorder_reset(b);
+
+	struct rte_mbuf *bufs[REORDER_BUFFER_SIZE+10] = {NULL};
+	struct rte_mbuf *robufs[REORDER_BUFFER_SIZE+10] = {NULL};
+	int cnt = 0;
+	int i;
+	int ret = 0;
+
+	if (rte_mempool_get_bulk(p, (void *)bufs, REORDER_BUFFER_SIZE+10) != 0) {
+		printf("%s: Error getting mbuf from pool\n", __func__);
+		return -1;
+	}
+
+	/*1: draining of ready buffer with tail == 0 */
+	for (i = 0; i < REORDER_BUFFER_SIZE; i++) {
+		bufs[i]->seqn = i;
+		ret = rte_reorder_insert(b, bufs[i]);
+		if (ret) {
+			printf("%s: Error on insert of bufs[%u]\n",
+				__func__, i);
+			rte_mempool_put_bulk(p, (void *)bufs, REORDER_BUFFER_SIZE+10);
+			return -1;
+		}
+	}
+
+	/*
+	 * insert early packet, this moves entries from order buffer
+	 * to ready buffer
+	 */
+	bufs[REORDER_BUFFER_SIZE]->seqn = (2*REORDER_BUFFER_SIZE)-1;
+	rte_reorder_insert(b, bufs[REORDER_BUFFER_SIZE]);
+
+	/*
+	 * since ready buffer is full, could drain REORDER_BUFFER_SIZE
+	 * entries  from ready buffer
+	 */
+	cnt = rte_reorder_drain(b, robufs, REORDER_BUFFER_SIZE);
+	if (cnt != REORDER_BUFFER_SIZE) {
+		printf("%s:%d:%d: number of expected packets not drained\n",
+			__func__, __LINE__, cnt);
+		rte_mempool_put_bulk(p, (void *)bufs, REORDER_BUFFER_SIZE+10);
+		return -1;
+	}
+
+	/*2: draining of ready buffer with tail != 0 */
+
+	/* insert mbufs with seqn:REORDER_BUFFER_SIZE to 2*REORDER_BUFFER_SIZE */
+	for (i = 0; i < REORDER_BUFFER_SIZE; i++) {
+		bufs[i]->seqn = REORDER_BUFFER_SIZE+1+i;
+		ret = rte_reorder_insert(b, bufs[i]);
+		if (ret) {
+			printf("%s: Error on insert of bufs[%u]\n",
+				__func__, i);
+			rte_mempool_put_bulk(p, (void *)bufs, REORDER_BUFFER_SIZE+10);
+			return -1;
+		}
+	}
+
+	/*
+	 * insert early packet, this will move entries
+	 * from order buffer to ready buffer
+	 */
+	bufs[REORDER_BUFFER_SIZE]->seqn = (3*REORDER_BUFFER_SIZE)-5;
+	rte_reorder_insert(b, bufs[REORDER_BUFFER_SIZE]);
+
+	/*
+	 * drain only 3 mbufs, this will drain ready buffer
+	 * and advances tail by 3
+	 */
+	cnt = rte_reorder_drain(b, robufs, 3);
+	if (cnt != 3) {
+		printf("%s:%d:%d: number of expected packets not drained\n",
+			__func__, __LINE__, cnt);
+		rte_mempool_put_bulk(p, (void *)bufs, REORDER_BUFFER_SIZE+10);
+		return -1;
+	}
+
+	/* insert early packet */
+	bufs[REORDER_BUFFER_SIZE]->seqn = (3*REORDER_BUFFER_SIZE)+2;
+	rte_reorder_insert(b, bufs[REORDER_BUFFER_SIZE]);
+
+	/*
+	 * perform drain on ready buffer with advanced tail,
+	 * validates if(tail == size) in drain
+	 */
+	rte_reorder_drain(b, robufs, REORDER_BUFFER_SIZE);
+	rte_mempool_put_bulk(p, (void *)bufs, REORDER_BUFFER_SIZE+10);
+	return 0;
+}
+
+static int
+test_setup(void)
+{
+	/* reorder buffer instance creation */
+	if (test_params->b == NULL) {
+		test_params->b = rte_reorder_create("PKT_RO1", rte_socket_id(),
+							REORDER_BUFFER_SIZE);
+		if (test_params->b == NULL) {
+			printf("%s: Error creating reorder buffer instance b\n",
+					__func__);
+			return -1;
+		}
+	} else
+		rte_reorder_reset(test_params->b);
+
+	/* mempool creation */
+	if (test_params->p == NULL) {
+		test_params->p = rte_mempool_create("RO_MBUF_POOL", NUM_MBUFS,
+				MBUF_SIZE, BURST,
+				sizeof(struct rte_pktmbuf_pool_private),
+				rte_pktmbuf_pool_init, NULL,
+				rte_pktmbuf_init, NULL,
+				rte_socket_id(), 0);
+		if (test_params->p == NULL) {
+			printf("%s: Error creating mempool\n", __func__);
+			return -1;
+		}
+	}
+	return 0;
+}
+
+static struct unit_test_suite reorder_test_suite  = {
+
+	.setup = test_setup,
+	.suite_name = "Reorder Unit Test Suite",
+	.unit_test_cases = {
+		TEST_CASE(test_reorder_create_inval_name),
+		TEST_CASE(test_reorder_create_inval_size),
+		TEST_CASE(test_reorder_init_null_buffer),
+		TEST_CASE(test_reorder_init_inval_mzsize),
+		TEST_CASE(test_reorder_init_inval_size),
+		TEST_CASE(test_reorder_init_inval_name),
+		TEST_CASE(test_reorder_buf_instance_existance),
+		TEST_CASE(test_reorder_insert),
+		TEST_CASE(test_reorder_drain_order_buf),
+		TEST_CASE(test_reorder_drain_ready_buf),
+		TEST_CASES_END()
+	}
+};
+
+static int
+test_reorder(void)
+{
+	return unit_test_suite_runner(&reorder_test_suite);
+}
+
+static struct test_command reorder_cmd = {
+	.command = "reorder_autotest",
+	.callback = test_reorder,
+};
+REGISTER_TEST_COMMAND(reorder_cmd);
diff --git a/config/common_bsdapp b/config/common_bsdapp
index 9177db1..e3e0e94 100644
--- a/config/common_bsdapp
+++ b/config/common_bsdapp
@@ -334,6 +334,11 @@ CONFIG_RTE_SCHED_PORT_N_GRINDERS=8
 CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
 
 #
+# Compile the reorder library
+#
+CONFIG_RTE_LIBRTE_REORDER=y
+
+#
 # Compile librte_port
 #
 CONFIG_RTE_LIBRTE_PORT=y
diff --git a/config/common_linuxapp b/config/common_linuxapp
index 2f9643b..b5ec730 100644
--- a/config/common_linuxapp
+++ b/config/common_linuxapp
@@ -342,6 +342,11 @@ CONFIG_RTE_SCHED_PORT_N_GRINDERS=8
 CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
 
 #
+# Compile the reorder library
+#
+CONFIG_RTE_LIBRTE_REORDER=y
+
+#
 # Compile librte_port
 #
 CONFIG_RTE_LIBRTE_PORT=y
diff --git a/examples/packet_ordering/Makefile b/examples/packet_ordering/Makefile
new file mode 100644
index 0000000..44bd2e1
--- /dev/null
+++ b/examples/packet_ordering/Makefile
@@ -0,0 +1,50 @@
+#   BSD LICENSE
+#
+#   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+#   All rights reserved.
+#
+#   Redistribution and use in source and binary forms, with or without
+#   modification, are permitted provided that the following conditions
+#   are met:
+#
+#     * Redistributions of source code must retain the above copyright
+#       notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above copyright
+#       notice, this list of conditions and the following disclaimer in
+#       the documentation and/or other materials provided with the
+#       distribution.
+#     * Neither the name of Intel Corporation nor the names of its
+#       contributors may be used to endorse or promote products derived
+#       from this software without specific prior written permission.
+#
+#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+ifeq ($(RTE_SDK),)
+$(error "Please define RTE_SDK environment variable")
+endif
+
+# Default target, can be overriden by command line or environment
+RTE_TARGET ?= x86_64-ivshmem-linuxapp-gcc
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# binary name
+APP = packet_ordering
+
+# all source are stored in SRCS-y
+SRCS-y := main.c
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+
+include $(RTE_SDK)/mk/rte.extapp.mk
diff --git a/examples/packet_ordering/main.c b/examples/packet_ordering/main.c
new file mode 100644
index 0000000..8b65275
--- /dev/null
+++ b/examples/packet_ordering/main.c
@@ -0,0 +1,637 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <signal.h>
+#include <getopt.h>
+
+#include <rte_eal.h>
+#include <rte_common.h>
+#include <rte_errno.h>
+#include <rte_ethdev.h>
+#include <rte_lcore.h>
+#include <rte_mbuf.h>
+#include <rte_mempool.h>
+#include <rte_ring.h>
+#include <rte_reorder.h>
+
+#define RX_DESC_PER_QUEUE 128
+#define TX_DESC_PER_QUEUE 512
+
+#define MAX_PKTS_BURST 32
+#define REORDER_BUFFER_SIZE 8192
+#define MBUF_PER_POOL 65535
+#define MBUF_SIZE (1600 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
+#define MBUF_POOL_CACHE_SIZE 250
+
+#define RING_SIZE 16384
+
+/* uncommnet below line to enable debug logs */
+/* #define DEBUG */
+
+#ifdef DEBUG
+#define LOG_LEVEL RTE_LOG_DEBUG
+#define LOG_DEBUG(log_type, fmt, args...) RTE_LOG(DEBUG, log_type, fmt, ##args)
+#else
+#define LOG_LEVEL RTE_LOG_INFO
+#define LOG_DEBUG(log_type, fmt, args...) do {} while (0)
+#endif
+
+/* Macros for printing using RTE_LOG */
+#define RTE_LOGTYPE_REORDERAPP          RTE_LOGTYPE_USER1
+
+unsigned int portmask;
+volatile uint8_t quit_signal;
+
+static struct rte_mempool *mbuf_pool;
+
+static struct rte_eth_conf port_conf_default;
+
+struct worker_thread_args {
+	struct rte_ring *ring_in;
+	struct rte_ring *ring_out;
+};
+
+struct output_buffer {
+	unsigned count;
+	struct rte_mbuf *mbufs[MAX_PKTS_BURST];
+};
+
+volatile struct app_stats {
+	struct {
+		uint64_t rx_pkts;
+		uint64_t enqueue_pkts;
+		uint64_t enqueue_failed_pkts;
+	} rx __rte_cache_aligned;
+
+	struct {
+		uint64_t dequeue_pkts;
+		uint64_t enqueue_pkts;
+		uint64_t enqueue_failed_pkts;
+	} wkr __rte_cache_aligned;
+
+	struct {
+		uint64_t dequeue_pkts;
+		/* Too early pkts transmitted directly w/o reordering */
+		uint64_t early_pkts_txtd_woro;
+		/* Too early pkts failed from direct transmit */
+		uint64_t early_pkts_tx_failed_woro;
+		uint64_t ro_tx_pkts;
+		uint64_t ro_tx_failed_pkts;
+	} tx __rte_cache_aligned;
+} app_stats;
+
+/**
+ * Get the last enabled lcore ID
+ *
+ * @return
+ *   The last enabled lcore ID.
+ */
+static unsigned int
+get_last_lcore_id(void)
+{
+	int i;
+
+	for (i = RTE_MAX_LCORE - 1; i >= 0; i--)
+		if (rte_lcore_is_enabled(i))
+			return i;
+	return 0;
+}
+
+/**
+ * Get the previous enabled lcore ID
+ * @param id
+ *  The current lcore ID
+ * @return
+ *   The previous enabled lcore ID or the current lcore
+ *   ID if it is the first available core.
+ */
+static unsigned int
+get_previous_lcore_id(unsigned int id)
+{
+	int i;
+
+	for (i = id - 1; i >= 0; i--)
+		if (rte_lcore_is_enabled(i))
+			return i;
+	return id;
+}
+
+static inline void
+pktmbuf_free_bulk(struct rte_mbuf *mbuf_table[], unsigned n)
+{
+	unsigned int i;
+
+	for (i = 0; i < n; i++)
+		rte_pktmbuf_free(mbuf_table[i]);
+}
+
+/* display usage */
+static void
+print_usage(const char *prgname)
+{
+	printf("%s [EAL options] -- -p PORTMASK\n"
+			"  -p PORTMASK: hexadecimal bitmask of ports to configure\n",
+			prgname);
+}
+
+static int
+parse_portmask(const char *portmask)
+{
+	unsigned long pm;
+	char *end = NULL;
+
+	/* parse hexadecimal string */
+	pm = strtoul(portmask, &end, 16);
+	if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0'))
+		return -1;
+
+	if (pm == 0)
+		return -1;
+
+	return pm;
+}
+
+/* Parse the argument given in the command line of the application */
+static int
+parse_args(int argc, char **argv)
+{
+	int opt;
+	int option_index;
+	char **argvopt;
+	char *prgname = argv[0];
+	static struct option lgopts[] = {
+		{NULL, 0, 0, 0}
+	};
+
+	argvopt = argv;
+
+	while ((opt = getopt_long(argc, argvopt, "p:",
+					lgopts, &option_index)) != EOF) {
+		switch (opt) {
+		/* portmask */
+		case 'p':
+			portmask = parse_portmask(optarg);
+			if (portmask == 0) {
+				printf("invalid portmask\n");
+				print_usage(prgname);
+				return -1;
+			}
+			break;
+		default:
+			print_usage(prgname);
+			return -1;
+		}
+	}
+	if (optind <= 1) {
+		print_usage(prgname);
+		return -1;
+	}
+
+	argv[optind-1] = prgname;
+	optind = 0; /* reset getopt lib */
+	return 0;
+}
+
+static inline int
+configure_eth_port(uint8_t port_id)
+{
+	const uint16_t rxRings = 1, txRings = 1;
+	const uint8_t nb_ports = rte_eth_dev_count();
+	int ret;
+	uint16_t q;
+
+	if (port_id > nb_ports)
+		return -1;
+
+	ret = rte_eth_dev_configure(port_id, rxRings, txRings , &port_conf_default);
+	if (ret != 0)
+		return ret;
+
+	for (q = 0; q < rxRings; q++) {
+		ret = rte_eth_rx_queue_setup(port_id, q, RX_DESC_PER_QUEUE,
+				rte_eth_dev_socket_id(port_id), NULL,
+				mbuf_pool);
+		if (ret < 0)
+			return ret;
+	}
+
+	for (q = 0; q < txRings; q++) {
+		ret = rte_eth_tx_queue_setup(port_id, q, TX_DESC_PER_QUEUE,
+				rte_eth_dev_socket_id(port_id), NULL);
+		if (ret < 0)
+			return ret;
+	}
+
+	ret = rte_eth_dev_start(port_id);
+	if (ret < 0)
+		return ret;
+
+	struct ether_addr addr;
+	rte_eth_macaddr_get(port_id, &addr);
+	printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
+			" %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
+			(unsigned)port_id,
+			addr.addr_bytes[0], addr.addr_bytes[1],
+			addr.addr_bytes[2], addr.addr_bytes[3],
+			addr.addr_bytes[4], addr.addr_bytes[5]);
+
+	rte_eth_promiscuous_enable(port_id);
+
+	return 0;
+}
+
+static void
+print_stats(void)
+{
+	const uint8_t nb_ports = rte_eth_dev_count();
+	unsigned i;
+	struct rte_eth_stats eth_stats;
+
+	printf("\nRX thread stats:\n");
+	printf(" - Pkts rxd:				%"PRIu64"\n",
+						app_stats.rx.rx_pkts);
+	printf(" - Pkts enqd to workers ring:		%"PRIu64"\n",
+						app_stats.rx.enqueue_pkts);
+
+	printf("\nWorker thread stats:\n");
+	printf(" - Pkts deqd from workers ring:		%"PRIu64"\n",
+						app_stats.wkr.dequeue_pkts);
+	printf(" - Pkts enqd to tx ring:		%"PRIu64"\n",
+						app_stats.wkr.enqueue_pkts);
+	printf(" - Pkts enq to tx failed:		%"PRIu64"\n",
+						app_stats.wkr.enqueue_failed_pkts);
+
+	printf("\nTX stats:\n");
+	printf(" - Pkts deqd from tx ring:		%"PRIu64"\n",
+						app_stats.tx.dequeue_pkts);
+	printf(" - Ro Pkts transmitted:			%"PRIu64"\n",
+						app_stats.tx.ro_tx_pkts);
+	printf(" - Ro Pkts tx failed:			%"PRIu64"\n",
+						app_stats.tx.ro_tx_failed_pkts);
+	printf(" - Pkts transmitted w/o reorder:	%"PRIu64"\n",
+						app_stats.tx.early_pkts_txtd_woro);
+	printf(" - Pkts tx failed w/o reorder:		%"PRIu64"\n",
+						app_stats.tx.early_pkts_tx_failed_woro);
+
+	for (i = 0; i < nb_ports; i++) {
+		rte_eth_stats_get(i, &eth_stats);
+		printf("\nPort %u stats:\n", i);
+		printf(" - Pkts in:   %"PRIu64"\n", eth_stats.ipackets);
+		printf(" - Pkts out:  %"PRIu64"\n", eth_stats.opackets);
+		printf(" - In Errs:   %"PRIu64"\n", eth_stats.ierrors);
+		printf(" - Out Errs:  %"PRIu64"\n", eth_stats.oerrors);
+		printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf);
+	}
+}
+
+static void
+int_handler(int sig_num)
+{
+	printf("Exiting on signal %d\n", sig_num);
+	quit_signal = 1;
+}
+
+/**
+ * This thread receives mbufs from the port and affects them an internal
+ * sequence number to keep track of their order of arrival through an
+ * mbuf structure.
+ * The mbufs are then passed to the worker threads via the rx_to_workers
+ * ring.
+ */
+static int
+rx_thread(struct rte_ring *ring_out)
+{
+	const uint8_t nb_ports = rte_eth_dev_count();
+	uint32_t seqn = 0;
+	uint16_t i, ret = 0;
+	uint16_t nb_rx_pkts;
+	uint8_t port_id;
+	struct rte_mbuf *pkts[MAX_PKTS_BURST];
+
+	RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__,
+							rte_lcore_id());
+
+	while (!quit_signal) {
+
+		for (port_id = 0; port_id < nb_ports; port_id++) {
+			if ((portmask & (1 << port_id)) != 0) {
+
+				/* receive packets */
+				nb_rx_pkts = rte_eth_rx_burst(port_id, 0,
+								pkts, MAX_PKTS_BURST);
+				if (nb_rx_pkts == 0) {
+					LOG_DEBUG(REORDERAPP,
+					"%s():Received zero packets\n",	__func__);
+					continue;
+				}
+				app_stats.rx.rx_pkts += nb_rx_pkts;
+
+				/* mark sequence number */
+				for (i = 0; i < nb_rx_pkts; )
+					pkts[i++]->seqn = seqn++;
+
+				/* enqueue to rx_to_workers ring */
+				ret = rte_ring_enqueue_burst(ring_out, (void *) pkts,
+								nb_rx_pkts);
+				app_stats.rx.enqueue_pkts += ret;
+				if (unlikely(ret < nb_rx_pkts)) {
+					app_stats.rx.enqueue_failed_pkts +=
+									(nb_rx_pkts-ret);
+					pktmbuf_free_bulk(&pkts[ret], nb_rx_pkts - ret);
+				}
+			}
+		}
+	}
+	return 0;
+}
+
+/**
+ * This thread takes bursts of packets from the rx_to_workers ring and
+ * Changes the input port value to output port value. And feds it to
+ * workers_to_tx
+ */
+static int
+worker_thread(void *args_ptr)
+{
+	const uint8_t nb_ports = rte_eth_dev_count();
+	uint16_t i, ret = 0;
+	uint16_t burst_size = 0;
+	struct worker_thread_args *args;
+	struct rte_mbuf *burst_buffer[MAX_PKTS_BURST] = { NULL };
+	struct rte_ring *ring_in, *ring_out;
+
+	args = (struct worker_thread_args *) args_ptr;
+	ring_in  = args->ring_in;
+	ring_out = args->ring_out;
+
+	RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__,
+							rte_lcore_id());
+	const unsigned xor_val = (nb_ports > 1);
+	while (!quit_signal) {
+
+		/* dequeue the mbufs from rx_to_workers ring */
+		burst_size = rte_ring_dequeue_burst(ring_in,
+				(void *)burst_buffer, MAX_PKTS_BURST);
+		if (unlikely(burst_size == 0))
+			continue;
+
+		__sync_fetch_and_add(&app_stats.wkr.dequeue_pkts, burst_size);
+
+		/* just do some operation on mbuf */
+		for (i = 0; i < burst_size;)
+			burst_buffer[i++]->port ^= xor_val;
+
+		/* enqueue the modified mbufs to workers_to_tx ring */
+		ret = rte_ring_enqueue_burst(ring_out, (void *)burst_buffer, burst_size);
+		__sync_fetch_and_add(&app_stats.wkr.enqueue_pkts, ret);
+		if (unlikely(ret < burst_size)) {
+			/* Return the mbufs to their respective pool, dropping packets */
+			__sync_fetch_and_add(&app_stats.wkr.enqueue_failed_pkts,
+					(int)burst_size - ret);
+			pktmbuf_free_bulk(&burst_buffer[ret], burst_size - ret);
+		}
+	}
+	return 0;
+}
+
+static inline void
+flush_one_port(struct output_buffer *outbuf, uint8_t outp)
+{
+	unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs,
+			outbuf->count);
+	app_stats.tx.ro_tx_pkts += nb_tx;
+
+	if (unlikely(nb_tx < outbuf->count)) {
+		/* free the mbufs which failed from transmit */
+		app_stats.tx.ro_tx_failed_pkts += (outbuf->count - nb_tx);
+		LOG_DEBUG(REORDERAPP, "%s:Packet loss with tx_burst\n", __func__);
+		pktmbuf_free_bulk(&outbuf->mbufs[nb_tx], outbuf->count - nb_tx);
+	}
+	outbuf->count = 0;
+}
+
+/**
+ * Dequeue mbufs from the workers_to_tx ring and reorder them before
+ * transmitting.
+ */
+static int
+send_thread(struct rte_ring *ring_in)
+{
+	int ret;
+	unsigned int i, dret;
+	uint16_t nb_dq_mbufs;
+	uint8_t outp;
+	static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS];
+	struct rte_mbuf *mbufs[MAX_PKTS_BURST];
+	struct rte_mbuf *rombufs[MAX_PKTS_BURST] = {NULL};
+	struct rte_reorder_buffer *buffer;
+
+	RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__,
+							rte_lcore_id());
+	buffer = rte_reorder_create("PKT_RO", rte_socket_id(), REORDER_BUFFER_SIZE);
+	while (!quit_signal) {
+
+		/* deque the mbufs from workers_to_tx ring */
+		nb_dq_mbufs = rte_ring_dequeue_burst(ring_in,
+				(void *)mbufs, MAX_PKTS_BURST);
+
+		if (unlikely(nb_dq_mbufs == 0))
+			continue;
+
+		app_stats.tx.dequeue_pkts += nb_dq_mbufs;
+
+		for (i = 0; i < nb_dq_mbufs; i++) {
+			/* send dequeued mbufs for reordering */
+			ret = rte_reorder_insert(buffer, mbufs[i]);
+
+			if (ret == -1 && rte_errno == ERANGE) {
+				/* Too early pkts should be transmitted out directly */
+				LOG_DEBUG(REORDERAPP, "%s():Cannot reorder early packet"
+						"direct enqueuing to TX\n", __func__);
+				outp = mbufs[i]->port;
+				if ((portmask & (1 << outp)) == 0) {
+					rte_pktmbuf_free(mbufs[i]);
+					continue;
+				}
+				if (rte_eth_tx_burst(outp, 0, (void *)mbufs[i], 1) != 1) {
+					rte_pktmbuf_free(mbufs[i]);
+					app_stats.tx.early_pkts_tx_failed_woro++;
+				} else
+					app_stats.tx.early_pkts_txtd_woro++;
+			} else if (ret == -1 && rte_errno == ENOSPC) {
+				/**
+				 * Early pkts just outside of window should be dropped
+				 */
+				rte_pktmbuf_free(mbufs[i]);
+			}
+		}
+
+		/*
+		 * drain MAX_PKTS_BURST of reordered
+		 * mbufs for transmit
+		 */
+		dret = rte_reorder_drain(buffer, rombufs, MAX_PKTS_BURST);
+		for (i = 0; i < dret; i++) {
+
+			struct output_buffer *outbuf;
+			uint8_t outp1;
+
+			outp1 = rombufs[i]->port;
+			/* skip ports that are not enabled */
+			if ((portmask & (1 << outp1)) == 0) {
+				rte_pktmbuf_free(rombufs[i]);
+				continue;
+			}
+
+			outbuf = &tx_buffers[outp1];
+			outbuf->mbufs[outbuf->count++] = rombufs[i];
+			if (outbuf->count == MAX_PKTS_BURST)
+				flush_one_port(outbuf, outp1);
+		}
+	}
+	return 0;
+}
+
+int
+main(int argc, char **argv)
+{
+	int ret;
+	unsigned nb_ports;
+	unsigned int lcore_id, last_lcore_id, master_lcore_id;
+	uint8_t port_id;
+	uint8_t nb_ports_available;
+	struct worker_thread_args worker_args = {NULL, NULL};
+	struct rte_ring *rx_to_workers;
+	struct rte_ring *workers_to_tx;
+
+	/* catch ctrl-c so we can print on exit */
+	signal(SIGINT, int_handler);
+
+	/* Initialize EAL */
+	ret = rte_eal_init(argc, argv);
+	if (ret < 0)
+		return -1;
+
+	argc -= ret;
+	argv += ret;
+
+	/* Parse the application specific arguments */
+	ret = parse_args(argc, argv);
+	if (ret < 0)
+		return -1;
+
+	/* Check if we have enought cores */
+	if (rte_lcore_count() < 3)
+		rte_exit(EXIT_FAILURE, "Error, This application needs at "
+				"least 3 logical cores to run:\n"
+				"1 lcore for packet RX\n"
+				"1 lcore for packet TX\n"
+				"and at least 1 lcore for worker threads\n");
+
+	nb_ports = rte_eth_dev_count();
+	if (nb_ports == 0)
+		rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n");
+	if (nb_ports != 1 && (nb_ports & 1))
+		rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except "
+				"when using a single port\n");
+
+	mbuf_pool = rte_mempool_create("mbuf_pool", MBUF_PER_POOL, MBUF_SIZE,
+			MBUF_POOL_CACHE_SIZE,
+			sizeof(struct rte_pktmbuf_pool_private),
+			rte_pktmbuf_pool_init, NULL,
+			rte_pktmbuf_init, NULL,
+			rte_socket_id(), 0);
+	if (mbuf_pool == NULL)
+		rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno));
+
+	nb_ports_available = nb_ports;
+
+	/* initialize all ports */
+	for (port_id = 0; port_id < nb_ports; port_id++) {
+		/* skip ports that are not enabled */
+		if ((portmask & (1 << port_id)) == 0) {
+			printf("\nSkipping disabled port %d\n", port_id);
+			nb_ports_available--;
+			continue;
+		}
+		/* init port */
+		printf("Initializing port %u... done\n", (unsigned) port_id);
+
+		if (configure_eth_port(port_id) != 0)
+			rte_exit(EXIT_FAILURE, "Cannot initialize port %"PRIu8"\n",
+					port_id);
+	}
+
+	if (!nb_ports_available) {
+		rte_exit(EXIT_FAILURE,
+			"All available ports are disabled. Please set portmask.\n");
+	}
+
+	/* Create rings for inter core communication */
+	rx_to_workers = rte_ring_create("rx_to_workers", RING_SIZE, rte_socket_id(),
+			RING_F_SP_ENQ);
+	if (rx_to_workers == NULL)
+		rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno));
+
+	workers_to_tx = rte_ring_create("workers_to_tx", RING_SIZE, rte_socket_id(),
+			RING_F_SC_DEQ);
+	if (workers_to_tx == NULL)
+		rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno));
+
+	last_lcore_id   = get_last_lcore_id();
+	master_lcore_id = rte_get_master_lcore();
+
+	worker_args.ring_in  = rx_to_workers;
+	worker_args.ring_out = workers_to_tx;
+
+	/* Start worker_thread() on all the available slave cores but the last 1 */
+	for (lcore_id = 0; lcore_id <= get_previous_lcore_id(last_lcore_id); lcore_id++)
+		if (rte_lcore_is_enabled(lcore_id) && lcore_id != master_lcore_id)
+			rte_eal_remote_launch(worker_thread, (void *)&worker_args,
+						lcore_id);
+
+	/* Start send_thread() on the last slave core */
+	rte_eal_remote_launch((lcore_function_t *)send_thread, workers_to_tx,
+				last_lcore_id);
+
+	/* Start rx_thread() on the master core */
+	rx_thread(rx_to_workers);
+
+	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
+		if (rte_eal_wait_lcore(lcore_id) < 0)
+			return -1;
+	}
+
+	print_stats();
+	return 0;
+}
diff --git a/lib/Makefile b/lib/Makefile
index 0ffc982..5919d32 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -65,6 +65,7 @@ DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += librte_distributor
 DIRS-$(CONFIG_RTE_LIBRTE_PORT) += librte_port
 DIRS-$(CONFIG_RTE_LIBRTE_TABLE) += librte_table
 DIRS-$(CONFIG_RTE_LIBRTE_PIPELINE) += librte_pipeline
+DIRS-$(CONFIG_RTE_LIBRTE_REORDER) += librte_reorder
 
 ifeq ($(CONFIG_RTE_EXEC_ENV_LINUXAPP),y)
 DIRS-$(CONFIG_RTE_LIBRTE_KNI) += librte_kni
diff --git a/lib/librte_eal/common/include/rte_tailq_elem.h b/lib/librte_eal/common/include/rte_tailq_elem.h
index f74fc7c..3013869 100644
--- a/lib/librte_eal/common/include/rte_tailq_elem.h
+++ b/lib/librte_eal/common/include/rte_tailq_elem.h
@@ -84,6 +84,8 @@ rte_tailq_elem(RTE_TAILQ_ACL, "RTE_ACL")
 
 rte_tailq_elem(RTE_TAILQ_DISTRIBUTOR, "RTE_DISTRIBUTOR")
 
+rte_tailq_elem(RTE_TAILQ_REORDER, "RTE_REORDER")
+
 rte_tailq_end(RTE_TAILQ_NUM)
 
 #undef rte_tailq_elem
diff --git a/lib/librte_mbuf/rte_mbuf.h b/lib/librte_mbuf/rte_mbuf.h
index 16059c6..ed27eb8 100644
--- a/lib/librte_mbuf/rte_mbuf.h
+++ b/lib/librte_mbuf/rte_mbuf.h
@@ -262,6 +262,9 @@ struct rte_mbuf {
 		uint32_t usr;	  /**< User defined tags. See @rte_distributor_process */
 	} hash;                   /**< hash information */
 
+	/* sequence number - field used in distributor and reorder library */
+	uint32_t seqn;
+
 	/* second cache line - fields only used in slow path or on TX */
 	MARKER cacheline1 __rte_cache_aligned;
 
diff --git a/lib/librte_reorder/Makefile b/lib/librte_reorder/Makefile
new file mode 100644
index 0000000..12b916f
--- /dev/null
+++ b/lib/librte_reorder/Makefile
@@ -0,0 +1,50 @@
+#   BSD LICENSE
+#
+#   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+#   All rights reserved.
+#
+#   Redistribution and use in source and binary forms, with or without
+#   modification, are permitted provided that the following conditions
+#   are met:
+#
+#     * Redistributions of source code must retain the above copyright
+#       notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above copyright
+#       notice, this list of conditions and the following disclaimer in
+#       the documentation and/or other materials provided with the
+#       distribution.
+#     * Neither the name of Intel Corporation nor the names of its
+#       contributors may be used to endorse or promote products derived
+#       from this software without specific prior written permission.
+#
+#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# library name
+LIB = librte_reorder.a
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
+
+# all source are stored in SRCS-y
+SRCS-$(CONFIG_RTE_LIBRTE_REORDER) := rte_reorder.c
+
+# install this header file
+SYMLINK-$(CONFIG_RTE_LIBRTE_REORDER)-include := rte_reorder.h
+
+# this lib depends upon:
+DEPDIRS-$(CONFIG_RTE_LIBRTE_REORDER) += lib/librte_mbuf
+DEPDIRS-$(CONFIG_RTE_LIBRTE_REORDER) += lib/librte_eal
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_reorder/rte_reorder.c b/lib/librte_reorder/rte_reorder.c
new file mode 100644
index 0000000..fb3e986
--- /dev/null
+++ b/lib/librte_reorder/rte_reorder.c
@@ -0,0 +1,464 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <inttypes.h>
+#include <string.h>
+
+#include <rte_log.h>
+#include <rte_mbuf.h>
+#include <rte_memzone.h>
+#include <rte_eal_memconfig.h>
+#include <rte_errno.h>
+#include <rte_tailq.h>
+#include <rte_malloc.h>
+
+#include "rte_reorder.h"
+
+TAILQ_HEAD(rte_reorder_list, rte_tailq_entry);
+
+#define NO_FLAGS 0
+#define RTE_REORDER_PREFIX "RO_"
+#define RTE_REORDER_NAMESIZE 32
+
+/* Macros for printing using RTE_LOG */
+#define RTE_LOGTYPE_REORDER	RTE_LOGTYPE_USER1
+
+/* A generic circular buffer */
+struct cir_buffer {
+	unsigned int size;   /**< Number of entries that can be stored */
+	unsigned int mask;   /**< [buffer_size - 1]: used for wrap-around */
+	unsigned int head;   /**< insertion point in buffer */
+	unsigned int tail;   /**< extraction point in buffer */
+	struct rte_mbuf **entries;
+} __rte_cache_aligned;
+
+/* The reorder buffer data structure itself */
+struct rte_reorder_buffer {
+	char name[RTE_REORDER_NAMESIZE];
+	uint32_t min_seqn;  /**< Lowest seq. number that can be in the buffer */
+	unsigned int memsize; /**< memory area size of reorder buffer */
+	struct cir_buffer ready_buf; /**< temp buffer for dequeued entries */
+	struct cir_buffer order_buf; /**< buffer used to reorder entries */
+} __rte_cache_aligned;
+
+struct rte_reorder_buffer *
+rte_reorder_init(void *buf, unsigned int bufsize,
+	const char *name, unsigned int size)
+{
+	struct rte_reorder_buffer *b = (struct rte_reorder_buffer *)buf;
+	const unsigned int min_bufsize = sizeof(*b) +
+					(2 * size * sizeof(struct rte_mbuf *));
+
+	struct rte_reorder_buffer *be;
+	struct rte_tailq_entry *te;
+	struct rte_reorder_list *reorder_list;
+
+	/* check that we have an initialised tail queue */
+	reorder_list = RTE_TAILQ_LOOKUP_BY_IDX(RTE_TAILQ_REORDER, rte_reorder_list);
+	if (!reorder_list) {
+		rte_errno = E_RTE_NO_TAILQ;
+		return NULL;
+	}
+
+	if (!rte_is_power_of_2(size)) {
+		RTE_LOG(ERR, REORDER, "Invalid reorder buffer size"
+				" - Not a power of 2\n");
+		rte_errno = EINVAL;
+		return NULL;
+	}
+	if (b == NULL) {
+		RTE_LOG(ERR, REORDER, "Invalid reorder buffer parameter:"
+					" NULL\n");
+		rte_errno = EINVAL;
+		return NULL;
+	}
+	if (name == NULL) {
+		RTE_LOG(ERR, REORDER, "Invalid reorder buffer name ptr:"
+					" NULL\n");
+		rte_errno = EINVAL;
+		return NULL;
+	}
+	if (bufsize < min_bufsize) {
+		RTE_LOG(ERR, REORDER, "Invalid reorder buffer size:%u, "
+			"should be minimum:%u\n", bufsize, min_bufsize);
+		rte_errno = ENOMEM;
+		return NULL;
+	}
+
+	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+
+	/* guarantee there's no existing */
+	TAILQ_FOREACH(te, reorder_list, next) {
+		be = (struct rte_reorder_buffer *) te->data;
+		if (strncmp(name, be->name, RTE_REORDER_NAMESIZE) == 0)
+			break;
+	}
+	if (te != NULL) {
+		b = be;
+		memset(b, 0, bufsize);
+		snprintf(b->name, sizeof(b->name), "%s", name);
+		b->memsize = bufsize;
+		b->order_buf.size = b->ready_buf.size = size;
+		b->order_buf.mask = b->ready_buf.mask = size - 1;
+		b->ready_buf.entries = (void *)&b[1];
+		b->order_buf.entries = RTE_PTR_ADD(&b[1],
+				size * sizeof(b->ready_buf.entries[0]));
+		goto exit;
+	}
+
+	/* allocate tailq entry */
+	te = rte_zmalloc("REORDER_TAILQ_ENTRY", sizeof(*te), 0);
+	if (te == NULL) {
+		RTE_LOG(ERR, REORDER, "Failed to allocate tailq entry\n");
+		goto exit;
+	}
+
+	memset(b, 0, bufsize);
+	snprintf(b->name, sizeof(b->name), "%s", name);
+	b->memsize = bufsize;
+	b->order_buf.size = b->ready_buf.size = size;
+	b->order_buf.mask = b->ready_buf.mask = size - 1;
+	b->ready_buf.entries = (void *)&b[1];
+	b->order_buf.entries = RTE_PTR_ADD(&b[1],
+			size * sizeof(b->ready_buf.entries[0]));
+
+	te->data = (void *) b;
+
+	TAILQ_INSERT_TAIL(reorder_list, te, next);
+
+exit:
+	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+	return b;
+}
+
+void rte_reorder_reset(struct rte_reorder_buffer *b)
+{
+	unsigned int i = 0;
+	char name[RTE_REORDER_NAMESIZE];
+	/* Free up the mbufs of order buffer & ready buffer */
+	for (i = 0; i < b->order_buf.size; i++) {
+		if (b->order_buf.entries[i])
+			rte_pktmbuf_free(b->order_buf.entries[i]);
+		if (b->ready_buf.entries[i])
+			rte_pktmbuf_free(b->ready_buf.entries[i]);
+	}
+	snprintf(name, sizeof(name), "%s", b->name);
+	rte_reorder_init(b, b->memsize, name, b->order_buf.size);
+}
+
+struct rte_reorder_buffer*
+rte_reorder_create(const char *name, unsigned socket_id, unsigned int size)
+{
+	const struct rte_memzone *mz;
+	struct rte_reorder_buffer *b = NULL;
+	struct rte_tailq_entry *te;
+	struct rte_reorder_list *reorder_list;
+	char mz_name[RTE_MEMZONE_NAMESIZE];
+
+	/* check that we have an initialised tail queue */
+	reorder_list = RTE_TAILQ_LOOKUP_BY_IDX(RTE_TAILQ_REORDER, rte_reorder_list);
+	if (!reorder_list) {
+		rte_errno = E_RTE_NO_TAILQ;
+		return NULL;
+	}
+
+	/* Check user arguments. */
+	if (!rte_is_power_of_2(size)) {
+		RTE_LOG(ERR, REORDER, "Invalid reorder buffer size"
+				" - Not a power of 2\n");
+		rte_errno = EINVAL;
+		return NULL;
+	}
+	if (name == NULL) {
+		RTE_LOG(ERR, REORDER, "Invalid reorder buffer name ptr:"
+					" NULL\n");
+		rte_errno = EINVAL;
+		return NULL;
+	}
+
+	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+
+	/* guarantee there's no existing */
+	TAILQ_FOREACH(te, reorder_list, next) {
+		b = (struct rte_reorder_buffer *) te->data;
+		if (strncmp(name, b->name, RTE_REORDER_NAMESIZE) == 0)
+			break;
+	}
+	if (te != NULL)
+		goto exit;
+
+	/* allocate tailq entry */
+	te = rte_zmalloc("REORDER_TAILQ_ENTRY", sizeof(*te), 0);
+	if (te == NULL) {
+		RTE_LOG(ERR, REORDER, "Failed to allocate tailq entry\n");
+		goto exit;
+	}
+
+	/* Allocate memory to store the reorder buffer structure. */
+	const unsigned int bufsize = sizeof(struct rte_reorder_buffer) +
+					(2 * size * sizeof(struct rte_mbuf *));
+	snprintf(mz_name, sizeof(mz_name), RTE_REORDER_PREFIX"%s", name);
+	mz = rte_memzone_reserve(mz_name, bufsize,
+			socket_id, NO_FLAGS);
+	if (mz == NULL) {
+		RTE_LOG(ERR, REORDER, "Memzone allocation failed\n");
+		rte_errno = ENOMEM;
+		return NULL;
+	}
+	b = mz->addr;
+	memset(b, 0, bufsize);
+	snprintf(b->name, sizeof(b->name), "%s", name);
+	b->memsize = bufsize;
+	b->order_buf.size = b->ready_buf.size = size;
+	b->order_buf.mask = b->ready_buf.mask = size - 1;
+	b->ready_buf.entries = (void *)&b[1];
+	b->order_buf.entries = RTE_PTR_ADD(&b[1],
+			size * sizeof(b->ready_buf.entries[0]));
+
+	te->data = (void *) b;
+
+	TAILQ_INSERT_TAIL(reorder_list, te, next);
+
+exit:
+	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+	return b;
+}
+
+void
+rte_reorder_free(struct rte_reorder_buffer *b)
+{
+	struct rte_reorder_list *reorder_list;
+	struct rte_tailq_entry *te;
+
+	/* Check user arguments. */
+	if (b == NULL)
+		return;
+
+	/* check that we have an initialised tail queue */
+	reorder_list = RTE_TAILQ_LOOKUP_BY_IDX(RTE_TAILQ_REORDER, rte_reorder_list);
+	if (!reorder_list) {
+		rte_errno = E_RTE_NO_TAILQ;
+		return;
+	}
+
+	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+
+	/* find our tailq entry */
+	TAILQ_FOREACH(te, reorder_list, next) {
+		if (te->data == (void *) b)
+			break;
+	}
+	if (te == NULL) {
+		rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+		return;
+	}
+
+	TAILQ_REMOVE(reorder_list, te, next);
+
+	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+	rte_free(b);
+	rte_free(te);
+}
+
+struct rte_reorder_buffer *
+rte_reorder_find_existing(const char *name)
+{
+	struct rte_reorder_buffer *b = NULL;
+	struct rte_tailq_entry *te;
+	struct rte_reorder_list *reorder_list;
+
+	/* check that we have an initialised tail queue */
+	reorder_list = RTE_TAILQ_LOOKUP_BY_IDX(RTE_TAILQ_REORDER, rte_reorder_list);
+	if (!reorder_list) {
+		rte_errno = E_RTE_NO_TAILQ;
+		return NULL;
+	}
+
+	rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK);
+	TAILQ_FOREACH(te, reorder_list, next) {
+		b = (struct rte_reorder_buffer *) te->data;
+		if (strncmp(name, b->name, RTE_REORDER_NAMESIZE) == 0)
+			break;
+	}
+	rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+	if (te == NULL) {
+		rte_errno = ENOENT;
+		return NULL;
+	}
+
+	return b;
+}
+
+static unsigned
+rte_reorder_fill_overflow(struct rte_reorder_buffer *b, unsigned n)
+{
+	/*
+	 * 1. Move all ready entries that fit to the ready_buf
+	 * 2. check if we meet the minimum needed (n).
+	 * 3. If not, then skip any gaps and keep moving.
+	 * 4. If at any point the ready buffer is full, stop
+	 * 5. Return the number of positions the order_buf head has moved
+	 */
+
+	struct cir_buffer *order_buf = &b->order_buf,
+			*ready_buf = &b->ready_buf;
+
+	unsigned int order_head_adv = 0;
+
+	/*
+	 * move at least n packets to ready buffer, assuming ready buffer
+	 * has room for those packets.
+	 */
+	while (order_head_adv < n &&
+			((ready_buf->head + 1) & ready_buf->mask) != ready_buf->tail) {
+
+		/* if we are blocked waiting on a packet, skip it */
+		if (order_buf->entries[order_buf->head] == NULL) {
+			order_buf->head++, order_head_adv++;
+
+			if (order_buf->head == order_buf->size)
+				order_buf->head = 0;
+		}
+
+		/* Move all ready entries that fit to the ready_buf */
+		while (order_buf->entries[order_buf->head] != NULL) {
+			ready_buf->entries[ready_buf->head++] =
+					order_buf->entries[order_buf->head];
+
+			order_buf->entries[order_buf->head++] = NULL;
+			order_head_adv++;
+
+			if (ready_buf->head == ready_buf->size)
+				ready_buf->head = 0;
+			if (order_buf->head == order_buf->size)
+				order_buf->head = 0;
+
+			if (((ready_buf->head+1) & ready_buf->mask) == ready_buf->tail)
+				break;
+		}
+	}
+
+	b->min_seqn += order_head_adv;
+	/* Return the number of positions the order_buf head has moved */
+	return order_head_adv;
+}
+
+int
+rte_reorder_insert(struct rte_reorder_buffer *b, struct rte_mbuf *mbuf)
+{
+	uint32_t offset, position;
+	struct cir_buffer *order_buf = &b->order_buf;
+
+	/*
+	 * calculate the offset from the head pointer we need to go.
+	 * The subtraction takes care of the sequence number wrapping.
+	 * For example (using 16-bit for brevity):
+	 *	min_seqn  = 0xFFFD
+	 *	mbuf_seqn = 0x0010
+	 *	offset    = 0x0010 - 0xFFFD = 0x13
+	 */
+	offset = mbuf->seqn - b->min_seqn;
+
+	/*
+	 * action to take depends on offset.
+	 * offset < buffer->size: the mbuf fits within the current window of
+	 *    sequence numbers we can reorder. EXPECTED CASE.
+	 * offset > buffer->size: the mbuf is outside the current window. There
+	 *    are a number of cases to consider:
+	 *    1. The packet sequence is just outside the window, then we need
+	 *       to see about shifting the head pointer and taking any ready
+	 *       to return packets out of the ring. If there was a delayed
+	 *       or dropped packet preventing drains from shifting the window
+	 *       this case will skip over the dropped packet instead, and any
+	 *       packets dequeued here will be returned on the next drain call.
+	 *    2. The packet sequence number is vastly outside our window, taken
+	 *       here as having offset greater than twice the buffer size. In
+	 *       this case, the packet is probably an old or late packet that
+	 *       was previously skipped, so just enqueue the packet for
+	 *       immediate return on the next drain call, or else return error.
+	 */
+	if (offset < b->order_buf.size) {
+		position = (order_buf->head + offset) & order_buf->mask;
+		order_buf->entries[position] = mbuf;
+	} else if (offset < 2 * b->order_buf.size) {
+		if (rte_reorder_fill_overflow(b, offset - order_buf->size) <
+				offset - order_buf->size) {
+			/* Put in handling for enqueue straight to output */
+			rte_errno = ENOSPC;
+			return -1;
+		}
+		offset = mbuf->seqn - b->min_seqn;
+		position = (order_buf->head + offset) & order_buf->mask;
+		order_buf->entries[position] = mbuf;
+	} else {
+		/* Put in handling for enqueue straight to output */
+		rte_errno = ERANGE;
+		return -1;
+	}
+	return 0;
+}
+
+unsigned int
+rte_reorder_drain(struct rte_reorder_buffer *b, struct rte_mbuf **mbufs,
+		unsigned max_mbufs)
+{
+	unsigned int drain_cnt = 0;
+
+	struct cir_buffer *order_buf = &b->order_buf,
+			*ready_buf = &b->ready_buf;
+
+	/* Try to fetch requested number of mbufs from ready buffer */
+	while ((drain_cnt < max_mbufs) && (ready_buf->tail != ready_buf->head)) {
+		mbufs[drain_cnt++] = ready_buf->entries[ready_buf->tail++];
+		if (ready_buf->tail == ready_buf->size)
+			ready_buf->tail = 0;
+	}
+
+	/*
+	 * If requested number of buffers not fetched from ready buffer, fetch
+	 * remaining buffers from order buffer
+	 */
+	while ((drain_cnt < max_mbufs) &&
+			(order_buf->entries[order_buf->head] != NULL)) {
+		mbufs[drain_cnt++] = order_buf->entries[order_buf->head];
+		order_buf->entries[order_buf->head] = NULL;
+		b->min_seqn++;
+		order_buf->head++;
+		if (order_buf->head == order_buf->size)
+			order_buf->head = 0;
+	}
+
+	return drain_cnt;
+}
diff --git a/lib/librte_reorder/rte_reorder.h b/lib/librte_reorder/rte_reorder.h
new file mode 100644
index 0000000..3ec7011
--- /dev/null
+++ b/lib/librte_reorder/rte_reorder.h
@@ -0,0 +1,184 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_REORDER_H_
+#define _RTE_REORDER_H_
+
+/**
+ * @file
+ * RTE reorder
+ *
+ * Reorder library is a component which is designed to
+ * provide ordering of out of ordered packets based on
+ * sequence number present in mbuf.
+ *
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct rte_reorder_buffer;
+
+/**
+ * Create a new reorder buffer instance
+ *
+ * Allocate memory and initialize a new reorder buffer in that
+ * memory, returning the reorder buffer pointer to the user
+ *
+ * @param name
+ *   The name to be given to the reorder buffer instance.
+ * @param socket_id
+ *   The NUMA node on which the memory for the reorder buffer
+ *   instance is to be reserved.
+ * @param size
+ *   Max number of elements that can be stored in the reorder buffer
+ * @return
+ *   The initialized reorder buffer instance, or NULL on error
+ *   On error case, rte_errno will be set appropriately:
+ *    - ENOMEM - no appropriate memory area found in which to create memzone
+ *    - EINVAL - invalid parameters
+ */
+struct rte_reorder_buffer *
+rte_reorder_create(const char *name, unsigned socket_id, unsigned int size);
+
+/**
+ * Initializes given reorder buffer instance
+ *
+ * @param buf
+ *   Pointer to memory area where reorder buffer instance
+ *   should be initialized
+ * @param bufsize
+ *   Size of the memory area to be used for reorder buffer instance
+ *   initialization
+ * @param name
+ *   The name to be given to the reorder buffer instance
+ * @param size
+ *   Number of elements that can be stored in reorder buffer
+ * @return
+ *   The initialized reorder buffer instance, or NULL on error
+ *   On error case, rte_errno will be set appropriately:
+ *    - EINVAL - invalid parameters
+ *    - ENOMEM - not enough memory for reorder buffer instance
+ *    initialization
+ */
+struct rte_reorder_buffer *
+rte_reorder_init(void *buf, unsigned int bufsize,
+		const char *name, unsigned int size);
+
+/**
+ * Reset the given reorder buffer instance with initial values.
+ *
+ * @param b
+ *   Reorder buffer instance which has to be reset
+ */
+void rte_reorder_reset(struct rte_reorder_buffer *b);
+
+/**
+ * Find an existing reorder buffer instance
+ * and return a pointer to it.
+ *
+ * @param name
+ *   Name of the reorder buffer instacne as passed to rte_reorder_create()
+ * @return
+ *   Pointer to reorder buffer instance or NULL if object not found with rte_errno
+ *   set appropriately. Possible rte_errno values include:
+ *    - ENOENT - required entry not available to return.
+ *    - E_RTE_NO_TAILQ - no tailq list could be got for the
+ *    reorder instance list
+ */
+struct rte_reorder_buffer *
+rte_reorder_find_existing(const char *name);
+
+/**
+ * Free reorder buffer instance.
+ *
+ * @param b
+ *   reorder buffer instance
+ * @return
+ *   None
+ */
+void
+rte_reorder_free(struct rte_reorder_buffer *b);
+
+/**
+ * Insert given mbuf in reorder buffer in its correct position
+ *
+ * The given mbuf is to be reordered relative to other mbufs in the system.
+ * The mbuf must contain a sequence number which is then used to place
+ * the buffer in the correct position in the reorder buffer. Reordered
+ * packets can later be taken from the buffer using the rte_reorder_drain()
+ * API.
+ *
+ * @param b
+ *   Reorder buffer where the mbuf has to be inserted.
+ * @param mbuf
+ *   mbuf of packet that needs to be inserted in reorder buffer.
+ * @return
+ *   0 on success
+ *   -1 on error
+ *   On error case, rte_errno will be set appropriately:
+ *    - ENOSPC - Cannot move existing mbufs from reorder buffer to accomodate ealry mbuf.
+ *    But mbuf can be accomodated by performing drain and then insert.
+ *    - ERANGE - Too early or late mbuf which is vastly out of
+ *    range of expected window should be ingnored without any handling.
+ */
+int
+rte_reorder_insert(struct rte_reorder_buffer *b, struct rte_mbuf *mbuf);
+
+/**
+ * Fetch reordered buffers
+ *
+ * Returns a set of in-order buffers from the reorder buffer structure. Gaps
+ * may be present in the sequence numbers of the mbuf if packets have been
+ * delayed too long before reaching the reorder window, or have been previously
+ * dropped by the system.
+ *
+ * @param b
+ *   Reorder buffer instance from which packets are to be drained
+ * @param mbufs
+ *   array of mbufs where reordered packets will be inserted from reorder buffer
+ * @param max_mbufs
+ *   the number of elements in the mbufs array.
+ * @return
+ *   number of mbuf pointers written to mbufs. 0 <= N < max_mbufs.
+ */
+unsigned int
+rte_reorder_drain(struct rte_reorder_buffer *b, struct rte_mbuf **mbufs,
+	unsigned max_mbufs);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_REORDER_H_ */
diff --git a/mk/rte.app.mk b/mk/rte.app.mk
index e1a0dbf..2a08acb 100644
--- a/mk/rte.app.mk
+++ b/mk/rte.app.mk
@@ -67,6 +67,10 @@ ifeq ($(CONFIG_RTE_LIBRTE_DISTRIBUTOR),y)
 LDLIBS += -lrte_distributor
 endif
 
+ifeq ($(CONFIG_RTE_LIBRTE_REORDER),y)
+LDLIBS += -lrte_reorder
+endif
+
 ifeq ($(CONFIG_RTE_LIBRTE_KNI),y)
 ifeq ($(CONFIG_RTE_EXEC_ENV_LINUXAPP),y)
 LDLIBS += -lrte_kni
-- 
1.8.3.1

^ permalink raw reply	[flat|nested] 2+ messages in thread

end of thread, other threads:[~2015-01-07 16:37 UTC | newest]

Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2015-01-07 15:28 [dpdk-dev] [PATCH] librte_reorder: New reorder library with unit tests and app Reshma Pattan
2015-01-07 16:37 ` Pattan, Reshma

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).