From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from foss.arm.com (foss.arm.com [217.140.101.70]) by dpdk.org (Postfix) with ESMTP id 76AD52C39 for ; Fri, 22 Feb 2019 08:05:10 +0100 (CET) Received: from usa-sjc-imap-foss1.foss.arm.com (unknown [10.72.51.249]) by usa-sjc-mx-foss1.foss.arm.com (Postfix) with ESMTP id E593280D; Thu, 21 Feb 2019 23:05:09 -0800 (PST) Received: from qc2400f-1.austin.arm.com (qc2400f-1.austin.arm.com [10.118.12.104]) by usa-sjc-imap-foss1.foss.arm.com (Postfix) with ESMTPSA id 747B43F690; Thu, 21 Feb 2019 23:05:09 -0800 (PST) From: Honnappa Nagarahalli To: konstantin.ananyev@intel.com, stephen@networkplumber.org, paulmck@linux.ibm.com, dev@dpdk.org, honnappa.nagarahalli@arm.com Cc: gavin.hu@arm.com, dharmik.thakkar@arm.com, malvika.gupta@arm.com, nd@arm.com Date: Fri, 22 Feb 2019 01:04:25 -0600 Message-Id: <20190222070427.22866-4-honnappa.nagarahalli@arm.com> X-Mailer: git-send-email 2.17.1 In-Reply-To: <20190222070427.22866-1-honnappa.nagarahalli@arm.com> References: <20181222021420.5114-1-honnappa.nagarahalli@arm.com> <20190222070427.22866-1-honnappa.nagarahalli@arm.com> Subject: [dpdk-dev] [RFC v3 3/5] lib/rcu: add dynamic memory allocation capability X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Fri, 22 Feb 2019 07:05:11 -0000 rte_rcu_qsbr_get_memsize API is introduced. This will allow the user to controll the amount of memory used based on the maximum number of threads present in the application. Signed-off-by: Honnappa Nagarahalli --- lib/librte_rcu/rte_rcu_qsbr.c | 51 ++++++++++++--- lib/librte_rcu/rte_rcu_qsbr.h | 118 +++++++++++++++++++++------------- 2 files changed, 118 insertions(+), 51 deletions(-) diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c index 3c2577ee2..02464fdba 100644 --- a/lib/librte_rcu/rte_rcu_qsbr.c +++ b/lib/librte_rcu/rte_rcu_qsbr.c @@ -21,11 +21,39 @@ #include "rte_rcu_qsbr.h" +/* Get the memory size of QSBR variable */ +unsigned int __rte_experimental +rte_rcu_qsbr_get_memsize(uint32_t max_threads) +{ + int n; + ssize_t sz; + + RTE_ASSERT(max_threads == 0); + + sz = sizeof(struct rte_rcu_qsbr); + + /* Add the size of the registered thread ID bitmap array */ + n = RTE_ALIGN(max_threads, RTE_QSBR_THRID_ARRAY_ELM_SIZE); + sz += RTE_QSBR_THRID_ARRAY_SIZE(n); + + /* Add the size of quiescent state counter array */ + sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads; + + return RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); +} + /* Initialize a quiescent state variable */ void __rte_experimental -rte_rcu_qsbr_init(struct rte_rcu_qsbr *v) +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads) { - memset(v, 0, sizeof(struct rte_rcu_qsbr)); + RTE_ASSERT(v == NULL); + + memset(v, 0, rte_rcu_qsbr_get_memsize(max_threads)); + v->m_threads = max_threads; + v->ma_threads = RTE_ALIGN(max_threads, RTE_QSBR_THRID_ARRAY_ELM_SIZE); + + v->num_elems = v->ma_threads/RTE_QSBR_THRID_ARRAY_ELM_SIZE; + v->thrid_array_size = RTE_QSBR_THRID_ARRAY_SIZE(v->ma_threads); } /* Dump the details of a single quiescent state variable to a file. */ @@ -39,9 +67,15 @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v) fprintf(f, "\nQuiescent State Variable @%p\n", v); + fprintf(f, " QS variable memory size = %u\n", + rte_rcu_qsbr_get_memsize(v->m_threads)); + fprintf(f, " Given # max threads = %u\n", v->m_threads); + fprintf(f, " Adjusted # max threads = %u\n", v->ma_threads); + fprintf(f, " Registered thread ID mask = 0x"); - for (i = 0; i < RTE_QSBR_BIT_MAP_ELEMS; i++) - fprintf(f, "%lx", __atomic_load_n(&v->reg_thread_id[i], + for (i = 0; i < v->num_elems; i++) + fprintf(f, "%lx", __atomic_load_n( + RTE_QSBR_THRID_ARRAY_ELM(v, i), __ATOMIC_ACQUIRE)); fprintf(f, "\n"); @@ -49,14 +83,15 @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v) __atomic_load_n(&v->token, __ATOMIC_ACQUIRE)); fprintf(f, "Quiescent State Counts for readers:\n"); - for (i = 0; i < RTE_QSBR_BIT_MAP_ELEMS; i++) { - bmap = __atomic_load_n(&v->reg_thread_id[i], + for (i = 0; i < v->num_elems; i++) { + bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i), __ATOMIC_ACQUIRE); while (bmap) { t = __builtin_ctzl(bmap); fprintf(f, "thread ID = %d, count = %lu\n", t, - __atomic_load_n(&v->w[i].cnt, - __ATOMIC_RELAXED)); + __atomic_load_n( + &RTE_QSBR_CNT_ARRAY_ELM(v, i)->cnt, + __ATOMIC_RELAXED)); bmap &= ~(1UL << t); } } diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h index 53e00488b..21fa2c198 100644 --- a/lib/librte_rcu/rte_rcu_qsbr.h +++ b/lib/librte_rcu/rte_rcu_qsbr.h @@ -29,46 +29,71 @@ extern "C" { #include #include -/**< Maximum number of reader threads supported. */ -#define RTE_RCU_MAX_THREADS 128 - -#if !RTE_IS_POWER_OF_2(RTE_RCU_MAX_THREADS) -#error RTE_RCU_MAX_THREADS must be a power of 2 -#endif - -/**< Number of array elements required for the bit-map */ -#define RTE_QSBR_BIT_MAP_ELEMS (RTE_RCU_MAX_THREADS/(sizeof(uint64_t) * 8)) - -/* Thread IDs are stored as a bitmap of 64b element array. Given thread id - * needs to be converted to index into the array and the id within - * the array element. +/* Registered thread IDs are stored as a bitmap of 64b element array. + * Given thread id needs to be converted to index into the array and + * the id within the array element. + */ +/* Thread ID array size + * @param ma_threads + * num of threads aligned to 64 */ -#define RTE_QSBR_THR_INDEX_SHIFT 6 -#define RTE_QSBR_THR_ID_MASK 0x3f +#define RTE_QSBR_THRID_ARRAY_SIZE(ma_threads) \ + RTE_ALIGN((ma_threads) >> 3, RTE_CACHE_LINE_SIZE) +#define RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8) +#define RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *)(v + 1) + i) +#define RTE_QSBR_THRID_INDEX_SHIFT 6 +#define RTE_QSBR_THRID_MASK 0x3f /* Worker thread counter */ struct rte_rcu_qsbr_cnt { uint64_t cnt; /**< Quiescent state counter. */ } __rte_cache_aligned; +#define RTE_QSBR_CNT_ARRAY_ELM(v, i) ((struct rte_rcu_qsbr_cnt *) \ + ((uint8_t *)(v + 1) + v->thrid_array_size) + i) + /** * RTE thread Quiescent State structure. + * The following data, which is dependent on the maximum number of + * threads using this variable, is stored in memory immediately + * following this structure. + * + * 1) registered thread ID bitmap array + * This is a uint64_t array enough to hold 'ma_threads' number + * of thread IDs. + * 2) quiescent state counter array + * This is an array of 'struct rte_rcu_qsbr_cnt' with + * 'm_threads' number of elements. */ struct rte_rcu_qsbr { - uint64_t reg_thread_id[RTE_QSBR_BIT_MAP_ELEMS] __rte_cache_aligned; - /**< Registered reader thread IDs - reader threads reporting - * on this QS variable represented in a bit map. - */ - uint64_t token __rte_cache_aligned; /**< Counter to allow for multiple simultaneous QS queries */ - struct rte_rcu_qsbr_cnt w[RTE_RCU_MAX_THREADS] __rte_cache_aligned; - /**< QS counter for each reader thread, counts upto - * current value of token. - */ + uint32_t thrid_array_size __rte_cache_aligned; + /**< Registered thread ID bitmap array size in bytes */ + uint32_t num_elems; + /**< Number of elements in the thread ID array */ + + uint32_t m_threads; + /**< Maximum number of threads this RCU variable will use */ + uint32_t ma_threads; + /**< Maximum number of threads aligned to 32 */ } __rte_cache_aligned; +/** + * @warning + * @b EXPERIMENTAL: this API may change without prior notice + * + * Return the size of the memory occupied by a Quiescent State (QS) variable. + * + * @param max_threads + * Maximum number of threads reporting QS on this variable. + * @return + * Size of memory in bytes required for this QS variable. + */ +unsigned int __rte_experimental +rte_rcu_qsbr_get_memsize(uint32_t max_threads); + /** * @warning * @b EXPERIMENTAL: this API may change without prior notice @@ -77,10 +102,12 @@ struct rte_rcu_qsbr { * * @param v * QS variable + * @param max_threads + * Maximum number of threads reporting QS on this variable. * */ void __rte_experimental -rte_rcu_qsbr_init(struct rte_rcu_qsbr *v); +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads); /** * @warning @@ -108,24 +135,25 @@ rte_rcu_qsbr_register_thread(struct rte_rcu_qsbr *v, unsigned int thread_id) { unsigned int i, id; - RTE_ASSERT(v == NULL || thread_id >= RTE_RCU_MAX_THREADS); + RTE_ASSERT(v == NULL || thread_id >= v->max_threads); - id = thread_id & RTE_QSBR_THR_ID_MASK; - i = thread_id >> RTE_QSBR_THR_INDEX_SHIFT; + id = thread_id & RTE_QSBR_THRID_MASK; + i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT; /* Worker thread has to count the quiescent states * only from the current value of token. * __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure * 'cnt' (64b) is accessed atomically. */ - __atomic_store_n(&v->w[thread_id].cnt, + __atomic_store_n(&RTE_QSBR_CNT_ARRAY_ELM(v, thread_id)->cnt, __atomic_load_n(&v->token, __ATOMIC_ACQUIRE), __ATOMIC_RELAXED); /* Release the store to initial TQS count so that readers * can use it immediately after this function returns. */ - __atomic_fetch_or(&v->reg_thread_id[i], 1UL << id, __ATOMIC_RELEASE); + __atomic_fetch_or(RTE_QSBR_THRID_ARRAY_ELM(v, i), + 1UL << id, __ATOMIC_RELEASE); } /** @@ -151,16 +179,16 @@ rte_rcu_qsbr_unregister_thread(struct rte_rcu_qsbr *v, unsigned int thread_id) { unsigned int i, id; - RTE_ASSERT(v == NULL || thread_id >= RTE_RCU_MAX_THREADS); + RTE_ASSERT(v == NULL || thread_id >= v->max_threads); - id = thread_id & RTE_QSBR_THR_ID_MASK; - i = thread_id >> RTE_QSBR_THR_INDEX_SHIFT; + id = thread_id & RTE_QSBR_THRID_MASK; + i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT; /* Make sure the removal of the thread from the list of * reporting threads is visible before the thread * does anything else. */ - __atomic_fetch_and(&v->reg_thread_id[i], + __atomic_fetch_and(RTE_QSBR_THRID_ARRAY_ELM(v, i), ~(1UL << id), __ATOMIC_RELEASE); } @@ -212,7 +240,7 @@ rte_rcu_qsbr_update(struct rte_rcu_qsbr *v, unsigned int thread_id) { uint64_t t; - RTE_ASSERT(v == NULL || thread_id >= RTE_RCU_MAX_THREADS); + RTE_ASSERT(v == NULL || thread_id >= v->max_threads); /* Load the token before the reader thread loads any other * (lock-free) data structure. This ensures that updates @@ -228,8 +256,10 @@ rte_rcu_qsbr_update(struct rte_rcu_qsbr *v, unsigned int thread_id) * Copy the current token value. This will end grace period * of multiple concurrent writers. */ - if (__atomic_load_n(&v->w[thread_id].cnt, __ATOMIC_RELAXED) != t) - __atomic_store_n(&v->w[thread_id].cnt, t, __ATOMIC_RELAXED); + if (__atomic_load_n(&RTE_QSBR_CNT_ARRAY_ELM(v, thread_id)->cnt, + __ATOMIC_RELAXED) != t) + __atomic_store_n(&RTE_QSBR_CNT_ARRAY_ELM(v, thread_id)->cnt, + t, __ATOMIC_RELAXED); } /** @@ -268,18 +298,20 @@ rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait) /* Load the current registered thread bit map before * loading the reader thread quiescent state counters. */ - bmap = __atomic_load_n(&v->reg_thread_id[i], __ATOMIC_ACQUIRE); - id = i << RTE_QSBR_THR_INDEX_SHIFT; + bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i), + __ATOMIC_ACQUIRE); + id = i << RTE_QSBR_THRID_INDEX_SHIFT; while (bmap) { j = __builtin_ctzl(bmap); -/* printf ("Status check: token = %lu, wait = %d, Bit Map = 0x%x, Thread ID = %d\n", t, wait, bmap, id+j); */ +/* printf ("Status check: token = %lu, wait = %d, Bit Map = 0x%lx, Thread ID = %d\n", t, wait, bmap, id+j); */ /* __atomic_load_n(cnt, __ATOMIC_RELAXED) * is used to ensure 'cnt' (64b) is accessed * atomically. */ - if (unlikely(__atomic_load_n(&v->w[id + j].cnt, + if (unlikely(__atomic_load_n( + &RTE_QSBR_CNT_ARRAY_ELM(v, id + j)->cnt, __ATOMIC_RELAXED) < t)) { /* printf ("Status not in QS: token = %lu, Wait = %d, Thread QS cnt = %lu, Thread ID = %d\n", t, wait, RTE_QSBR_CNT_ARRAY_ELM(v, id + j)->cnt, id+j); */ @@ -292,7 +324,7 @@ rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait) * Re-read the bitmap. */ bmap = __atomic_load_n( - &v->reg_thread_id[i], + RTE_QSBR_THRID_ARRAY_ELM(v, i), __ATOMIC_ACQUIRE); continue; @@ -302,7 +334,7 @@ rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait) } i++; - } while (i < RTE_QSBR_BIT_MAP_ELEMS); + } while (i < v->num_elems); return 1; } -- 2.17.1