From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga07.intel.com (mga07.intel.com [134.134.136.100]) by dpdk.org (Postfix) with ESMTP id AA2A11B272 for ; Fri, 22 Mar 2019 17:42:13 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga007.jf.intel.com ([10.7.209.58]) by orsmga105.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 22 Mar 2019 09:42:12 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.60,256,1549958400"; d="scan'208";a="125003608" Received: from irsmsx103.ger.corp.intel.com ([163.33.3.157]) by orsmga007.jf.intel.com with ESMTP; 22 Mar 2019 09:42:10 -0700 Received: from irsmsx105.ger.corp.intel.com ([169.254.7.210]) by IRSMSX103.ger.corp.intel.com ([169.254.3.152]) with mapi id 14.03.0415.000; Fri, 22 Mar 2019 16:42:09 +0000 From: "Ananyev, Konstantin" To: Honnappa Nagarahalli , "stephen@networkplumber.org" , "paulmck@linux.ibm.com" , "dev@dpdk.org" CC: "gavin.hu@arm.com" , "dharmik.thakkar@arm.com" , "malvika.gupta@arm.com" Thread-Topic: [PATCH 1/3] rcu: add RCU library supporting QSBR mechanism Thread-Index: AQHU3hBhNhXVbW/WVkOFAdc0mqwLiaYX1KDA Date: Fri, 22 Mar 2019 16:42:09 +0000 Message-ID: <2601191342CEEE43887BDE71AB977258013655ED5C@irsmsx105.ger.corp.intel.com> References: <20181122033055.3431-1-honnappa.nagarahalli@arm.com> <20190319045228.46879-1-honnappa.nagarahalli@arm.com> <20190319045228.46879-2-honnappa.nagarahalli@arm.com> In-Reply-To: <20190319045228.46879-2-honnappa.nagarahalli@arm.com> Accept-Language: en-IE, en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: x-titus-metadata-40: eyJDYXRlZ29yeUxhYmVscyI6IiIsIk1ldGFkYXRhIjp7Im5zIjoiaHR0cDpcL1wvd3d3LnRpdHVzLmNvbVwvbnNcL0ludGVsMyIsImlkIjoiMjY5NGE4YmYtMjRiYi00ZjkzLWFkMTItYmI0MWRlNmIyMWIyIiwicHJvcHMiOlt7Im4iOiJDVFBDbGFzc2lmaWNhdGlvbiIsInZhbHMiOlt7InZhbHVlIjoiQ1RQX05UIn1dfV19LCJTdWJqZWN0TGFiZWxzIjpbXSwiVE1DVmVyc2lvbiI6IjE3LjEwLjE4MDQuNDkiLCJUcnVzdGVkTGFiZWxIYXNoIjoiMFppUHdRbjZMVDM2R21xY243VVNQVndiUnl2VDRZTEtIckdwSlwvcWlLR3hKKzE1MkswZXAwTUZXMDJZQ285TzAifQ== x-ctpclassification: CTP_NT dlp-product: dlpe-windows dlp-version: 11.0.400.15 dlp-reaction: no-action x-originating-ip: [163.33.239.181] Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: quoted-printable MIME-Version: 1.0 Subject: Re: [dpdk-dev] [PATCH 1/3] rcu: add RCU library supporting QSBR mechanism X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Fri, 22 Mar 2019 16:42:14 -0000 Hi Honnappa, > diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.= c > new file mode 100644 > index 000000000..0fc4515ea > --- /dev/null > +++ b/lib/librte_rcu/rte_rcu_qsbr.c > @@ -0,0 +1,99 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * > + * Copyright (c) 2018 Arm Limited > + */ > + > +#include > +#include > +#include > +#include > + > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > + > +#include "rte_rcu_qsbr.h" > + > +/* Get the memory size of QSBR variable */ > +size_t __rte_experimental > +rte_rcu_qsbr_get_memsize(uint32_t max_threads) > +{ > + size_t sz; > + > + RTE_ASSERT(max_threads =3D=3D 0); Here and in all similar places: assert() will abort when its condition will be evaluated to false. So it should be max_threads !=3D 0. Also it a public and non-datapath function. Calling assert() for invalid input parameter - seems way too extreme. Why not just return error to the caller?=20 > + > + sz =3D sizeof(struct rte_rcu_qsbr); > + > + /* Add the size of quiescent state counter array */ > + sz +=3D sizeof(struct rte_rcu_qsbr_cnt) * max_threads; > + > + return RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); > +} > + > +/* Initialize a quiescent state variable */ > +void __rte_experimental > +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads) > +{ > + RTE_ASSERT(v =3D=3D NULL); > + > + memset(v, 0, rte_rcu_qsbr_get_memsize(max_threads)); > + v->m_threads =3D max_threads; > + v->num_elems =3D RTE_ALIGN_MUL_CEIL(max_threads, > + RTE_QSBR_THRID_ARRAY_ELM_SIZE) / > + RTE_QSBR_THRID_ARRAY_ELM_SIZE; > + v->token =3D RTE_QSBR_CNT_INIT; > +} > + > +/* Dump the details of a single quiescent state variable to a file. */ > +void __rte_experimental > +rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v) > +{ > + uint64_t bmap; > + uint32_t i, t; > + > + RTE_ASSERT(v =3D=3D NULL || f =3D=3D NULL); > + > + fprintf(f, "\nQuiescent State Variable @%p\n", v); > + > + fprintf(f, " QS variable memory size =3D %lu\n", > + rte_rcu_qsbr_get_memsize(v->m_threads)); > + fprintf(f, " Given # max threads =3D %u\n", v->m_threads); > + > + fprintf(f, " Registered thread ID mask =3D 0x"); > + for (i =3D 0; i < v->num_elems; i++) > + fprintf(f, "%lx", __atomic_load_n(&v->reg_thread_id[i], > + __ATOMIC_ACQUIRE)); > + fprintf(f, "\n"); > + > + fprintf(f, " Token =3D %lu\n", > + __atomic_load_n(&v->token, __ATOMIC_ACQUIRE)); > + > + fprintf(f, "Quiescent State Counts for readers:\n"); > + for (i =3D 0; i < v->num_elems; i++) { > + bmap =3D __atomic_load_n(&v->reg_thread_id[i], __ATOMIC_ACQUIRE); > + while (bmap) { > + t =3D __builtin_ctzl(bmap); > + fprintf(f, "thread ID =3D %d, count =3D %lu\n", t, > + __atomic_load_n( > + &RTE_QSBR_CNT_ARRAY_ELM(v, i)->cnt, > + __ATOMIC_RELAXED)); > + bmap &=3D ~(1UL << t); > + } > + } > +} > + > +int rcu_log_type; > + > +RTE_INIT(rte_rcu_register) > +{ > + rcu_log_type =3D rte_log_register("lib.rcu"); > + if (rcu_log_type >=3D 0) > + rte_log_set_level(rcu_log_type, RTE_LOG_ERR); > +} > diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.= h > new file mode 100644 > index 000000000..83943f751 > --- /dev/null > +++ b/lib/librte_rcu/rte_rcu_qsbr.h > @@ -0,0 +1,511 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * Copyright (c) 2018 Arm Limited > + */ > + > +#ifndef _RTE_RCU_QSBR_H_ > +#define _RTE_RCU_QSBR_H_ > + > +/** > + * @file > + * RTE Quiescent State Based Reclamation (QSBR) > + * > + * Quiescent State (QS) is any point in the thread execution > + * where the thread does not hold a reference to a data structure > + * in shared memory. While using lock-less data structures, the writer > + * can safely free memory once all the reader threads have entered > + * quiescent state. > + * > + * This library provides the ability for the readers to report quiescent > + * state and for the writers to identify when all the readers have > + * entered quiescent state. > + */ > + > +#ifdef __cplusplus > +extern "C" { > +#endif > + > +#include > +#include > +#include > +#include > +#include > +#include > +#include > + > +extern int rcu_log_type; > + > +#if RTE_LOG_DP_LEVEL >=3D RTE_LOG_DEBUG > +#define RCU_DP_LOG(level, fmt, args...) \ > + rte_log(RTE_LOG_ ## level, rcu_log_type, \ > + "%s(): " fmt "\n", __func__, ## args) > +#else > +#define RCU_DP_LOG(level, fmt, args...) > +#endif Why do you need that? Can't you use RTE_LOG_DP() instead? > + > +/* Registered thread IDs are stored as a bitmap of 64b element array. > + * Given thread id needs to be converted to index into the array and > + * the id within the array element. > + */ > +#define RTE_RCU_MAX_THREADS 1024 > +#define RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8) > +#define RTE_QSBR_THRID_ARRAY_ELEMS \ > + (RTE_ALIGN_MUL_CEIL(RTE_RCU_MAX_THREADS, \ > + RTE_QSBR_THRID_ARRAY_ELM_SIZE) / RTE_QSBR_THRID_ARRAY_ELM_SIZE) > +#define RTE_QSBR_THRID_INDEX_SHIFT 6 > +#define RTE_QSBR_THRID_MASK 0x3f > +#define RTE_QSBR_THRID_INVALID 0xffffffff > + > +/* Worker thread counter */ > +struct rte_rcu_qsbr_cnt { > + uint64_t cnt; > + /**< Quiescent state counter. Value 0 indicates the thread is offline *= / > +} __rte_cache_aligned; > + > +#define RTE_QSBR_CNT_ARRAY_ELM(v, i) (((struct rte_rcu_qsbr_cnt *)(v + 1= )) + i) You can probably add struct rte_rcu_qsbr_cnt cnt[0]; at the end of struct rte_rcu_qsbr, then wouldn't need macro above. > +#define RTE_QSBR_CNT_THR_OFFLINE 0 > +#define RTE_QSBR_CNT_INIT 1 > + > +/** > + * RTE thread Quiescent State structure. > + * Quiescent state counter array (array of 'struct rte_rcu_qsbr_cnt'), > + * whose size is dependent on the maximum number of reader threads > + * (m_threads) using this variable is stored immediately following > + * this structure. > + */ > +struct rte_rcu_qsbr { > + uint64_t token __rte_cache_aligned; > + /**< Counter to allow for multiple simultaneous QS queries */ > + > + uint32_t num_elems __rte_cache_aligned; > + /**< Number of elements in the thread ID array */ > + uint32_t m_threads; > + /**< Maximum number of threads this RCU variable will use */ > + > + uint64_t reg_thread_id[RTE_QSBR_THRID_ARRAY_ELEMS] __rte_cache_aligned; > + /**< Registered thread IDs are stored in a bitmap array */ As I understand you ended up with fixed size array to avoid 2 variable size= arrays in this struct? Is that big penalty for register/unregister() to either store a pointer to = bitmap, or calculate it based on num_elems value? As another thought - do we really need bitmap at all? Might it is possible to sotre register value for each thread inside it's rt= e_rcu_qsbr_cnt: struct rte_rcu_qsbr_cnt {uint64_t cnt; uint32_t register;} __rte_cache_alig= ned; ? That would cause check() to walk through all elems in rte_rcu_qsbr_cnt arra= y, but from other side would help to avoid cache conflicts for register/unregi= ster.=20 > +} __rte_cache_aligned; > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Return the size of the memory occupied by a Quiescent State variable. > + * > + * @param max_threads > + * Maximum number of threads reporting quiescent state on this variabl= e. > + * @return > + * Size of memory in bytes required for this QS variable. > + */ > +size_t __rte_experimental > +rte_rcu_qsbr_get_memsize(uint32_t max_threads); > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Initialize a Quiescent State (QS) variable. > + * > + * @param v > + * QS variable > + * @param max_threads > + * Maximum number of threads reporting QS on this variable. > + * > + */ > +void __rte_experimental > +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads); > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Register a reader thread to report its quiescent state > + * on a QS variable. > + * > + * This is implemented as a lock-free function. It is multi-thread > + * safe. > + * Any reader thread that wants to report its quiescent state must > + * call this API. This can be called during initialization or as part > + * of the packet processing loop. > + * > + * Note that rte_rcu_qsbr_thread_online must be called before the > + * thread updates its QS using rte_rcu_qsbr_update. > + * > + * @param v > + * QS variable > + * @param thread_id > + * Reader thread with this thread ID will report its quiescent state o= n > + * the QS variable. > + */ > +static __rte_always_inline void __rte_experimental > +rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread= _id) > +{ > + unsigned int i, id; > + > + RTE_ASSERT(v =3D=3D NULL || thread_id >=3D v->max_threads); > + > + id =3D thread_id & RTE_QSBR_THRID_MASK; > + i =3D thread_id >> RTE_QSBR_THRID_INDEX_SHIFT; > + > + /* Release the new register thread ID to other threads > + * calling rte_rcu_qsbr_check. > + */ > + __atomic_fetch_or(&v->reg_thread_id[i], 1UL << id, __ATOMIC_RELEASE); > +} > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Remove a reader thread, from the list of threads reporting their > + * quiescent state on a QS variable. > + * > + * This is implemented as a lock-free function. It is multi-thread safe. > + * This API can be called from the reader threads during shutdown. > + * Ongoing QS queries will stop waiting for the status from this > + * unregistered reader thread. > + * > + * @param v > + * QS variable > + * @param thread_id > + * Reader thread with this thread ID will stop reporting its quiescent > + * state on the QS variable. > + */ > +static __rte_always_inline void __rte_experimental > +rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thre= ad_id) > +{ > + unsigned int i, id; > + > + RTE_ASSERT(v =3D=3D NULL || thread_id >=3D v->max_threads); > + > + id =3D thread_id & RTE_QSBR_THRID_MASK; > + i =3D thread_id >> RTE_QSBR_THRID_INDEX_SHIFT; > + > + /* Make sure the removal of the thread from the list of > + * reporting threads is visible before the thread > + * does anything else. > + */ > + __atomic_fetch_and(&v->reg_thread_id[i], > + ~(1UL << id), __ATOMIC_RELEASE); > +} > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Add a registered reader thread, to the list of threads reporting thei= r > + * quiescent state on a QS variable. > + * > + * This is implemented as a lock-free function. It is multi-thread > + * safe. > + * > + * Any registered reader thread that wants to report its quiescent state= must > + * call this API before calling rte_rcu_qsbr_update. This can be called > + * during initialization or as part of the packet processing loop. > + * > + * The reader thread must call rte_rcu_thread_offline API, before > + * calling any functions that block, to ensure that rte_rcu_qsbr_check > + * API does not wait indefinitely for the reader thread to update its QS= . > + * > + * The reader thread must call rte_rcu_thread_online API, after the bloc= king > + * function call returns, to ensure that rte_rcu_qsbr_check API > + * waits for the reader thread to update its QS. > + * > + * @param v > + * QS variable > + * @param thread_id > + * Reader thread with this thread ID will report its quiescent state o= n > + * the QS variable. > + */ > +static __rte_always_inline void __rte_experimental > +rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_i= d) > +{ > + uint64_t t; > + > + RTE_ASSERT(v =3D=3D NULL || thread_id >=3D v->max_threads); > + > + /* Copy the current value of token. > + * The fence at the end of the function will ensure that > + * the following will not move down after the load of any shared > + * data structure. > + */ > + t =3D __atomic_load_n(&v->token, __ATOMIC_RELAXED); > + > + /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure > + * 'cnt' (64b) is accessed atomically. > + */ > + __atomic_store_n(&RTE_QSBR_CNT_ARRAY_ELM(v, thread_id)->cnt, > + t, __ATOMIC_RELAXED); > + > + /* The subsequent load of the data structure should not > + * move above the store. Hence a store-load barrier > + * is required. > + * If the load of the data structure moves above the store, > + * writer might not see that the reader is online, even though > + * the reader is referencing the shared data structure. > + */ > + __atomic_thread_fence(__ATOMIC_SEQ_CST); If it has to generate a proper memory-barrier here anyway, could it use rte_smp_mb() here? At least for IA it would generate more lightweight one.=20 Konstantin > +} > + From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by dpdk.space (Postfix) with ESMTP id EA2F1A00E6 for ; Fri, 22 Mar 2019 17:42:15 +0100 (CET) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id A890B1B5AB; Fri, 22 Mar 2019 17:42:14 +0100 (CET) Received: from mga07.intel.com (mga07.intel.com [134.134.136.100]) by dpdk.org (Postfix) with ESMTP id AA2A11B272 for ; Fri, 22 Mar 2019 17:42:13 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga007.jf.intel.com ([10.7.209.58]) by orsmga105.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 22 Mar 2019 09:42:12 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.60,256,1549958400"; d="scan'208";a="125003608" Received: from irsmsx103.ger.corp.intel.com ([163.33.3.157]) by orsmga007.jf.intel.com with ESMTP; 22 Mar 2019 09:42:10 -0700 Received: from irsmsx105.ger.corp.intel.com ([169.254.7.210]) by IRSMSX103.ger.corp.intel.com ([169.254.3.152]) with mapi id 14.03.0415.000; Fri, 22 Mar 2019 16:42:09 +0000 From: "Ananyev, Konstantin" To: Honnappa Nagarahalli , "stephen@networkplumber.org" , "paulmck@linux.ibm.com" , "dev@dpdk.org" CC: "gavin.hu@arm.com" , "dharmik.thakkar@arm.com" , "malvika.gupta@arm.com" Thread-Topic: [PATCH 1/3] rcu: add RCU library supporting QSBR mechanism Thread-Index: AQHU3hBhNhXVbW/WVkOFAdc0mqwLiaYX1KDA Date: Fri, 22 Mar 2019 16:42:09 +0000 Message-ID: <2601191342CEEE43887BDE71AB977258013655ED5C@irsmsx105.ger.corp.intel.com> References: <20181122033055.3431-1-honnappa.nagarahalli@arm.com> <20190319045228.46879-1-honnappa.nagarahalli@arm.com> <20190319045228.46879-2-honnappa.nagarahalli@arm.com> In-Reply-To: <20190319045228.46879-2-honnappa.nagarahalli@arm.com> Accept-Language: en-IE, en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: x-titus-metadata-40: eyJDYXRlZ29yeUxhYmVscyI6IiIsIk1ldGFkYXRhIjp7Im5zIjoiaHR0cDpcL1wvd3d3LnRpdHVzLmNvbVwvbnNcL0ludGVsMyIsImlkIjoiMjY5NGE4YmYtMjRiYi00ZjkzLWFkMTItYmI0MWRlNmIyMWIyIiwicHJvcHMiOlt7Im4iOiJDVFBDbGFzc2lmaWNhdGlvbiIsInZhbHMiOlt7InZhbHVlIjoiQ1RQX05UIn1dfV19LCJTdWJqZWN0TGFiZWxzIjpbXSwiVE1DVmVyc2lvbiI6IjE3LjEwLjE4MDQuNDkiLCJUcnVzdGVkTGFiZWxIYXNoIjoiMFppUHdRbjZMVDM2R21xY243VVNQVndiUnl2VDRZTEtIckdwSlwvcWlLR3hKKzE1MkswZXAwTUZXMDJZQ285TzAifQ== x-ctpclassification: CTP_NT dlp-product: dlpe-windows dlp-version: 11.0.400.15 dlp-reaction: no-action x-originating-ip: [163.33.239.181] Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable MIME-Version: 1.0 Subject: Re: [dpdk-dev] [PATCH 1/3] rcu: add RCU library supporting QSBR mechanism X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Message-ID: <20190322164209.ZhlG5lBQRToT3EuUvFVS7891Kb-xuCUnk0i2K9LPlsU@z> Hi Honnappa, > diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.= c > new file mode 100644 > index 000000000..0fc4515ea > --- /dev/null > +++ b/lib/librte_rcu/rte_rcu_qsbr.c > @@ -0,0 +1,99 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * > + * Copyright (c) 2018 Arm Limited > + */ > + > +#include > +#include > +#include > +#include > + > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > + > +#include "rte_rcu_qsbr.h" > + > +/* Get the memory size of QSBR variable */ > +size_t __rte_experimental > +rte_rcu_qsbr_get_memsize(uint32_t max_threads) > +{ > + size_t sz; > + > + RTE_ASSERT(max_threads =3D=3D 0); Here and in all similar places: assert() will abort when its condition will be evaluated to false. So it should be max_threads !=3D 0. Also it a public and non-datapath function. Calling assert() for invalid input parameter - seems way too extreme. Why not just return error to the caller?=20 > + > + sz =3D sizeof(struct rte_rcu_qsbr); > + > + /* Add the size of quiescent state counter array */ > + sz +=3D sizeof(struct rte_rcu_qsbr_cnt) * max_threads; > + > + return RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); > +} > + > +/* Initialize a quiescent state variable */ > +void __rte_experimental > +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads) > +{ > + RTE_ASSERT(v =3D=3D NULL); > + > + memset(v, 0, rte_rcu_qsbr_get_memsize(max_threads)); > + v->m_threads =3D max_threads; > + v->num_elems =3D RTE_ALIGN_MUL_CEIL(max_threads, > + RTE_QSBR_THRID_ARRAY_ELM_SIZE) / > + RTE_QSBR_THRID_ARRAY_ELM_SIZE; > + v->token =3D RTE_QSBR_CNT_INIT; > +} > + > +/* Dump the details of a single quiescent state variable to a file. */ > +void __rte_experimental > +rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v) > +{ > + uint64_t bmap; > + uint32_t i, t; > + > + RTE_ASSERT(v =3D=3D NULL || f =3D=3D NULL); > + > + fprintf(f, "\nQuiescent State Variable @%p\n", v); > + > + fprintf(f, " QS variable memory size =3D %lu\n", > + rte_rcu_qsbr_get_memsize(v->m_threads)); > + fprintf(f, " Given # max threads =3D %u\n", v->m_threads); > + > + fprintf(f, " Registered thread ID mask =3D 0x"); > + for (i =3D 0; i < v->num_elems; i++) > + fprintf(f, "%lx", __atomic_load_n(&v->reg_thread_id[i], > + __ATOMIC_ACQUIRE)); > + fprintf(f, "\n"); > + > + fprintf(f, " Token =3D %lu\n", > + __atomic_load_n(&v->token, __ATOMIC_ACQUIRE)); > + > + fprintf(f, "Quiescent State Counts for readers:\n"); > + for (i =3D 0; i < v->num_elems; i++) { > + bmap =3D __atomic_load_n(&v->reg_thread_id[i], __ATOMIC_ACQUIRE); > + while (bmap) { > + t =3D __builtin_ctzl(bmap); > + fprintf(f, "thread ID =3D %d, count =3D %lu\n", t, > + __atomic_load_n( > + &RTE_QSBR_CNT_ARRAY_ELM(v, i)->cnt, > + __ATOMIC_RELAXED)); > + bmap &=3D ~(1UL << t); > + } > + } > +} > + > +int rcu_log_type; > + > +RTE_INIT(rte_rcu_register) > +{ > + rcu_log_type =3D rte_log_register("lib.rcu"); > + if (rcu_log_type >=3D 0) > + rte_log_set_level(rcu_log_type, RTE_LOG_ERR); > +} > diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.= h > new file mode 100644 > index 000000000..83943f751 > --- /dev/null > +++ b/lib/librte_rcu/rte_rcu_qsbr.h > @@ -0,0 +1,511 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * Copyright (c) 2018 Arm Limited > + */ > + > +#ifndef _RTE_RCU_QSBR_H_ > +#define _RTE_RCU_QSBR_H_ > + > +/** > + * @file > + * RTE Quiescent State Based Reclamation (QSBR) > + * > + * Quiescent State (QS) is any point in the thread execution > + * where the thread does not hold a reference to a data structure > + * in shared memory. While using lock-less data structures, the writer > + * can safely free memory once all the reader threads have entered > + * quiescent state. > + * > + * This library provides the ability for the readers to report quiescent > + * state and for the writers to identify when all the readers have > + * entered quiescent state. > + */ > + > +#ifdef __cplusplus > +extern "C" { > +#endif > + > +#include > +#include > +#include > +#include > +#include > +#include > +#include > + > +extern int rcu_log_type; > + > +#if RTE_LOG_DP_LEVEL >=3D RTE_LOG_DEBUG > +#define RCU_DP_LOG(level, fmt, args...) \ > + rte_log(RTE_LOG_ ## level, rcu_log_type, \ > + "%s(): " fmt "\n", __func__, ## args) > +#else > +#define RCU_DP_LOG(level, fmt, args...) > +#endif Why do you need that? Can't you use RTE_LOG_DP() instead? > + > +/* Registered thread IDs are stored as a bitmap of 64b element array. > + * Given thread id needs to be converted to index into the array and > + * the id within the array element. > + */ > +#define RTE_RCU_MAX_THREADS 1024 > +#define RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8) > +#define RTE_QSBR_THRID_ARRAY_ELEMS \ > + (RTE_ALIGN_MUL_CEIL(RTE_RCU_MAX_THREADS, \ > + RTE_QSBR_THRID_ARRAY_ELM_SIZE) / RTE_QSBR_THRID_ARRAY_ELM_SIZE) > +#define RTE_QSBR_THRID_INDEX_SHIFT 6 > +#define RTE_QSBR_THRID_MASK 0x3f > +#define RTE_QSBR_THRID_INVALID 0xffffffff > + > +/* Worker thread counter */ > +struct rte_rcu_qsbr_cnt { > + uint64_t cnt; > + /**< Quiescent state counter. Value 0 indicates the thread is offline *= / > +} __rte_cache_aligned; > + > +#define RTE_QSBR_CNT_ARRAY_ELM(v, i) (((struct rte_rcu_qsbr_cnt *)(v + 1= )) + i) You can probably add struct rte_rcu_qsbr_cnt cnt[0]; at the end of struct rte_rcu_qsbr, then wouldn't need macro above. > +#define RTE_QSBR_CNT_THR_OFFLINE 0 > +#define RTE_QSBR_CNT_INIT 1 > + > +/** > + * RTE thread Quiescent State structure. > + * Quiescent state counter array (array of 'struct rte_rcu_qsbr_cnt'), > + * whose size is dependent on the maximum number of reader threads > + * (m_threads) using this variable is stored immediately following > + * this structure. > + */ > +struct rte_rcu_qsbr { > + uint64_t token __rte_cache_aligned; > + /**< Counter to allow for multiple simultaneous QS queries */ > + > + uint32_t num_elems __rte_cache_aligned; > + /**< Number of elements in the thread ID array */ > + uint32_t m_threads; > + /**< Maximum number of threads this RCU variable will use */ > + > + uint64_t reg_thread_id[RTE_QSBR_THRID_ARRAY_ELEMS] __rte_cache_aligned; > + /**< Registered thread IDs are stored in a bitmap array */ As I understand you ended up with fixed size array to avoid 2 variable size= arrays in this struct? Is that big penalty for register/unregister() to either store a pointer to = bitmap, or calculate it based on num_elems value? As another thought - do we really need bitmap at all? Might it is possible to sotre register value for each thread inside it's rt= e_rcu_qsbr_cnt: struct rte_rcu_qsbr_cnt {uint64_t cnt; uint32_t register;} __rte_cache_alig= ned; ? That would cause check() to walk through all elems in rte_rcu_qsbr_cnt arra= y, but from other side would help to avoid cache conflicts for register/unregi= ster.=20 > +} __rte_cache_aligned; > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Return the size of the memory occupied by a Quiescent State variable. > + * > + * @param max_threads > + * Maximum number of threads reporting quiescent state on this variabl= e. > + * @return > + * Size of memory in bytes required for this QS variable. > + */ > +size_t __rte_experimental > +rte_rcu_qsbr_get_memsize(uint32_t max_threads); > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Initialize a Quiescent State (QS) variable. > + * > + * @param v > + * QS variable > + * @param max_threads > + * Maximum number of threads reporting QS on this variable. > + * > + */ > +void __rte_experimental > +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads); > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Register a reader thread to report its quiescent state > + * on a QS variable. > + * > + * This is implemented as a lock-free function. It is multi-thread > + * safe. > + * Any reader thread that wants to report its quiescent state must > + * call this API. This can be called during initialization or as part > + * of the packet processing loop. > + * > + * Note that rte_rcu_qsbr_thread_online must be called before the > + * thread updates its QS using rte_rcu_qsbr_update. > + * > + * @param v > + * QS variable > + * @param thread_id > + * Reader thread with this thread ID will report its quiescent state o= n > + * the QS variable. > + */ > +static __rte_always_inline void __rte_experimental > +rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread= _id) > +{ > + unsigned int i, id; > + > + RTE_ASSERT(v =3D=3D NULL || thread_id >=3D v->max_threads); > + > + id =3D thread_id & RTE_QSBR_THRID_MASK; > + i =3D thread_id >> RTE_QSBR_THRID_INDEX_SHIFT; > + > + /* Release the new register thread ID to other threads > + * calling rte_rcu_qsbr_check. > + */ > + __atomic_fetch_or(&v->reg_thread_id[i], 1UL << id, __ATOMIC_RELEASE); > +} > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Remove a reader thread, from the list of threads reporting their > + * quiescent state on a QS variable. > + * > + * This is implemented as a lock-free function. It is multi-thread safe. > + * This API can be called from the reader threads during shutdown. > + * Ongoing QS queries will stop waiting for the status from this > + * unregistered reader thread. > + * > + * @param v > + * QS variable > + * @param thread_id > + * Reader thread with this thread ID will stop reporting its quiescent > + * state on the QS variable. > + */ > +static __rte_always_inline void __rte_experimental > +rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thre= ad_id) > +{ > + unsigned int i, id; > + > + RTE_ASSERT(v =3D=3D NULL || thread_id >=3D v->max_threads); > + > + id =3D thread_id & RTE_QSBR_THRID_MASK; > + i =3D thread_id >> RTE_QSBR_THRID_INDEX_SHIFT; > + > + /* Make sure the removal of the thread from the list of > + * reporting threads is visible before the thread > + * does anything else. > + */ > + __atomic_fetch_and(&v->reg_thread_id[i], > + ~(1UL << id), __ATOMIC_RELEASE); > +} > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Add a registered reader thread, to the list of threads reporting thei= r > + * quiescent state on a QS variable. > + * > + * This is implemented as a lock-free function. It is multi-thread > + * safe. > + * > + * Any registered reader thread that wants to report its quiescent state= must > + * call this API before calling rte_rcu_qsbr_update. This can be called > + * during initialization or as part of the packet processing loop. > + * > + * The reader thread must call rte_rcu_thread_offline API, before > + * calling any functions that block, to ensure that rte_rcu_qsbr_check > + * API does not wait indefinitely for the reader thread to update its QS= . > + * > + * The reader thread must call rte_rcu_thread_online API, after the bloc= king > + * function call returns, to ensure that rte_rcu_qsbr_check API > + * waits for the reader thread to update its QS. > + * > + * @param v > + * QS variable > + * @param thread_id > + * Reader thread with this thread ID will report its quiescent state o= n > + * the QS variable. > + */ > +static __rte_always_inline void __rte_experimental > +rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_i= d) > +{ > + uint64_t t; > + > + RTE_ASSERT(v =3D=3D NULL || thread_id >=3D v->max_threads); > + > + /* Copy the current value of token. > + * The fence at the end of the function will ensure that > + * the following will not move down after the load of any shared > + * data structure. > + */ > + t =3D __atomic_load_n(&v->token, __ATOMIC_RELAXED); > + > + /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure > + * 'cnt' (64b) is accessed atomically. > + */ > + __atomic_store_n(&RTE_QSBR_CNT_ARRAY_ELM(v, thread_id)->cnt, > + t, __ATOMIC_RELAXED); > + > + /* The subsequent load of the data structure should not > + * move above the store. Hence a store-load barrier > + * is required. > + * If the load of the data structure moves above the store, > + * writer might not see that the reader is online, even though > + * the reader is referencing the shared data structure. > + */ > + __atomic_thread_fence(__ATOMIC_SEQ_CST); If it has to generate a proper memory-barrier here anyway, could it use rte_smp_mb() here? At least for IA it would generate more lightweight one.=20 Konstantin > +} > +