From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <dev-bounces@dpdk.org>
Received: from dpdk.org (dpdk.org [92.243.14.124])
	by inbox.dpdk.org (Postfix) with ESMTP id 9E8B1A2EDB
	for <public@inbox.dpdk.org>; Tue,  1 Oct 2019 08:29:37 +0200 (CEST)
Received: from [92.243.14.124] (localhost [127.0.0.1])
	by dpdk.org (Postfix) with ESMTP id 89A354CA7;
	Tue,  1 Oct 2019 08:29:30 +0200 (CEST)
Received: from foss.arm.com (foss.arm.com [217.140.110.172])
 by dpdk.org (Postfix) with ESMTP id 8C5CE4C8E
 for <dev@dpdk.org>; Tue,  1 Oct 2019 08:29:26 +0200 (CEST)
Received: from usa-sjc-imap-foss1.foss.arm.com (unknown [10.121.207.14])
 by usa-sjc-mx-foss1.foss.arm.com (Postfix) with ESMTP id F3BB81597;
 Mon, 30 Sep 2019 23:29:25 -0700 (PDT)
Received: from qc2400f-1.austin.arm.com (qc2400f-1.austin.arm.com
 [10.118.12.34])
 by usa-sjc-imap-foss1.foss.arm.com (Postfix) with ESMTPSA id 8FE743F739;
 Mon, 30 Sep 2019 23:32:03 -0700 (PDT)
From: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
To: honnappa.nagarahalli@arm.com, konstantin.ananyev@intel.com,
 stephen@networkplumber.org, paulmck@linux.ibm.com
Cc: yipeng1.wang@intel.com, vladimir.medvedkin@intel.com, ruifeng.wang@arm.com,
 dharmik.thakkar@arm.com, dev@dpdk.org, nd@arm.com
Date: Tue,  1 Oct 2019 01:29:16 -0500
Message-Id: <20191001062917.35578-3-honnappa.nagarahalli@arm.com>
X-Mailer: git-send-email 2.17.1
In-Reply-To: <20191001062917.35578-1-honnappa.nagarahalli@arm.com>
References: <20190906094534.36060-1-ruifeng.wang@arm.com>
 <20191001062917.35578-1-honnappa.nagarahalli@arm.com>
Subject: [dpdk-dev] [PATCH v3 2/3] lib/rcu: add resource reclamation APIs
X-BeenThere: dev@dpdk.org
X-Mailman-Version: 2.1.15
Precedence: list
List-Id: DPDK patches and discussions <dev.dpdk.org>
List-Unsubscribe: <https://mails.dpdk.org/options/dev>,
 <mailto:dev-request@dpdk.org?subject=unsubscribe>
List-Archive: <http://mails.dpdk.org/archives/dev/>
List-Post: <mailto:dev@dpdk.org>
List-Help: <mailto:dev-request@dpdk.org?subject=help>
List-Subscribe: <https://mails.dpdk.org/listinfo/dev>,
 <mailto:dev-request@dpdk.org?subject=subscribe>
Errors-To: dev-bounces@dpdk.org
Sender: "dev" <dev-bounces@dpdk.org>

Add resource reclamation APIs to make it simple for applications
and libraries to integrate rte_rcu library.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Reviewed-by: Ola Liljedhal <ola.liljedhal@arm.com>
Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
---
 app/test/test_rcu_qsbr.c           | 291 ++++++++++++++++++++++++++++-
 lib/librte_rcu/meson.build         |   2 +
 lib/librte_rcu/rte_rcu_qsbr.c      | 185 ++++++++++++++++++
 lib/librte_rcu/rte_rcu_qsbr.h      | 169 +++++++++++++++++
 lib/librte_rcu/rte_rcu_qsbr_pvt.h  |  46 +++++
 lib/librte_rcu/rte_rcu_version.map |   4 +
 lib/meson.build                    |   6 +-
 7 files changed, 700 insertions(+), 3 deletions(-)
 create mode 100644 lib/librte_rcu/rte_rcu_qsbr_pvt.h

