From: "Mattias Rönnblom" <hofors@lysator.liu.se>
To: "Morten Brørup" <mb@smartsharesystems.com>, dev@dpdk.org
Subject: Re: [RFC PATCH v3] mempool: obey configured cache size
Date: Fri, 20 Sep 2024 21:41:11 +0200 [thread overview]
Message-ID: <acf249cd-3aea-4520-bad8-11e565da3199@lysator.liu.se> (raw)
In-Reply-To: <20240920171350.841532-1-mb@smartsharesystems.com>
On 2024-09-20 19:13, Morten Brørup wrote:
> Seeking feedback on the concept.
> I have not yet benchmarked performance.
>
> The mempool cache size is configurable, but it could hold 1.5 times the
> configured size.
> This was confusing for developers, and added complexity when configuring
> mempools with caches.
>
Is there an upside to the current semantics? Are non-power-2
rings/stacks more costly to operate against?
> This patch modifies the mempool cache to obey the configured size, and
> removes the cache flush threshold.
>
Maybe it would be worth mentioning what was the original purpose of the
threshold (if known), and why it's no longer a good idea (if it ever was).
> Furthermore, the mempool caches are now completely flushed/filled to/from
> the backend, so backend accesses are CPU cache line aligned.
>
> Finallly, the mempool get and put functions are optimized to only
> inline the likely scenarios, and call a non-inline static helper function
> in other cases.
What do you mean by "inline" here? Not in the header file, not marked
"inline", or something else. (I've read the code so I know what you
mean, but it should preferably be clear already in the commit message.)
Being in the header file does force the compiler to inline functions,
and being in a .c file does not prevent inlining, in case LTO is used.
>
> Variuos drivers accessing the mempool directly have been updated
> accordingly.
>
> Signed-off-by: Morten Brørup <mb@smartsharesystems.com>
> ---
> v3:
> * Removed __attribute__(assume).
> v2:
> * Removed mempool perf test; not part of patch set.
> ---
> drivers/common/idpf/idpf_common_rxtx_avx512.c | 54 +--
> drivers/mempool/dpaa/dpaa_mempool.c | 16 +-
> drivers/mempool/dpaa2/dpaa2_hw_mempool.c | 14 -
> drivers/net/i40e/i40e_rxtx_vec_avx512.c | 17 +-
> drivers/net/iavf/iavf_rxtx_vec_avx512.c | 27 +-
> drivers/net/ice/ice_rxtx_vec_avx512.c | 27 +-
> lib/mempool/mempool_trace.h | 1 -
> lib/mempool/rte_mempool.c | 12 +-
> lib/mempool/rte_mempool.h | 321 +++++++++++-------
> 9 files changed, 240 insertions(+), 249 deletions(-)
>
> diff --git a/drivers/common/idpf/idpf_common_rxtx_avx512.c b/drivers/common/idpf/idpf_common_rxtx_avx512.c
> index 3b5e124ec8..98535a48f3 100644
> --- a/drivers/common/idpf/idpf_common_rxtx_avx512.c
> +++ b/drivers/common/idpf/idpf_common_rxtx_avx512.c
> @@ -1024,21 +1024,13 @@ idpf_tx_singleq_free_bufs_avx512(struct idpf_tx_queue *txq)
> rte_lcore_id());
> void **cache_objs;
>
> - if (cache == NULL || cache->len == 0)
> - goto normal;
> -
> - cache_objs = &cache->objs[cache->len];
> -
> - if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
> - rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);
> + if (!cache || unlikely(n + cache->len > cache->size)) {
> + rte_mempool_generic_put(mp, (void *)txep, n, cache);
> goto done;
> }
>
> - /* The cache follows the following algorithm
> - * 1. Add the objects to the cache
> - * 2. Anything greater than the cache min value (if it crosses the
> - * cache flush threshold) is flushed to the ring.
> - */
> + cache_objs = &cache->objs[cache->len];
> +
> /* Add elements back into the cache */
> uint32_t copied = 0;
> /* n is multiple of 32 */
> @@ -1056,16 +1048,13 @@ idpf_tx_singleq_free_bufs_avx512(struct idpf_tx_queue *txq)
> }
> cache->len += n;
>
> - if (cache->len >= cache->flushthresh) {
> - rte_mempool_ops_enqueue_bulk(mp,
> - &cache->objs[cache->size],
> - cache->len - cache->size);
> - cache->len = cache->size;
> - }
> + /* Increment stat. */
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
> +
> goto done;
> }
>
> -normal:
> m = rte_pktmbuf_prefree_seg(txep[0].mbuf);
> if (likely(m != NULL)) {
> free[0] = m;
> @@ -1335,21 +1324,13 @@ idpf_tx_splitq_free_bufs_avx512(struct idpf_tx_queue *txq)
> rte_lcore_id());
> void **cache_objs;
>
> - if (!cache || cache->len == 0)
> - goto normal;
> -
> - cache_objs = &cache->objs[cache->len];
> -
> - if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
> - rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);
> + if (!cache || unlikely(n + cache->len > cache->size)) {
> + rte_mempool_generic_put(mp, (void *)txep, n, cache);
> goto done;
> }
>
> - /* The cache follows the following algorithm
> - * 1. Add the objects to the cache
> - * 2. Anything greater than the cache min value (if it crosses the
> - * cache flush threshold) is flushed to the ring.
> - */
> + cache_objs = &cache->objs[cache->len];
> +
> /* Add elements back into the cache */
> uint32_t copied = 0;
> /* n is multiple of 32 */
> @@ -1367,16 +1348,13 @@ idpf_tx_splitq_free_bufs_avx512(struct idpf_tx_queue *txq)
> }
> cache->len += n;
>
> - if (cache->len >= cache->flushthresh) {
> - rte_mempool_ops_enqueue_bulk(mp,
> - &cache->objs[cache->size],
> - cache->len - cache->size);
> - cache->len = cache->size;
> - }
> + /* Increment stat. */
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
> +
> goto done;
> }
>
> -normal:
> m = rte_pktmbuf_prefree_seg(txep[0].mbuf);
> if (likely(m)) {
> free[0] = m;
> diff --git a/drivers/mempool/dpaa/dpaa_mempool.c b/drivers/mempool/dpaa/dpaa_mempool.c
> index 74bfcab509..3a936826c8 100644
> --- a/drivers/mempool/dpaa/dpaa_mempool.c
> +++ b/drivers/mempool/dpaa/dpaa_mempool.c
> @@ -51,8 +51,6 @@ dpaa_mbuf_create_pool(struct rte_mempool *mp)
> struct bman_pool_params params = {
> .flags = BMAN_POOL_FLAG_DYNAMIC_BPID
> };
> - unsigned int lcore_id;
> - struct rte_mempool_cache *cache;
>
> MEMPOOL_INIT_FUNC_TRACE();
>
> @@ -120,18 +118,6 @@ dpaa_mbuf_create_pool(struct rte_mempool *mp)
> rte_memcpy(bp_info, (void *)&rte_dpaa_bpid_info[bpid],
> sizeof(struct dpaa_bp_info));
> mp->pool_data = (void *)bp_info;
> - /* Update per core mempool cache threshold to optimal value which is
> - * number of buffers that can be released to HW buffer pool in
> - * a single API call.
> - */
> - for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
> - cache = &mp->local_cache[lcore_id];
> - DPAA_MEMPOOL_DEBUG("lCore %d: cache->flushthresh %d -> %d",
> - lcore_id, cache->flushthresh,
> - (uint32_t)(cache->size + DPAA_MBUF_MAX_ACQ_REL));
> - if (cache->flushthresh)
> - cache->flushthresh = cache->size + DPAA_MBUF_MAX_ACQ_REL;
> - }
>
> DPAA_MEMPOOL_INFO("BMAN pool created for bpid =%d", bpid);
> return 0;
> @@ -234,7 +220,7 @@ dpaa_mbuf_alloc_bulk(struct rte_mempool *pool,
> DPAA_MEMPOOL_DPDEBUG("Request to alloc %d buffers in bpid = %d",
> count, bp_info->bpid);
>
> - if (unlikely(count >= (RTE_MEMPOOL_CACHE_MAX_SIZE * 2))) {
> + if (unlikely(count >= RTE_MEMPOOL_CACHE_MAX_SIZE)) {
> DPAA_MEMPOOL_ERR("Unable to allocate requested (%u) buffers",
> count);
> return -1;
> diff --git a/drivers/mempool/dpaa2/dpaa2_hw_mempool.c b/drivers/mempool/dpaa2/dpaa2_hw_mempool.c
> index 42e17d984c..a44f3cf616 100644
> --- a/drivers/mempool/dpaa2/dpaa2_hw_mempool.c
> +++ b/drivers/mempool/dpaa2/dpaa2_hw_mempool.c
> @@ -44,8 +44,6 @@ rte_hw_mbuf_create_pool(struct rte_mempool *mp)
> struct dpaa2_bp_info *bp_info;
> struct dpbp_attr dpbp_attr;
> uint32_t bpid;
> - unsigned int lcore_id;
> - struct rte_mempool_cache *cache;
> int ret;
>
> avail_dpbp = dpaa2_alloc_dpbp_dev();
> @@ -134,18 +132,6 @@ rte_hw_mbuf_create_pool(struct rte_mempool *mp)
> DPAA2_MEMPOOL_DEBUG("BP List created for bpid =%d", dpbp_attr.bpid);
>
> h_bp_list = bp_list;
> - /* Update per core mempool cache threshold to optimal value which is
> - * number of buffers that can be released to HW buffer pool in
> - * a single API call.
> - */
> - for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
> - cache = &mp->local_cache[lcore_id];
> - DPAA2_MEMPOOL_DEBUG("lCore %d: cache->flushthresh %d -> %d",
> - lcore_id, cache->flushthresh,
> - (uint32_t)(cache->size + DPAA2_MBUF_MAX_ACQ_REL));
> - if (cache->flushthresh)
> - cache->flushthresh = cache->size + DPAA2_MBUF_MAX_ACQ_REL;
> - }
>
> return 0;
> err3:
> diff --git a/drivers/net/i40e/i40e_rxtx_vec_avx512.c b/drivers/net/i40e/i40e_rxtx_vec_avx512.c
> index 0238b03f8a..712ab1726f 100644
> --- a/drivers/net/i40e/i40e_rxtx_vec_avx512.c
> +++ b/drivers/net/i40e/i40e_rxtx_vec_avx512.c
> @@ -783,18 +783,13 @@ i40e_tx_free_bufs_avx512(struct i40e_tx_queue *txq)
> struct rte_mempool_cache *cache = rte_mempool_default_cache(mp,
> rte_lcore_id());
>
> - if (!cache || n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
> + if (!cache || unlikely(n + cache->len > cache->size)) {
> rte_mempool_generic_put(mp, (void *)txep, n, cache);
> goto done;
> }
>
> cache_objs = &cache->objs[cache->len];
>
> - /* The cache follows the following algorithm
> - * 1. Add the objects to the cache
> - * 2. Anything greater than the cache min value (if it
> - * crosses the cache flush threshold) is flushed to the ring.
> - */
> /* Add elements back into the cache */
> uint32_t copied = 0;
> /* n is multiple of 32 */
> @@ -812,12 +807,10 @@ i40e_tx_free_bufs_avx512(struct i40e_tx_queue *txq)
> }
> cache->len += n;
>
> - if (cache->len >= cache->flushthresh) {
> - rte_mempool_ops_enqueue_bulk
> - (mp, &cache->objs[cache->size],
> - cache->len - cache->size);
> - cache->len = cache->size;
> - }
> + /* Increment stat. */
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
> +
> goto done;
> }
>
> diff --git a/drivers/net/iavf/iavf_rxtx_vec_avx512.c b/drivers/net/iavf/iavf_rxtx_vec_avx512.c
> index 3bb6f305df..307bb8556a 100644
> --- a/drivers/net/iavf/iavf_rxtx_vec_avx512.c
> +++ b/drivers/net/iavf/iavf_rxtx_vec_avx512.c
> @@ -1873,21 +1873,13 @@ iavf_tx_free_bufs_avx512(struct iavf_tx_queue *txq)
> rte_lcore_id());
> void **cache_objs;
>
> - if (!cache || cache->len == 0)
> - goto normal;
> -
> - cache_objs = &cache->objs[cache->len];
> -
> - if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
> - rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);
> + if (!cache || unlikely(n + cache->len > cache->size)) {
> + rte_mempool_generic_put(mp, (void *)txep, n, cache);
> goto done;
> }
>
> - /* The cache follows the following algorithm
> - * 1. Add the objects to the cache
> - * 2. Anything greater than the cache min value (if it crosses the
> - * cache flush threshold) is flushed to the ring.
> - */
> + cache_objs = &cache->objs[cache->len];
> +
> /* Add elements back into the cache */
> uint32_t copied = 0;
> /* n is multiple of 32 */
> @@ -1905,16 +1897,13 @@ iavf_tx_free_bufs_avx512(struct iavf_tx_queue *txq)
> }
> cache->len += n;
>
> - if (cache->len >= cache->flushthresh) {
> - rte_mempool_ops_enqueue_bulk(mp,
> - &cache->objs[cache->size],
> - cache->len - cache->size);
> - cache->len = cache->size;
> - }
> + /* Increment stat. */
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
> +
> goto done;
> }
>
> -normal:
> m = rte_pktmbuf_prefree_seg(txep[0].mbuf);
> if (likely(m)) {
> free[0] = m;
> diff --git a/drivers/net/ice/ice_rxtx_vec_avx512.c b/drivers/net/ice/ice_rxtx_vec_avx512.c
> index 04148e8ea2..4ea1db734e 100644
> --- a/drivers/net/ice/ice_rxtx_vec_avx512.c
> +++ b/drivers/net/ice/ice_rxtx_vec_avx512.c
> @@ -888,21 +888,13 @@ ice_tx_free_bufs_avx512(struct ice_tx_queue *txq)
> struct rte_mempool_cache *cache = rte_mempool_default_cache(mp,
> rte_lcore_id());
>
> - if (!cache || cache->len == 0)
> - goto normal;
> -
> - cache_objs = &cache->objs[cache->len];
> -
> - if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
> - rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);
> + if (!cache || unlikely(n + cache->len > cache->size)) {
> + rte_mempool_generic_put(mp, (void *)txep, n, cache);
> goto done;
> }
>
> - /* The cache follows the following algorithm
> - * 1. Add the objects to the cache
> - * 2. Anything greater than the cache min value (if it
> - * crosses the cache flush threshold) is flushed to the ring.
> - */
> + cache_objs = &cache->objs[cache->len];
> +
> /* Add elements back into the cache */
> uint32_t copied = 0;
> /* n is multiple of 32 */
> @@ -920,16 +912,13 @@ ice_tx_free_bufs_avx512(struct ice_tx_queue *txq)
> }
> cache->len += n;
>
> - if (cache->len >= cache->flushthresh) {
> - rte_mempool_ops_enqueue_bulk
> - (mp, &cache->objs[cache->size],
> - cache->len - cache->size);
> - cache->len = cache->size;
> - }
> + /* Increment stat. */
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
> +
> goto done;
> }
>
> -normal:
> m = rte_pktmbuf_prefree_seg(txep[0].mbuf);
> if (likely(m)) {
> free[0] = m;
> diff --git a/lib/mempool/mempool_trace.h b/lib/mempool/mempool_trace.h
> index dffef062e4..3c49b41a6d 100644
> --- a/lib/mempool/mempool_trace.h
> +++ b/lib/mempool/mempool_trace.h
> @@ -112,7 +112,6 @@ RTE_TRACE_POINT(
> rte_trace_point_emit_i32(socket_id);
> rte_trace_point_emit_ptr(cache);
> rte_trace_point_emit_u32(cache->len);
> - rte_trace_point_emit_u32(cache->flushthresh);
> )
>
> RTE_TRACE_POINT(
> diff --git a/lib/mempool/rte_mempool.c b/lib/mempool/rte_mempool.c
> index d8e39e5c20..40fb13239a 100644
> --- a/lib/mempool/rte_mempool.c
> +++ b/lib/mempool/rte_mempool.c
> @@ -50,11 +50,6 @@ static void
> mempool_event_callback_invoke(enum rte_mempool_event event,
> struct rte_mempool *mp);
>
> -/* Note: avoid using floating point since that compiler
> - * may not think that is constant.
> - */
> -#define CALC_CACHE_FLUSHTHRESH(c) (((c) * 3) / 2)
> -
> #if defined(RTE_ARCH_X86)
> /*
> * return the greatest common divisor between a and b (fast algorithm)
> @@ -746,13 +741,12 @@ rte_mempool_free(struct rte_mempool *mp)
> static void
> mempool_cache_init(struct rte_mempool_cache *cache, uint32_t size)
> {
> - /* Check that cache have enough space for flush threshold */
> - RTE_BUILD_BUG_ON(CALC_CACHE_FLUSHTHRESH(RTE_MEMPOOL_CACHE_MAX_SIZE) >
> + /* Check that cache have enough space for size */
> + RTE_BUILD_BUG_ON(RTE_MEMPOOL_CACHE_MAX_SIZE >
> RTE_SIZEOF_FIELD(struct rte_mempool_cache, objs) /
> RTE_SIZEOF_FIELD(struct rte_mempool_cache, objs[0]));
>
> cache->size = size;
> - cache->flushthresh = CALC_CACHE_FLUSHTHRESH(size);
> cache->len = 0;
> }
>
> @@ -836,7 +830,7 @@ rte_mempool_create_empty(const char *name, unsigned n, unsigned elt_size,
>
> /* asked cache too big */
> if (cache_size > RTE_MEMPOOL_CACHE_MAX_SIZE ||
> - CALC_CACHE_FLUSHTHRESH(cache_size) > n) {
> + cache_size > n) {
> rte_errno = EINVAL;
> return NULL;
> }
> diff --git a/lib/mempool/rte_mempool.h b/lib/mempool/rte_mempool.h
> index 7bdc92b812..580c655eb3 100644
> --- a/lib/mempool/rte_mempool.h
> +++ b/lib/mempool/rte_mempool.h
> @@ -89,10 +89,8 @@ struct __rte_cache_aligned rte_mempool_debug_stats {
> */
> struct __rte_cache_aligned rte_mempool_cache {
> uint32_t size; /**< Size of the cache */
> - uint32_t flushthresh; /**< Threshold before we flush excess elements */
> uint32_t len; /**< Current cache count */
> #ifdef RTE_LIBRTE_MEMPOOL_STATS
> - uint32_t unused;
> /*
> * Alternative location for the most frequently updated mempool statistics (per-lcore),
> * providing faster update access when using a mempool cache.
> @@ -110,7 +108,7 @@ struct __rte_cache_aligned rte_mempool_cache {
> * Cache is allocated to this size to allow it to overflow in certain
> * cases to avoid needless emptying of cache.
> */
> - alignas(RTE_CACHE_LINE_SIZE) void *objs[RTE_MEMPOOL_CACHE_MAX_SIZE * 2];
> + alignas(RTE_CACHE_LINE_SIZE) void *objs[RTE_MEMPOOL_CACHE_MAX_SIZE];
> };
>
> /**
> @@ -1363,7 +1361,8 @@ rte_mempool_cache_flush(struct rte_mempool_cache *cache,
> }
>
> /**
> - * @internal Put several objects back in the mempool; used internally.
> + * @internal Put several objects back in the mempool; used internally when
> + * the number of objects exceeds the remaining space in the mempool cache.
> * @param mp
> * A pointer to the mempool structure.
> * @param obj_table
> @@ -1371,58 +1370,86 @@ rte_mempool_cache_flush(struct rte_mempool_cache *cache,
> * @param n
> * The number of objects to store back in the mempool, must be strictly
> * positive.
> + * Must be more than the remaining space in the mempool cache, i.e.:
> + * cache->len + n > cache->size
> * @param cache
> - * A pointer to a mempool cache structure. May be NULL if not needed.
> + * A pointer to a mempool cache structure. Not NULL.
> */
> -static __rte_always_inline void
> -rte_mempool_do_generic_put(struct rte_mempool *mp, void * const *obj_table,
> - unsigned int n, struct rte_mempool_cache *cache)
> +static __rte_noinline void
In a sense this is the slow path, but it will still be run quite often,
will it not? You could just mark it __rte_unused and let the compiler
decide what to do, or maybe better, move it to the .c file if you really
don't think a function call proper is a performance issue.
With __rte_noinline you will prevent the compiler from inlining (i.e.,
it's not a hint). Even though it may *know* (i.e., from PGO) that this
is a common path (e.g., for cache-less memory pools).
> +rte_mempool_do_generic_put_many(struct rte_mempool *mp, void * const *obj_table,
> + unsigned int n, struct rte_mempool_cache *cache)
> {
> void **cache_objs;
> + unsigned int len;
> + const uint32_t cache_size = cache->size;
>
> - /* No cache provided */
> - if (unlikely(cache == NULL))
> - goto driver_enqueue;
> -
> - /* increment stat now, adding in mempool always success */
> + /* Increment stat now, adding in mempool always succeeds. */
> RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
> RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
>
> - /* The request itself is too big for the cache */
> - if (unlikely(n > cache->flushthresh))
> - goto driver_enqueue_stats_incremented;
> -
> - /*
> - * The cache follows the following algorithm:
> - * 1. If the objects cannot be added to the cache without crossing
> - * the flush threshold, flush the cache to the backend.
> - * 2. Add the objects to the cache.
> - */
> -
> - if (cache->len + n <= cache->flushthresh) {
> - cache_objs = &cache->objs[cache->len];
> - cache->len += n;
> - } else {
> - cache_objs = &cache->objs[0];
> - rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache->len);
> - cache->len = n;
> + /* Fill the cache with the first objects. */
> + cache_objs = &cache->objs[cache->len];
> + len = (cache_size - cache->len);
> + rte_memcpy(cache_objs, obj_table, sizeof(void *) * len);
> + obj_table += len;
> + n -= len;
> +
> + /* Flush the entire cache to the backend. */
> + cache_objs = &cache->objs[0];
> + rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache_size);
> +
> + if (unlikely(n > cache_size)) {
> + /* Push following objects, in entire cache sizes, directly to the backend. */
> + len = n - n % cache_size;
> + rte_mempool_ops_enqueue_bulk(mp, obj_table, len);
> + obj_table += len;
> + n -= len;
> }
>
> - /* Add the objects to the cache. */
> + /* Add the remaining objects to the cache. */
> + cache->len = n;
> rte_memcpy(cache_objs, obj_table, sizeof(void *) * n);
> +}
>
> - return;
> -
> -driver_enqueue:
> -
> - /* increment stat now, adding in mempool always success */
> - RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1);
> - RTE_MEMPOOL_STAT_ADD(mp, put_objs, n);
> -
> -driver_enqueue_stats_incremented:
> +/**
> + * @internal Put several objects back in the mempool; used internally.
> + * @param mp
> + * A pointer to the mempool structure.
> + * @param obj_table
> + * A pointer to a table of void * pointers (objects).
> + * @param n
> + * The number of objects to store back in the mempool, must be strictly
> + * positive.
> + * @param cache
> + * A pointer to a mempool cache structure. May be NULL if not needed.
> + */
> +static __rte_always_inline void
> +rte_mempool_do_generic_put(struct rte_mempool *mp, void * const *obj_table,
> + unsigned int n, struct rte_mempool_cache *cache)
> +{
> + if (likely(cache != NULL)) {
> + /* Enough remaining space in the cache? */
> + if (likely(cache->len + n <= cache->size)) {
> + void **cache_objs;
> +
> + /* Increment stat now, adding in mempool always succeeds. */
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
> +
> + /* Add the objects to the cache. */
> + cache_objs = &cache->objs[cache->len];
> + cache->len += n;
> + rte_memcpy(cache_objs, obj_table, sizeof(void *) * n);
> + } else
> + rte_mempool_do_generic_put_many(mp, obj_table, n, cache);
> + } else {
> + /* Increment stat now, adding in mempool always succeeds. */
> + RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1);
> + RTE_MEMPOOL_STAT_ADD(mp, put_objs, n);
>
> - /* push objects to the backend */
> - rte_mempool_ops_enqueue_bulk(mp, obj_table, n);
> + /* push objects to the backend */
> + rte_mempool_ops_enqueue_bulk(mp, obj_table, n);
> + }
> }
>
>
> @@ -1490,135 +1517,185 @@ rte_mempool_put(struct rte_mempool *mp, void *obj)
> }
>
> /**
> - * @internal Get several objects from the mempool; used internally.
> + * @internal Get several objects from the mempool; used internally when
> + * the number of objects exceeds what is available in the mempool cache.
> * @param mp
> * A pointer to the mempool structure.
> * @param obj_table
> * A pointer to a table of void * pointers (objects).
> * @param n
> * The number of objects to get, must be strictly positive.
> + * Must be more than available in the mempool cache, i.e.:
> + * n > cache->len
> * @param cache
> - * A pointer to a mempool cache structure. May be NULL if not needed.
> + * A pointer to a mempool cache structure. Not NULL.
> * @return
> * - 0: Success.
> * - <0: Error; code of driver dequeue function.
> */
> -static __rte_always_inline int
> -rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table,
> - unsigned int n, struct rte_mempool_cache *cache)
> +static __rte_noinline int
> +rte_mempool_do_generic_get_many(struct rte_mempool *mp, void **obj_table,
> + unsigned int n, struct rte_mempool_cache *cache)
> {
> int ret;
> unsigned int remaining;
> uint32_t index, len;
> void **cache_objs;
> + const uint32_t cache_size = cache->size;
>
> - /* No cache provided */
> - if (unlikely(cache == NULL)) {
> - remaining = n;
> - goto driver_dequeue;
> - }
> -
> - /* The cache is a stack, so copy will be in reverse order. */
> + /* Serve the first part of the request from the cache to return hot objects first. */
> cache_objs = &cache->objs[cache->len];
> + len = cache->len;
> + remaining = n - len;
> + for (index = 0; index < len; index++)
> + *obj_table++ = *--cache_objs;
>
> - if (__rte_constant(n) && n <= cache->len) {
> + /* At this point, the cache is empty. */
> +
> + /* More than can be served from a full cache? */
> + if (unlikely(remaining >= cache_size)) {
> /*
> - * The request size is known at build time, and
> - * the entire request can be satisfied from the cache,
> - * so let the compiler unroll the fixed length copy loop.
> + * Serve the following part of the request directly from the backend
> + * in multipla of the cache size.
> */
> - cache->len -= n;
> - for (index = 0; index < n; index++)
> - *obj_table++ = *--cache_objs;
> + len = remaining - remaining % cache_size;
> + ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, len);
> + if (unlikely(ret < 0)) {
> + /*
> + * No further action is required to roll back the request,
> + * as objects in the cache are intact, and no objects have
> + * been dequeued from the backend.
> + */
>
> - RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
> - RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
> + RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
> + RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);
>
> - return 0;
> - }
> + return ret;
> + }
>
> - /*
> - * Use the cache as much as we have to return hot objects first.
> - * If the request size 'n' is known at build time, the above comparison
> - * ensures that n > cache->len here, so omit RTE_MIN().
> - */
> - len = __rte_constant(n) ? cache->len : RTE_MIN(n, cache->len);
> - cache->len -= len;
> - remaining = n - len;
> - for (index = 0; index < len; index++)
> - *obj_table++ = *--cache_objs;
> + remaining -= len;
> + obj_table += len;
>
> - /*
> - * If the request size 'n' is known at build time, the case
> - * where the entire request can be satisfied from the cache
> - * has already been handled above, so omit handling it here.
> - */
> - if (!__rte_constant(n) && remaining == 0) {
> - /* The entire request is satisfied from the cache. */
> + if (unlikely(remaining == 0)) {
> + cache->len = 0;
>
> - RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
> - RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
>
> - return 0;
> + return 0;
> + }
> }
>
> - /* if dequeue below would overflow mem allocated for cache */
> - if (unlikely(remaining > RTE_MEMPOOL_CACHE_MAX_SIZE))
> - goto driver_dequeue;
> -
> - /* Fill the cache from the backend; fetch size + remaining objects. */
> - ret = rte_mempool_ops_dequeue_bulk(mp, cache->objs,
> - cache->size + remaining);
> + /* Fill the entire cache from the backend. */
> + ret = rte_mempool_ops_dequeue_bulk(mp, cache->objs, cache_size);
> if (unlikely(ret < 0)) {
> /*
> - * We are buffer constrained, and not able to allocate
> - * cache + remaining.
> - * Do not fill the cache, just satisfy the remaining part of
> - * the request directly from the backend.
> + * Unable to fill the cache.
> + * Last resort: Try only the remaining part of the request,
> + * served directly from the backend.
> */
> - goto driver_dequeue;
> - }
> + ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, remaining);
> + if (unlikely(ret == 0)) {
> + cache->len = 0;
>
> - /* Satisfy the remaining part of the request from the filled cache. */
> - cache_objs = &cache->objs[cache->size + remaining];
> - for (index = 0; index < remaining; index++)
> - *obj_table++ = *--cache_objs;
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
> +
> + return 0;
> + }
> +
> + /* Roll back. */
> + if (cache->len + remaining == n) {
> + /*
> + * No further action is required to roll back the request,
> + * as objects in the cache are intact, and no objects have
> + * been dequeued from the backend.
> + */
> + } else {
> + /* Update the state of the cache before putting back the objects. */
> + cache->len = 0;
> +
> + len = n - remaining;
> + obj_table -= len;
> + rte_mempool_do_generic_put(mp, obj_table, len, cache);
> + }
>
> - cache->len = cache->size;
> + RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
> + RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);
> +
> + return ret;
> + }
>
> + /* Increment stat now, this always succeeds. */
> RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
> RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
>
> + /* Serve the remaining part of the request from the filled cache. */
> + cache_objs = &cache->objs[cache_size];
> + for (index = 0; index < remaining; index++)
> + *obj_table++ = *--cache_objs;
> +
> + cache->len = cache_size - remaining;
> +
> return 0;
> +}
>
> -driver_dequeue:
> +/**
> + * @internal Get several objects from the mempool; used internally.
> + * @param mp
> + * A pointer to the mempool structure.
> + * @param obj_table
> + * A pointer to a table of void * pointers (objects).
> + * @param n
> + * The number of objects to get, must be strictly positive.
> + * @param cache
> + * A pointer to a mempool cache structure. May be NULL if not needed.
> + * @return
> + * - 0: Success.
> + * - <0: Error; code of driver dequeue function.
> + */
> +static __rte_always_inline int
> +rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table,
> + unsigned int n, struct rte_mempool_cache *cache)
> +{
> + if (likely(cache != NULL)) {
> + /* Enough objects in the cache? */
> + if (n <= cache->len) {
> + unsigned int index;
> + void **cache_objs;
>
> - /* Get remaining objects directly from the backend. */
> - ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, remaining);
> + /* Increment stat now, this always succeeds. */
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
>
> - if (ret < 0) {
> - if (likely(cache != NULL)) {
> - cache->len = n - remaining;
> /*
> - * No further action is required to roll the first part
> - * of the request back into the cache, as objects in
> - * the cache are intact.
> + * The cache is a stack, so copy will be in reverse order.
> + * If the request size is known at build time,
> + * the compiler will unroll the fixed length copy loop.
> */
> - }
> -
> - RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
> - RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);
> + cache_objs = &cache->objs[cache->len];
> + cache->len -= n;
> + for (index = 0; index < n; index++)
> + *obj_table++ = *--cache_objs;
> +
> + return 0;
> + } else
> + return rte_mempool_do_generic_get_many(mp, obj_table, n, cache);
> } else {
> - if (likely(cache != NULL)) {
> - RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
> - RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
> + int ret;
> +
> + /* Get the objects directly from the backend. */
> + ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, n);
> + if (unlikely(ret < 0)) {
> + RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
> + RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);
> } else {
> RTE_MEMPOOL_STAT_ADD(mp, get_success_bulk, 1);
> RTE_MEMPOOL_STAT_ADD(mp, get_success_objs, n);
> }
> - }
>
> - return ret;
> + return ret;
> + }
> }
>
> /**
next prev parent reply other threads:[~2024-09-20 19:41 UTC|newest]
Thread overview: 19+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-09-20 16:32 [RFC PATCH] " Morten Brørup
2024-09-20 16:37 ` [RFC PATCH v2] " Morten Brørup
2024-09-20 17:13 ` [RFC PATCH v3] " Morten Brørup
2024-09-20 19:41 ` Mattias Rönnblom [this message]
2024-09-22 10:50 ` [RFC PATCH v4] mempool: fix mempool " Morten Brørup
2024-09-24 3:58 ` [RFC PATCH v5] " Morten Brørup
2024-09-24 11:58 ` [RFC PATCH v6] " Morten Brørup
2024-09-24 18:12 ` [RFC PATCH v7] " Morten Brørup
2024-09-24 20:44 ` Patrick Robb
2024-09-25 21:33 ` [RFC PATCH v8] " Morten Brørup
2024-09-26 18:24 ` [RFC PATCH v9] " Morten Brørup
2024-09-26 20:53 ` [RFC PATCH v10] " Morten Brørup
2024-09-28 17:32 ` [RFC PATCH v11] " Morten Brørup
2024-09-28 19:38 ` [RFC PATCH v12] " Morten Brørup
2024-10-01 11:33 ` [RFC PATCH v13] " Morten Brørup
2024-10-01 13:41 ` [RFC PATCH v14] " Morten Brørup
2024-10-02 11:25 ` [RFC PATCH v15] " Morten Brørup
2024-10-02 14:35 ` [RFC PATCH v16] " Morten Brørup
2024-10-02 15:00 ` [RFC PATCH v17] " Morten Brørup
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=acf249cd-3aea-4520-bad8-11e565da3199@lysator.liu.se \
--to=hofors@lysator.liu.se \
--cc=dev@dpdk.org \
--cc=mb@smartsharesystems.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).