From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mail-lb0-f169.google.com (mail-lb0-f169.google.com [209.85.217.169]) by dpdk.org (Postfix) with ESMTP id 37723E6D for ; Mon, 4 Apr 2016 17:43:35 +0200 (CEST) Received: by mail-lb0-f169.google.com with SMTP id qe11so166613655lbc.3 for ; Mon, 04 Apr 2016 08:43:35 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=nofutznetworks-com.20150623.gappssmtp.com; s=20150623; h=from:to:subject:date:message-id; bh=kN3qYE5/h/hWttKtcVFGQsoZlCt5ECx7lgt/Vm3WJLg=; b=eaKYpyToJf5jS0oma98Gfh+d1owlF7HQVVvexB4tLoG0UUP4QQdiVBJUV9qdtjzIfB ncDHLCnoW+C9LE/iMTLsJ9Cyg+XydwmGuoorzjLzMtYmZDetvh/ljhdRhxNLy33euRvM 0YxAI1o7gu0mD50yX1gtugpPMcLgzwIn3jFzQPbdvfDqacd43OpqzU37QuJX38Gb/7zZ +nBP2X7rxZhgTVhO3Qp9oIwURvUh1f3thZSNoE+PinbMrG64zg7O2X6jRNDmkKVNvNCu uywj2O+TF2oIBi0Xmxj4E/mxA0hB6EMPQIW1I4Ge5r2IiRzogzFjdtVfkMNX4PM/sSy8 /zTA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:from:to:subject:date:message-id; bh=kN3qYE5/h/hWttKtcVFGQsoZlCt5ECx7lgt/Vm3WJLg=; b=iFeh7UCIrRu3sYsCbSU3w3A5rmr6qvB0YylHCdrm8BMFs/NDMQiJmRmeA4G4LyjU8+ +ew5YXtSGBceSxDsJPAo3EyjitRWAn9b5dMtQAQY9mMy4SZRNGirRlp1oMzMC/RBmz1J WY3kz98iTgMfGQaA/TOUokmqvt+r1M4pI0+OSjxII6ruQzCigre21xmBq7g3StTp9R+j HMYcfoZy0X56my2tieiskD0vaXB/fM9u3seV2toyFbZx4507+pY2r/Pp+1qSKkxtKO2U 8Z1wAkK85CH9+j/Z3wNX6QLVs0eCbrvqVDctuQAeQg3oBvlkWoYHGeLwX0oIaxl9gY8X RAeA== X-Gm-Message-State: AD7BkJJey/0fcibW1qvXgg8PfYt7qbty3H7j3JaQXtyjIotPs+bHNdpaA/HhyDLdj0Vn0g== X-Received: by 10.28.223.70 with SMTP id w67mr12527980wmg.92.1459784614262; Mon, 04 Apr 2016 08:43:34 -0700 (PDT) Received: from lap-3.nofutz.com (ppp089210182232.access.hol.gr. [89.210.182.232]) by smtp.gmail.com with ESMTPSA id q62sm14643143wmg.12.2016.04.04.08.43.32 for (version=TLS1_2 cipher=ECDHE-RSA-AES128-SHA bits=128/128); Mon, 04 Apr 2016 08:43:33 -0700 (PDT) From: Lazaros Koromilas To: dev@dpdk.org Date: Mon, 4 Apr 2016 18:43:29 +0300 Message-Id: <1459784610-14008-1-git-send-email-l@nofutznetworks.com> X-Mailer: git-send-email 1.9.1 Subject: [dpdk-dev] [PATCH v2 1/2] mempool: allow for user-owned mempool caches X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: patches and discussions about DPDK List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 04 Apr 2016 15:43:35 -0000 The mempool cache is only available to EAL threads as a per-lcore resource. Change this so that the user can create and provide their own cache on mempool get and put operations. This works with non-EAL threads too. This commit introduces the new API calls: rte_mempool_cache_create(size, socket_id) rte_mempool_cache_flush(cache, mp) rte_mempool_cache_free(cache) rte_mempool_default_cache(mp, lcore_id) rte_mempool_generic_put(mp, obj_table, n, cache, is_mp) rte_mempool_generic_get(mp, obj_table, n, cache, is_mc) Removes the API calls: rte_mempool_sp_put_bulk(mp, obj_table, n) rte_mempool_sc_get_bulk(mp, obj_table, n) rte_mempool_sp_put(mp, obj) rte_mempool_sc_get(mp, obj) And the remaining API calls use the per-lcore default local cache: rte_mempool_put_bulk(mp, obj_table, n) rte_mempool_get_bulk(mp, obj_table, n) rte_mempool_put(mp, obj) rte_mempool_get(mp, obj) Signed-off-by: Lazaros Koromilas --- app/test/test_mempool.c | 58 +++++-- app/test/test_mempool_perf.c | 46 +++++- lib/librte_eal/common/eal_common_log.c | 8 +- lib/librte_mempool/rte_mempool.c | 76 ++++++++- lib/librte_mempool/rte_mempool.h | 291 +++++++++++++-------------------- 5 files changed, 275 insertions(+), 204 deletions(-) diff --git a/app/test/test_mempool.c b/app/test/test_mempool.c index 10e1fa4..2dc0cf2 100644 --- a/app/test/test_mempool.c +++ b/app/test/test_mempool.c @@ -79,6 +79,7 @@ static struct rte_mempool *mp; static struct rte_mempool *mp_cache, *mp_nocache; +static int use_external_cache; static rte_atomic32_t synchro; @@ -107,19 +108,33 @@ test_mempool_basic(void) char *obj_data; int ret = 0; unsigned i, j; + struct rte_mempool_cache *cache; + + if (use_external_cache) + /* Create a user-owned mempool cache. */ + cache = rte_mempool_cache_create(RTE_MEMPOOL_CACHE_MAX_SIZE, + SOCKET_ID_ANY); + else + cache = rte_mempool_default_cache(mp, rte_lcore_id()); /* dump the mempool status */ rte_mempool_dump(stdout, mp); printf("get an object\n"); - if (rte_mempool_get(mp, &obj) < 0) + if (rte_mempool_generic_get(mp, &obj, 1, cache, 1) < 0) return -1; rte_mempool_dump(stdout, mp); /* tests that improve coverage */ printf("get object count\n"); - if (rte_mempool_count(mp) != MEMPOOL_SIZE - 1) - return -1; + if (use_external_cache) { + /* We have to count the extra caches, one in this case. */ + if (rte_mempool_count(mp) + cache->len != MEMPOOL_SIZE - 1) + return -1; + } else { + if (rte_mempool_count(mp) != MEMPOOL_SIZE - 1) + return -1; + } printf("get private data\n"); if (rte_mempool_get_priv(mp) != (char *)mp + @@ -134,21 +149,21 @@ test_mempool_basic(void) return -1; printf("put the object back\n"); - rte_mempool_put(mp, obj); + rte_mempool_generic_put(mp, &obj, 1, cache, 1); rte_mempool_dump(stdout, mp); printf("get 2 objects\n"); - if (rte_mempool_get(mp, &obj) < 0) + if (rte_mempool_generic_get(mp, &obj, 1, cache, 1) < 0) return -1; - if (rte_mempool_get(mp, &obj2) < 0) { - rte_mempool_put(mp, obj); + if (rte_mempool_generic_get(mp, &obj2, 1, cache, 1) < 0) { + rte_mempool_generic_put(mp, &obj, 1, cache, 1); return -1; } rte_mempool_dump(stdout, mp); printf("put the objects back\n"); - rte_mempool_put(mp, obj); - rte_mempool_put(mp, obj2); + rte_mempool_generic_put(mp, &obj, 1, cache, 1); + rte_mempool_generic_put(mp, &obj2, 1, cache, 1); rte_mempool_dump(stdout, mp); /* @@ -161,7 +176,7 @@ test_mempool_basic(void) } for (i=0; i= MAX_KEEP) continue; - if (rte_mempool_sc_get(mp_spsc, &obj) < 0) + if (rte_mempool_get(mp_spsc, &obj) < 0) break; rte_spinlock_lock(&scsp_spinlock); scsp_obj_table[i] = obj; @@ -371,12 +391,12 @@ test_mempool_basic_ex(struct rte_mempool * mp) } for (i = 0; i < MEMPOOL_SIZE; i ++) { - if (rte_mempool_mc_get(mp, &obj[i]) < 0) { + if (rte_mempool_get(mp, &obj[i]) < 0) { printf("fail_mp_basic_ex fail to get mempool object for [%u]\n", i); goto fail_mp_basic_ex; } } - if (rte_mempool_mc_get(mp, &err_obj) == 0) { + if (rte_mempool_get(mp, &err_obj) == 0) { printf("test_mempool_basic_ex get an impossible obj from mempool\n"); goto fail_mp_basic_ex; } @@ -387,7 +407,7 @@ test_mempool_basic_ex(struct rte_mempool * mp) } for (i = 0; i < MEMPOOL_SIZE; i ++) { - rte_mempool_mp_put(mp, obj[i]); + rte_mempool_put(mp, obj[i]); } if (rte_mempool_full(mp) != 1) { printf("test_mempool_basic_ex the mempool is not full but it should be\n"); @@ -499,6 +519,12 @@ test_mempool(void) if (test_mempool_basic() < 0) return -1; + /* basic tests with user-owned cache */ + mp = mp_nocache; + use_external_cache = 1; + if (test_mempool_basic() < 0) + return -1; + /* more basic tests without cache */ if (test_mempool_basic_ex(mp_nocache) < 0) return -1; diff --git a/app/test/test_mempool_perf.c b/app/test/test_mempool_perf.c index cdc02a0..e917f4d 100644 --- a/app/test/test_mempool_perf.c +++ b/app/test/test_mempool_perf.c @@ -78,6 +78,9 @@ * - One core without cache * - Two cores without cache * - Max. cores without cache + * - One core with user-owned cache + * - Two cores with user-owned cache + * - Max. cores with user-owned cache * * - Bulk size (*n_get_bulk*, *n_put_bulk*) * @@ -98,6 +101,8 @@ static struct rte_mempool *mp; static struct rte_mempool *mp_cache, *mp_nocache; +static int use_external_cache; +static unsigned external_cache_size = RTE_MEMPOOL_CACHE_MAX_SIZE; static rte_atomic32_t synchro; @@ -137,6 +142,14 @@ per_lcore_mempool_test(__attribute__((unused)) void *arg) int ret; uint64_t start_cycles, end_cycles; uint64_t time_diff = 0, hz = rte_get_timer_hz(); + struct rte_mempool_cache *cache; + + if (use_external_cache) + /* Create a user-owned mempool cache. */ + cache = rte_mempool_cache_create(external_cache_size, + SOCKET_ID_ANY); + else + cache = rte_mempool_default_cache(mp, lcore_id); /* n_get_bulk and n_put_bulk must be divisors of n_keep */ if (((n_keep / n_get_bulk) * n_get_bulk) != n_keep) @@ -157,8 +170,9 @@ per_lcore_mempool_test(__attribute__((unused)) void *arg) /* get n_keep objects by bulk of n_bulk */ idx = 0; while (idx < n_keep) { - ret = rte_mempool_get_bulk(mp, &obj_table[idx], - n_get_bulk); + ret = rte_mempool_generic_get(mp, &obj_table[idx], + n_get_bulk, + cache, 1); if (unlikely(ret < 0)) { rte_mempool_dump(stdout, mp); rte_ring_dump(stdout, mp->ring); @@ -171,8 +185,9 @@ per_lcore_mempool_test(__attribute__((unused)) void *arg) /* put the objects back */ idx = 0; while (idx < n_keep) { - rte_mempool_put_bulk(mp, &obj_table[idx], - n_put_bulk); + rte_mempool_generic_put(mp, &obj_table[idx], + n_put_bulk, + cache, 1); idx += n_put_bulk; } } @@ -181,6 +196,11 @@ per_lcore_mempool_test(__attribute__((unused)) void *arg) stats[lcore_id].enq_count += N; } + if (use_external_cache) { + rte_mempool_cache_flush(cache, mp); + rte_mempool_cache_free(cache); + } + return 0; } @@ -200,7 +220,9 @@ launch_cores(unsigned cores) printf("mempool_autotest cache=%u cores=%u n_get_bulk=%u " "n_put_bulk=%u n_keep=%u ", - (unsigned) mp->cache_size, cores, n_get_bulk, n_put_bulk, n_keep); + use_external_cache ? + external_cache_size : (unsigned) mp->cache_size, + cores, n_get_bulk, n_put_bulk, n_keep); if (rte_mempool_count(mp) != MEMPOOL_SIZE) { printf("mempool is not full\n"); @@ -324,6 +346,20 @@ test_mempool_perf(void) if (do_one_mempool_test(rte_lcore_count()) < 0) return -1; + /* performance test with 1, 2 and max cores */ + printf("start performance test (with user-owned cache)\n"); + mp = mp_nocache; + use_external_cache = 1; + + if (do_one_mempool_test(1) < 0) + return -1; + + if (do_one_mempool_test(2) < 0) + return -1; + + if (do_one_mempool_test(rte_lcore_count()) < 0) + return -1; + rte_mempool_list_dump(stdout); return 0; diff --git a/lib/librte_eal/common/eal_common_log.c b/lib/librte_eal/common/eal_common_log.c index 1ae8de7..07aee22 100644 --- a/lib/librte_eal/common/eal_common_log.c +++ b/lib/librte_eal/common/eal_common_log.c @@ -125,7 +125,7 @@ rte_log_add_in_history(const char *buf, size_t size) } } else { - if (rte_mempool_mc_get(log_history_mp, &obj) < 0) + if (rte_mempool_get(log_history_mp, &obj) < 0) obj = NULL; hist_buf = obj; } @@ -138,7 +138,7 @@ rte_log_add_in_history(const char *buf, size_t size) /* not enough room for msg, buffer go back in mempool */ if (size >= hist_buf_size) { - rte_mempool_mp_put(log_history_mp, hist_buf); + rte_mempool_put(log_history_mp, hist_buf); rte_spinlock_unlock(&log_list_lock); return -ENOBUFS; } @@ -252,12 +252,12 @@ rte_log_dump_history(FILE *out) /* write on stdout */ if (fwrite(hist_buf->buf, hist_buf->size, 1, out) == 0) { - rte_mempool_mp_put(log_history_mp, hist_buf); + rte_mempool_put(log_history_mp, hist_buf); break; } /* put back message structure in pool */ - rte_mempool_mp_put(log_history_mp, hist_buf); + rte_mempool_put(log_history_mp, hist_buf); } fflush(out); diff --git a/lib/librte_mempool/rte_mempool.c b/lib/librte_mempool/rte_mempool.c index 73ca770..4d977c1 100644 --- a/lib/librte_mempool/rte_mempool.c +++ b/lib/librte_mempool/rte_mempool.c @@ -375,6 +375,63 @@ rte_mempool_xmem_usage(void *vaddr, uint32_t elt_num, size_t elt_sz, return usz; } +static void +mempool_cache_init(struct rte_mempool_cache *cache, uint32_t size) +{ + cache->size = size; + cache->flushthresh = CALC_CACHE_FLUSHTHRESH(size); + cache->len = 0; +} + +/* + * Create and initialize a cache for objects that are retrieved from and + * returned to an underlying mempool. This structure is identical to the + * local_cache[lcore_id] pointed to by the mempool structure. + */ +struct rte_mempool_cache * +rte_mempool_cache_create(uint32_t size, int socket_id) +{ + struct rte_mempool_cache *cache; + + if (size > RTE_MEMPOOL_CACHE_MAX_SIZE) { + rte_errno = EINVAL; + return NULL; + } + + cache = rte_zmalloc_socket("MEMPOOL_CACHE", sizeof(*cache), + RTE_CACHE_LINE_SIZE, socket_id); + if (cache == NULL) { + RTE_LOG(ERR, MEMPOOL, "Cannot allocate mempool cache!\n"); + rte_errno = ENOMEM; + return NULL; + } + + mempool_cache_init(cache, size); + + return cache; +} + +/* + * Free a cache. It's the responsibility of the user to make sure that any + * remaining objects in the cache are flushed to the corresponding + * mempool. + */ +void +rte_mempool_cache_free(struct rte_mempool_cache *cache) +{ + rte_free(cache); +} + +/* + * Put all objects in the cache to the specified mempool's ring. + */ +void +rte_mempool_cache_flush(struct rte_mempool_cache *cache, + struct rte_mempool *mp) +{ + rte_ring_enqueue_bulk(mp->ring, cache->objs, cache->len); +} + #ifndef RTE_LIBRTE_XEN_DOM0 /* stub if DOM0 support not configured */ struct rte_mempool * @@ -448,6 +505,7 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size, struct rte_mempool_objsz objsz; void *startaddr; int page_size = getpagesize(); + unsigned lcore_id; /* compilation-time checks */ RTE_BUILD_BUG_ON((sizeof(struct rte_mempool) & @@ -583,8 +641,8 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size, mp->elt_size = objsz.elt_size; mp->header_size = objsz.header_size; mp->trailer_size = objsz.trailer_size; + /* Size of default caches, zero means disabled. */ mp->cache_size = cache_size; - mp->cache_flushthresh = CALC_CACHE_FLUSHTHRESH(cache_size); mp->private_data_size = private_data_size; /* @@ -594,6 +652,13 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size, mp->local_cache = (struct rte_mempool_cache *) ((char *)mp + MEMPOOL_HEADER_SIZE(mp, pg_num, 0)); + /* Init all default caches. */ + if (cache_size != 0) { + for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) + mempool_cache_init(&mp->local_cache[lcore_id], + cache_size); + } + /* calculate address of the first element for continuous mempool. */ obj = (char *)mp + MEMPOOL_HEADER_SIZE(mp, pg_num, cache_size) + private_data_size; @@ -672,7 +737,7 @@ rte_mempool_dump_cache(FILE *f, const struct rte_mempool *mp) unsigned count = 0; unsigned cache_count; - fprintf(f, " cache infos:\n"); + fprintf(f, " internal cache infos:\n"); fprintf(f, " cache_size=%"PRIu32"\n", mp->cache_size); if (mp->cache_size == 0) @@ -680,7 +745,8 @@ rte_mempool_dump_cache(FILE *f, const struct rte_mempool *mp) for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) { cache_count = mp->local_cache[lcore_id].len; - fprintf(f, " cache_count[%u]=%u\n", lcore_id, cache_count); + fprintf(f, " cache_count[%u]=%"PRIu32"\n", + lcore_id, cache_count); count += cache_count; } fprintf(f, " total_cache_count=%u\n", count); @@ -760,7 +826,9 @@ mempool_audit_cache(const struct rte_mempool *mp) return; for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) { - if (mp->local_cache[lcore_id].len > mp->cache_flushthresh) { + const struct rte_mempool_cache *cache; + cache = &mp->local_cache[lcore_id]; + if (cache->len > cache->flushthresh) { RTE_LOG(CRIT, MEMPOOL, "badness on cache[%u]\n", lcore_id); rte_panic("MEMPOOL: invalid cache len\n"); diff --git a/lib/librte_mempool/rte_mempool.h b/lib/librte_mempool/rte_mempool.h index 8595e77..21d43e2 100644 --- a/lib/librte_mempool/rte_mempool.h +++ b/lib/librte_mempool/rte_mempool.h @@ -99,7 +99,9 @@ struct rte_mempool_debug_stats { * A structure that stores a per-core object cache. */ struct rte_mempool_cache { - unsigned len; /**< Cache len */ + uint32_t size; /**< Size of the cache */ + uint32_t flushthresh; /**< Threshold before we flush excess elements */ + uint32_t len; /**< Current cache count */ /* * Cache is allocated to this size to allow it to overflow in certain * cases to avoid needless emptying of cache. @@ -182,9 +184,7 @@ struct rte_mempool { phys_addr_t phys_addr; /**< Phys. addr. of mempool struct. */ int flags; /**< Flags of the mempool. */ uint32_t size; /**< Size of the mempool. */ - uint32_t cache_size; /**< Size of per-lcore local cache. */ - uint32_t cache_flushthresh; - /**< Threshold before we flush excess elements. */ + uint32_t cache_size; /**< Size of per-lcore default local cache. */ uint32_t elt_size; /**< Size of an element. */ uint32_t header_size; /**< Size of header (before elt). */ @@ -742,6 +742,66 @@ rte_dom0_mempool_create(const char *name, unsigned n, unsigned elt_size, void rte_mempool_dump(FILE *f, const struct rte_mempool *mp); /** + * Create a user-owned mempool cache. + * + * This can be used by non-EAL threads to enable caching when they + * interact with a mempool. + * + * @param size + * The size of the mempool cache. See rte_mempool_create()'s cache_size + * parameter description for more information. The same limits and + * considerations apply here too. + * @param socket_id + * The socket identifier in the case of NUMA. The value can be + * SOCKET_ID_ANY if there is no NUMA constraint for the reserved zone. + */ +struct rte_mempool_cache * +rte_mempool_cache_create(uint32_t size, int socket_id); + +/** + * Free a user-owned mempool cache. + * + * @param cache + * A pointer to the mempool cache. + */ +void +rte_mempool_cache_free(struct rte_mempool_cache *cache); + +/** + * Flush a user-owned mempool cache to the specified mempool. + * + * @param cache + * A pointer to the mempool cache. + * @param mp + * A pointer to the mempool. + */ +void +rte_mempool_cache_flush(struct rte_mempool_cache *cache, + struct rte_mempool *mp); + +/** + * Get a pointer to the per-lcore default mempool cache. + * + * @param mp + * A pointer to the mempool structure. + * @param lcore_id + * The logical core id. + * @return + * A pointer to the mempool cache or NULL if disabled or non-EAL thread. + */ +static inline struct rte_mempool_cache * +rte_mempool_default_cache(struct rte_mempool *mp, unsigned lcore_id) +{ + if (mp->cache_size == 0) + return NULL; + + if (lcore_id >= RTE_MAX_LCORE) + return NULL; + + return &mp->local_cache[lcore_id]; +} + +/** * @internal Put several objects back in the mempool; used internally. * @param mp * A pointer to the mempool structure. @@ -750,33 +810,29 @@ void rte_mempool_dump(FILE *f, const struct rte_mempool *mp); * @param n * The number of objects to store back in the mempool, must be strictly * positive. + * @param cache + * A pointer to a mempool cache structure. May be NULL if not needed. * @param is_mp * Mono-producer (0) or multi-producers (1). */ static inline void __attribute__((always_inline)) -__mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table, - unsigned n, int is_mp) +__mempool_generic_put(struct rte_mempool *mp, void * const *obj_table, + unsigned n, struct rte_mempool_cache *cache, int is_mp) { - struct rte_mempool_cache *cache; uint32_t index; void **cache_objs; - unsigned lcore_id = rte_lcore_id(); - uint32_t cache_size = mp->cache_size; - uint32_t flushthresh = mp->cache_flushthresh; /* increment stat now, adding in mempool always success */ __MEMPOOL_STAT_ADD(mp, put, n); - /* cache is not enabled or single producer or non-EAL thread */ - if (unlikely(cache_size == 0 || is_mp == 0 || - lcore_id >= RTE_MAX_LCORE)) + /* No cache provided or cache is not enabled or single producer */ + if (unlikely(cache == NULL || cache->size == 0 || is_mp == 0)) goto ring_enqueue; /* Go straight to ring if put would overflow mem allocated for cache */ if (unlikely(n > RTE_MEMPOOL_CACHE_MAX_SIZE)) goto ring_enqueue; - cache = &mp->local_cache[lcore_id]; cache_objs = &cache->objs[cache->len]; /* @@ -792,10 +848,10 @@ __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table, cache->len += n; - if (cache->len >= flushthresh) { - rte_ring_mp_enqueue_bulk(mp->ring, &cache->objs[cache_size], - cache->len - cache_size); - cache->len = cache_size; + if (cache->len >= cache->flushthresh) { + rte_ring_mp_enqueue_bulk(mp->ring, &cache->objs[cache->size], + cache->len - cache->size); + cache->len = cache->size; } return; @@ -820,41 +876,27 @@ ring_enqueue: #endif } - /** - * Put several objects back in the mempool (multi-producers safe). + * Put several objects back in the mempool. * * @param mp * A pointer to the mempool structure. * @param obj_table * A pointer to a table of void * pointers (objects). * @param n - * The number of objects to add in the mempool from the obj_table. + * The number of objects to store back in the mempool, must be strictly + * positive. + * @param cache + * A pointer to a mempool cache structure. May be NULL if not needed. + * @param is_mp + * Mono-producer (0) or multi-producers (1). */ static inline void __attribute__((always_inline)) -rte_mempool_mp_put_bulk(struct rte_mempool *mp, void * const *obj_table, - unsigned n) -{ - __mempool_check_cookies(mp, obj_table, n, 0); - __mempool_put_bulk(mp, obj_table, n, 1); -} - -/** - * Put several objects back in the mempool (NOT multi-producers safe). - * - * @param mp - * A pointer to the mempool structure. - * @param obj_table - * A pointer to a table of void * pointers (objects). - * @param n - * The number of objects to add in the mempool from obj_table. - */ -static inline void -rte_mempool_sp_put_bulk(struct rte_mempool *mp, void * const *obj_table, - unsigned n) +rte_mempool_generic_put(struct rte_mempool *mp, void * const *obj_table, + unsigned n, struct rte_mempool_cache *cache, int is_mp) { __mempool_check_cookies(mp, obj_table, n, 0); - __mempool_put_bulk(mp, obj_table, n, 0); + __mempool_generic_put(mp, obj_table, n, cache, is_mp); } /** @@ -875,36 +917,11 @@ static inline void __attribute__((always_inline)) rte_mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table, unsigned n) { - __mempool_check_cookies(mp, obj_table, n, 0); - __mempool_put_bulk(mp, obj_table, n, !(mp->flags & MEMPOOL_F_SP_PUT)); -} - -/** - * Put one object in the mempool (multi-producers safe). - * - * @param mp - * A pointer to the mempool structure. - * @param obj - * A pointer to the object to be added. - */ -static inline void __attribute__((always_inline)) -rte_mempool_mp_put(struct rte_mempool *mp, void *obj) -{ - rte_mempool_mp_put_bulk(mp, &obj, 1); -} + struct rte_mempool_cache *cache; -/** - * Put one object back in the mempool (NOT multi-producers safe). - * - * @param mp - * A pointer to the mempool structure. - * @param obj - * A pointer to the object to be added. - */ -static inline void __attribute__((always_inline)) -rte_mempool_sp_put(struct rte_mempool *mp, void *obj) -{ - rte_mempool_sp_put_bulk(mp, &obj, 1); + cache = rte_mempool_default_cache(mp, rte_lcore_id()); + rte_mempool_generic_put(mp, obj_table, n, cache, + !(mp->flags & MEMPOOL_F_SP_PUT)); } /** @@ -933,6 +950,8 @@ rte_mempool_put(struct rte_mempool *mp, void *obj) * A pointer to a table of void * pointers (objects). * @param n * The number of objects to get, must be strictly positive. + * @param cache + * A pointer to a mempool cache structure. May be NULL if not needed. * @param is_mc * Mono-consumer (0) or multi-consumers (1). * @return @@ -940,28 +959,24 @@ rte_mempool_put(struct rte_mempool *mp, void *obj) * - <0: Error; code of ring dequeue function. */ static inline int __attribute__((always_inline)) -__mempool_get_bulk(struct rte_mempool *mp, void **obj_table, - unsigned n, int is_mc) +__mempool_generic_get(struct rte_mempool *mp, void **obj_table, + unsigned n, struct rte_mempool_cache *cache, int is_mc) { int ret; - struct rte_mempool_cache *cache; uint32_t index, len; void **cache_objs; - unsigned lcore_id = rte_lcore_id(); - uint32_t cache_size = mp->cache_size; - /* cache is not enabled or single consumer */ - if (unlikely(cache_size == 0 || is_mc == 0 || - n >= cache_size || lcore_id >= RTE_MAX_LCORE)) + /* No cache provided or cache is not enabled or single consumer */ + if (unlikely(cache == NULL || cache->size == 0 || is_mc == 0 || + n >= cache->size)) goto ring_dequeue; - cache = &mp->local_cache[lcore_id]; cache_objs = cache->objs; /* Can this be satisfied from the cache? */ if (cache->len < n) { /* No. Backfill the cache first, and then fill from it */ - uint32_t req = n + (cache_size - cache->len); + uint32_t req = n + (cache->size - cache->len); /* How many do we require i.e. number to fill the cache + the request */ ret = rte_ring_mc_dequeue_bulk(mp->ring, &cache->objs[cache->len], req); @@ -1005,57 +1020,28 @@ ring_dequeue: } /** - * Get several objects from the mempool (multi-consumers safe). - * - * If cache is enabled, objects will be retrieved first from cache, - * subsequently from the common pool. Note that it can return -ENOENT when - * the local cache and common pool are empty, even if cache from other - * lcores are full. - * - * @param mp - * A pointer to the mempool structure. - * @param obj_table - * A pointer to a table of void * pointers (objects) that will be filled. - * @param n - * The number of objects to get from mempool to obj_table. - * @return - * - 0: Success; objects taken. - * - -ENOENT: Not enough entries in the mempool; no object is retrieved. - */ -static inline int __attribute__((always_inline)) -rte_mempool_mc_get_bulk(struct rte_mempool *mp, void **obj_table, unsigned n) -{ - int ret; - ret = __mempool_get_bulk(mp, obj_table, n, 1); - if (ret == 0) - __mempool_check_cookies(mp, obj_table, n, 1); - return ret; -} - -/** - * Get several objects from the mempool (NOT multi-consumers safe). - * - * If cache is enabled, objects will be retrieved first from cache, - * subsequently from the common pool. Note that it can return -ENOENT when - * the local cache and common pool are empty, even if cache from other - * lcores are full. + * Get several objects from the mempool. * * @param mp * A pointer to the mempool structure. * @param obj_table - * A pointer to a table of void * pointers (objects) that will be filled. + * A pointer to a table of void * pointers (objects). * @param n - * The number of objects to get from the mempool to obj_table. + * The number of objects to get, must be strictly positive. + * @param cache + * A pointer to a mempool cache structure. May be NULL if not needed. + * @param is_mc + * Mono-consumer (0) or multi-consumers (1). * @return - * - 0: Success; objects taken. - * - -ENOENT: Not enough entries in the mempool; no object is - * retrieved. + * - >=0: Success; number of objects supplied. + * - <0: Error; code of ring dequeue function. */ static inline int __attribute__((always_inline)) -rte_mempool_sc_get_bulk(struct rte_mempool *mp, void **obj_table, unsigned n) +rte_mempool_generic_get(struct rte_mempool *mp, void **obj_table, + unsigned n, struct rte_mempool_cache *cache, int is_mc) { int ret; - ret = __mempool_get_bulk(mp, obj_table, n, 0); + ret = __mempool_generic_get(mp, obj_table, n, cache, is_mc); if (ret == 0) __mempool_check_cookies(mp, obj_table, n, 1); return ret; @@ -1086,56 +1072,11 @@ rte_mempool_sc_get_bulk(struct rte_mempool *mp, void **obj_table, unsigned n) static inline int __attribute__((always_inline)) rte_mempool_get_bulk(struct rte_mempool *mp, void **obj_table, unsigned n) { - int ret; - ret = __mempool_get_bulk(mp, obj_table, n, - !(mp->flags & MEMPOOL_F_SC_GET)); - if (ret == 0) - __mempool_check_cookies(mp, obj_table, n, 1); - return ret; -} - -/** - * Get one object from the mempool (multi-consumers safe). - * - * If cache is enabled, objects will be retrieved first from cache, - * subsequently from the common pool. Note that it can return -ENOENT when - * the local cache and common pool are empty, even if cache from other - * lcores are full. - * - * @param mp - * A pointer to the mempool structure. - * @param obj_p - * A pointer to a void * pointer (object) that will be filled. - * @return - * - 0: Success; objects taken. - * - -ENOENT: Not enough entries in the mempool; no object is retrieved. - */ -static inline int __attribute__((always_inline)) -rte_mempool_mc_get(struct rte_mempool *mp, void **obj_p) -{ - return rte_mempool_mc_get_bulk(mp, obj_p, 1); -} + struct rte_mempool_cache *cache; -/** - * Get one object from the mempool (NOT multi-consumers safe). - * - * If cache is enabled, objects will be retrieved first from cache, - * subsequently from the common pool. Note that it can return -ENOENT when - * the local cache and common pool are empty, even if cache from other - * lcores are full. - * - * @param mp - * A pointer to the mempool structure. - * @param obj_p - * A pointer to a void * pointer (object) that will be filled. - * @return - * - 0: Success; objects taken. - * - -ENOENT: Not enough entries in the mempool; no object is retrieved. - */ -static inline int __attribute__((always_inline)) -rte_mempool_sc_get(struct rte_mempool *mp, void **obj_p) -{ - return rte_mempool_sc_get_bulk(mp, obj_p, 1); + cache = rte_mempool_default_cache(mp, rte_lcore_id()); + return rte_mempool_generic_get(mp, obj_table, n, cache, + !(mp->flags & MEMPOOL_F_SC_GET)); } /** @@ -1169,7 +1110,7 @@ rte_mempool_get(struct rte_mempool *mp, void **obj_p) * * When cache is enabled, this function has to browse the length of * all lcores, so it should not be used in a data path, but only for - * debug purposes. + * debug purposes. User-owned mempool caches are not accounted for. * * @param mp * A pointer to the mempool structure. @@ -1188,7 +1129,7 @@ unsigned rte_mempool_count(const struct rte_mempool *mp); * * When cache is enabled, this function has to browse the length of * all lcores, so it should not be used in a data path, but only for - * debug purposes. + * debug purposes. User-owned mempool caches are not accounted for. * * @param mp * A pointer to the mempool structure. @@ -1206,7 +1147,7 @@ rte_mempool_free_count(const struct rte_mempool *mp) * * When cache is enabled, this function has to browse the length of all * lcores, so it should not be used in a data path, but only for debug - * purposes. + * purposes. User-owned mempool caches are not accounted for. * * @param mp * A pointer to the mempool structure. @@ -1225,7 +1166,7 @@ rte_mempool_full(const struct rte_mempool *mp) * * When cache is enabled, this function has to browse the length of all * lcores, so it should not be used in a data path, but only for debug - * purposes. + * purposes. User-owned mempool caches are not accounted for. * * @param mp * A pointer to the mempool structure. -- 1.9.1