diff --git a/app/test/test_rcu_qsbr.c b/app/test/test_rcu_qsbr.c
index d1b9e46a2..3a6815243 100644
--- a/app/test/test_rcu_qsbr.c
+++ b/app/test/test_rcu_qsbr.c
@@ -1,8 +1,9 @@
 /* SPDX-License-Identifier: BSD-3-Clause
- * Copyright (c) 2018 Arm Limited
+ * Copyright (c) 2019 Arm Limited
  */
 
 #include <stdio.h>
+#include <string.h>
 #include <rte_pause.h>
 #include <rte_rcu_qsbr.h>
 #include <rte_hash.h>
@@ -33,6 +34,7 @@ static uint32_t *keys;
 #define COUNTER_VALUE 4096
 static uint32_t *hash_data[RTE_MAX_LCORE][TOTAL_ENTRY];
 static uint8_t writer_done;
+static uint8_t cb_failed;
 
 static struct rte_rcu_qsbr *t[RTE_MAX_LCORE];
 struct rte_hash *h[RTE_MAX_LCORE];
@@ -582,6 +584,269 @@ test_rcu_qsbr_thread_offline(void)
 	return 0;
 }
 
+static void
+rte_rcu_qsbr_test_free_resource(void *p, void *e)
+{
+	if (p != NULL && e != NULL) {
+		printf("%s: Test failed\n", __func__);
+		cb_failed = 1;
+	}
+}
+
+/*
+ * rte_rcu_qsbr_dq_create: create a queue used to store the data structure
+ * elements that can be freed later. This queue is referred to as 'defer queue'.
+ */
+static int
+test_rcu_qsbr_dq_create(void)
+{
+	char rcu_dq_name[RTE_RING_NAMESIZE];
+	struct rte_rcu_qsbr_dq_parameters params;
+	struct rte_rcu_qsbr_dq *dq;
+
+	printf("\nTest rte_rcu_qsbr_dq_create()\n");
+
+	/* Pass invalid parameters */
+	dq = rte_rcu_qsbr_dq_create(NULL);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
+
+	memset(&params, 0, sizeof(struct rte_rcu_qsbr_dq_parameters));
+	dq = rte_rcu_qsbr_dq_create(&params);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
+
+	snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU");
+	params.name = rcu_dq_name;
+	dq = rte_rcu_qsbr_dq_create(&params);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
+
+	params.f = rte_rcu_qsbr_test_free_resource;
+	dq = rte_rcu_qsbr_dq_create(&params);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
+
+	rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
+	params.v = t[0];
+	dq = rte_rcu_qsbr_dq_create(&params);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
+
+	params.size = 1;
+	dq = rte_rcu_qsbr_dq_create(&params);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
+
+	params.esize = 3;
+	dq = rte_rcu_qsbr_dq_create(&params);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
+
+	/* Pass all valid parameters */
+	params.esize = 16;
+	dq = rte_rcu_qsbr_dq_create(&params);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params");
+	rte_rcu_qsbr_dq_delete(dq);
+
+	return 0;
+}
+
+/*
+ * rte_rcu_qsbr_dq_enqueue: enqueue one resource to the defer queue,
+ * to be freed later after atleast one grace period is over.
+ */
+static int
+test_rcu_qsbr_dq_enqueue(void)
+{
+	int ret;
+	uint64_t r;
+	char rcu_dq_name[RTE_RING_NAMESIZE];
+	struct rte_rcu_qsbr_dq_parameters params;
+	struct rte_rcu_qsbr_dq *dq;
+
+	printf("\nTest rte_rcu_qsbr_dq_enqueue()\n");
+
+	/* Create a queue with simple parameters */
+	memset(&params, 0, sizeof(struct rte_rcu_qsbr_dq_parameters));
+	snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU");
+	params.name = rcu_dq_name;
+	params.f = rte_rcu_qsbr_test_free_resource;
+	rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
+	params.v = t[0];
+	params.size = 1;
+	params.esize = 16;
+	dq = rte_rcu_qsbr_dq_create(&params);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params");
+
+	/* Pass invalid parameters */
+	ret = rte_rcu_qsbr_dq_enqueue(NULL, NULL);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue invalid params");
+
+	ret = rte_rcu_qsbr_dq_enqueue(dq, NULL);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue invalid params");
+
+	ret = rte_rcu_qsbr_dq_enqueue(NULL, &r);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue invalid params");
+
+	ret = rte_rcu_qsbr_dq_delete(dq);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 1), "dq delete valid params");
+
+	return 0;
+}
+
+/*
+ * rte_rcu_qsbr_dq_reclaim: Reclaim resources from the defer queue.
+ */
+static int
+test_rcu_qsbr_dq_reclaim(void)
+{
+	int ret;
+
+	printf("\nTest rte_rcu_qsbr_dq_reclaim()\n");
+
+	/* Pass invalid parameters */
+	ret = rte_rcu_qsbr_dq_reclaim(NULL);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 1), "dq reclaim invalid params");
+
+	return 0;
+}
+
+/*
+ * rte_rcu_qsbr_dq_delete: Delete a defer queue.
+ */
+static int
+test_rcu_qsbr_dq_delete(void)
+{
+	int ret;
+	char rcu_dq_name[RTE_RING_NAMESIZE];
+	struct rte_rcu_qsbr_dq_parameters params;
+	struct rte_rcu_qsbr_dq *dq;
+
+	printf("\nTest rte_rcu_qsbr_dq_delete()\n");
+
+	/* Pass invalid parameters */
+	ret = rte_rcu_qsbr_dq_delete(NULL);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 1), "dq delete invalid params");
+
+	memset(&params, 0, sizeof(struct rte_rcu_qsbr_dq_parameters));
+	snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU");
+	params.name = rcu_dq_name;
+	params.f = rte_rcu_qsbr_test_free_resource;
+	rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
+	params.v = t[0];
+	params.size = 1;
+	params.esize = 16;
+	dq = rte_rcu_qsbr_dq_create(&params);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params");
+	ret = rte_rcu_qsbr_dq_delete(dq);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0), "dq delete valid params");
+
+	return 0;
+}
+
+/*
+ * rte_rcu_qsbr_dq_enqueue: enqueue one resource to the defer queue,
+ * to be freed later after atleast one grace period is over.
+ */
+static int
+test_rcu_qsbr_dq_functional(int32_t size, int32_t esize)
+{
+	int i, j, ret;
+	char rcu_dq_name[RTE_RING_NAMESIZE];
+	struct rte_rcu_qsbr_dq_parameters params;
+	struct rte_rcu_qsbr_dq *dq;
+	uint64_t *e;
+	uint64_t sc = 200;
+	int max_entries;
+
+	printf("\nTest rte_rcu_qsbr_dq_xxx functional tests()\n");
+	printf("Size = %d, esize = %d\n", size, esize);
+
+	e = (uint64_t *)rte_zmalloc(NULL, esize, RTE_CACHE_LINE_SIZE);
+	if (e == NULL)
+		return 0;
+	cb_failed = 0;
+
+	/* Initialize the RCU variable. No threads are registered */
+	rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
+
+	/* Create a queue with simple parameters */
+	memset(&params, 0, sizeof(struct rte_rcu_qsbr_dq_parameters));
+	snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU");
+	params.name = rcu_dq_name;
+	params.f = rte_rcu_qsbr_test_free_resource;
+	params.v = t[0];
+	params.size = size;
+	params.esize = esize;
+	dq = rte_rcu_qsbr_dq_create(&params);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params");
+
+	/* Given the size and esize, calculate the maximum number of entries
+	 * that can be stored on the defer queue (look at the logic used
+	 * in capacity calculation of rte_ring).
+	 */
+	max_entries = rte_align32pow2(((esize/8 + 1) * size) + 1);
+	max_entries = (max_entries - 1)/(esize/8 + 1);
+
+	/* Enqueue few counters starting with the value 'sc' */
+	/* The queue size will be rounded up to 2. The enqueue API also
+	 * reclaims if the queue size is above certain limit. Since, there
+	 * are no threads registered, reclamation succedes. Hence, it should
+	 * be possible to enqueue more than the provided queue size.
+	 */
+	for (i = 0; i < 10; i++) {
+		ret = rte_rcu_qsbr_dq_enqueue(dq, e);
+		TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0),
+			"dq enqueue functional");
+		for (j = 0; j < esize/8; j++)
+			e[j] = sc++;
+	}
+
+	/* Register a thread on the RCU QSBR variable. Reclamation will not
+	 * succeed. It should not be possible to enqueue more than the size
+	 * number of resources.
+	 */
+	rte_rcu_qsbr_thread_register(t[0], 1);
+	rte_rcu_qsbr_thread_online(t[0], 1);
+
+	for (i = 0; i < max_entries; i++) {
+		ret = rte_rcu_qsbr_dq_enqueue(dq, e);
+		TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0),
+			"dq enqueue functional");
+		for (j = 0; j < esize/8; j++)
+			e[j] = sc++;
+	}
+
+	/* Enqueue fails as queue is full */
+	ret = rte_rcu_qsbr_dq_enqueue(dq, e);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue functional");
+
+	/* Delete should fail as there are elements in defer queue which
+	 * cannot be reclaimed.
+	 */
+	ret = rte_rcu_qsbr_dq_delete(dq);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq delete valid params");
+
+	/* Report quiescent state, enqueue should succeed */
+	rte_rcu_qsbr_quiescent(t[0], 1);
+	for (i = 0; i < max_entries; i++) {
+		ret = rte_rcu_qsbr_dq_enqueue(dq, e);
+		TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0),
+			"dq enqueue functional");
+		for (j = 0; j < esize/8; j++)
+			e[j] = sc++;
+	}
+
+	/* Queue is full */
+	ret = rte_rcu_qsbr_dq_enqueue(dq, e);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue functional");
+
+	/* Report quiescent state, delete should succeed */
+	rte_rcu_qsbr_quiescent(t[0], 1);
+	ret = rte_rcu_qsbr_dq_delete(dq);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0), "dq delete valid params");
+
+	/* Validate that call back function did not return any error */
+	TEST_RCU_QSBR_RETURN_IF_ERROR((cb_failed == 1), "CB failed");
+
+	rte_free(e);
+	return 0;
+}
+
 /*
  * rte_rcu_qsbr_dump: Dump status of a single QS variable to a file
  */
