From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 1F527459E1; Fri, 20 Sep 2024 21:41:18 +0200 (CEST) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id A848A40DD1; Fri, 20 Sep 2024 21:41:17 +0200 (CEST) Received: from mail.lysator.liu.se (mail.lysator.liu.se [130.236.254.3]) by mails.dpdk.org (Postfix) with ESMTP id 39143400D6 for ; Fri, 20 Sep 2024 21:41:16 +0200 (CEST) Received: from mail.lysator.liu.se (localhost [127.0.0.1]) by mail.lysator.liu.se (Postfix) with ESMTP id 874DB9A9C for ; Fri, 20 Sep 2024 21:41:15 +0200 (CEST) Received: by mail.lysator.liu.se (Postfix, from userid 1004) id 7AB4D9B11; Fri, 20 Sep 2024 21:41:15 +0200 (CEST) X-Spam-Checker-Version: SpamAssassin 4.0.0 (2022-12-13) on hermod.lysator.liu.se X-Spam-Level: X-Spam-Status: No, score=-1.2 required=5.0 tests=ALL_TRUSTED,AWL, T_SCC_BODY_TEXT_LINE autolearn=disabled version=4.0.0 X-Spam-Score: -1.2 Received: from [192.168.1.86] (h-62-63-215-114.A163.priv.bahnhof.se [62.63.215.114]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by mail.lysator.liu.se (Postfix) with ESMTPSA id EFE1D99CF; Fri, 20 Sep 2024 21:41:11 +0200 (CEST) Message-ID: Date: Fri, 20 Sep 2024 21:41:11 +0200 MIME-Version: 1.0 User-Agent: Mozilla Thunderbird Subject: Re: [RFC PATCH v3] mempool: obey configured cache size To: =?UTF-8?Q?Morten_Br=C3=B8rup?= , dev@dpdk.org References: <20240920163203.840770-1-mb@smartsharesystems.com> <20240920171350.841532-1-mb@smartsharesystems.com> Content-Language: en-US From: =?UTF-8?Q?Mattias_R=C3=B6nnblom?= In-Reply-To: <20240920171350.841532-1-mb@smartsharesystems.com> Content-Type: text/plain; charset=UTF-8; format=flowed Content-Transfer-Encoding: 8bit X-Virus-Scanned: ClamAV using ClamSMTP X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org On 2024-09-20 19:13, Morten Brørup wrote: > Seeking feedback on the concept. > I have not yet benchmarked performance. > > The mempool cache size is configurable, but it could hold 1.5 times the > configured size. > This was confusing for developers, and added complexity when configuring > mempools with caches. > Is there an upside to the current semantics? Are non-power-2 rings/stacks more costly to operate against? > This patch modifies the mempool cache to obey the configured size, and > removes the cache flush threshold. > Maybe it would be worth mentioning what was the original purpose of the threshold (if known), and why it's no longer a good idea (if it ever was). > Furthermore, the mempool caches are now completely flushed/filled to/from > the backend, so backend accesses are CPU cache line aligned. > > Finallly, the mempool get and put functions are optimized to only > inline the likely scenarios, and call a non-inline static helper function > in other cases. What do you mean by "inline" here? Not in the header file, not marked "inline", or something else. (I've read the code so I know what you mean, but it should preferably be clear already in the commit message.) Being in the header file does force the compiler to inline functions, and being in a .c file does not prevent inlining, in case LTO is used. > > Variuos drivers accessing the mempool directly have been updated > accordingly. > > Signed-off-by: Morten Brørup > --- > v3: > * Removed __attribute__(assume). > v2: > * Removed mempool perf test; not part of patch set. > --- > drivers/common/idpf/idpf_common_rxtx_avx512.c | 54 +-- > drivers/mempool/dpaa/dpaa_mempool.c | 16 +- > drivers/mempool/dpaa2/dpaa2_hw_mempool.c | 14 - > drivers/net/i40e/i40e_rxtx_vec_avx512.c | 17 +- > drivers/net/iavf/iavf_rxtx_vec_avx512.c | 27 +- > drivers/net/ice/ice_rxtx_vec_avx512.c | 27 +- > lib/mempool/mempool_trace.h | 1 - > lib/mempool/rte_mempool.c | 12 +- > lib/mempool/rte_mempool.h | 321 +++++++++++------- > 9 files changed, 240 insertions(+), 249 deletions(-) > > diff --git a/drivers/common/idpf/idpf_common_rxtx_avx512.c b/drivers/common/idpf/idpf_common_rxtx_avx512.c > index 3b5e124ec8..98535a48f3 100644 > --- a/drivers/common/idpf/idpf_common_rxtx_avx512.c > +++ b/drivers/common/idpf/idpf_common_rxtx_avx512.c > @@ -1024,21 +1024,13 @@ idpf_tx_singleq_free_bufs_avx512(struct idpf_tx_queue *txq) > rte_lcore_id()); > void **cache_objs; > > - if (cache == NULL || cache->len == 0) > - goto normal; > - > - cache_objs = &cache->objs[cache->len]; > - > - if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) { > - rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n); > + if (!cache || unlikely(n + cache->len > cache->size)) { > + rte_mempool_generic_put(mp, (void *)txep, n, cache); > goto done; > } > > - /* The cache follows the following algorithm > - * 1. Add the objects to the cache > - * 2. Anything greater than the cache min value (if it crosses the > - * cache flush threshold) is flushed to the ring. > - */ > + cache_objs = &cache->objs[cache->len]; > + > /* Add elements back into the cache */ > uint32_t copied = 0; > /* n is multiple of 32 */ > @@ -1056,16 +1048,13 @@ idpf_tx_singleq_free_bufs_avx512(struct idpf_tx_queue *txq) > } > cache->len += n; > > - if (cache->len >= cache->flushthresh) { > - rte_mempool_ops_enqueue_bulk(mp, > - &cache->objs[cache->size], > - cache->len - cache->size); > - cache->len = cache->size; > - } > + /* Increment stat. */ > + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1); > + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n); > + > goto done; > } > > -normal: > m = rte_pktmbuf_prefree_seg(txep[0].mbuf); > if (likely(m != NULL)) { > free[0] = m; > @@ -1335,21 +1324,13 @@ idpf_tx_splitq_free_bufs_avx512(struct idpf_tx_queue *txq) > rte_lcore_id()); > void **cache_objs; > > - if (!cache || cache->len == 0) > - goto normal; > - > - cache_objs = &cache->objs[cache->len]; > - > - if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) { > - rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n); > + if (!cache || unlikely(n + cache->len > cache->size)) { > + rte_mempool_generic_put(mp, (void *)txep, n, cache); > goto done; > } > > - /* The cache follows the following algorithm > - * 1. Add the objects to the cache > - * 2. Anything greater than the cache min value (if it crosses the > - * cache flush threshold) is flushed to the ring. > - */ > + cache_objs = &cache->objs[cache->len]; > + > /* Add elements back into the cache */ > uint32_t copied = 0; > /* n is multiple of 32 */ > @@ -1367,16 +1348,13 @@ idpf_tx_splitq_free_bufs_avx512(struct idpf_tx_queue *txq) > } > cache->len += n; > > - if (cache->len >= cache->flushthresh) { > - rte_mempool_ops_enqueue_bulk(mp, > - &cache->objs[cache->size], > - cache->len - cache->size); > - cache->len = cache->size; > - } > + /* Increment stat. */ > + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1); > + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n); > + > goto done; > } > > -normal: > m = rte_pktmbuf_prefree_seg(txep[0].mbuf); > if (likely(m)) { > free[0] = m; > diff --git a/drivers/mempool/dpaa/dpaa_mempool.c b/drivers/mempool/dpaa/dpaa_mempool.c > index 74bfcab509..3a936826c8 100644 > --- a/drivers/mempool/dpaa/dpaa_mempool.c > +++ b/drivers/mempool/dpaa/dpaa_mempool.c > @@ -51,8 +51,6 @@ dpaa_mbuf_create_pool(struct rte_mempool *mp) > struct bman_pool_params params = { > .flags = BMAN_POOL_FLAG_DYNAMIC_BPID > }; > - unsigned int lcore_id; > - struct rte_mempool_cache *cache; > > MEMPOOL_INIT_FUNC_TRACE(); > > @@ -120,18 +118,6 @@ dpaa_mbuf_create_pool(struct rte_mempool *mp) > rte_memcpy(bp_info, (void *)&rte_dpaa_bpid_info[bpid], > sizeof(struct dpaa_bp_info)); > mp->pool_data = (void *)bp_info; > - /* Update per core mempool cache threshold to optimal value which is > - * number of buffers that can be released to HW buffer pool in > - * a single API call. > - */ > - for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) { > - cache = &mp->local_cache[lcore_id]; > - DPAA_MEMPOOL_DEBUG("lCore %d: cache->flushthresh %d -> %d", > - lcore_id, cache->flushthresh, > - (uint32_t)(cache->size + DPAA_MBUF_MAX_ACQ_REL)); > - if (cache->flushthresh) > - cache->flushthresh = cache->size + DPAA_MBUF_MAX_ACQ_REL; > - } > > DPAA_MEMPOOL_INFO("BMAN pool created for bpid =%d", bpid); > return 0; > @@ -234,7 +220,7 @@ dpaa_mbuf_alloc_bulk(struct rte_mempool *pool, > DPAA_MEMPOOL_DPDEBUG("Request to alloc %d buffers in bpid = %d", > count, bp_info->bpid); > > - if (unlikely(count >= (RTE_MEMPOOL_CACHE_MAX_SIZE * 2))) { > + if (unlikely(count >= RTE_MEMPOOL_CACHE_MAX_SIZE)) { > DPAA_MEMPOOL_ERR("Unable to allocate requested (%u) buffers", > count); > return -1; > diff --git a/drivers/mempool/dpaa2/dpaa2_hw_mempool.c b/drivers/mempool/dpaa2/dpaa2_hw_mempool.c > index 42e17d984c..a44f3cf616 100644 > --- a/drivers/mempool/dpaa2/dpaa2_hw_mempool.c > +++ b/drivers/mempool/dpaa2/dpaa2_hw_mempool.c > @@ -44,8 +44,6 @@ rte_hw_mbuf_create_pool(struct rte_mempool *mp) > struct dpaa2_bp_info *bp_info; > struct dpbp_attr dpbp_attr; > uint32_t bpid; > - unsigned int lcore_id; > - struct rte_mempool_cache *cache; > int ret; > > avail_dpbp = dpaa2_alloc_dpbp_dev(); > @@ -134,18 +132,6 @@ rte_hw_mbuf_create_pool(struct rte_mempool *mp) > DPAA2_MEMPOOL_DEBUG("BP List created for bpid =%d", dpbp_attr.bpid); > > h_bp_list = bp_list; > - /* Update per core mempool cache threshold to optimal value which is > - * number of buffers that can be released to HW buffer pool in > - * a single API call. > - */ > - for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) { > - cache = &mp->local_cache[lcore_id]; > - DPAA2_MEMPOOL_DEBUG("lCore %d: cache->flushthresh %d -> %d", > - lcore_id, cache->flushthresh, > - (uint32_t)(cache->size + DPAA2_MBUF_MAX_ACQ_REL)); > - if (cache->flushthresh) > - cache->flushthresh = cache->size + DPAA2_MBUF_MAX_ACQ_REL; > - } > > return 0; > err3: > diff --git a/drivers/net/i40e/i40e_rxtx_vec_avx512.c b/drivers/net/i40e/i40e_rxtx_vec_avx512.c > index 0238b03f8a..712ab1726f 100644 > --- a/drivers/net/i40e/i40e_rxtx_vec_avx512.c > +++ b/drivers/net/i40e/i40e_rxtx_vec_avx512.c > @@ -783,18 +783,13 @@ i40e_tx_free_bufs_avx512(struct i40e_tx_queue *txq) > struct rte_mempool_cache *cache = rte_mempool_default_cache(mp, > rte_lcore_id()); > > - if (!cache || n > RTE_MEMPOOL_CACHE_MAX_SIZE) { > + if (!cache || unlikely(n + cache->len > cache->size)) { > rte_mempool_generic_put(mp, (void *)txep, n, cache); > goto done; > } > > cache_objs = &cache->objs[cache->len]; > > - /* The cache follows the following algorithm > - * 1. Add the objects to the cache > - * 2. Anything greater than the cache min value (if it > - * crosses the cache flush threshold) is flushed to the ring. > - */ > /* Add elements back into the cache */ > uint32_t copied = 0; > /* n is multiple of 32 */ > @@ -812,12 +807,10 @@ i40e_tx_free_bufs_avx512(struct i40e_tx_queue *txq) > } > cache->len += n; > > - if (cache->len >= cache->flushthresh) { > - rte_mempool_ops_enqueue_bulk > - (mp, &cache->objs[cache->size], > - cache->len - cache->size); > - cache->len = cache->size; > - } > + /* Increment stat. */ > + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1); > + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n); > + > goto done; > } > > diff --git a/drivers/net/iavf/iavf_rxtx_vec_avx512.c b/drivers/net/iavf/iavf_rxtx_vec_avx512.c > index 3bb6f305df..307bb8556a 100644 > --- a/drivers/net/iavf/iavf_rxtx_vec_avx512.c > +++ b/drivers/net/iavf/iavf_rxtx_vec_avx512.c > @@ -1873,21 +1873,13 @@ iavf_tx_free_bufs_avx512(struct iavf_tx_queue *txq) > rte_lcore_id()); > void **cache_objs; > > - if (!cache || cache->len == 0) > - goto normal; > - > - cache_objs = &cache->objs[cache->len]; > - > - if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) { > - rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n); > + if (!cache || unlikely(n + cache->len > cache->size)) { > + rte_mempool_generic_put(mp, (void *)txep, n, cache); > goto done; > } > > - /* The cache follows the following algorithm > - * 1. Add the objects to the cache > - * 2. Anything greater than the cache min value (if it crosses the > - * cache flush threshold) is flushed to the ring. > - */ > + cache_objs = &cache->objs[cache->len]; > + > /* Add elements back into the cache */ > uint32_t copied = 0; > /* n is multiple of 32 */ > @@ -1905,16 +1897,13 @@ iavf_tx_free_bufs_avx512(struct iavf_tx_queue *txq) > } > cache->len += n; > > - if (cache->len >= cache->flushthresh) { > - rte_mempool_ops_enqueue_bulk(mp, > - &cache->objs[cache->size], > - cache->len - cache->size); > - cache->len = cache->size; > - } > + /* Increment stat. */ > + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1); > + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n); > + > goto done; > } > > -normal: > m = rte_pktmbuf_prefree_seg(txep[0].mbuf); > if (likely(m)) { > free[0] = m; > diff --git a/drivers/net/ice/ice_rxtx_vec_avx512.c b/drivers/net/ice/ice_rxtx_vec_avx512.c > index 04148e8ea2..4ea1db734e 100644 > --- a/drivers/net/ice/ice_rxtx_vec_avx512.c > +++ b/drivers/net/ice/ice_rxtx_vec_avx512.c > @@ -888,21 +888,13 @@ ice_tx_free_bufs_avx512(struct ice_tx_queue *txq) > struct rte_mempool_cache *cache = rte_mempool_default_cache(mp, > rte_lcore_id()); > > - if (!cache || cache->len == 0) > - goto normal; > - > - cache_objs = &cache->objs[cache->len]; > - > - if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) { > - rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n); > + if (!cache || unlikely(n + cache->len > cache->size)) { > + rte_mempool_generic_put(mp, (void *)txep, n, cache); > goto done; > } > > - /* The cache follows the following algorithm > - * 1. Add the objects to the cache > - * 2. Anything greater than the cache min value (if it > - * crosses the cache flush threshold) is flushed to the ring. > - */ > + cache_objs = &cache->objs[cache->len]; > + > /* Add elements back into the cache */ > uint32_t copied = 0; > /* n is multiple of 32 */ > @@ -920,16 +912,13 @@ ice_tx_free_bufs_avx512(struct ice_tx_queue *txq) > } > cache->len += n; > > - if (cache->len >= cache->flushthresh) { > - rte_mempool_ops_enqueue_bulk > - (mp, &cache->objs[cache->size], > - cache->len - cache->size); > - cache->len = cache->size; > - } > + /* Increment stat. */ > + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1); > + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n); > + > goto done; > } > > -normal: > m = rte_pktmbuf_prefree_seg(txep[0].mbuf); > if (likely(m)) { > free[0] = m; > diff --git a/lib/mempool/mempool_trace.h b/lib/mempool/mempool_trace.h > index dffef062e4..3c49b41a6d 100644 > --- a/lib/mempool/mempool_trace.h > +++ b/lib/mempool/mempool_trace.h > @@ -112,7 +112,6 @@ RTE_TRACE_POINT( > rte_trace_point_emit_i32(socket_id); > rte_trace_point_emit_ptr(cache); > rte_trace_point_emit_u32(cache->len); > - rte_trace_point_emit_u32(cache->flushthresh); > ) > > RTE_TRACE_POINT( > diff --git a/lib/mempool/rte_mempool.c b/lib/mempool/rte_mempool.c > index d8e39e5c20..40fb13239a 100644 > --- a/lib/mempool/rte_mempool.c > +++ b/lib/mempool/rte_mempool.c > @@ -50,11 +50,6 @@ static void > mempool_event_callback_invoke(enum rte_mempool_event event, > struct rte_mempool *mp); > > -/* Note: avoid using floating point since that compiler > - * may not think that is constant. > - */ > -#define CALC_CACHE_FLUSHTHRESH(c) (((c) * 3) / 2) > - > #if defined(RTE_ARCH_X86) > /* > * return the greatest common divisor between a and b (fast algorithm) > @@ -746,13 +741,12 @@ rte_mempool_free(struct rte_mempool *mp) > static void > mempool_cache_init(struct rte_mempool_cache *cache, uint32_t size) > { > - /* Check that cache have enough space for flush threshold */ > - RTE_BUILD_BUG_ON(CALC_CACHE_FLUSHTHRESH(RTE_MEMPOOL_CACHE_MAX_SIZE) > > + /* Check that cache have enough space for size */ > + RTE_BUILD_BUG_ON(RTE_MEMPOOL_CACHE_MAX_SIZE > > RTE_SIZEOF_FIELD(struct rte_mempool_cache, objs) / > RTE_SIZEOF_FIELD(struct rte_mempool_cache, objs[0])); > > cache->size = size; > - cache->flushthresh = CALC_CACHE_FLUSHTHRESH(size); > cache->len = 0; > } > > @@ -836,7 +830,7 @@ rte_mempool_create_empty(const char *name, unsigned n, unsigned elt_size, > > /* asked cache too big */ > if (cache_size > RTE_MEMPOOL_CACHE_MAX_SIZE || > - CALC_CACHE_FLUSHTHRESH(cache_size) > n) { > + cache_size > n) { > rte_errno = EINVAL; > return NULL; > } > diff --git a/lib/mempool/rte_mempool.h b/lib/mempool/rte_mempool.h > index 7bdc92b812..580c655eb3 100644 > --- a/lib/mempool/rte_mempool.h > +++ b/lib/mempool/rte_mempool.h > @@ -89,10 +89,8 @@ struct __rte_cache_aligned rte_mempool_debug_stats { > */ > struct __rte_cache_aligned rte_mempool_cache { > uint32_t size; /**< Size of the cache */ > - uint32_t flushthresh; /**< Threshold before we flush excess elements */ > uint32_t len; /**< Current cache count */ > #ifdef RTE_LIBRTE_MEMPOOL_STATS > - uint32_t unused; > /* > * Alternative location for the most frequently updated mempool statistics (per-lcore), > * providing faster update access when using a mempool cache. > @@ -110,7 +108,7 @@ struct __rte_cache_aligned rte_mempool_cache { > * Cache is allocated to this size to allow it to overflow in certain > * cases to avoid needless emptying of cache. > */ > - alignas(RTE_CACHE_LINE_SIZE) void *objs[RTE_MEMPOOL_CACHE_MAX_SIZE * 2]; > + alignas(RTE_CACHE_LINE_SIZE) void *objs[RTE_MEMPOOL_CACHE_MAX_SIZE]; > }; > > /** > @@ -1363,7 +1361,8 @@ rte_mempool_cache_flush(struct rte_mempool_cache *cache, > } > > /** > - * @internal Put several objects back in the mempool; used internally. > + * @internal Put several objects back in the mempool; used internally when > + * the number of objects exceeds the remaining space in the mempool cache. > * @param mp > * A pointer to the mempool structure. > * @param obj_table > @@ -1371,58 +1370,86 @@ rte_mempool_cache_flush(struct rte_mempool_cache *cache, > * @param n > * The number of objects to store back in the mempool, must be strictly > * positive. > + * Must be more than the remaining space in the mempool cache, i.e.: > + * cache->len + n > cache->size > * @param cache > - * A pointer to a mempool cache structure. May be NULL if not needed. > + * A pointer to a mempool cache structure. Not NULL. > */ > -static __rte_always_inline void > -rte_mempool_do_generic_put(struct rte_mempool *mp, void * const *obj_table, > - unsigned int n, struct rte_mempool_cache *cache) > +static __rte_noinline void In a sense this is the slow path, but it will still be run quite often, will it not? You could just mark it __rte_unused and let the compiler decide what to do, or maybe better, move it to the .c file if you really don't think a function call proper is a performance issue. With __rte_noinline you will prevent the compiler from inlining (i.e., it's not a hint). Even though it may *know* (i.e., from PGO) that this is a common path (e.g., for cache-less memory pools). > +rte_mempool_do_generic_put_many(struct rte_mempool *mp, void * const *obj_table, > + unsigned int n, struct rte_mempool_cache *cache) > { > void **cache_objs; > + unsigned int len; > + const uint32_t cache_size = cache->size; > > - /* No cache provided */ > - if (unlikely(cache == NULL)) > - goto driver_enqueue; > - > - /* increment stat now, adding in mempool always success */ > + /* Increment stat now, adding in mempool always succeeds. */ > RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1); > RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n); > > - /* The request itself is too big for the cache */ > - if (unlikely(n > cache->flushthresh)) > - goto driver_enqueue_stats_incremented; > - > - /* > - * The cache follows the following algorithm: > - * 1. If the objects cannot be added to the cache without crossing > - * the flush threshold, flush the cache to the backend. > - * 2. Add the objects to the cache. > - */ > - > - if (cache->len + n <= cache->flushthresh) { > - cache_objs = &cache->objs[cache->len]; > - cache->len += n; > - } else { > - cache_objs = &cache->objs[0]; > - rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache->len); > - cache->len = n; > + /* Fill the cache with the first objects. */ > + cache_objs = &cache->objs[cache->len]; > + len = (cache_size - cache->len); > + rte_memcpy(cache_objs, obj_table, sizeof(void *) * len); > + obj_table += len; > + n -= len; > + > + /* Flush the entire cache to the backend. */ > + cache_objs = &cache->objs[0]; > + rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache_size); > + > + if (unlikely(n > cache_size)) { > + /* Push following objects, in entire cache sizes, directly to the backend. */ > + len = n - n % cache_size; > + rte_mempool_ops_enqueue_bulk(mp, obj_table, len); > + obj_table += len; > + n -= len; > } > > - /* Add the objects to the cache. */ > + /* Add the remaining objects to the cache. */ > + cache->len = n; > rte_memcpy(cache_objs, obj_table, sizeof(void *) * n); > +} > > - return; > - > -driver_enqueue: > - > - /* increment stat now, adding in mempool always success */ > - RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1); > - RTE_MEMPOOL_STAT_ADD(mp, put_objs, n); > - > -driver_enqueue_stats_incremented: > +/** > + * @internal Put several objects back in the mempool; used internally. > + * @param mp > + * A pointer to the mempool structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param n > + * The number of objects to store back in the mempool, must be strictly > + * positive. > + * @param cache > + * A pointer to a mempool cache structure. May be NULL if not needed. > + */ > +static __rte_always_inline void > +rte_mempool_do_generic_put(struct rte_mempool *mp, void * const *obj_table, > + unsigned int n, struct rte_mempool_cache *cache) > +{ > + if (likely(cache != NULL)) { > + /* Enough remaining space in the cache? */ > + if (likely(cache->len + n <= cache->size)) { > + void **cache_objs; > + > + /* Increment stat now, adding in mempool always succeeds. */ > + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1); > + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n); > + > + /* Add the objects to the cache. */ > + cache_objs = &cache->objs[cache->len]; > + cache->len += n; > + rte_memcpy(cache_objs, obj_table, sizeof(void *) * n); > + } else > + rte_mempool_do_generic_put_many(mp, obj_table, n, cache); > + } else { > + /* Increment stat now, adding in mempool always succeeds. */ > + RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1); > + RTE_MEMPOOL_STAT_ADD(mp, put_objs, n); > > - /* push objects to the backend */ > - rte_mempool_ops_enqueue_bulk(mp, obj_table, n); > + /* push objects to the backend */ > + rte_mempool_ops_enqueue_bulk(mp, obj_table, n); > + } > } > > > @@ -1490,135 +1517,185 @@ rte_mempool_put(struct rte_mempool *mp, void *obj) > } > > /** > - * @internal Get several objects from the mempool; used internally. > + * @internal Get several objects from the mempool; used internally when > + * the number of objects exceeds what is available in the mempool cache. > * @param mp > * A pointer to the mempool structure. > * @param obj_table > * A pointer to a table of void * pointers (objects). > * @param n > * The number of objects to get, must be strictly positive. > + * Must be more than available in the mempool cache, i.e.: > + * n > cache->len > * @param cache > - * A pointer to a mempool cache structure. May be NULL if not needed. > + * A pointer to a mempool cache structure. Not NULL. > * @return > * - 0: Success. > * - <0: Error; code of driver dequeue function. > */ > -static __rte_always_inline int > -rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table, > - unsigned int n, struct rte_mempool_cache *cache) > +static __rte_noinline int > +rte_mempool_do_generic_get_many(struct rte_mempool *mp, void **obj_table, > + unsigned int n, struct rte_mempool_cache *cache) > { > int ret; > unsigned int remaining; > uint32_t index, len; > void **cache_objs; > + const uint32_t cache_size = cache->size; > > - /* No cache provided */ > - if (unlikely(cache == NULL)) { > - remaining = n; > - goto driver_dequeue; > - } > - > - /* The cache is a stack, so copy will be in reverse order. */ > + /* Serve the first part of the request from the cache to return hot objects first. */ > cache_objs = &cache->objs[cache->len]; > + len = cache->len; > + remaining = n - len; > + for (index = 0; index < len; index++) > + *obj_table++ = *--cache_objs; > > - if (__rte_constant(n) && n <= cache->len) { > + /* At this point, the cache is empty. */ > + > + /* More than can be served from a full cache? */ > + if (unlikely(remaining >= cache_size)) { > /* > - * The request size is known at build time, and > - * the entire request can be satisfied from the cache, > - * so let the compiler unroll the fixed length copy loop. > + * Serve the following part of the request directly from the backend > + * in multipla of the cache size. > */ > - cache->len -= n; > - for (index = 0; index < n; index++) > - *obj_table++ = *--cache_objs; > + len = remaining - remaining % cache_size; > + ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, len); > + if (unlikely(ret < 0)) { > + /* > + * No further action is required to roll back the request, > + * as objects in the cache are intact, and no objects have > + * been dequeued from the backend. > + */ > > - RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1); > - RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n); > + RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1); > + RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n); > > - return 0; > - } > + return ret; > + } > > - /* > - * Use the cache as much as we have to return hot objects first. > - * If the request size 'n' is known at build time, the above comparison > - * ensures that n > cache->len here, so omit RTE_MIN(). > - */ > - len = __rte_constant(n) ? cache->len : RTE_MIN(n, cache->len); > - cache->len -= len; > - remaining = n - len; > - for (index = 0; index < len; index++) > - *obj_table++ = *--cache_objs; > + remaining -= len; > + obj_table += len; > > - /* > - * If the request size 'n' is known at build time, the case > - * where the entire request can be satisfied from the cache > - * has already been handled above, so omit handling it here. > - */ > - if (!__rte_constant(n) && remaining == 0) { > - /* The entire request is satisfied from the cache. */ > + if (unlikely(remaining == 0)) { > + cache->len = 0; > > - RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1); > - RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n); > + RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1); > + RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n); > > - return 0; > + return 0; > + } > } > > - /* if dequeue below would overflow mem allocated for cache */ > - if (unlikely(remaining > RTE_MEMPOOL_CACHE_MAX_SIZE)) > - goto driver_dequeue; > - > - /* Fill the cache from the backend; fetch size + remaining objects. */ > - ret = rte_mempool_ops_dequeue_bulk(mp, cache->objs, > - cache->size + remaining); > + /* Fill the entire cache from the backend. */ > + ret = rte_mempool_ops_dequeue_bulk(mp, cache->objs, cache_size); > if (unlikely(ret < 0)) { > /* > - * We are buffer constrained, and not able to allocate > - * cache + remaining. > - * Do not fill the cache, just satisfy the remaining part of > - * the request directly from the backend. > + * Unable to fill the cache. > + * Last resort: Try only the remaining part of the request, > + * served directly from the backend. > */ > - goto driver_dequeue; > - } > + ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, remaining); > + if (unlikely(ret == 0)) { > + cache->len = 0; > > - /* Satisfy the remaining part of the request from the filled cache. */ > - cache_objs = &cache->objs[cache->size + remaining]; > - for (index = 0; index < remaining; index++) > - *obj_table++ = *--cache_objs; > + RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1); > + RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n); > + > + return 0; > + } > + > + /* Roll back. */ > + if (cache->len + remaining == n) { > + /* > + * No further action is required to roll back the request, > + * as objects in the cache are intact, and no objects have > + * been dequeued from the backend. > + */ > + } else { > + /* Update the state of the cache before putting back the objects. */ > + cache->len = 0; > + > + len = n - remaining; > + obj_table -= len; > + rte_mempool_do_generic_put(mp, obj_table, len, cache); > + } > > - cache->len = cache->size; > + RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1); > + RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n); > + > + return ret; > + } > > + /* Increment stat now, this always succeeds. */ > RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1); > RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n); > > + /* Serve the remaining part of the request from the filled cache. */ > + cache_objs = &cache->objs[cache_size]; > + for (index = 0; index < remaining; index++) > + *obj_table++ = *--cache_objs; > + > + cache->len = cache_size - remaining; > + > return 0; > +} > > -driver_dequeue: > +/** > + * @internal Get several objects from the mempool; used internally. > + * @param mp > + * A pointer to the mempool structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param n > + * The number of objects to get, must be strictly positive. > + * @param cache > + * A pointer to a mempool cache structure. May be NULL if not needed. > + * @return > + * - 0: Success. > + * - <0: Error; code of driver dequeue function. > + */ > +static __rte_always_inline int > +rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table, > + unsigned int n, struct rte_mempool_cache *cache) > +{ > + if (likely(cache != NULL)) { > + /* Enough objects in the cache? */ > + if (n <= cache->len) { > + unsigned int index; > + void **cache_objs; > > - /* Get remaining objects directly from the backend. */ > - ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, remaining); > + /* Increment stat now, this always succeeds. */ > + RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1); > + RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n); > > - if (ret < 0) { > - if (likely(cache != NULL)) { > - cache->len = n - remaining; > /* > - * No further action is required to roll the first part > - * of the request back into the cache, as objects in > - * the cache are intact. > + * The cache is a stack, so copy will be in reverse order. > + * If the request size is known at build time, > + * the compiler will unroll the fixed length copy loop. > */ > - } > - > - RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1); > - RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n); > + cache_objs = &cache->objs[cache->len]; > + cache->len -= n; > + for (index = 0; index < n; index++) > + *obj_table++ = *--cache_objs; > + > + return 0; > + } else > + return rte_mempool_do_generic_get_many(mp, obj_table, n, cache); > } else { > - if (likely(cache != NULL)) { > - RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1); > - RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n); > + int ret; > + > + /* Get the objects directly from the backend. */ > + ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, n); > + if (unlikely(ret < 0)) { > + RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1); > + RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n); > } else { > RTE_MEMPOOL_STAT_ADD(mp, get_success_bulk, 1); > RTE_MEMPOOL_STAT_ADD(mp, get_success_objs, n); > } > - } > > - return ret; > + return ret; > + } > } > > /**