From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id 51C19A2F67 for ; Fri, 4 Oct 2019 21:01:14 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 640CE1C25A; Fri, 4 Oct 2019 21:01:13 +0200 (CEST) Received: from mga02.intel.com (mga02.intel.com [134.134.136.20]) by dpdk.org (Postfix) with ESMTP id 7F7101C25A for ; Fri, 4 Oct 2019 21:01:11 +0200 (CEST) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga007.jf.intel.com ([10.7.209.58]) by orsmga101.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 04 Oct 2019 12:01:10 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.67,257,1566889200"; d="scan'208";a="182802745" Received: from vmedvedk-mobl.ger.corp.intel.com (HELO [10.251.95.153]) ([10.251.95.153]) by orsmga007.jf.intel.com with ESMTP; 04 Oct 2019 12:01:05 -0700 To: Honnappa Nagarahalli , konstantin.ananyev@intel.com, stephen@networkplumber.org, paulmck@linux.ibm.com Cc: yipeng1.wang@intel.com, ruifeng.wang@arm.com, dharmik.thakkar@arm.com, dev@dpdk.org, nd@arm.com References: <20190906094534.36060-1-ruifeng.wang@arm.com> <20191001062917.35578-1-honnappa.nagarahalli@arm.com> <20191001062917.35578-3-honnappa.nagarahalli@arm.com> From: "Medvedkin, Vladimir" Message-ID: <93051cbd-b047-37df-18ac-3ca0a0795a77@intel.com> Date: Fri, 4 Oct 2019 20:01:04 +0100 User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64; rv:60.0) Gecko/20100101 Thunderbird/60.9.0 MIME-Version: 1.0 In-Reply-To: <20191001062917.35578-3-honnappa.nagarahalli@arm.com> Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 8bit Content-Language: en-US Subject: Re: [dpdk-dev] [PATCH v3 2/3] lib/rcu: add resource reclamation APIs X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Hi Honnappa, On 01/10/2019 07:29, Honnappa Nagarahalli wrote: > Add resource reclamation APIs to make it simple for applications > and libraries to integrate rte_rcu library. > > Signed-off-by: Honnappa Nagarahalli > Reviewed-by: Ola Liljedhal > Reviewed-by: Ruifeng Wang > --- > app/test/test_rcu_qsbr.c | 291 ++++++++++++++++++++++++++++- > lib/librte_rcu/meson.build | 2 + > lib/librte_rcu/rte_rcu_qsbr.c | 185 ++++++++++++++++++ > lib/librte_rcu/rte_rcu_qsbr.h | 169 +++++++++++++++++ > lib/librte_rcu/rte_rcu_qsbr_pvt.h | 46 +++++ > lib/librte_rcu/rte_rcu_version.map | 4 + > lib/meson.build | 6 +- > 7 files changed, 700 insertions(+), 3 deletions(-) > create mode 100644 lib/librte_rcu/rte_rcu_qsbr_pvt.h There are compilation errors when building DPDK as a shared library. I think you need something like: --- a/lib/librte_rcu/Makefile +++ b/lib/librte_rcu/Makefile @@ -8,7 +8,7 @@ LIB = librte_rcu.a  CFLAGS += -DALLOW_EXPERIMENTAL_API  CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 -LDLIBS += -lrte_eal +LDLIBS += -lrte_eal -lrte_ring > > diff --git a/app/test/test_rcu_qsbr.c b/app/test/test_rcu_qsbr.c > index d1b9e46a2..3a6815243 100644 > --- a/app/test/test_rcu_qsbr.c > +++ b/app/test/test_rcu_qsbr.c I think it's better to split unittests patches and the library patches > @@ -1,8 +1,9 @@ > /* SPDX-License-Identifier: BSD-3-Clause > - * Copyright (c) 2018 Arm Limited > + * Copyright (c) 2019 Arm Limited > */ > > #include > +#include > #include > #include > #include > @@ -33,6 +34,7 @@ static uint32_t *keys; > #define COUNTER_VALUE 4096 > static uint32_t *hash_data[RTE_MAX_LCORE][TOTAL_ENTRY]; > static uint8_t writer_done; > +static uint8_t cb_failed; > > static struct rte_rcu_qsbr *t[RTE_MAX_LCORE]; > struct rte_hash *h[RTE_MAX_LCORE]; > @@ -582,6 +584,269 @@ test_rcu_qsbr_thread_offline(void) > return 0; > } > > +static void > +rte_rcu_qsbr_test_free_resource(void *p, void *e) This function is not a part of DPDK API so it's better to name it like test_rcu_qsbr_free_resource(). > +{ > + if (p != NULL && e != NULL) { > + printf("%s: Test failed\n", __func__); > + cb_failed = 1; > + } > +} > + > +/* > + * rte_rcu_qsbr_dq_create: create a queue used to store the data structure > + * elements that can be freed later. This queue is referred to as 'defer queue'. > + */ > +static int > +test_rcu_qsbr_dq_create(void) > +{ > + char rcu_dq_name[RTE_RING_NAMESIZE]; > + struct rte_rcu_qsbr_dq_parameters params; > + struct rte_rcu_qsbr_dq *dq; > + > + printf("\nTest rte_rcu_qsbr_dq_create()\n"); > + > + /* Pass invalid parameters */ > + dq = rte_rcu_qsbr_dq_create(NULL); > + TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params"); > + > + memset(¶ms, 0, sizeof(struct rte_rcu_qsbr_dq_parameters)); > + dq = rte_rcu_qsbr_dq_create(¶ms); > + TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params"); > + > + snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU"); > + params.name = rcu_dq_name; > + dq = rte_rcu_qsbr_dq_create(¶ms); > + TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params"); > + > + params.f = rte_rcu_qsbr_test_free_resource; > + dq = rte_rcu_qsbr_dq_create(¶ms); > + TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params"); > + > + rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE); > + params.v = t[0]; > + dq = rte_rcu_qsbr_dq_create(¶ms); > + TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params"); > + > + params.size = 1; > + dq = rte_rcu_qsbr_dq_create(¶ms); > + TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params"); > + > + params.esize = 3; > + dq = rte_rcu_qsbr_dq_create(¶ms); > + TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params"); > + > + /* Pass all valid parameters */ > + params.esize = 16; > + dq = rte_rcu_qsbr_dq_create(¶ms); > + TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params"); > + rte_rcu_qsbr_dq_delete(dq); > + > + return 0; > +} > + > +/* > + * rte_rcu_qsbr_dq_enqueue: enqueue one resource to the defer queue, > + * to be freed later after atleast one grace period is over. > + */ > +static int > +test_rcu_qsbr_dq_enqueue(void) > +{ > + int ret; > + uint64_t r; > + char rcu_dq_name[RTE_RING_NAMESIZE]; > + struct rte_rcu_qsbr_dq_parameters params; > + struct rte_rcu_qsbr_dq *dq; > + > + printf("\nTest rte_rcu_qsbr_dq_enqueue()\n"); > + > + /* Create a queue with simple parameters */ > + memset(¶ms, 0, sizeof(struct rte_rcu_qsbr_dq_parameters)); > + snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU"); > + params.name = rcu_dq_name; > + params.f = rte_rcu_qsbr_test_free_resource; > + rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE); > + params.v = t[0]; > + params.size = 1; > + params.esize = 16; > + dq = rte_rcu_qsbr_dq_create(¶ms); > + TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params"); > + > + /* Pass invalid parameters */ > + ret = rte_rcu_qsbr_dq_enqueue(NULL, NULL); > + TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue invalid params"); > + > + ret = rte_rcu_qsbr_dq_enqueue(dq, NULL); > + TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue invalid params"); > + > + ret = rte_rcu_qsbr_dq_enqueue(NULL, &r); > + TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue invalid params"); > + > + ret = rte_rcu_qsbr_dq_delete(dq); > + TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 1), "dq delete valid params"); > + > + return 0; > +} > + > +/* > + * rte_rcu_qsbr_dq_reclaim: Reclaim resources from the defer queue. > + */ > +static int > +test_rcu_qsbr_dq_reclaim(void) > +{ > + int ret; > + > + printf("\nTest rte_rcu_qsbr_dq_reclaim()\n"); > + > + /* Pass invalid parameters */ > + ret = rte_rcu_qsbr_dq_reclaim(NULL); > + TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 1), "dq reclaim invalid params"); > + > + return 0; > +} > + > +/* > + * rte_rcu_qsbr_dq_delete: Delete a defer queue. > + */ > +static int > +test_rcu_qsbr_dq_delete(void) > +{ > + int ret; > + char rcu_dq_name[RTE_RING_NAMESIZE]; > + struct rte_rcu_qsbr_dq_parameters params; > + struct rte_rcu_qsbr_dq *dq; > + > + printf("\nTest rte_rcu_qsbr_dq_delete()\n"); > + > + /* Pass invalid parameters */ > + ret = rte_rcu_qsbr_dq_delete(NULL); > + TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 1), "dq delete invalid params"); > + > + memset(¶ms, 0, sizeof(struct rte_rcu_qsbr_dq_parameters)); > + snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU"); > + params.name = rcu_dq_name; > + params.f = rte_rcu_qsbr_test_free_resource; > + rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE); > + params.v = t[0]; > + params.size = 1; > + params.esize = 16; > + dq = rte_rcu_qsbr_dq_create(¶ms); > + TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params"); > + ret = rte_rcu_qsbr_dq_delete(dq); > + TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0), "dq delete valid params"); > + > + return 0; > +} > + > +/* > + * rte_rcu_qsbr_dq_enqueue: enqueue one resource to the defer queue, > + * to be freed later after atleast one grace period is over. > + */ > +static int > +test_rcu_qsbr_dq_functional(int32_t size, int32_t esize) > +{ > + int i, j, ret; > + char rcu_dq_name[RTE_RING_NAMESIZE]; > + struct rte_rcu_qsbr_dq_parameters params; > + struct rte_rcu_qsbr_dq *dq; > + uint64_t *e; > + uint64_t sc = 200; > + int max_entries; > + > + printf("\nTest rte_rcu_qsbr_dq_xxx functional tests()\n"); > + printf("Size = %d, esize = %d\n", size, esize); > + > + e = (uint64_t *)rte_zmalloc(NULL, esize, RTE_CACHE_LINE_SIZE); > + if (e == NULL) > + return 0; > + cb_failed = 0; > + > + /* Initialize the RCU variable. No threads are registered */ > + rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE); > + > + /* Create a queue with simple parameters */ > + memset(¶ms, 0, sizeof(struct rte_rcu_qsbr_dq_parameters)); > + snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU"); > + params.name = rcu_dq_name; > + params.f = rte_rcu_qsbr_test_free_resource; > + params.v = t[0]; > + params.size = size; > + params.esize = esize; > + dq = rte_rcu_qsbr_dq_create(¶ms); > + TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params"); > + > + /* Given the size and esize, calculate the maximum number of entries > + * that can be stored on the defer queue (look at the logic used > + * in capacity calculation of rte_ring). > + */ > + max_entries = rte_align32pow2(((esize/8 + 1) * size) + 1); > + max_entries = (max_entries - 1)/(esize/8 + 1); > + > + /* Enqueue few counters starting with the value 'sc' */ > + /* The queue size will be rounded up to 2. The enqueue API also > + * reclaims if the queue size is above certain limit. Since, there > + * are no threads registered, reclamation succedes. Hence, it should > + * be possible to enqueue more than the provided queue size. > + */ > + for (i = 0; i < 10; i++) { > + ret = rte_rcu_qsbr_dq_enqueue(dq, e); > + TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0), > + "dq enqueue functional"); > + for (j = 0; j < esize/8; j++) > + e[j] = sc++; > + } > + > + /* Register a thread on the RCU QSBR variable. Reclamation will not > + * succeed. It should not be possible to enqueue more than the size > + * number of resources. > + */ > + rte_rcu_qsbr_thread_register(t[0], 1); > + rte_rcu_qsbr_thread_online(t[0], 1); > + > + for (i = 0; i < max_entries; i++) { > + ret = rte_rcu_qsbr_dq_enqueue(dq, e); > + TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0), > + "dq enqueue functional"); > + for (j = 0; j < esize/8; j++) > + e[j] = sc++; > + } > + > + /* Enqueue fails as queue is full */ > + ret = rte_rcu_qsbr_dq_enqueue(dq, e); > + TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue functional"); > + > + /* Delete should fail as there are elements in defer queue which > + * cannot be reclaimed. > + */ > + ret = rte_rcu_qsbr_dq_delete(dq); > + TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq delete valid params"); > + > + /* Report quiescent state, enqueue should succeed */ > + rte_rcu_qsbr_quiescent(t[0], 1); > + for (i = 0; i < max_entries; i++) { > + ret = rte_rcu_qsbr_dq_enqueue(dq, e); > + TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0), > + "dq enqueue functional"); > + for (j = 0; j < esize/8; j++) > + e[j] = sc++; > + } > + > + /* Queue is full */ > + ret = rte_rcu_qsbr_dq_enqueue(dq, e); > + TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue functional"); > + > + /* Report quiescent state, delete should succeed */ > + rte_rcu_qsbr_quiescent(t[0], 1); > + ret = rte_rcu_qsbr_dq_delete(dq); > + TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0), "dq delete valid params"); > + > + /* Validate that call back function did not return any error */ > + TEST_RCU_QSBR_RETURN_IF_ERROR((cb_failed == 1), "CB failed"); > + > + rte_free(e); > + return 0; > +} > + > /* > * rte_rcu_qsbr_dump: Dump status of a single QS variable to a file > */ > @@ -1025,6 +1290,18 @@ test_rcu_qsbr_main(void) > if (test_rcu_qsbr_thread_offline() < 0) > goto test_fail; > > + if (test_rcu_qsbr_dq_create() < 0) > + goto test_fail; > + > + if (test_rcu_qsbr_dq_reclaim() < 0) > + goto test_fail; > + > + if (test_rcu_qsbr_dq_delete() < 0) > + goto test_fail; > + > + if (test_rcu_qsbr_dq_enqueue() < 0) > + goto test_fail; > + > printf("\nFunctional tests\n"); > > if (test_rcu_qsbr_sw_sv_3qs() < 0) > @@ -1033,6 +1310,18 @@ test_rcu_qsbr_main(void) > if (test_rcu_qsbr_mw_mv_mqs() < 0) > goto test_fail; > > + if (test_rcu_qsbr_dq_functional(1, 8) < 0) > + goto test_fail; > + > + if (test_rcu_qsbr_dq_functional(2, 8) < 0) > + goto test_fail; > + > + if (test_rcu_qsbr_dq_functional(303, 16) < 0) > + goto test_fail; > + > + if (test_rcu_qsbr_dq_functional(7, 128) < 0) > + goto test_fail; > + > free_rcu(); > > printf("\n"); > diff --git a/lib/librte_rcu/meson.build b/lib/librte_rcu/meson.build > index 62920ba02..e280b29c1 100644 > --- a/lib/librte_rcu/meson.build > +++ b/lib/librte_rcu/meson.build > @@ -10,3 +10,5 @@ headers = files('rte_rcu_qsbr.h') > if cc.get_id() == 'clang' and dpdk_conf.get('RTE_ARCH_64') == false > ext_deps += cc.find_library('atomic') > endif > + > +deps += ['ring'] > diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c > index ce7f93dd3..76814f50b 100644 > --- a/lib/librte_rcu/rte_rcu_qsbr.c > +++ b/lib/librte_rcu/rte_rcu_qsbr.c > @@ -21,6 +21,7 @@ > #include > > #include "rte_rcu_qsbr.h" > +#include "rte_rcu_qsbr_pvt.h" > > /* Get the memory size of QSBR variable */ > size_t > @@ -267,6 +268,190 @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v) > return 0; > } > > +/* Create a queue used to store the data structure elements that can > + * be freed later. This queue is referred to as 'defer queue'. > + */ > +struct rte_rcu_qsbr_dq * > +rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params) > +{ > + struct rte_rcu_qsbr_dq *dq; > + uint32_t qs_fifo_size; > + > + if (params == NULL || params->f == NULL || > + params->v == NULL || params->name == NULL || > + params->size == 0 || params->esize == 0 || > + (params->esize % 8 != 0)) { > + rte_log(RTE_LOG_ERR, rte_rcu_log_type, > + "%s(): Invalid input parameter\n", __func__); > + rte_errno = EINVAL; > + > + return NULL; > + } > + > + dq = rte_zmalloc(NULL, > + (sizeof(struct rte_rcu_qsbr_dq) + params->esize), > + RTE_CACHE_LINE_SIZE); > + if (dq == NULL) { > + rte_errno = ENOMEM; > + > + return NULL; > + } > + > + /* round up qs_fifo_size to next power of two that is not less than > + * max_size. > + */ > + qs_fifo_size = rte_align32pow2((((params->esize/8) + 1) > + * params->size) + 1); > + dq->r = rte_ring_create(params->name, qs_fifo_size, > + SOCKET_ID_ANY, 0); > + if (dq->r == NULL) { > + rte_log(RTE_LOG_ERR, rte_rcu_log_type, > + "%s(): defer queue create failed\n", __func__); > + rte_free(dq); > + return NULL; > + } > + > + dq->v = params->v; > + dq->size = params->size; > + dq->esize = params->esize; > + dq->f = params->f; > + dq->p = params->p; > + > + return dq; > +} > + > +/* Enqueue one resource to the defer queue to free after the grace > + * period is over. > + */ > +int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e) > +{ > + uint64_t token; > + uint64_t *tmp; > + uint32_t i; > + uint32_t cur_size, free_size; > + > + if (dq == NULL || e == NULL) { > + rte_log(RTE_LOG_ERR, rte_rcu_log_type, > + "%s(): Invalid input parameter\n", __func__); > + rte_errno = EINVAL; > + > + return 1; > + } > + > + /* Start the grace period */ > + token = rte_rcu_qsbr_start(dq->v); > + > + /* Reclaim resources if the queue is 1/8th full. This helps > + * the queue from growing too large and allows time for reader > + * threads to report their quiescent state. > + */ > + cur_size = rte_ring_count(dq->r) / (dq->esize/8 + 1); > + if (cur_size > (dq->size >> RTE_RCU_QSBR_AUTO_RECLAIM_LIMIT)) { > + rte_log(RTE_LOG_INFO, rte_rcu_log_type, > + "%s(): Triggering reclamation\n", __func__); > + rte_rcu_qsbr_dq_reclaim(dq); > + } > + > + /* Check if there is space for atleast for 1 resource */ > + free_size = rte_ring_free_count(dq->r) / (dq->esize/8 + 1); > + if (!free_size) { > + rte_log(RTE_LOG_ERR, rte_rcu_log_type, > + "%s(): Defer queue is full\n", __func__); > + rte_errno = ENOSPC; > + return 1; > + } > + > + /* Enqueue the resource */ > + rte_ring_sp_enqueue(dq->r, (void *)(uintptr_t)token); > + > + /* The resource to enqueue needs to be a multiple of 64b > + * due to the limitation of the rte_ring implementation. > + */ > + for (i = 0, tmp = (uint64_t *)e; i < dq->esize/8; i++, tmp++) > + rte_ring_sp_enqueue(dq->r, (void *)(uintptr_t)*tmp); > + > + return 0; > +} > + > +/* Reclaim resources from the defer queue. */ > +int > +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq) > +{ > + uint32_t max_cnt; > + uint32_t cnt; > + void *token; > + uint64_t *tmp; > + uint32_t i; > + > + if (dq == NULL) { > + rte_log(RTE_LOG_ERR, rte_rcu_log_type, > + "%s(): Invalid input parameter\n", __func__); > + rte_errno = EINVAL; > + > + return 1; > + } > + > + /* Anything to reclaim? */ > + if (rte_ring_count(dq->r) == 0) > + return 0; > + > + /* Reclaim at the max 1/16th the total number of entries. */ > + max_cnt = dq->size >> RTE_RCU_QSBR_MAX_RECLAIM_LIMIT; > + max_cnt = (max_cnt == 0) ? dq->size : max_cnt; > + cnt = 0; > + > + /* Check reader threads quiescent state and reclaim resources */ > + while ((cnt < max_cnt) && (rte_ring_peek(dq->r, &token) == 0) && > + (rte_rcu_qsbr_check(dq->v, (uint64_t)((uintptr_t)token), false) > + == 1)) { > + (void)rte_ring_sc_dequeue(dq->r, &token); > + /* The resource to dequeue needs to be a multiple of 64b > + * due to the limitation of the rte_ring implementation. > + */ > + for (i = 0, tmp = (uint64_t *)dq->e; i < dq->esize/8; > + i++, tmp++) > + (void)rte_ring_sc_dequeue(dq->r, > + (void *)(uintptr_t)tmp); > + dq->f(dq->p, dq->e); > + > + cnt++; > + } > + > + rte_log(RTE_LOG_INFO, rte_rcu_log_type, > + "%s(): Reclaimed %u resources\n", __func__, cnt); > + > + if (cnt == 0) { > + /* No resources were reclaimed */ > + rte_errno = EAGAIN; > + return 1; > + } > + > + return 0; > +} > + > +/* Delete a defer queue. */ > +int > +rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq) > +{ > + if (dq == NULL) { > + rte_log(RTE_LOG_ERR, rte_rcu_log_type, > + "%s(): Invalid input parameter\n", __func__); > + rte_errno = EINVAL; > + > + return 1; > + } > + > + /* Reclaim all the resources */ > + if (rte_rcu_qsbr_dq_reclaim(dq) != 0) > + /* Error number is already set by the reclaim API */ > + return 1; Here could be a potential problem. rte_rcu_qsbr_dq_reclai() reclaims only max_cnt entries that is 1/16 of possible enqueued entries, so the rest won't be reclaimed. > + > + rte_ring_free(dq->r); > + rte_free(dq); > + > + return 0; > +} > + > int rte_rcu_log_type; > > RTE_INIT(rte_rcu_register) > diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h > index c80f15c00..185d4b50a 100644 > --- a/lib/librte_rcu/rte_rcu_qsbr.h > +++ b/lib/librte_rcu/rte_rcu_qsbr.h > @@ -34,6 +34,7 @@ extern "C" { > #include > #include > #include > +#include I think it's better to move this include into rte_rcu_qsbr.c > > extern int rte_rcu_log_type; > > @@ -109,6 +110,67 @@ struct rte_rcu_qsbr { > */ > } __rte_cache_aligned; > > +/** > + * Call back function called to free the resources. > + * > + * @param p > + * Pointer provided while creating the defer queue > + * @param e > + * Pointer to the resource data stored on the defer queue > + * > + * @return > + * None > + */ > +typedef void (*rte_rcu_qsbr_free_resource)(void *p, void *e); > + > +#define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE I don't see the usage of this macro anywhere in the rcu library (I see you are using it in LPM). char rcu_dq_name[RTE_RING_NAMESIZE]; is using instead in the tests. + See my comments for  [PATCH v3 1/3] lib/lpm: integrate RCU QSBR > + > +/** > + * Trigger automatic reclamation after 1/8th the defer queue is full. > + */ > +#define RTE_RCU_QSBR_AUTO_RECLAIM_LIMIT 3 > + > +/** > + * Reclaim at the max 1/16th the total number of resources. > + */ > +#define RTE_RCU_QSBR_MAX_RECLAIM_LIMIT 4 Those two defines could be moved into .c file. > + > +/** > + * Parameters used when creating the defer queue. > + */ > +struct rte_rcu_qsbr_dq_parameters { > + const char *name; > + /**< Name of the queue. */ > + uint32_t size; > + /**< Number of entries in queue. Typically, this will be > + * the same as the maximum number of entries supported in the > + * lock free data structure. > + * Data structures with unbounded number of entries is not > + * supported currently. > + */ > + uint32_t esize; > + /**< Size (in bytes) of each element in the defer queue. > + * This has to be multiple of 8B as the rte_ring APIs > + * support 8B element sizes only. > + */ > + rte_rcu_qsbr_free_resource f; > + /**< Function to call to free the resource. */ > + void *p; > + /**< Pointer passed to the free function. Typically, this is the > + * pointer to the data structure to which the resource to free > + * belongs. This can be NULL. > + */ > + struct rte_rcu_qsbr *v; > + /**< RCU QSBR variable to use for this defer queue */ > +}; > + > +/* RTE defer queue structure. > + * This structure holds the defer queue. The defer queue is used to > + * hold the deleted entries from the data structure that are not > + * yet freed. > + */ > +struct rte_rcu_qsbr_dq; > + > /** > * @warning > * @b EXPERIMENTAL: this API may change without prior notice > @@ -648,6 +710,113 @@ __rte_experimental > int > rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v); > > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Create a queue used to store the data structure elements that can > + * be freed later. This queue is referred to as 'defer queue'. > + * > + * @param params > + * Parameters to create a defer queue. > + * @return > + * On success - Valid pointer to defer queue > + * On error - NULL > + * Possible rte_errno codes are: > + * - EINVAL - NULL parameters are passed > + * - ENOMEM - Not enough memory > + */ > +__rte_experimental > +struct rte_rcu_qsbr_dq * > +rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params); > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Enqueue one resource to the defer queue and start the grace period. > + * The resource will be freed later after at least one grace period > + * is over. > + * > + * If the defer queue is full, it will attempt to reclaim resources. > + * It will also reclaim resources at regular intervals to avoid > + * the defer queue from growing too big. > + * > + * This API is not multi-thread safe. It is expected that the caller > + * provides multi-thread safety by locking a mutex or some other means. > + * > + * A lock free multi-thread writer algorithm could achieve multi-thread > + * safety by creating and using one defer queue per thread. > + * > + * @param dq > + * Defer queue to allocate an entry from. > + * @param e > + * Pointer to resource data to copy to the defer queue. The size of > + * the data to copy is equal to the element size provided when the > + * defer queue was created. > + * @return > + * On success - 0 > + * On error - 1 with rte_errno set to > + * - EINVAL - NULL parameters are passed > + * - ENOSPC - Defer queue is full. This condition can not happen > + * if the defer queue size is equal (or larger) than the > + * number of elements in the data structure. > + */ > +__rte_experimental > +int > +rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e); > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Reclaim resources from the defer queue. > + * > + * This API is not multi-thread safe. It is expected that the caller > + * provides multi-thread safety by locking a mutex or some other means. > + * > + * A lock free multi-thread writer algorithm could achieve multi-thread > + * safety by creating and using one defer queue per thread. > + * > + * @param dq > + * Defer queue to reclaim an entry from. > + * @return > + * On successful reclamation of at least 1 resource - 0 > + * On error - 1 with rte_errno set to > + * - EINVAL - NULL parameters are passed > + * - EAGAIN - None of the resources have completed at least 1 grace period, > + * try again. > + */ > +__rte_experimental > +int > +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq); > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Delete a defer queue. > + * > + * It tries to reclaim all the resources on the defer queue. > + * If any of the resources have not completed the grace period > + * the reclamation stops and returns immediately. The rest of > + * the resources are not reclaimed and the defer queue is not > + * freed. > + * > + * @param dq > + * Defer queue to delete. > + * @return > + * On success - 0 > + * On error - 1 > + * Possible rte_errno codes are: > + * - EINVAL - NULL parameters are passed > + * - EAGAIN - Some of the resources have not completed at least 1 grace > + * period, try again. > + */ > +__rte_experimental > +int > +rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq); > + > #ifdef __cplusplus > } > #endif > diff --git a/lib/librte_rcu/rte_rcu_qsbr_pvt.h b/lib/librte_rcu/rte_rcu_qsbr_pvt.h > new file mode 100644 > index 000000000..2122bc36a > --- /dev/null > +++ b/lib/librte_rcu/rte_rcu_qsbr_pvt.h > @@ -0,0 +1,46 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * Copyright (c) 2019 Arm Limited > + */ > + > +#ifndef _RTE_RCU_QSBR_PVT_H_ > +#define _RTE_RCU_QSBR_PVT_H_ > + > +/** > + * This file is private to the RCU library. It should not be included > + * by the user of this library. > + */ Why this struct definition is separated into private .h? Maybe just define it in rte_rcu_qsbr.c instead? > + > +#ifdef __cplusplus > +extern "C" { > +#endif > + > +#include "rte_rcu_qsbr.h" > + > +/* RTE defer queue structure. > + * This structure holds the defer queue. The defer queue is used to > + * hold the deleted entries from the data structure that are not > + * yet freed. > + */ > +struct rte_rcu_qsbr_dq { > + struct rte_rcu_qsbr *v; /**< RCU QSBR variable used by this queue.*/ > + struct rte_ring *r; /**< RCU QSBR defer queue. */ > + uint32_t size; > + /**< Number of elements in the defer queue */ > + uint32_t esize; > + /**< Size (in bytes) of data stored on the defer queue */ > + rte_rcu_qsbr_free_resource f; > + /**< Function to call to free the resource. */ > + void *p; > + /**< Pointer passed to the free function. Typically, this is the > + * pointer to the data structure to which the resource to free > + * belongs. > + */ > + char e[0]; > + /**< Temporary storage to copy the defer queue element. */ > +}; > + > +#ifdef __cplusplus > +} > +#endif > + > +#endif /* _RTE_RCU_QSBR_PVT_H_ */ > diff --git a/lib/librte_rcu/rte_rcu_version.map b/lib/librte_rcu/rte_rcu_version.map > index f8b9ef2ab..dfac88a37 100644 > --- a/lib/librte_rcu/rte_rcu_version.map > +++ b/lib/librte_rcu/rte_rcu_version.map > @@ -8,6 +8,10 @@ EXPERIMENTAL { > rte_rcu_qsbr_synchronize; > rte_rcu_qsbr_thread_register; > rte_rcu_qsbr_thread_unregister; > + rte_rcu_qsbr_dq_create; > + rte_rcu_qsbr_dq_enqueue; > + rte_rcu_qsbr_dq_reclaim; > + rte_rcu_qsbr_dq_delete; > > local: *; > }; > diff --git a/lib/meson.build b/lib/meson.build > index e5ff83893..0e1be8407 100644 > --- a/lib/meson.build > +++ b/lib/meson.build > @@ -11,7 +11,9 @@ > libraries = [ > 'kvargs', # eal depends on kvargs > 'eal', # everything depends on eal > - 'ring', 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core > + 'ring', > + 'rcu', # rcu depends on ring > + 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core > 'cmdline', > 'metrics', # bitrate/latency stats depends on this > 'hash', # efd depends on this > @@ -22,7 +24,7 @@ libraries = [ > 'gro', 'gso', 'ip_frag', 'jobstats', > 'kni', 'latencystats', 'lpm', 'member', > 'power', 'pdump', 'rawdev', > - 'rcu', 'reorder', 'sched', 'security', 'stack', 'vhost', > + 'reorder', 'sched', 'security', 'stack', 'vhost', > # ipsec lib depends on net, crypto and security > 'ipsec', > # add pkt framework libs which use other libs from above -- Regards, Vladimir