@@ -1025,6 +1290,18 @@ test_rcu_qsbr_main(void)
 	if (test_rcu_qsbr_thread_offline() < 0)
 		goto test_fail;
 
+	if (test_rcu_qsbr_dq_create() < 0)
+		goto test_fail;
+
+	if (test_rcu_qsbr_dq_reclaim() < 0)
+		goto test_fail;
+
+	if (test_rcu_qsbr_dq_delete() < 0)
+		goto test_fail;
+
+	if (test_rcu_qsbr_dq_enqueue() < 0)
+		goto test_fail;
+
 	printf("\nFunctional tests\n");
 
 	if (test_rcu_qsbr_sw_sv_3qs() < 0)
@@ -1033,6 +1310,18 @@ test_rcu_qsbr_main(void)
 	if (test_rcu_qsbr_mw_mv_mqs() < 0)
 		goto test_fail;
 
+	if (test_rcu_qsbr_dq_functional(1, 8) < 0)
+		goto test_fail;
+
+	if (test_rcu_qsbr_dq_functional(2, 8) < 0)
+		goto test_fail;
+
+	if (test_rcu_qsbr_dq_functional(303, 16) < 0)
+		goto test_fail;
+
+	if (test_rcu_qsbr_dq_functional(7, 128) < 0)
+		goto test_fail;
+
 	free_rcu();
 
 	printf("\n");
