From: Patrick Robb <probb@iol.unh.edu>
To: "Morten Brørup" <mb@smartsharesystems.com>
Cc: dev@dpdk.org, "Mattias Rönnblom" <hofors@lysator.liu.se>
Subject: Re: [RFC PATCH v7] mempool: fix mempool cache size
Date: Tue, 24 Sep 2024 16:44:29 -0400 [thread overview]
Message-ID: <CAJvnSUBncAuQN25DAuVWqJhN_55im6U7bXthM89TaOOb-su0=w@mail.gmail.com> (raw)
In-Reply-To: <20240924181224.1562346-1-mb@smartsharesystems.com>
[-- Attachment #1: Type: text/plain, Size: 37732 bytes --]
Recheck-request: iol-intel-Performance
On Tue, Sep 24, 2024 at 2:12 PM Morten Brørup <mb@smartsharesystems.com>
wrote:
> This patch refactors the mempool cache to fix two bugs:
> 1. When a mempool is created with a cache size of N objects, the cache was
> actually created with a size of 1.5 * N objects.
> 2. The mempool cache field names did not reflect their purpose;
> the "flushthresh" field held the size, and the "size" field held the
> number of objects remaining in the cache when returning from a get
> operation refilling it from the backend.
>
> Especially the first item could be fatal:
> When more objects than a mempool's configured cache size is held in the
> mempool's caches associated with other lcores, a rightsized mempool may
> unexpectedly run out of objects, causing the application to fail.
>
> Furthermore, this patch introduces two optimizations:
> 1. The mempool caches are flushed to/filled from the backend in their
> entirety, so backend accesses are CPU cache line aligned. (Assuming the
> mempool cache size is a multiplum of a CPU cache line size divided by the
> size of a pointer.)
> 2. The unlikely paths in the get and put functions, where the cache is
> flushed to/filled from the backend, are moved from the inline functions to
> separate helper functions, thereby reducing the code size of the inline
> functions.
> Note: Accessing the backend for cacheless mempools remains inline.
>
> Various drivers accessing the mempool directly have been updated
> accordingly.
> These drivers did not update mempool statistics when accessing the mempool
> directly, so that is fixed too.
>
> Note: Performance not yet benchmarked.
>
> Signed-off-by: Morten Brørup <mb@smartsharesystems.com>
> ---
> v7:
> * Increased max mempool cache size from 512 to 1024 objects.
> Mainly for CI performance test purposes.
> Originally, the max mempool cache size was 768 objects, and used a fixed
> size array of 1024 objects in the mempool cache structure.
> v6:
> * Fix v5 incomplete implementation of passing large requests directly to
> the backend.
> * Use memcpy instead of rte_memcpy where compiler complains about it.
> * Added const to some function parameters.
> v5:
> * Moved helper functions back into the header file, for improved
> performance.
> * Pass large requests directly to the backend. This also simplifies the
> code.
> v4:
> * Updated subject to reflect that misleading names are considered bugs.
> * Rewrote patch description to provide more details about the bugs fixed.
> (Mattias Rönnblom)
> * Moved helper functions, not to be inlined, to mempool C file.
> (Mattias Rönnblom)
> * Pass requests for n >= RTE_MEMPOOL_CACHE_MAX_SIZE objects known at build
> time directly to backend driver, to avoid calling the helper functions.
> This also fixes the compiler warnings about out of bounds array access.
> v3:
> * Removed __attribute__(assume).
> v2:
> * Removed mempool perf test; not part of patch set.
> ---
> config/rte_config.h | 2 +-
> drivers/common/idpf/idpf_common_rxtx_avx512.c | 54 +---
> drivers/mempool/dpaa/dpaa_mempool.c | 16 +-
> drivers/mempool/dpaa2/dpaa2_hw_mempool.c | 14 -
> drivers/net/i40e/i40e_rxtx_vec_avx512.c | 17 +-
> drivers/net/iavf/iavf_rxtx_vec_avx512.c | 27 +-
> drivers/net/ice/ice_rxtx_vec_avx512.c | 27 +-
> lib/mempool/mempool_trace.h | 1 -
> lib/mempool/rte_mempool.c | 12 +-
> lib/mempool/rte_mempool.h | 287 ++++++++++++------
> 10 files changed, 232 insertions(+), 225 deletions(-)
>
> diff --git a/config/rte_config.h b/config/rte_config.h
> index dd7bb0d35b..2488ff167d 100644
> --- a/config/rte_config.h
> +++ b/config/rte_config.h
> @@ -56,7 +56,7 @@
> #define RTE_CONTIGMEM_DEFAULT_BUF_SIZE (512*1024*1024)
>
> /* mempool defines */
> -#define RTE_MEMPOOL_CACHE_MAX_SIZE 512
> +#define RTE_MEMPOOL_CACHE_MAX_SIZE 1024
> /* RTE_LIBRTE_MEMPOOL_STATS is not set */
> /* RTE_LIBRTE_MEMPOOL_DEBUG is not set */
>
> diff --git a/drivers/common/idpf/idpf_common_rxtx_avx512.c
> b/drivers/common/idpf/idpf_common_rxtx_avx512.c
> index 3b5e124ec8..98535a48f3 100644
> --- a/drivers/common/idpf/idpf_common_rxtx_avx512.c
> +++ b/drivers/common/idpf/idpf_common_rxtx_avx512.c
> @@ -1024,21 +1024,13 @@ idpf_tx_singleq_free_bufs_avx512(struct
> idpf_tx_queue *txq)
>
> rte_lcore_id());
> void **cache_objs;
>
> - if (cache == NULL || cache->len == 0)
> - goto normal;
> -
> - cache_objs = &cache->objs[cache->len];
> -
> - if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
> - rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);
> + if (!cache || unlikely(n + cache->len > cache->size)) {
> + rte_mempool_generic_put(mp, (void *)txep, n,
> cache);
> goto done;
> }
>
> - /* The cache follows the following algorithm
> - * 1. Add the objects to the cache
> - * 2. Anything greater than the cache min value (if it
> crosses the
> - * cache flush threshold) is flushed to the ring.
> - */
> + cache_objs = &cache->objs[cache->len];
> +
> /* Add elements back into the cache */
> uint32_t copied = 0;
> /* n is multiple of 32 */
> @@ -1056,16 +1048,13 @@ idpf_tx_singleq_free_bufs_avx512(struct
> idpf_tx_queue *txq)
> }
> cache->len += n;
>
> - if (cache->len >= cache->flushthresh) {
> - rte_mempool_ops_enqueue_bulk(mp,
> -
> &cache->objs[cache->size],
> - cache->len -
> cache->size);
> - cache->len = cache->size;
> - }
> + /* Increment stat. */
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
> +
> goto done;
> }
>
> -normal:
> m = rte_pktmbuf_prefree_seg(txep[0].mbuf);
> if (likely(m != NULL)) {
> free[0] = m;
> @@ -1335,21 +1324,13 @@ idpf_tx_splitq_free_bufs_avx512(struct
> idpf_tx_queue *txq)
>
> rte_lcore_id());
> void **cache_objs;
>
> - if (!cache || cache->len == 0)
> - goto normal;
> -
> - cache_objs = &cache->objs[cache->len];
> -
> - if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
> - rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);
> + if (!cache || unlikely(n + cache->len > cache->size)) {
> + rte_mempool_generic_put(mp, (void *)txep, n,
> cache);
> goto done;
> }
>
> - /* The cache follows the following algorithm
> - * 1. Add the objects to the cache
> - * 2. Anything greater than the cache min value (if it
> crosses the
> - * cache flush threshold) is flushed to the ring.
> - */
> + cache_objs = &cache->objs[cache->len];
> +
> /* Add elements back into the cache */
> uint32_t copied = 0;
> /* n is multiple of 32 */
> @@ -1367,16 +1348,13 @@ idpf_tx_splitq_free_bufs_avx512(struct
> idpf_tx_queue *txq)
> }
> cache->len += n;
>
> - if (cache->len >= cache->flushthresh) {
> - rte_mempool_ops_enqueue_bulk(mp,
> -
> &cache->objs[cache->size],
> - cache->len -
> cache->size);
> - cache->len = cache->size;
> - }
> + /* Increment stat. */
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
> +
> goto done;
> }
>
> -normal:
> m = rte_pktmbuf_prefree_seg(txep[0].mbuf);
> if (likely(m)) {
> free[0] = m;
> diff --git a/drivers/mempool/dpaa/dpaa_mempool.c
> b/drivers/mempool/dpaa/dpaa_mempool.c
> index 74bfcab509..3a936826c8 100644
> --- a/drivers/mempool/dpaa/dpaa_mempool.c
> +++ b/drivers/mempool/dpaa/dpaa_mempool.c
> @@ -51,8 +51,6 @@ dpaa_mbuf_create_pool(struct rte_mempool *mp)
> struct bman_pool_params params = {
> .flags = BMAN_POOL_FLAG_DYNAMIC_BPID
> };
> - unsigned int lcore_id;
> - struct rte_mempool_cache *cache;
>
> MEMPOOL_INIT_FUNC_TRACE();
>
> @@ -120,18 +118,6 @@ dpaa_mbuf_create_pool(struct rte_mempool *mp)
> rte_memcpy(bp_info, (void *)&rte_dpaa_bpid_info[bpid],
> sizeof(struct dpaa_bp_info));
> mp->pool_data = (void *)bp_info;
> - /* Update per core mempool cache threshold to optimal value which
> is
> - * number of buffers that can be released to HW buffer pool in
> - * a single API call.
> - */
> - for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
> - cache = &mp->local_cache[lcore_id];
> - DPAA_MEMPOOL_DEBUG("lCore %d: cache->flushthresh %d -> %d",
> - lcore_id, cache->flushthresh,
> - (uint32_t)(cache->size + DPAA_MBUF_MAX_ACQ_REL));
> - if (cache->flushthresh)
> - cache->flushthresh = cache->size +
> DPAA_MBUF_MAX_ACQ_REL;
> - }
>
> DPAA_MEMPOOL_INFO("BMAN pool created for bpid =%d", bpid);
> return 0;
> @@ -234,7 +220,7 @@ dpaa_mbuf_alloc_bulk(struct rte_mempool *pool,
> DPAA_MEMPOOL_DPDEBUG("Request to alloc %d buffers in bpid = %d",
> count, bp_info->bpid);
>
> - if (unlikely(count >= (RTE_MEMPOOL_CACHE_MAX_SIZE * 2))) {
> + if (unlikely(count >= RTE_MEMPOOL_CACHE_MAX_SIZE)) {
> DPAA_MEMPOOL_ERR("Unable to allocate requested (%u)
> buffers",
> count);
> return -1;
> diff --git a/drivers/mempool/dpaa2/dpaa2_hw_mempool.c
> b/drivers/mempool/dpaa2/dpaa2_hw_mempool.c
> index 42e17d984c..a44f3cf616 100644
> --- a/drivers/mempool/dpaa2/dpaa2_hw_mempool.c
> +++ b/drivers/mempool/dpaa2/dpaa2_hw_mempool.c
> @@ -44,8 +44,6 @@ rte_hw_mbuf_create_pool(struct rte_mempool *mp)
> struct dpaa2_bp_info *bp_info;
> struct dpbp_attr dpbp_attr;
> uint32_t bpid;
> - unsigned int lcore_id;
> - struct rte_mempool_cache *cache;
> int ret;
>
> avail_dpbp = dpaa2_alloc_dpbp_dev();
> @@ -134,18 +132,6 @@ rte_hw_mbuf_create_pool(struct rte_mempool *mp)
> DPAA2_MEMPOOL_DEBUG("BP List created for bpid =%d",
> dpbp_attr.bpid);
>
> h_bp_list = bp_list;
> - /* Update per core mempool cache threshold to optimal value which
> is
> - * number of buffers that can be released to HW buffer pool in
> - * a single API call.
> - */
> - for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
> - cache = &mp->local_cache[lcore_id];
> - DPAA2_MEMPOOL_DEBUG("lCore %d: cache->flushthresh %d ->
> %d",
> - lcore_id, cache->flushthresh,
> - (uint32_t)(cache->size + DPAA2_MBUF_MAX_ACQ_REL));
> - if (cache->flushthresh)
> - cache->flushthresh = cache->size +
> DPAA2_MBUF_MAX_ACQ_REL;
> - }
>
> return 0;
> err3:
> diff --git a/drivers/net/i40e/i40e_rxtx_vec_avx512.c
> b/drivers/net/i40e/i40e_rxtx_vec_avx512.c
> index 0238b03f8a..712ab1726f 100644
> --- a/drivers/net/i40e/i40e_rxtx_vec_avx512.c
> +++ b/drivers/net/i40e/i40e_rxtx_vec_avx512.c
> @@ -783,18 +783,13 @@ i40e_tx_free_bufs_avx512(struct i40e_tx_queue *txq)
> struct rte_mempool_cache *cache =
> rte_mempool_default_cache(mp,
> rte_lcore_id());
>
> - if (!cache || n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
> + if (!cache || unlikely(n + cache->len > cache->size)) {
> rte_mempool_generic_put(mp, (void *)txep, n,
> cache);
> goto done;
> }
>
> cache_objs = &cache->objs[cache->len];
>
> - /* The cache follows the following algorithm
> - * 1. Add the objects to the cache
> - * 2. Anything greater than the cache min value (if it
> - * crosses the cache flush threshold) is flushed to the
> ring.
> - */
> /* Add elements back into the cache */
> uint32_t copied = 0;
> /* n is multiple of 32 */
> @@ -812,12 +807,10 @@ i40e_tx_free_bufs_avx512(struct i40e_tx_queue *txq)
> }
> cache->len += n;
>
> - if (cache->len >= cache->flushthresh) {
> - rte_mempool_ops_enqueue_bulk
> - (mp, &cache->objs[cache->size],
> - cache->len - cache->size);
> - cache->len = cache->size;
> - }
> + /* Increment stat. */
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
> +
> goto done;
> }
>
> diff --git a/drivers/net/iavf/iavf_rxtx_vec_avx512.c
> b/drivers/net/iavf/iavf_rxtx_vec_avx512.c
> index 3bb6f305df..307bb8556a 100644
> --- a/drivers/net/iavf/iavf_rxtx_vec_avx512.c
> +++ b/drivers/net/iavf/iavf_rxtx_vec_avx512.c
> @@ -1873,21 +1873,13 @@ iavf_tx_free_bufs_avx512(struct iavf_tx_queue *txq)
>
> rte_lcore_id());
> void **cache_objs;
>
> - if (!cache || cache->len == 0)
> - goto normal;
> -
> - cache_objs = &cache->objs[cache->len];
> -
> - if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
> - rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);
> + if (!cache || unlikely(n + cache->len > cache->size)) {
> + rte_mempool_generic_put(mp, (void *)txep, n,
> cache);
> goto done;
> }
>
> - /* The cache follows the following algorithm
> - * 1. Add the objects to the cache
> - * 2. Anything greater than the cache min value (if it
> crosses the
> - * cache flush threshold) is flushed to the ring.
> - */
> + cache_objs = &cache->objs[cache->len];
> +
> /* Add elements back into the cache */
> uint32_t copied = 0;
> /* n is multiple of 32 */
> @@ -1905,16 +1897,13 @@ iavf_tx_free_bufs_avx512(struct iavf_tx_queue *txq)
> }
> cache->len += n;
>
> - if (cache->len >= cache->flushthresh) {
> - rte_mempool_ops_enqueue_bulk(mp,
> -
> &cache->objs[cache->size],
> - cache->len -
> cache->size);
> - cache->len = cache->size;
> - }
> + /* Increment stat. */
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
> +
> goto done;
> }
>
> -normal:
> m = rte_pktmbuf_prefree_seg(txep[0].mbuf);
> if (likely(m)) {
> free[0] = m;
> diff --git a/drivers/net/ice/ice_rxtx_vec_avx512.c
> b/drivers/net/ice/ice_rxtx_vec_avx512.c
> index 04148e8ea2..4ea1db734e 100644
> --- a/drivers/net/ice/ice_rxtx_vec_avx512.c
> +++ b/drivers/net/ice/ice_rxtx_vec_avx512.c
> @@ -888,21 +888,13 @@ ice_tx_free_bufs_avx512(struct ice_tx_queue *txq)
> struct rte_mempool_cache *cache =
> rte_mempool_default_cache(mp,
> rte_lcore_id());
>
> - if (!cache || cache->len == 0)
> - goto normal;
> -
> - cache_objs = &cache->objs[cache->len];
> -
> - if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
> - rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);
> + if (!cache || unlikely(n + cache->len > cache->size)) {
> + rte_mempool_generic_put(mp, (void *)txep, n,
> cache);
> goto done;
> }
>
> - /* The cache follows the following algorithm
> - * 1. Add the objects to the cache
> - * 2. Anything greater than the cache min value (if it
> - * crosses the cache flush threshold) is flushed to the
> ring.
> - */
> + cache_objs = &cache->objs[cache->len];
> +
> /* Add elements back into the cache */
> uint32_t copied = 0;
> /* n is multiple of 32 */
> @@ -920,16 +912,13 @@ ice_tx_free_bufs_avx512(struct ice_tx_queue *txq)
> }
> cache->len += n;
>
> - if (cache->len >= cache->flushthresh) {
> - rte_mempool_ops_enqueue_bulk
> - (mp, &cache->objs[cache->size],
> - cache->len - cache->size);
> - cache->len = cache->size;
> - }
> + /* Increment stat. */
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
> +
> goto done;
> }
>
> -normal:
> m = rte_pktmbuf_prefree_seg(txep[0].mbuf);
> if (likely(m)) {
> free[0] = m;
> diff --git a/lib/mempool/mempool_trace.h b/lib/mempool/mempool_trace.h
> index dffef062e4..3c49b41a6d 100644
> --- a/lib/mempool/mempool_trace.h
> +++ b/lib/mempool/mempool_trace.h
> @@ -112,7 +112,6 @@ RTE_TRACE_POINT(
> rte_trace_point_emit_i32(socket_id);
> rte_trace_point_emit_ptr(cache);
> rte_trace_point_emit_u32(cache->len);
> - rte_trace_point_emit_u32(cache->flushthresh);
> )
>
> RTE_TRACE_POINT(
> diff --git a/lib/mempool/rte_mempool.c b/lib/mempool/rte_mempool.c
> index d8e39e5c20..40fb13239a 100644
> --- a/lib/mempool/rte_mempool.c
> +++ b/lib/mempool/rte_mempool.c
> @@ -50,11 +50,6 @@ static void
> mempool_event_callback_invoke(enum rte_mempool_event event,
> struct rte_mempool *mp);
>
> -/* Note: avoid using floating point since that compiler
> - * may not think that is constant.
> - */
> -#define CALC_CACHE_FLUSHTHRESH(c) (((c) * 3) / 2)
> -
> #if defined(RTE_ARCH_X86)
> /*
> * return the greatest common divisor between a and b (fast algorithm)
> @@ -746,13 +741,12 @@ rte_mempool_free(struct rte_mempool *mp)
> static void
> mempool_cache_init(struct rte_mempool_cache *cache, uint32_t size)
> {
> - /* Check that cache have enough space for flush threshold */
> -
> RTE_BUILD_BUG_ON(CALC_CACHE_FLUSHTHRESH(RTE_MEMPOOL_CACHE_MAX_SIZE) >
> + /* Check that cache have enough space for size */
> + RTE_BUILD_BUG_ON(RTE_MEMPOOL_CACHE_MAX_SIZE >
> RTE_SIZEOF_FIELD(struct rte_mempool_cache, objs) /
> RTE_SIZEOF_FIELD(struct rte_mempool_cache,
> objs[0]));
>
> cache->size = size;
> - cache->flushthresh = CALC_CACHE_FLUSHTHRESH(size);
> cache->len = 0;
> }
>
> @@ -836,7 +830,7 @@ rte_mempool_create_empty(const char *name, unsigned n,
> unsigned elt_size,
>
> /* asked cache too big */
> if (cache_size > RTE_MEMPOOL_CACHE_MAX_SIZE ||
> - CALC_CACHE_FLUSHTHRESH(cache_size) > n) {
> + cache_size > n) {
> rte_errno = EINVAL;
> return NULL;
> }
> diff --git a/lib/mempool/rte_mempool.h b/lib/mempool/rte_mempool.h
> index 7bdc92b812..0801cec24a 100644
> --- a/lib/mempool/rte_mempool.h
> +++ b/lib/mempool/rte_mempool.h
> @@ -89,10 +89,8 @@ struct __rte_cache_aligned rte_mempool_debug_stats {
> */
> struct __rte_cache_aligned rte_mempool_cache {
> uint32_t size; /**< Size of the cache */
> - uint32_t flushthresh; /**< Threshold before we flush excess
> elements */
> uint32_t len; /**< Current cache count */
> #ifdef RTE_LIBRTE_MEMPOOL_STATS
> - uint32_t unused;
> /*
> * Alternative location for the most frequently updated mempool
> statistics (per-lcore),
> * providing faster update access when using a mempool cache.
> @@ -110,7 +108,7 @@ struct __rte_cache_aligned rte_mempool_cache {
> * Cache is allocated to this size to allow it to overflow in
> certain
> * cases to avoid needless emptying of cache.
> */
> - alignas(RTE_CACHE_LINE_SIZE) void *objs[RTE_MEMPOOL_CACHE_MAX_SIZE
> * 2];
> + alignas(RTE_CACHE_LINE_SIZE) void
> *objs[RTE_MEMPOOL_CACHE_MAX_SIZE];
> };
>
> /**
> @@ -1362,6 +1360,48 @@ rte_mempool_cache_flush(struct rte_mempool_cache
> *cache,
> cache->len = 0;
> }
>
> +/**
> + * @internal Put several objects back in the mempool; used internally when
> + * the number of objects exceeds the remaining space in the mempool
> cache.
> + * @param mp
> + * A pointer to the mempool structure.
> + * @param obj_table
> + * A pointer to a table of void * pointers (objects).
> + * @param n
> + * The number of objects to store back in the mempool, must be strictly
> + * positive.
> + * Must be more than the remaining space in the mempool cache, i.e.:
> + * cache->len + n > cache->size
> + * Must be less than the size of the mempool cache, i.e.:
> + * n < cache->size
> + * @param cache
> + * A pointer to a mempool cache structure. Not NULL.
> + */
> +static void
> +rte_mempool_do_generic_put_split(struct rte_mempool *mp, void * const
> *obj_table,
> + unsigned int n, struct rte_mempool_cache * const cache)
> +{
> + void **cache_objs;
> + unsigned int len;
> + const uint32_t cache_size = cache->size;
> +
> + /* Fill the cache with the first objects. */
> + cache_objs = &cache->objs[cache->len];
> + len = (cache_size - cache->len);
> + cache->len = n - len; /* Moved to here (for performance). */
> + /* rte_ */ memcpy(cache_objs, obj_table, sizeof(void *) * len);
> + obj_table += len;
> + n -= len;
> +
> + /* Flush the entire cache to the backend. */
> + cache_objs = &cache->objs[0];
> + rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache_size);
> +
> + /* Add the remaining objects to the cache. */
> + /* Moved from here (for performance): cache->len = n; */
> + /* rte_ */ memcpy(cache_objs, obj_table, sizeof(void *) * n);
> +}
> +
> /**
> * @internal Put several objects back in the mempool; used internally.
> * @param mp
> @@ -1376,52 +1416,44 @@ rte_mempool_cache_flush(struct rte_mempool_cache
> *cache,
> */
> static __rte_always_inline void
> rte_mempool_do_generic_put(struct rte_mempool *mp, void * const
> *obj_table,
> - unsigned int n, struct rte_mempool_cache *cache)
> + unsigned int n, struct rte_mempool_cache *
> const cache)
> {
> - void **cache_objs;
> -
> - /* No cache provided */
> + /* No cache provided? */
> if (unlikely(cache == NULL))
> goto driver_enqueue;
>
> - /* increment stat now, adding in mempool always success */
> + /* Increment stats now, adding in mempool always succeeds. */
> RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
> RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
>
> - /* The request itself is too big for the cache */
> - if (unlikely(n > cache->flushthresh))
> + /* The request itself is known to be too big for any cache? */
> + if (__rte_constant(n) && n >= RTE_MEMPOOL_CACHE_MAX_SIZE)
> goto driver_enqueue_stats_incremented;
>
> - /*
> - * The cache follows the following algorithm:
> - * 1. If the objects cannot be added to the cache without
> crossing
> - * the flush threshold, flush the cache to the backend.
> - * 2. Add the objects to the cache.
> - */
> + /* Enough remaining space in the cache? */
> + if (likely(cache->len + n <= cache->size)) {
> + void **cache_objs;
>
> - if (cache->len + n <= cache->flushthresh) {
> + /* Add the objects to the cache. */
> cache_objs = &cache->objs[cache->len];
> cache->len += n;
> - } else {
> - cache_objs = &cache->objs[0];
> - rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache->len);
> - cache->len = n;
> - }
> -
> - /* Add the objects to the cache. */
> - rte_memcpy(cache_objs, obj_table, sizeof(void *) * n);
> + rte_memcpy(cache_objs, obj_table, sizeof(void *) * n);
> + } else if (likely(n < cache->size))
> + rte_mempool_do_generic_put_split(mp, obj_table, n, cache);
> + else
> + goto driver_enqueue_stats_incremented;
>
> return;
>
> driver_enqueue:
>
> - /* increment stat now, adding in mempool always success */
> + /* Increment stats now, adding in mempool always succeeds. */
> RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1);
> RTE_MEMPOOL_STAT_ADD(mp, put_objs, n);
>
> driver_enqueue_stats_incremented:
>
> - /* push objects to the backend */
> + /* Push the objects directly to the backend. */
> rte_mempool_ops_enqueue_bulk(mp, obj_table, n);
> }
>
> @@ -1490,122 +1522,183 @@ rte_mempool_put(struct rte_mempool *mp, void
> *obj)
> }
>
> /**
> - * @internal Get several objects from the mempool; used internally.
> + * @internal Get several objects from the mempool; used internally when
> + * the number of objects exceeds what is available in the mempool cache.
> * @param mp
> * A pointer to the mempool structure.
> * @param obj_table
> * A pointer to a table of void * pointers (objects).
> * @param n
> * The number of objects to get, must be strictly positive.
> + * Must be more than available in the mempool cache, i.e.:
> + * n > cache->len
> * @param cache
> - * A pointer to a mempool cache structure. May be NULL if not needed.
> + * A pointer to a mempool cache structure. Not NULL.
> * @return
> * - 0: Success.
> * - <0: Error; code of driver dequeue function.
> */
> -static __rte_always_inline int
> -rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table,
> - unsigned int n, struct rte_mempool_cache *cache)
> +static int
> +rte_mempool_do_generic_get_split(struct rte_mempool *mp, void **obj_table,
> + unsigned int n, struct rte_mempool_cache * const cache)
> {
> int ret;
> unsigned int remaining;
> uint32_t index, len;
> void **cache_objs;
> + const uint32_t cache_size = cache->size;
>
> - /* No cache provided */
> - if (unlikely(cache == NULL)) {
> - remaining = n;
> - goto driver_dequeue;
> - }
> -
> - /* The cache is a stack, so copy will be in reverse order. */
> + /* Serve the first part of the request from the cache to return
> hot objects first. */
> cache_objs = &cache->objs[cache->len];
> + len = cache->len;
> + remaining = n - len;
> + for (index = 0; index < len; index++)
> + *obj_table++ = *--cache_objs;
>
> - if (__rte_constant(n) && n <= cache->len) {
> + /* At this point, the cache is empty. */
> +
> + /* More than can be served from a full cache? */
> + if (unlikely(remaining >= cache_size)) {
> /*
> - * The request size is known at build time, and
> - * the entire request can be satisfied from the cache,
> - * so let the compiler unroll the fixed length copy loop.
> + * Serve the following part of the request directly from
> the backend
> + * in multipla of the cache size.
> */
> - cache->len -= n;
> - for (index = 0; index < n; index++)
> - *obj_table++ = *--cache_objs;
> + len = remaining - remaining % cache_size;
> + ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, len);
> + if (unlikely(ret < 0)) {
> + /*
> + * No further action is required to roll back the
> request,
> + * as objects in the cache are intact, and no
> objects have
> + * been dequeued from the backend.
> + */
>
> - RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
> - RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
> + RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
> + RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);
>
> - return 0;
> - }
> + return ret;
> + }
>
> - /*
> - * Use the cache as much as we have to return hot objects first.
> - * If the request size 'n' is known at build time, the above
> comparison
> - * ensures that n > cache->len here, so omit RTE_MIN().
> - */
> - len = __rte_constant(n) ? cache->len : RTE_MIN(n, cache->len);
> - cache->len -= len;
> - remaining = n - len;
> - for (index = 0; index < len; index++)
> - *obj_table++ = *--cache_objs;
> + remaining -= len;
> + obj_table += len;
>
> - /*
> - * If the request size 'n' is known at build time, the case
> - * where the entire request can be satisfied from the cache
> - * has already been handled above, so omit handling it here.
> - */
> - if (!__rte_constant(n) && remaining == 0) {
> - /* The entire request is satisfied from the cache. */
> + if (unlikely(remaining == 0)) {
> + cache->len = 0;
>
> - RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
> - RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache,
> get_success_bulk, 1);
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache,
> get_success_objs, n);
>
> - return 0;
> + return 0;
> + }
> }
>
> - /* if dequeue below would overflow mem allocated for cache */
> - if (unlikely(remaining > RTE_MEMPOOL_CACHE_MAX_SIZE))
> - goto driver_dequeue;
> -
> - /* Fill the cache from the backend; fetch size + remaining
> objects. */
> - ret = rte_mempool_ops_dequeue_bulk(mp, cache->objs,
> - cache->size + remaining);
> + /* Fill the entire cache from the backend. */
> + ret = rte_mempool_ops_dequeue_bulk(mp, cache->objs, cache_size);
> if (unlikely(ret < 0)) {
> /*
> - * We are buffer constrained, and not able to allocate
> - * cache + remaining.
> - * Do not fill the cache, just satisfy the remaining part
> of
> - * the request directly from the backend.
> + * Unable to fill the cache.
> + * Last resort: Try only the remaining part of the request,
> + * served directly from the backend.
> */
> - goto driver_dequeue;
> + ret = rte_mempool_ops_dequeue_bulk(mp, obj_table,
> remaining);
> + if (unlikely(ret == 0)) {
> + cache->len = 0;
> +
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache,
> get_success_bulk, 1);
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache,
> get_success_objs, n);
> +
> + return 0;
> + }
> +
> + /* Roll back. */
> + if (cache->len + remaining == n) {
> + /*
> + * No further action is required to roll back the
> request,
> + * as objects in the cache are intact, and no
> objects have
> + * been dequeued from the backend.
> + */
> + } else {
> + /* Update the state of the cache before putting
> back the objects. */
> + cache->len = 0;
> +
> + len = n - remaining;
> + obj_table -= len;
> + rte_mempool_do_generic_put(mp, obj_table, len,
> cache);
> + }
> +
> + RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
> + RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);
> +
> + return ret;
> }
>
> - /* Satisfy the remaining part of the request from the filled
> cache. */
> - cache_objs = &cache->objs[cache->size + remaining];
> + /* Serve the remaining part of the request from the filled cache.
> */
> + cache_objs = &cache->objs[cache_size];
> for (index = 0; index < remaining; index++)
> *obj_table++ = *--cache_objs;
>
> - cache->len = cache->size;
> + cache->len = cache_size - remaining;
>
> RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
> RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
>
> return 0;
> +}
>
> -driver_dequeue:
> +/**
> + * @internal Get several objects from the mempool; used internally.
> + * @param mp
> + * A pointer to the mempool structure.
> + * @param obj_table
> + * A pointer to a table of void * pointers (objects).
> + * @param n
> + * The number of objects to get, must be strictly positive.
> + * @param cache
> + * A pointer to a mempool cache structure. May be NULL if not needed.
> + * @return
> + * - 0: Success.
> + * - <0: Error; code of driver dequeue function.
> + */
> +static __rte_always_inline int
> +rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table,
> + unsigned int n, struct rte_mempool_cache *
> const cache)
> +{
> + int ret;
>
> - /* Get remaining objects directly from the backend. */
> - ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, remaining);
> + /* No cache provided? */
> + if (unlikely(cache == NULL))
> + goto driver_dequeue;
>
> - if (ret < 0) {
> - if (likely(cache != NULL)) {
> - cache->len = n - remaining;
> - /*
> - * No further action is required to roll the first
> part
> - * of the request back into the cache, as objects
> in
> - * the cache are intact.
> - */
> - }
> + /* The request itself is known to be too big for any cache? */
> + if (__rte_constant(n) && n >= RTE_MEMPOOL_CACHE_MAX_SIZE)
> + goto driver_dequeue;
> +
> + /* The request can be served entirely from the cache? */
> + if (likely(n <= cache->len)) {
> + unsigned int index;
> + void **cache_objs;
>
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
> + RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
> +
> + /*
> + * The cache is a stack, so copy will be in reverse order.
> + * If the request size is known at build time,
> + * the compiler will unroll the fixed length copy loop.
> + */
> + cache_objs = &cache->objs[cache->len];
> + cache->len -= n;
> + for (index = 0; index < n; index++)
> + *obj_table++ = *--cache_objs;
> +
> + return 0;
> + } else
> + return rte_mempool_do_generic_get_split(mp, obj_table, n,
> cache);
> +
> +driver_dequeue:
> +
> + /* Get the objects directly from the backend. */
> + ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, n);
> + if (unlikely(ret < 0)) {
> RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
> RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);
> } else {
> --
> 2.43.0
>
>
[-- Attachment #2: Type: text/html, Size: 44870 bytes --]
next prev parent reply other threads:[~2024-09-24 20:45 UTC|newest]
Thread overview: 19+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-09-20 16:32 [RFC PATCH] mempool: obey configured " Morten Brørup
2024-09-20 16:37 ` [RFC PATCH v2] " Morten Brørup
2024-09-20 17:13 ` [RFC PATCH v3] " Morten Brørup
2024-09-20 19:41 ` Mattias Rönnblom
2024-09-22 10:50 ` [RFC PATCH v4] mempool: fix mempool " Morten Brørup
2024-09-24 3:58 ` [RFC PATCH v5] " Morten Brørup
2024-09-24 11:58 ` [RFC PATCH v6] " Morten Brørup
2024-09-24 18:12 ` [RFC PATCH v7] " Morten Brørup
2024-09-24 20:44 ` Patrick Robb [this message]
2024-09-25 21:33 ` [RFC PATCH v8] " Morten Brørup
2024-09-26 18:24 ` [RFC PATCH v9] " Morten Brørup
2024-09-26 20:53 ` [RFC PATCH v10] " Morten Brørup
2024-09-28 17:32 ` [RFC PATCH v11] " Morten Brørup
2024-09-28 19:38 ` [RFC PATCH v12] " Morten Brørup
2024-10-01 11:33 ` [RFC PATCH v13] " Morten Brørup
2024-10-01 13:41 ` [RFC PATCH v14] " Morten Brørup
2024-10-02 11:25 ` [RFC PATCH v15] " Morten Brørup
2024-10-02 14:35 ` [RFC PATCH v16] " Morten Brørup
2024-10-02 15:00 ` [RFC PATCH v17] " Morten Brørup
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to='CAJvnSUBncAuQN25DAuVWqJhN_55im6U7bXthM89TaOOb-su0=w@mail.gmail.com' \
--to=probb@iol.unh.edu \
--cc=dev@dpdk.org \
--cc=hofors@lysator.liu.se \
--cc=mb@smartsharesystems.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).