DPDK patches and discussions
 help / color / mirror / Atom feed
From: "Mattias Rönnblom" <hofors@lysator.liu.se>
To: "Morten Brørup" <mb@smartsharesystems.com>, dev@dpdk.org
Subject: Re: [RFC PATCH v3] mempool: obey configured cache size
Date: Fri, 20 Sep 2024 21:41:11 +0200	[thread overview]
Message-ID: <acf249cd-3aea-4520-bad8-11e565da3199@lysator.liu.se> (raw)
In-Reply-To: <20240920171350.841532-1-mb@smartsharesystems.com>

On 2024-09-20 19:13, Morten Brørup wrote:
> Seeking feedback on the concept.
> I have not yet benchmarked performance.
> 
> The mempool cache size is configurable, but it could hold 1.5 times the
> configured size.
> This was confusing for developers, and added complexity when configuring
> mempools with caches.
> 

Is there an upside to the current semantics? Are non-power-2 
rings/stacks more costly to operate against?

> This patch modifies the mempool cache to obey the configured size, and
> removes the cache flush threshold.
> 

Maybe it would be worth mentioning what was the original purpose of the 
threshold (if known), and why it's no longer a good idea (if it ever was).

> Furthermore, the mempool caches are now completely flushed/filled to/from
> the backend, so backend accesses are CPU cache line aligned.
> 
> Finallly, the mempool get and put functions are optimized to only
> inline the likely scenarios, and call a non-inline static helper function
> in other cases.

What do you mean by "inline" here? Not in the header file, not marked 
"inline", or something else. (I've read the code so I know what you 
mean, but it should preferably be clear already in the commit message.)

Being in the header file does force the compiler to inline functions, 
and being in a .c file does not prevent inlining, in case LTO is used.