diff --git a/lib/librte_rcu/meson.build b/lib/librte_rcu/meson.build
index 62920ba02..e280b29c1 100644
--- a/lib/librte_rcu/meson.build
+++ b/lib/librte_rcu/meson.build
@@ -10,3 +10,5 @@ headers = files('rte_rcu_qsbr.h')
 if cc.get_id() == 'clang' and dpdk_conf.get('RTE_ARCH_64') == false
 	ext_deps += cc.find_library('atomic')
 endif
+
+deps += ['ring']
diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c
index ce7f93dd3..76814f50b 100644
--- a/lib/librte_rcu/rte_rcu_qsbr.c
+++ b/lib/librte_rcu/rte_rcu_qsbr.c
@@ -21,6 +21,7 @@
 #include <rte_errno.h>
 
 #include "rte_rcu_qsbr.h"
+#include "rte_rcu_qsbr_pvt.h"
 
 /* Get the memory size of QSBR variable */
 size_t
@@ -267,6 +268,190 @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
 	return 0;
 }
 
+/* Create a queue used to store the data structure elements that can
+ * be freed later. This queue is referred to as 'defer queue'.
+ */
+struct rte_rcu_qsbr_dq *
+rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
+{
+	struct rte_rcu_qsbr_dq *dq;
+	uint32_t qs_fifo_size;
+
+	if (params == NULL || params->f == NULL ||
+		params->v == NULL || params->name == NULL ||
+		params->size == 0 || params->esize == 0 ||
+		(params->esize % 8 != 0)) {
+		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+			"%s(): Invalid input parameter\n", __func__);
+		rte_errno = EINVAL;
+
+		return NULL;
+	}
+
+	dq = rte_zmalloc(NULL,
+		(sizeof(struct rte_rcu_qsbr_dq) + params->esize),
+		RTE_CACHE_LINE_SIZE);
+	if (dq == NULL) {
+		rte_errno = ENOMEM;
+
+		return NULL;
+	}
+
+	/* round up qs_fifo_size to next power of two that is not less than
+	 * max_size.
+	 */
+	qs_fifo_size = rte_align32pow2((((params->esize/8) + 1)
+					* params->size) + 1);
+	dq->r = rte_ring_create(params->name, qs_fifo_size,
+					SOCKET_ID_ANY, 0);
+	if (dq->r == NULL) {
+		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+			"%s(): defer queue create failed\n", __func__);
+		rte_free(dq);
+		return NULL;
+	}
+
+	dq->v = params->v;
+	dq->size = params->size;
+	dq->esize = params->esize;
+	dq->f = params->f;
+	dq->p = params->p;
+
+	return dq;
+}
+
+/* Enqueue one resource to the defer queue to free after the grace
+ * period is over.
+ */
+int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
+{
+	uint64_t token;
+	uint64_t *tmp;
+	uint32_t i;
+	uint32_t cur_size, free_size;
+
+	if (dq == NULL || e == NULL) {
+		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+			"%s(): Invalid input parameter\n", __func__);
+		rte_errno = EINVAL;
+
+		return 1;
+	}
+
+	/* Start the grace period */
+	token = rte_rcu_qsbr_start(dq->v);
+
+	/* Reclaim resources if the queue is 1/8th full. This helps
+	 * the queue from growing too large and allows time for reader
+	 * threads to report their quiescent state.
+	 */
+	cur_size = rte_ring_count(dq->r) / (dq->esize/8 + 1);
+	if (cur_size > (dq->size >> RTE_RCU_QSBR_AUTO_RECLAIM_LIMIT)) {
+		rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+			"%s(): Triggering reclamation\n", __func__);
+		rte_rcu_qsbr_dq_reclaim(dq);
+	}
+
+	/* Check if there is space for atleast for 1 resource */
+	free_size = rte_ring_free_count(dq->r) / (dq->esize/8 + 1);
+	if (!free_size) {
+		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+			"%s(): Defer queue is full\n", __func__);
+		rte_errno = ENOSPC;
+		return 1;
+	}
+
+	/* Enqueue the resource */
+	rte_ring_sp_enqueue(dq->r, (void *)(uintptr_t)token);
+
+	/* The resource to enqueue needs to be a multiple of 64b
+	 * due to the limitation of the rte_ring implementation.
+	 */
+	for (i = 0, tmp = (uint64_t *)e; i < dq->esize/8; i++, tmp++)
+		rte_ring_sp_enqueue(dq->r, (void *)(uintptr_t)*tmp);
+
+	return 0;
+}
+
+/* Reclaim resources from the defer queue. */
+int
+rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq)
+{
+	uint32_t max_cnt;
+	uint32_t cnt;
+	void *token;
+	uint64_t *tmp;
+	uint32_t i;
+
+	if (dq == NULL) {
+		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+			"%s(): Invalid input parameter\n", __func__);
+		rte_errno = EINVAL;
+
+		return 1;
+	}
+
+	/* Anything to reclaim? */
+	if (rte_ring_count(dq->r) == 0)
+		return 0;
+
+	/* Reclaim at the max 1/16th the total number of entries. */
+	max_cnt = dq->size >> RTE_RCU_QSBR_MAX_RECLAIM_LIMIT;
+	max_cnt = (max_cnt == 0) ? dq->size : max_cnt;
+	cnt = 0;
+
+	/* Check reader threads quiescent state and reclaim resources */
+	while ((cnt < max_cnt) && (rte_ring_peek(dq->r, &token) == 0) &&
+		(rte_rcu_qsbr_check(dq->v, (uint64_t)((uintptr_t)token), false)
+			== 1)) {
+		(void)rte_ring_sc_dequeue(dq->r, &token);
+		/* The resource to dequeue needs to be a multiple of 64b
+		 * due to the limitation of the rte_ring implementation.
+		 */
+		for (i = 0, tmp = (uint64_t *)dq->e; i < dq->esize/8;
+			i++, tmp++)
+			(void)rte_ring_sc_dequeue(dq->r,
+					(void *)(uintptr_t)tmp);
+		dq->f(dq->p, dq->e);
+
+		cnt++;
+	}
+
+	rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+		"%s(): Reclaimed %u resources\n", __func__, cnt);
+
+	if (cnt == 0) {
+		/* No resources were reclaimed */
+		rte_errno = EAGAIN;
+		return 1;
+	}
+
+	return 0;
+}
+
+/* Delete a defer queue. */
+int
+rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
+{
+	if (dq == NULL) {
+		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+			"%s(): Invalid input parameter\n", __func__);
+		rte_errno = EINVAL;
+
+		return 1;
+	}
+
+	/* Reclaim all the resources */
+	if (rte_rcu_qsbr_dq_reclaim(dq) != 0)
+		/* Error number is already set by the reclaim API */
+		return 1;
+
+	rte_ring_free(dq->r);
+	rte_free(dq);
+
+	return 0;
+}
+
 int rte_rcu_log_type;
 
 RTE_INIT(rte_rcu_register)
diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h
index c80f15c00..185d4b50a 100644
--- a/lib/librte_rcu/rte_rcu_qsbr.h
+++ b/lib/librte_rcu/rte_rcu_qsbr.h
@@ -34,6 +34,7 @@ extern "C" {
 #include <rte_lcore.h>
 #include <rte_debug.h>
 #include <rte_atomic.h>
+#include <rte_ring.h>
 
 extern int rte_rcu_log_type;
 
@@ -109,6 +110,67 @@ struct rte_rcu_qsbr {
 	 */
 } __rte_cache_aligned;
 
+/**
+ * Call back function called to free the resources.
+ *
+ * @param p
+ *   Pointer provided while creating the defer queue
+ * @param e
+ *   Pointer to the resource data stored on the defer queue
+ *
+ * @return
+ *   None
+ */
+typedef void (*rte_rcu_qsbr_free_resource)(void *p, void *e);
+
+#define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
+
+/**
+ *  Trigger automatic reclamation after 1/8th the defer queue is full.
+ */
+#define RTE_RCU_QSBR_AUTO_RECLAIM_LIMIT 3
+
+/**
+ *  Reclaim at the max 1/16th the total number of resources.
+ */
+#define RTE_RCU_QSBR_MAX_RECLAIM_LIMIT 4
+
+/**
+ * Parameters used when creating the defer queue.
+ */
+struct rte_rcu_qsbr_dq_parameters {
+	const char *name;
+	/**< Name of the queue. */
+	uint32_t size;
+	/**< Number of entries in queue. Typically, this will be
+	 *   the same as the maximum number of entries supported in the
+	 *   lock free data structure.
+	 *   Data structures with unbounded number of entries is not
+	 *   supported currently.
+	 */
+	uint32_t esize;
+	/**< Size (in bytes) of each element in the defer queue.
+	 *   This has to be multiple of 8B as the rte_ring APIs
+	 *   support 8B element sizes only.
+	 */
+	rte_rcu_qsbr_free_resource f;
+	/**< Function to call to free the resource. */
+	void *p;
+	/**< Pointer passed to the free function. Typically, this is the
+	 *   pointer to the data structure to which the resource to free
+	 *   belongs. This can be NULL.
+	 */
+	struct rte_rcu_qsbr *v;
+	/**< RCU QSBR variable to use for this defer queue */
+};
+
+/* RTE defer queue structure.
+ * This structure holds the defer queue. The defer queue is used to
+ * hold the deleted entries from the data structure that are not
+ * yet freed.
+ */
+struct rte_rcu_qsbr_dq;
+
 /**
  * @warning
  * @b EXPERIMENTAL: this API may change without prior notice
@@ -648,6 +710,113 @@ __rte_experimental
 int
 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
 
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Create a queue used to store the data structure elements that can
+ * be freed later. This queue is referred to as 'defer queue'.
+ *
+ * @param params
+ *   Parameters to create a defer queue.
+ * @return
+ *   On success - Valid pointer to defer queue
+ *   On error - NULL
+ *   Possible rte_errno codes are:
+ *   - EINVAL - NULL parameters are passed
+ *   - ENOMEM - Not enough memory
+ */
+__rte_experimental
+struct rte_rcu_qsbr_dq *
+rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Enqueue one resource to the defer queue and start the grace period.
+ * The resource will be freed later after at least one grace period
+ * is over.
+ *
+ * If the defer queue is full, it will attempt to reclaim resources.
+ * It will also reclaim resources at regular intervals to avoid
+ * the defer queue from growing too big.
+ *
+ * This API is not multi-thread safe. It is expected that the caller
+ * provides multi-thread safety by locking a mutex or some other means.
+ *
+ * A lock free multi-thread writer algorithm could achieve multi-thread
+ * safety by creating and using one defer queue per thread.
+ *
+ * @param dq
+ *   Defer queue to allocate an entry from.
+ * @param e
+ *   Pointer to resource data to copy to the defer queue. The size of
+ *   the data to copy is equal to the element size provided when the
+ *   defer queue was created.
+ * @return
+ *   On success - 0
+ *   On error - 1 with rte_errno set to
+ *   - EINVAL - NULL parameters are passed
+ *   - ENOSPC - Defer queue is full. This condition can not happen
+ *		if the defer queue size is equal (or larger) than the
+ *		number of elements in the data structure.
+ */
+__rte_experimental
+int
+rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Reclaim resources from the defer queue.
+ *
+ * This API is not multi-thread safe. It is expected that the caller
+ * provides multi-thread safety by locking a mutex or some other means.
+ *
+ * A lock free multi-thread writer algorithm could achieve multi-thread
+ * safety by creating and using one defer queue per thread.
+ *
+ * @param dq
+ *   Defer queue to reclaim an entry from.
+ * @return
+ *   On successful reclamation of at least 1 resource - 0
+ *   On error - 1 with rte_errno set to
+ *   - EINVAL - NULL parameters are passed
+ *   - EAGAIN - None of the resources have completed at least 1 grace period,
+ *		try again.
+ */
+__rte_experimental
+int
+rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Delete a defer queue.
+ *
+ * It tries to reclaim all the resources on the defer queue.
+ * If any of the resources have not completed the grace period
+ * the reclamation stops and returns immediately. The rest of
+ * the resources are not reclaimed and the defer queue is not
+ * freed.
+ *
+ * @param dq
+ *   Defer queue to delete.
+ * @return
+ *   On success - 0
+ *   On error - 1
+ *   Possible rte_errno codes are:
+ *   - EINVAL - NULL parameters are passed
+ *   - EAGAIN - Some of the resources have not completed at least 1 grace
+ *		period, try again.
+ */
+__rte_experimental
+int
+rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/lib/librte_rcu/rte_rcu_qsbr_pvt.h b/lib/librte_rcu/rte_rcu_qsbr_pvt.h
new file mode 100644
index 000000000..2122bc36a
--- /dev/null
+++ b/lib/librte_rcu/rte_rcu_qsbr_pvt.h
@@ -0,0 +1,46 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright (c) 2019 Arm Limited
+ */
+
+#ifndef _RTE_RCU_QSBR_PVT_H_
+#define _RTE_RCU_QSBR_PVT_H_
+
+/**
+ * This file is private to the RCU library. It should not be included
+ * by the user of this library.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "rte_rcu_qsbr.h"
+
+/* RTE defer queue structure.
+ * This structure holds the defer queue. The defer queue is used to
+ * hold the deleted entries from the data structure that are not
+ * yet freed.
+ */
+struct rte_rcu_qsbr_dq {
+	struct rte_rcu_qsbr *v; /**< RCU QSBR variable used by this queue.*/
+	struct rte_ring *r;     /**< RCU QSBR defer queue. */
+	uint32_t size;
+	/**< Number of elements in the defer queue */
+	uint32_t esize;
+	/**< Size (in bytes) of data stored on the defer queue */
+	rte_rcu_qsbr_free_resource f;
+	/**< Function to call to free the resource. */
+	void *p;
+	/**< Pointer passed to the free function. Typically, this is the
+	 *   pointer to the data structure to which the resource to free
+	 *   belongs.
+	 */
+	char e[0];
+	/**< Temporary storage to copy the defer queue element. */
+};
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_RCU_QSBR_PVT_H_ */
diff --git a/lib/librte_rcu/rte_rcu_version.map b/lib/librte_rcu/rte_rcu_version.map
index f8b9ef2ab..dfac88a37 100644
--- a/lib/librte_rcu/rte_rcu_version.map
+++ b/lib/librte_rcu/rte_rcu_version.map
@@ -8,6 +8,10 @@ EXPERIMENTAL {
 	rte_rcu_qsbr_synchronize;
 	rte_rcu_qsbr_thread_register;
 	rte_rcu_qsbr_thread_unregister;
+	rte_rcu_qsbr_dq_create;
+	rte_rcu_qsbr_dq_enqueue;
+	rte_rcu_qsbr_dq_reclaim;
+	rte_rcu_qsbr_dq_delete;
 
 	local: *;
 };
diff --git a/lib/meson.build b/lib/meson.build
index e5ff83893..0e1be8407 100644
--- a/lib/meson.build
+++ b/lib/meson.build
@@ -11,7 +11,9 @@
 libraries = [
 	'kvargs', # eal depends on kvargs
 	'eal', # everything depends on eal
-	'ring', 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
+	'ring',
+	'rcu', # rcu depends on ring
+	'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
 	'cmdline',
 	'metrics', # bitrate/latency stats depends on this
 	'hash',    # efd depends on this
@@ -22,7 +24,7 @@ libraries = [
 	'gro', 'gso', 'ip_frag', 'jobstats',
 	'kni', 'latencystats', 'lpm', 'member',
 	'power', 'pdump', 'rawdev',
-	'rcu', 'reorder', 'sched', 'security', 'stack', 'vhost',
+	'reorder', 'sched', 'security', 'stack', 'vhost',
 	# ipsec lib depends on net, crypto and security
 	'ipsec',
 	# add pkt framework libs which use other libs from above
-- 
2.17.1