> 
> Variuos drivers accessing the mempool directly have been updated
> accordingly.
> 
> Signed-off-by: Morten Brørup <mb@smartsharesystems.com>
> ---
> v3:
> * Removed __attribute__(assume).
> v2:
> * Removed mempool perf test; not part of patch set.
> ---
>   drivers/common/idpf/idpf_common_rxtx_avx512.c |  54 +--
>   drivers/mempool/dpaa/dpaa_mempool.c           |  16 +-
>   drivers/mempool/dpaa2/dpaa2_hw_mempool.c      |  14 -
>   drivers/net/i40e/i40e_rxtx_vec_avx512.c       |  17 +-
>   drivers/net/iavf/iavf_rxtx_vec_avx512.c       |  27 +-
>   drivers/net/ice/ice_rxtx_vec_avx512.c         |  27 +-
>   lib/mempool/mempool_trace.h                   |   1 -
>   lib/mempool/rte_mempool.c                     |  12 +-
>   lib/mempool/rte_mempool.h                     | 321 +++++++++++-------
>   9 files changed, 240 insertions(+), 249 deletions(-)
> 
> diff --git a/drivers/common/idpf/idpf_common_rxtx_avx512.c b/drivers/common/idpf/idpf_common_rxtx_avx512.c
> index 3b5e124ec8..98535a48f3 100644
> --- a/drivers/common/idpf/idpf_common_rxtx_avx512.c
> +++ b/drivers/common/idpf/idpf_common_rxtx_avx512.c
> @@ -1024,21 +1024,13 @@ idpf_tx_singleq_free_bufs_avx512(struct idpf_tx_queue *txq)
>   								rte_lcore_id());
>   		void **cache_objs;
>   
> -		if (cache == NULL || cache->len == 0)
> -			goto normal;
> -
> -		cache_objs = &cache->objs[cache->len];
> -
> -		if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
> -			rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);
> +		if (!cache || unlikely(n + cache->len > cache->size)) {
> +			rte_mempool_generic_put(mp, (void *)txep, n, cache);
>   			goto done;
>   		}
>   
> -		/* The cache follows the following algorithm
> -		 *   1. Add the objects to the cache
> -		 *   2. Anything greater than the cache min value (if it crosses the
> -		 *   cache flush threshold) is flushed to the ring.
> -		 */
> +		cache_objs = &cache->objs[cache->len];
> +
>   		/* Add elements back into the cache */
>   		uint32_t copied = 0;
>   		/* n is multiple of 32 */
> @@ -1056,16 +1048,13 @@ idpf_tx_singleq_free_bufs_avx512(struct idpf_tx_queue *txq)
>   		}
>   		cache->len += n;
>   
> -		if (cache->len >= cache->flushthresh) {
> -			rte_mempool_ops_enqueue_bulk(mp,
> -						     &cache->objs[cache->size],
> -						     cache->len - cache->size);
> -			cache->len = cache->size;
> -		}
> +		/* Increment stat. */
> +		RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
> +		RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
> +
>   		goto done;
>   	}
>   
> -normal:
>   	m = rte_pktmbuf_prefree_seg(txep[0].mbuf);
>   	if (likely(m != NULL)) {
>   		free[0] = m;
> @@ -1335,21 +1324,13 @@ idpf_tx_splitq_free_bufs_avx512(struct idpf_tx_queue *txq)
>   								rte_lcore_id());
>   		void **cache_objs;
>   
> -		if (!cache || cache->len == 0)
> -			goto normal;
> -
> -		cache_objs = &cache->objs[cache->len];
> -
> -		if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
> -			rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);
> +		if (!cache || unlikely(n + cache->len > cache->size)) {
> +			rte_mempool_generic_put(mp, (void *)txep, n, cache);
>   			goto done;
>   		}
>   
> -		/* The cache follows the following algorithm
> -		 *   1. Add the objects to the cache
> -		 *   2. Anything greater than the cache min value (if it crosses the
> -		 *   cache flush threshold) is flushed to the ring.
> -		 */
> +		cache_objs = &cache->objs[cache->len];
> +
>   		/* Add elements back into the cache */
>   		uint32_t copied = 0;
>   		/* n is multiple of 32 */
> @@ -1367,16 +1348,13 @@ idpf_tx_splitq_free_bufs_avx512(struct idpf_tx_queue *txq)
>   		}
>   		cache->len += n;
>   
> -		if (cache->len >= cache->flushthresh) {
> -			rte_mempool_ops_enqueue_bulk(mp,
> -						     &cache->objs[cache->size],
> -						     cache->len - cache->size);
> -			cache->len = cache->size;
> -		}
> +		/* Increment stat. */
> +		RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
> +		RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
> +
>   		goto done;
>   	}
>   
> -normal:
>   	m = rte_pktmbuf_prefree_seg(txep[0].mbuf);
>   	if (likely(m)) {
>   		free[0] = m;
> diff --git a/drivers/mempool/dpaa/dpaa_mempool.c b/drivers/mempool/dpaa/dpaa_mempool.c
> index 74bfcab509..3a936826c8 100644
> --- a/drivers/mempool/dpaa/dpaa_mempool.c
> +++ b/drivers/mempool/dpaa/dpaa_mempool.c
> @@ -51,8 +51,6 @@ dpaa_mbuf_create_pool(struct rte_mempool *mp)
>   	struct bman_pool_params params = {
>   		.flags = BMAN_POOL_FLAG_DYNAMIC_BPID
>   	};
> -	unsigned int lcore_id;
> -	struct rte_mempool_cache *cache;
>   
>   	MEMPOOL_INIT_FUNC_TRACE();
>   
> @@ -120,18 +118,6 @@ dpaa_mbuf_create_pool(struct rte_mempool *mp)
>   	rte_memcpy(bp_info, (void *)&rte_dpaa_bpid_info[bpid],
>   		   sizeof(struct dpaa_bp_info));
>   	mp->pool_data = (void *)bp_info;
> -	/* Update per core mempool cache threshold to optimal value which is
> -	 * number of buffers that can be released to HW buffer pool in
> -	 * a single API call.
> -	 */
> -	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
> -		cache = &mp->local_cache[lcore_id];
> -		DPAA_MEMPOOL_DEBUG("lCore %d: cache->flushthresh %d -> %d",
> -			lcore_id, cache->flushthresh,
> -			(uint32_t)(cache->size + DPAA_MBUF_MAX_ACQ_REL));
> -		if (cache->flushthresh)
> -			cache->flushthresh = cache->size + DPAA_MBUF_MAX_ACQ_REL;
> -	}
>   
>   	DPAA_MEMPOOL_INFO("BMAN pool created for bpid =%d", bpid);
>   	return 0;
> @@ -234,7 +220,7 @@ dpaa_mbuf_alloc_bulk(struct rte_mempool *pool,
>   	DPAA_MEMPOOL_DPDEBUG("Request to alloc %d buffers in bpid = %d",
>   			     count, bp_info->bpid);
>   
> -	if (unlikely(count >= (RTE_MEMPOOL_CACHE_MAX_SIZE * 2))) {
> +	if (unlikely(count >= RTE_MEMPOOL_CACHE_MAX_SIZE)) {
>   		DPAA_MEMPOOL_ERR("Unable to allocate requested (%u) buffers",
>   				 count);
>   		return -1;
> diff --git a/drivers/mempool/dpaa2/dpaa2_hw_mempool.c b/drivers/mempool/dpaa2/dpaa2_hw_mempool.c
> index 42e17d984c..a44f3cf616 100644
> --- a/drivers/mempool/dpaa2/dpaa2_hw_mempool.c
> +++ b/drivers/mempool/dpaa2/dpaa2_hw_mempool.c
> @@ -44,8 +44,6 @@ rte_hw_mbuf_create_pool(struct rte_mempool *mp)
>   	struct dpaa2_bp_info *bp_info;
>   	struct dpbp_attr dpbp_attr;
>   	uint32_t bpid;
> -	unsigned int lcore_id;
> -	struct rte_mempool_cache *cache;
>   	int ret;
>   
>   	avail_dpbp = dpaa2_alloc_dpbp_dev();
> @@ -134,18 +132,6 @@ rte_hw_mbuf_create_pool(struct rte_mempool *mp)
>   	DPAA2_MEMPOOL_DEBUG("BP List created for bpid =%d", dpbp_attr.bpid);
>   
>   	h_bp_list = bp_list;
> -	/* Update per core mempool cache threshold to optimal value which is
> -	 * number of buffers that can be released to HW buffer pool in
> -	 * a single API call.
> -	 */
> -	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
> -		cache = &mp->local_cache[lcore_id];
> -		DPAA2_MEMPOOL_DEBUG("lCore %d: cache->flushthresh %d -> %d",
> -			lcore_id, cache->flushthresh,
> -			(uint32_t)(cache->size + DPAA2_MBUF_MAX_ACQ_REL));
> -		if (cache->flushthresh)
> -			cache->flushthresh = cache->size + DPAA2_MBUF_MAX_ACQ_REL;
> -	}
>   
>   	return 0;
>   err3:
> diff --git a/drivers/net/i40e/i40e_rxtx_vec_avx512.c b/drivers/net/i40e/i40e_rxtx_vec_avx512.c
> index 0238b03f8a..712ab1726f 100644
> --- a/drivers/net/i40e/i40e_rxtx_vec_avx512.c
> +++ b/drivers/net/i40e/i40e_rxtx_vec_avx512.c
> @@ -783,18 +783,13 @@ i40e_tx_free_bufs_avx512(struct i40e_tx_queue *txq)
>   		struct rte_mempool_cache *cache = rte_mempool_default_cache(mp,
>   				rte_lcore_id());
>   
> -		if (!cache || n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
> +		if (!cache || unlikely(n + cache->len > cache->size)) {
>   			rte_mempool_generic_put(mp, (void *)txep, n, cache);
>   			goto done;
>   		}
>   
>   		cache_objs = &cache->objs[cache->len];
>   
> -		/* The cache follows the following algorithm
> -		 *   1. Add the objects to the cache
> -		 *   2. Anything greater than the cache min value (if it
> -		 *   crosses the cache flush threshold) is flushed to the ring.
> -		 */
>   		/* Add elements back into the cache */
>   		uint32_t copied = 0;
>   		/* n is multiple of 32 */
> @@ -812,12 +807,10 @@ i40e_tx_free_bufs_avx512(struct i40e_tx_queue *txq)
>   		}
>   		cache->len += n;
>   
> -		if (cache->len >= cache->flushthresh) {
> -			rte_mempool_ops_enqueue_bulk
> -				(mp, &cache->objs[cache->size],
> -				cache->len - cache->size);
> -			cache->len = cache->size;
> -		}
> +		/* Increment stat. */
> +		RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
> +		RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
> +
>   		goto done;
>   	}
>   
> diff --git a/drivers/net/iavf/iavf_rxtx_vec_avx512.c b/drivers/net/iavf/iavf_rxtx_vec_avx512.c
> index 3bb6f305df..307bb8556a 100644
> --- a/drivers/net/iavf/iavf_rxtx_vec_avx512.c
> +++ b/drivers/net/iavf/iavf_rxtx_vec_avx512.c
> @@ -1873,21 +1873,13 @@ iavf_tx_free_bufs_avx512(struct iavf_tx_queue *txq)
>   								rte_lcore_id());
>   		void **cache_objs;
>   
> -		if (!cache || cache->len == 0)
> -			goto normal;
> -
> -		cache_objs = &cache->objs[cache->len];
> -
> -		if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
> -			rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);
> +		if (!cache || unlikely(n + cache->len > cache->size)) {
> +			rte_mempool_generic_put(mp, (void *)txep, n, cache);
>   			goto done;
>   		}
>   
> -		/* The cache follows the following algorithm
> -		 *   1. Add the objects to the cache
> -		 *   2. Anything greater than the cache min value (if it crosses the
> -		 *   cache flush threshold) is flushed to the ring.
> -		 */
> +		cache_objs = &cache->objs[cache->len];
> +
>   		/* Add elements back into the cache */
>   		uint32_t copied = 0;
>   		/* n is multiple of 32 */
> @@ -1905,16 +1897,13 @@ iavf_tx_free_bufs_avx512(struct iavf_tx_queue *txq)
>   		}
>   		cache->len += n;
>   
> -		if (cache->len >= cache->flushthresh) {
> -			rte_mempool_ops_enqueue_bulk(mp,
> -						     &cache->objs[cache->size],
> -						     cache->len - cache->size);
> -			cache->len = cache->size;
> -		}
> +		/* Increment stat. */
> +		RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
> +		RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
> +
>   		goto done;
>   	}
>   
> -normal:
>   	m = rte_pktmbuf_prefree_seg(txep[0].mbuf);
>   	if (likely(m)) {
>   		free[0] = m;
> diff --git a/drivers/net/ice/ice_rxtx_vec_avx512.c b/drivers/net/ice/ice_rxtx_vec_avx512.c
> index 04148e8ea2..4ea1db734e 100644
> --- a/drivers/net/ice/ice_rxtx_vec_avx512.c
> +++ b/drivers/net/ice/ice_rxtx_vec_avx512.c
> @@ -888,21 +888,13 @@ ice_tx_free_bufs_avx512(struct ice_tx_queue *txq)
>   		struct rte_mempool_cache *cache = rte_mempool_default_cache(mp,
>   				rte_lcore_id());
>   
> -		if (!cache || cache->len == 0)
> -			goto normal;
> -
> -		cache_objs = &cache->objs[cache->len];
> -
> -		if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
> -			rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);
> +		if (!cache || unlikely(n + cache->len > cache->size)) {
> +			rte_mempool_generic_put(mp, (void *)txep, n, cache);
>   			goto done;
>   		}
>   
> -		/* The cache follows the following algorithm
> -		 *   1. Add the objects to the cache
> -		 *   2. Anything greater than the cache min value (if it
> -		 *   crosses the cache flush threshold) is flushed to the ring.
> -		 */
> +		cache_objs = &cache->objs[cache->len];
> +
>   		/* Add elements back into the cache */
>   		uint32_t copied = 0;
>   		/* n is multiple of 32 */
> @@ -920,16 +912,13 @@ ice_tx_free_bufs_avx512(struct ice_tx_queue *txq)
>   		}
>   		cache->len += n;
>   
> -		if (cache->len >= cache->flushthresh) {
> -			rte_mempool_ops_enqueue_bulk
> -				(mp, &cache->objs[cache->size],
> -				 cache->len - cache->size);
> -			cache->len = cache->size;
> -		}
> +		/* Increment stat. */
> +		RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
> +		RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
> +
>   		goto done;
>   	}
>   
> -normal:
>   	m = rte_pktmbuf_prefree_seg(txep[0].mbuf);
>   	if (likely(m)) {
>   		free[0] = m;
> diff --git a/lib/mempool/mempool_trace.h b/lib/mempool/mempool_trace.h
> index dffef062e4..3c49b41a6d 100644
> --- a/lib/mempool/mempool_trace.h
> +++ b/lib/mempool/mempool_trace.h
> @@ -112,7 +112,6 @@ RTE_TRACE_POINT(
>   	rte_trace_point_emit_i32(socket_id);
>   	rte_trace_point_emit_ptr(cache);
>   	rte_trace_point_emit_u32(cache->len);
> -	rte_trace_point_emit_u32(cache->flushthresh);
>   )
>   
>   RTE_TRACE_POINT(
> diff --git a/lib/mempool/rte_mempool.c b/lib/mempool/rte_mempool.c
> index d8e39e5c20..40fb13239a 100644
> --- a/lib/mempool/rte_mempool.c
> +++ b/lib/mempool/rte_mempool.c
> @@ -50,11 +50,6 @@ static void
>   mempool_event_callback_invoke(enum rte_mempool_event event,
>   			      struct rte_mempool *mp);
>   
> -/* Note: avoid using floating point since that compiler
> - * may not think that is constant.
> - */
> -#define CALC_CACHE_FLUSHTHRESH(c) (((c) * 3) / 2)
> -
>   #if defined(RTE_ARCH_X86)
>   /*
>    * return the greatest common divisor between a and b (fast algorithm)
> @@ -746,13 +741,12 @@ rte_mempool_free(struct rte_mempool *mp)
>   static void
>   mempool_cache_init(struct rte_mempool_cache *cache, uint32_t size)
>   {
> -	/* Check that cache have enough space for flush threshold */
> -	RTE_BUILD_BUG_ON(CALC_CACHE_FLUSHTHRESH(RTE_MEMPOOL_CACHE_MAX_SIZE) >
> +	/* Check that cache have enough space for size */
> +	RTE_BUILD_BUG_ON(RTE_MEMPOOL_CACHE_MAX_SIZE >
>   			 RTE_SIZEOF_FIELD(struct rte_mempool_cache, objs) /
>   			 RTE_SIZEOF_FIELD(struct rte_mempool_cache, objs[0]));
>   
>   	cache->size = size;
> -	cache->flushthresh = CALC_CACHE_FLUSHTHRESH(size);
>   	cache->len = 0;
>   }
>   
> @@ -836,7 +830,7 @@ rte_mempool_create_empty(const char *name, unsigned n, unsigned elt_size,
>   
>   	/* asked cache too big */
>   	if (cache_size > RTE_MEMPOOL_CACHE_MAX_SIZE ||
> -	    CALC_CACHE_FLUSHTHRESH(cache_size) > n) {
> +	    cache_size > n) {
>   		rte_errno = EINVAL;
>   		return NULL;
>   	}
> diff --git a/lib/mempool/rte_mempool.h b/lib/mempool/rte_mempool.h
> index 7bdc92b812..580c655eb3 100644
> --- a/lib/mempool/rte_mempool.h
> +++ b/lib/mempool/rte_mempool.h
> @@ -89,10 +89,8 @@ struct __rte_cache_aligned rte_mempool_debug_stats {
>    */
>   struct __rte_cache_aligned rte_mempool_cache {
>   	uint32_t size;	      /**< Size of the cache */
> -	uint32_t flushthresh; /**< Threshold before we flush excess elements */
>   	uint32_t len;	      /**< Current cache count */
>   #ifdef RTE_LIBRTE_MEMPOOL_STATS
> -	uint32_t unused;
>   	/*
>   	 * Alternative location for the most frequently updated mempool statistics (per-lcore),
>   	 * providing faster update access when using a mempool cache.
> @@ -110,7 +108,7 @@ struct __rte_cache_aligned rte_mempool_cache {
>   	 * Cache is allocated to this size to allow it to overflow in certain
>   	 * cases to avoid needless emptying of cache.
>   	 */
> -	alignas(RTE_CACHE_LINE_SIZE) void *objs[RTE_MEMPOOL_CACHE_MAX_SIZE * 2];
> +	alignas(RTE_CACHE_LINE_SIZE) void *objs[RTE_MEMPOOL_CACHE_MAX_SIZE];
>   };
>   
>   /**
> @@ -1363,7 +1361,8 @@ rte_mempool_cache_flush(struct rte_mempool_cache *cache,
>   }
>   
>   /**
> - * @internal Put several objects back in the mempool; used internally.
> + * @internal Put several objects back in the mempool; used internally when
> + *   the number of objects exceeds the remaining space in the mempool cache.
>    * @param mp
>    *   A pointer to the mempool structure.
>    * @param obj_table
> @@ -1371,58 +1370,86 @@ rte_mempool_cache_flush(struct rte_mempool_cache *cache,
>    * @param n
>    *   The number of objects to store back in the mempool, must be strictly
>    *   positive.
> + *   Must be more than the remaining space in the mempool cache, i.e.:
> + *   cache->len + n > cache->size
>    * @param cache
> - *   A pointer to a mempool cache structure. May be NULL if not needed.
> + *   A pointer to a mempool cache structure. Not NULL.
>    */
> -static __rte_always_inline void
> -rte_mempool_do_generic_put(struct rte_mempool *mp, void * const *obj_table,
> -			   unsigned int n, struct rte_mempool_cache *cache)
> +static __rte_noinline void

In a sense this is the slow path, but it will still be run quite often, 
will it not? You could just mark it __rte_unused and let the compiler 
decide what to do, or maybe better, move it to the .c file if you really 
don't think a function call proper is a performance issue.

With __rte_noinline you will prevent the compiler from inlining (i.e., 
it's not a hint). Even though it may *know* (i.e., from PGO) that this 
is a common path (e.g., for cache-less memory pools).

> +rte_mempool_do_generic_put_many(struct rte_mempool *mp, void * const *obj_table,
> +		unsigned int n, struct rte_mempool_cache *cache)
>   {
>   	void **cache_objs;
> +	unsigned int len;
> +	const uint32_t cache_size = cache->size;
>   
> -	/* No cache provided */
> -	if (unlikely(cache == NULL))
> -		goto driver_enqueue;
> -
> -	/* increment stat now, adding in mempool always success */
> +	/* Increment stat now, adding in mempool always succeeds. */
>   	RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
>   	RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
>   
> -	/* The request itself is too big for the cache */
> -	if (unlikely(n > cache->flushthresh))
> -		goto driver_enqueue_stats_incremented;
> -
> -	/*
> -	 * The cache follows the following algorithm:
> -	 *   1. If the objects cannot be added to the cache without crossing
> -	 *      the flush threshold, flush the cache to the backend.
> -	 *   2. Add the objects to the cache.
> -	 */
> -
> -	if (cache->len + n <= cache->flushthresh) {
> -		cache_objs = &cache->objs[cache->len];
> -		cache->len += n;
> -	} else {
> -		cache_objs = &cache->objs[0];
> -		rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache->len);
> -		cache->len = n;
> +	/* Fill the cache with the first objects. */
> +	cache_objs = &cache->objs[cache->len];
> +	len = (cache_size - cache->len);
> +	rte_memcpy(cache_objs, obj_table, sizeof(void *) * len);
> +	obj_table += len;
> +	n -= len;
> +
> +	/* Flush the entire cache to the backend. */
> +	cache_objs = &cache->objs[0];
> +	rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache_size);
> +
> +	if (unlikely(n > cache_size)) {
> +		/* Push following objects, in entire cache sizes, directly to the backend. */
> +		len = n - n % cache_size;
> +		rte_mempool_ops_enqueue_bulk(mp, obj_table, len);
> +		obj_table += len;
> +		n -= len;
>   	}
>   
> -	/* Add the objects to the cache. */
> +	/* Add the remaining objects to the cache. */
> +	cache->len = n;
>   	rte_memcpy(cache_objs, obj_table, sizeof(void *) * n);
> +}
>   
> -	return;
> -
> -driver_enqueue:
> -
> -	/* increment stat now, adding in mempool always success */
> -	RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1);
> -	RTE_MEMPOOL_STAT_ADD(mp, put_objs, n);
> -
> -driver_enqueue_stats_incremented:
> +/**
> + * @internal Put several objects back in the mempool; used internally.
> + * @param mp
> + *   A pointer to the mempool structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to store back in the mempool, must be strictly
> + *   positive.
> + * @param cache
> + *   A pointer to a mempool cache structure. May be NULL if not needed.
> + */
> +static __rte_always_inline void
> +rte_mempool_do_generic_put(struct rte_mempool *mp, void * const *obj_table,
> +			   unsigned int n, struct rte_mempool_cache *cache)
> +{
> +	if (likely(cache != NULL)) {
> +		/* Enough remaining space in the cache? */
> +		if (likely(cache->len + n <= cache->size)) {
> +			void **cache_objs;
> +
> +			/* Increment stat now, adding in mempool always succeeds. */
> +			RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
> +			RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
> +
> +			/* Add the objects to the cache. */
> +			cache_objs = &cache->objs[cache->len];
> +			cache->len += n;
> +			rte_memcpy(cache_objs, obj_table, sizeof(void *) * n);
> +		} else
> +			rte_mempool_do_generic_put_many(mp, obj_table, n, cache);
> +	} else {
> +		/* Increment stat now, adding in mempool always succeeds. */
> +		RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1);
> +		RTE_MEMPOOL_STAT_ADD(mp, put_objs, n);
>   
> -	/* push objects to the backend */
> -	rte_mempool_ops_enqueue_bulk(mp, obj_table, n);
> +		/* push objects to the backend */
> +		rte_mempool_ops_enqueue_bulk(mp, obj_table, n);
> +	}
>   }
>   
>   
> @@ -1490,135 +1517,185 @@ rte_mempool_put(struct rte_mempool *mp, void *obj)
>   }
>   
>   /**
> - * @internal Get several objects from the mempool; used internally.
> + * @internal Get several objects from the mempool; used internally when
> + *   the number of objects exceeds what is available in the mempool cache.
>    * @param mp
>    *   A pointer to the mempool structure.
>    * @param obj_table
>    *   A pointer to a table of void * pointers (objects).
>    * @param n
>    *   The number of objects to get, must be strictly positive.
> + *   Must be more than available in the mempool cache, i.e.:
> + *   n > cache->len
>    * @param cache
> - *   A pointer to a mempool cache structure. May be NULL if not needed.
> + *   A pointer to a mempool cache structure. Not NULL.
>    * @return
>    *   - 0: Success.
>    *   - <0: Error; code of driver dequeue function.
>    */
> -static __rte_always_inline int
> -rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table,
> -			   unsigned int n, struct rte_mempool_cache *cache)
> +static __rte_noinline int
> +rte_mempool_do_generic_get_many(struct rte_mempool *mp, void **obj_table,
> +		unsigned int n, struct rte_mempool_cache *cache)
>   {
>   	int ret;
>   	unsigned int remaining;
>   	uint32_t index, len;
>   	void **cache_objs;
> +	const uint32_t cache_size = cache->size;
>   
> -	/* No cache provided */
> -	if (unlikely(cache == NULL)) {
> -		remaining = n;
> -		goto driver_dequeue;
> -	}
> -
> -	/* The cache is a stack, so copy will be in reverse order. */
> +	/* Serve the first part of the request from the cache to return hot objects first. */
>   	cache_objs = &cache->objs[cache->len];
> +	len = cache->len;
> +	remaining = n - len;
> +	for (index = 0; index < len; index++)
> +		*obj_table++ = *--cache_objs;
>   
> -	if (__rte_constant(n) && n <= cache->len) {
> +	/* At this point, the cache is empty. */
> +
> +	/* More than can be served from a full cache? */
> +	if (unlikely(remaining >= cache_size)) {
>   		/*
> -		 * The request size is known at build time, and
> -		 * the entire request can be satisfied from the cache,
> -		 * so let the compiler unroll the fixed length copy loop.
> +		 * Serve the following part of the request directly from the backend
> +		 * in multipla of the cache size.
>   		 */
> -		cache->len -= n;
> -		for (index = 0; index < n; index++)
> -			*obj_table++ = *--cache_objs;
> +		len = remaining - remaining % cache_size;
> +		ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, len);
> +		if (unlikely(ret < 0)) {
> +			/*
> +			 * No further action is required to roll back the request,
> +			 * as objects in the cache are intact, and no objects have
> +			 * been dequeued from the backend.
> +			 */
>   
> -		RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
> -		RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
> +			RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
> +			RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);
>   
> -		return 0;
> -	}
> +			return ret;
> +		}
>   
> -	/*
> -	 * Use the cache as much as we have to return hot objects first.
> -	 * If the request size 'n' is known at build time, the above comparison
> -	 * ensures that n > cache->len here, so omit RTE_MIN().
> -	 */
> -	len = __rte_constant(n) ? cache->len : RTE_MIN(n, cache->len);
> -	cache->len -= len;
> -	remaining = n - len;
> -	for (index = 0; index < len; index++)
> -		*obj_table++ = *--cache_objs;
> +		remaining -= len;
> +		obj_table += len;
>   
> -	/*
> -	 * If the request size 'n' is known at build time, the case
> -	 * where the entire request can be satisfied from the cache
> -	 * has already been handled above, so omit handling it here.
> -	 */
> -	if (!__rte_constant(n) && remaining == 0) {
> -		/* The entire request is satisfied from the cache. */
> +		if (unlikely(remaining == 0)) {
> +			cache->len = 0;
>   
> -		RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
> -		RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
> +			RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
> +			RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
>   
> -		return 0;
> +			return 0;
> +		}
>   	}
>   
> -	/* if dequeue below would overflow mem allocated for cache */
> -	if (unlikely(remaining > RTE_MEMPOOL_CACHE_MAX_SIZE))
> -		goto driver_dequeue;
> -
> -	/* Fill the cache from the backend; fetch size + remaining objects. */
> -	ret = rte_mempool_ops_dequeue_bulk(mp, cache->objs,
> -			cache->size + remaining);
> +	/* Fill the entire cache from the backend. */
> +	ret = rte_mempool_ops_dequeue_bulk(mp, cache->objs, cache_size);
>   	if (unlikely(ret < 0)) {
>   		/*
> -		 * We are buffer constrained, and not able to allocate
> -		 * cache + remaining.
> -		 * Do not fill the cache, just satisfy the remaining part of
> -		 * the request directly from the backend.
> +		 * Unable to fill the cache.
> +		 * Last resort: Try only the remaining part of the request,
> +		 * served directly from the backend.
>   		 */
> -		goto driver_dequeue;
> -	}
> +		ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, remaining);
> +		if (unlikely(ret == 0)) {
> +			cache->len = 0;
>   
> -	/* Satisfy the remaining part of the request from the filled cache. */
> -	cache_objs = &cache->objs[cache->size + remaining];
> -	for (index = 0; index < remaining; index++)
> -		*obj_table++ = *--cache_objs;
> +			RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
> +			RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
> +
> +			return 0;
> +		}
> +
> +		/* Roll back. */
> +		if (cache->len + remaining == n) {
> +			/*
> +			 * No further action is required to roll back the request,
> +			 * as objects in the cache are intact, and no objects have
> +			 * been dequeued from the backend.
> +			 */
> +		} else {
> +			/* Update the state of the cache before putting back the objects. */
> +			cache->len = 0;
> +
> +			len = n - remaining;
> +			obj_table -= len;
> +			rte_mempool_do_generic_put(mp, obj_table, len, cache);
> +		}
>   
> -	cache->len = cache->size;
> +		RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
> +		RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);
> +
> +		return ret;
> +	}
>   
> +	/* Increment stat now, this always succeeds. */
>   	RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
>   	RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
>   
> +	/* Serve the remaining part of the request from the filled cache. */
> +	cache_objs = &cache->objs[cache_size];
> +	for (index = 0; index < remaining; index++)
> +		*obj_table++ = *--cache_objs;
> +
> +	cache->len = cache_size - remaining;
> +
>   	return 0;
> +}
>   
> -driver_dequeue:
> +/**
> + * @internal Get several objects from the mempool; used internally.
> + * @param mp
> + *   A pointer to the mempool structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to get, must be strictly positive.
> + * @param cache
> + *   A pointer to a mempool cache structure. May be NULL if not needed.
> + * @return
> + *   - 0: Success.
> + *   - <0: Error; code of driver dequeue function.
> + */
> +static __rte_always_inline int
> +rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table,
> +			   unsigned int n, struct rte_mempool_cache *cache)
> +{
> +	if (likely(cache != NULL)) {
> +		/* Enough objects in the cache? */
> +		if (n <= cache->len) {
> +			unsigned int index;
> +			void **cache_objs;
>   
> -	/* Get remaining objects directly from the backend. */
> -	ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, remaining);
> +			/* Increment stat now, this always succeeds. */
> +			RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
> +			RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
>   
> -	if (ret < 0) {
> -		if (likely(cache != NULL)) {
> -			cache->len = n - remaining;
>   			/*
> -			 * No further action is required to roll the first part
> -			 * of the request back into the cache, as objects in
> -			 * the cache are intact.
> +			 * The cache is a stack, so copy will be in reverse order.
> +			 * If the request size is known at build time,
> +			 * the compiler will unroll the fixed length copy loop.
>   			 */
> -		}
> -
> -		RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
> -		RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);
> +			cache_objs = &cache->objs[cache->len];
> +			cache->len -= n;
> +			for (index = 0; index < n; index++)
> +				*obj_table++ = *--cache_objs;
> +
> +			return 0;
> +		} else
> +			return rte_mempool_do_generic_get_many(mp, obj_table, n, cache);
>   	} else {
> -		if (likely(cache != NULL)) {
> -			RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
> -			RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
> +		int ret;
> +
> +		/* Get the objects directly from the backend. */
> +		ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, n);
> +		if (unlikely(ret < 0)) {
> +			RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
> +			RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);
>   		} else {
>   			RTE_MEMPOOL_STAT_ADD(mp, get_success_bulk, 1);
>   			RTE_MEMPOOL_STAT_ADD(mp, get_success_objs, n);
>   		}
> -	}
>   
> -	return ret;
> +		return ret;
> +	}
>   }
>   
>   /**


      reply	other threads:[~2024-09-20 19:41 UTC|newest]

Thread overview: 4+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-09-20 16:32 [RFC PATCH] " Morten Brørup
2024-09-20 16:37 ` [RFC PATCH v2] " Morten Brørup
2024-09-20 17:13 ` [RFC PATCH v3] " Morten Brørup
2024-09-20 19:41   ` Mattias Rönnblom [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=acf249cd-3aea-4520-bad8-11e565da3199@lysator.liu.se \
    --to=hofors@lysator.liu.se \
    --cc=dev@dpdk.org \
    --cc=mb@smartsharesystems.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).