From: Gage Eads <gage.eads@intel.com>
To: dev@dpdk.org
Cc: olivier.matz@6wind.com, arybchenko@solarflare.com,
bruce.richardson@intel.com, konstantin.ananyev@intel.com,
stephen@networkplumber.org, jerinj@marvell.com,
mczekaj@marvell.com, nd@arm.com, Ola.Liljedahl@arm.com
Subject: [dpdk-dev] [PATCH v6 3/6] ring: add a lock-free implementation
Date: Wed, 6 Mar 2019 09:03:39 -0600 [thread overview]
Message-ID: <20190306150342.2894-4-gage.eads@intel.com> (raw)
In-Reply-To: <20190306150342.2894-1-gage.eads@intel.com>
This commit adds support for lock-free circular ring enqueue and dequeue
functions. The ring is supported on 32- and 64-bit architectures, however
it uses a 128-bit compare-and-swap instruction when run on a 64-bit
architecture, and thus is currently limited to x86_64.
The algorithm is based on Ola Liljedahl's lfring, modified to fit within
the rte ring API. With no contention, an enqueue of n pointers uses (1 + n)
CAS operations and a dequeue of n pointers uses 1. This algorithm has worse
average-case performance than the regular rte ring (particularly a
highly-contended ring with large bulk accesses), however:
- For applications with preemptible pthreads, the regular rte ring's
worst-case performance (i.e. one thread being preempted in the
update_tail() critical section) is much worse than the lock-free ring's.
- Software caching can mitigate the average case performance for ring-based
algorithms. For example, a lock-free ring based mempool (a likely use
case for this ring) with per-thread caching.
To avoid the ABA problem, each ring entry contains a modification counter.
On a 64-bit architecture, the chance of ABA occurring are effectively zero;
a 64-bit counter will take many years to wrap at current CPU frequencies.
On a 32-bit architectures, a lock-free ring must be at least 1024-entries
deep; assuming 100 cycles per ring entry access, this guarantees the ring's
modification counters will wrap on the order of days.
The lock-free ring is enabled via a new flag, RING_F_LF. Because the ring's
memsize is now a function of its flags (the lock-free ring requires 128b
for each entry), this commit adds a new argument ('flags') to
rte_ring_get_memsize(). An API deprecation notice will be sent in a
separate commit.
For ease-of-use, existing ring enqueue and dequeue functions work on both
regular and lock-free rings. This introduces an additional branch in the
datapath, but this should be a highly predictable branch.
ring_perf_autotest shows a negligible performance impact; it's hard to
distinguish a real difference versus system noise.
| ring_perf_autotest cycles with branch -
Test | ring_perf_autotest cycles without
------------------------------------------------------------------
SP/SC single enq/dequeue | 0.33
MP/MC single enq/dequeue | -4.00
SP/SC burst enq/dequeue (size 8) | 0.00
MP/MC burst enq/dequeue (size 8) | 0.00
SP/SC burst enq/dequeue (size 32) | 0.00
MP/MC burst enq/dequeue (size 32) | 0.00
SC empty dequeue | 1.00
MC empty dequeue | 0.00
Single lcore:
SP/SC bulk enq/dequeue (size 8) | 0.49
MP/MC bulk enq/dequeue (size 8) | 0.08
SP/SC bulk enq/dequeue (size 32) | 0.07
MP/MC bulk enq/dequeue (size 32) | 0.09
Two physical cores:
SP/SC bulk enq/dequeue (size 8) | 0.19
MP/MC bulk enq/dequeue (size 8) | -0.37
SP/SC bulk enq/dequeue (size 32) | 0.09
MP/MC bulk enq/dequeue (size 32) | -0.05
Two NUMA nodes:
SP/SC bulk enq/dequeue (size 8) | -1.96
MP/MC bulk enq/dequeue (size 8) | 0.88
SP/SC bulk enq/dequeue (size 32) | 0.10
MP/MC bulk enq/dequeue (size 32) | 0.46
Test setup: x86_64 build with default config, dual-socket Xeon E5-2699 v4,
running on isolcpus cores with a tickless scheduler. Each test run three
times and the results averaged.
Signed-off-by: Gage Eads <gage.eads@intel.com>
---
lib/librte_ring/rte_ring.c | 92 +++++++--
lib/librte_ring/rte_ring.h | 308 ++++++++++++++++++++++++++---
lib/librte_ring/rte_ring_c11_mem.h | 366 ++++++++++++++++++++++++++++++++++-
lib/librte_ring/rte_ring_generic.h | 354 +++++++++++++++++++++++++++++++++
lib/librte_ring/rte_ring_version.map | 7 +
5 files changed, 1080 insertions(+), 47 deletions(-)
diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
index d215acecc..d4a176f57 100644
--- a/lib/librte_ring/rte_ring.c
+++ b/lib/librte_ring/rte_ring.c
@@ -45,9 +45,9 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
/* return the size of memory occupied by a ring */
ssize_t
-rte_ring_get_memsize(unsigned count)
+rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags)
{
- ssize_t sz;
+ ssize_t sz, elt_sz;
/* count must be a power of 2 */
if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
@@ -57,10 +57,23 @@ rte_ring_get_memsize(unsigned count)
return -EINVAL;
}
- sz = sizeof(struct rte_ring) + count * sizeof(void *);
+ elt_sz = (flags & RING_F_LF) ? 2 * sizeof(void *) : sizeof(void *);
+
+ sz = sizeof(struct rte_ring) + count * elt_sz;
sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
return sz;
}
+BIND_DEFAULT_SYMBOL(rte_ring_get_memsize, _v1905, 19.05);
+MAP_STATIC_SYMBOL(ssize_t rte_ring_get_memsize(unsigned int count,
+ unsigned int flags),
+ rte_ring_get_memsize_v1905);
+
+ssize_t
+rte_ring_get_memsize_v20(unsigned int count)
+{
+ return rte_ring_get_memsize_v1905(count, 0);
+}
+VERSION_SYMBOL(rte_ring_get_memsize, _v20, 2.0);
int
rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
@@ -75,6 +88,8 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
RTE_CACHE_LINE_MASK) != 0);
RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) &
RTE_CACHE_LINE_MASK) != 0);
+ RTE_BUILD_BUG_ON(sizeof(struct rte_ring_lf_entry) !=
+ 2 * sizeof(void *));
/* init the ring structure */
memset(r, 0, sizeof(*r));
@@ -82,8 +97,6 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
if (ret < 0 || ret >= (int)sizeof(r->name))
return -ENAMETOOLONG;
r->flags = flags;
- r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
- r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
if (flags & RING_F_EXACT_SZ) {
r->size = rte_align32pow2(count + 1);
@@ -100,12 +113,46 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
r->mask = count - 1;
r->capacity = r->mask;
}
- r->prod.head = r->cons.head = 0;
- r->prod.tail = r->cons.tail = 0;
+
+ r->log2_size = rte_log2_u64(r->size);
+
+ if (flags & RING_F_LF) {
+ uint32_t i;
+
+ r->prod_ptr.single =
+ (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
+ r->cons_ptr.single =
+ (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
+ r->prod_ptr.head = r->cons_ptr.head = 0;
+ r->prod_ptr.tail = r->cons_ptr.tail = 0;
+
+ for (i = 0; i < r->size; i++) {
+ struct rte_ring_lf_entry *ring_ptr, *base;
+
+ base = (struct rte_ring_lf_entry *)&r->ring;
+
+ ring_ptr = &base[i & r->mask];
+
+ ring_ptr->cnt = 0;
+ }
+ } else {
+ r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
+ r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
+ r->prod.head = r->cons.head = 0;
+ r->prod.tail = r->cons.tail = 0;
+ }
return 0;
}
+/* If a ring entry is written on average every M cycles, then a ring entry is
+ * reused every M*count cycles, and a ring entry's counter repeats every
+ * M*count*2^32 cycles. If M=100 on a 2GHz system, then a 1024-entry ring's
+ * counters would repeat every 2.37 days. The likelihood of ABA occurring is
+ * considered sufficiently low for 1024-entry and larger rings.
+ */
+#define MIN_32_BIT_LF_RING_SIZE 1024
+
/* create the ring */
struct rte_ring *
rte_ring_create(const char *name, unsigned count, int socket_id,
@@ -123,11 +170,25 @@ rte_ring_create(const char *name, unsigned count, int socket_id,
ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
+#ifdef RTE_ARCH_64
+#if !defined(RTE_ARCH_X86_64)
+ printf("This platform does not support the atomic operation required for RING_F_LF\n");
+ rte_errno = EINVAL;
+ return NULL;
+#endif
+#else
+ if ((flags & RING_F_LF) && count < MIN_32_BIT_LF_RING_SIZE) {
+ printf("RING_F_LF is only supported on 32-bit platforms for rings with at least 1024 entries.\n");
+ rte_errno = EINVAL;
+ return NULL;
+ }
+#endif
+
/* for an exact size ring, round up from count to a power of two */
if (flags & RING_F_EXACT_SZ)
count = rte_align32pow2(count + 1);
- ring_size = rte_ring_get_memsize(count);
+ ring_size = rte_ring_get_memsize(count, flags);
if (ring_size < 0) {
rte_errno = ring_size;
return NULL;
@@ -227,10 +288,17 @@ rte_ring_dump(FILE *f, const struct rte_ring *r)
fprintf(f, " flags=%x\n", r->flags);
fprintf(f, " size=%"PRIu32"\n", r->size);
fprintf(f, " capacity=%"PRIu32"\n", r->capacity);
- fprintf(f, " ct=%"PRIu32"\n", r->cons.tail);
- fprintf(f, " ch=%"PRIu32"\n", r->cons.head);
- fprintf(f, " pt=%"PRIu32"\n", r->prod.tail);
- fprintf(f, " ph=%"PRIu32"\n", r->prod.head);
+ if (r->flags & RING_F_LF) {
+ fprintf(f, " ct=%"PRIuPTR"\n", r->cons_ptr.tail);
+ fprintf(f, " ch=%"PRIuPTR"\n", r->cons_ptr.head);
+ fprintf(f, " pt=%"PRIuPTR"\n", r->prod_ptr.tail);
+ fprintf(f, " ph=%"PRIuPTR"\n", r->prod_ptr.head);
+ } else {
+ fprintf(f, " ct=%"PRIu32"\n", r->cons.tail);
+ fprintf(f, " ch=%"PRIu32"\n", r->cons.head);
+ fprintf(f, " pt=%"PRIu32"\n", r->prod.tail);
+ fprintf(f, " ph=%"PRIu32"\n", r->prod.head);
+ }
fprintf(f, " used=%u\n", rte_ring_count(r));
fprintf(f, " avail=%u\n", rte_ring_free_count(r));
}
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index f16d77b8a..200d7b2a0 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -20,7 +20,7 @@
*
* - FIFO (First In First Out)
* - Maximum size is fixed; the pointers are stored in a table.
- * - Lockless implementation.
+ * - Lockless (and optionally, non-blocking/lock-free) implementation.
* - Multi- or single-consumer dequeue.
* - Multi- or single-producer enqueue.
* - Bulk dequeue.
@@ -98,6 +98,7 @@ struct rte_ring {
const struct rte_memzone *memzone;
/**< Memzone, if any, containing the rte_ring */
uint32_t size; /**< Size of ring. */
+ uint32_t log2_size; /**< log2(size of ring) */
uint32_t mask; /**< Mask (size-1) of ring. */
uint32_t capacity; /**< Usable size of ring */
@@ -133,6 +134,18 @@ struct rte_ring {
*/
#define RING_F_EXACT_SZ 0x0004
#define RTE_RING_SZ_MASK (0x7fffffffU) /**< Ring size mask */
+/**
+ * The ring uses lock-free enqueue and dequeue functions. These functions
+ * do not have the "non-preemptive" constraint of a regular rte ring, and thus
+ * are suited for applications using preemptible pthreads. However, the
+ * lock-free functions have worse average-case performance than their regular
+ * rte ring counterparts. When used as the handler for a mempool, per-thread
+ * caching can mitigate the performance difference by reducing the number (and
+ * contention) of ring accesses.
+ *
+ * This flag is only supported on 32-bit and x86_64 platforms.
+ */
+#define RING_F_LF 0x0008
/* @internal defines for passing to the enqueue dequeue worker functions */
#define __IS_SP 1
@@ -150,11 +163,15 @@ struct rte_ring {
*
* @param count
* The number of elements in the ring (must be a power of 2).
+ * @param flags
+ * The flags the ring will be created with.
* @return
* - The memory size needed for the ring on success.
* - -EINVAL if count is not a power of 2.
*/
-ssize_t rte_ring_get_memsize(unsigned count);
+ssize_t rte_ring_get_memsize(unsigned int count, unsigned int flags);
+ssize_t rte_ring_get_memsize_v20(unsigned int count);
+ssize_t rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags);
/**
* Initialize a ring structure.
@@ -187,6 +204,10 @@ ssize_t rte_ring_get_memsize(unsigned count);
* - RING_F_SC_DEQ: If this flag is set, the default behavior when
* using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
* is "single-consumer". Otherwise, it is "multi-consumers".
+ * - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2
+ * number, but up to half the ring space may be wasted.
+ * - RING_F_LF: If this flag is set, the ring uses lock-free variants of the
+ * dequeue and enqueue functions.
* @return
* 0 on success, or a negative value on error.
*/
@@ -222,12 +243,17 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
* - RING_F_SC_DEQ: If this flag is set, the default behavior when
* using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
* is "single-consumer". Otherwise, it is "multi-consumers".
+ * - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2
+ * number, but up to half the ring space may be wasted.
+ * - RING_F_LF: If this flag is set, the ring uses lock-free variants of the
+ * dequeue and enqueue functions.
* @return
* On success, the pointer to the new allocated ring. NULL on error with
* rte_errno set appropriately. Possible errno values include:
* - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
* - E_RTE_SECONDARY - function was called from a secondary process instance
- * - EINVAL - count provided is not a power of 2
+ * - EINVAL - count provided is not a power of 2, or RING_F_LF is used on an
+ * unsupported platform
* - ENOSPC - the maximum number of memzones has already been allocated
* - EEXIST - a memzone with the same name already exists
* - ENOMEM - no appropriate memory area found in which to create memzone
@@ -283,6 +309,50 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
} \
} while (0)
+/* The actual enqueue of pointers on the lock-free ring, used by the
+ * single-producer lock-free enqueue function.
+ */
+#define ENQUEUE_PTRS_LF(r, base, prod_head, obj_table, n) do { \
+ unsigned int i; \
+ const uint32_t size = (r)->size; \
+ size_t idx = prod_head & (r)->mask; \
+ size_t new_cnt = prod_head + size; \
+ struct rte_ring_lf_entry *ring = (struct rte_ring_lf_entry *)base; \
+ unsigned int mask = ~0x3; \
+ if (likely(idx + n < size)) { \
+ for (i = 0; i < (n & mask); i += 4, idx += 4) { \
+ ring[idx].ptr = obj_table[i]; \
+ ring[idx].cnt = (new_cnt + i) >> r->log2_size; \
+ ring[idx + 1].ptr = obj_table[i + 1]; \
+ ring[idx + 1].cnt = (new_cnt + i + 1) >> r->log2_size; \
+ ring[idx + 2].ptr = obj_table[i + 2]; \
+ ring[idx + 2].cnt = (new_cnt + i + 2) >> r->log2_size; \
+ ring[idx + 3].ptr = obj_table[i + 3]; \
+ ring[idx + 3].cnt = (new_cnt + i + 3) >> r->log2_size; \
+ } \
+ switch (n & 0x3) { \
+ case 3: \
+ ring[idx].cnt = (new_cnt + i) >> r->log2_size; \
+ ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \
+ case 2: \
+ ring[idx].cnt = (new_cnt + i) >> r->log2_size; \
+ ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \
+ case 1: \
+ ring[idx].cnt = (new_cnt + i) >> r->log2_size; \
+ ring[idx++].ptr = obj_table[i++]; \
+ } \
+ } else { \
+ for (i = 0; idx < size; i++, idx++) { \
+ ring[idx].cnt = (new_cnt + i) >> r->log2_size; \
+ ring[idx].ptr = obj_table[i]; \
+ } \
+ for (idx = 0; i < n; i++, idx++) { \
+ ring[idx].cnt = (new_cnt + i) >> r->log2_size; \
+ ring[idx].ptr = obj_table[i]; \
+ } \
+ } \
+} while (0)
+
/* the actual copy of pointers on the ring to obj_table.
* Placed here since identical code needed in both
* single and multi consumer dequeue functions */
@@ -314,6 +384,43 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
} \
} while (0)
+/* The actual copy of pointers on the lock-free ring to obj_table. */
+#define DEQUEUE_PTRS_LF(r, base, cons_head, obj_table, n) do { \
+ unsigned int i; \
+ size_t idx = cons_head & (r)->mask; \
+ const uint32_t size = (r)->size; \
+ struct rte_ring_lf_entry *ring = (struct rte_ring_lf_entry *)base; \
+ unsigned int mask = ~0x3; \
+ if (likely(idx + n < size)) { \
+ for (i = 0; i < (n & mask); i += 4, idx += 4) {\
+ obj_table[i] = ring[idx].ptr; \
+ obj_table[i + 1] = ring[idx + 1].ptr; \
+ obj_table[i + 2] = ring[idx + 2].ptr; \
+ obj_table[i + 3] = ring[idx + 3].ptr; \
+ } \
+ switch (n & 0x3) { \
+ case 3: \
+ obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \
+ case 2: \
+ obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \
+ case 1: \
+ obj_table[i++] = ring[idx++].ptr; \
+ } \
+ } else { \
+ for (i = 0; idx < size; i++, idx++) \
+ obj_table[i] = ring[idx].ptr; \
+ for (idx = 0; i < n; i++, idx++) \
+ obj_table[i] = ring[idx].ptr; \
+ } \
+} while (0)
+
+
+/* @internal 128-bit structure used by the lock-free ring */
+struct rte_ring_lf_entry {
+ void *ptr; /**< Data pointer */
+ uintptr_t cnt; /**< Modification counter */
+};
+
/* Between load and load. there might be cpu reorder in weak model
* (powerpc/arm).
* There are 2 choices for the users
@@ -330,6 +437,70 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
#endif
/**
+ * @internal Enqueue several objects on the lock-free ring
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring
+ * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param is_sp
+ * Indicates whether to use single producer or multi-producer head update
+ * @param free_space
+ * returns the amount of space after the enqueue operation has finished
+ * @return
+ * Actual number of objects enqueued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_enqueue(struct rte_ring *r, void * const *obj_table,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ unsigned int is_sp, unsigned int *free_space)
+{
+ if (is_sp)
+ return __rte_ring_do_lf_enqueue_sp(r, obj_table, n,
+ behavior, free_space);
+ else
+ return __rte_ring_do_lf_enqueue_mp(r, obj_table, n,
+ behavior, free_space);
+}
+
+/**
+ * @internal Dequeue several objects from the lock-free ring
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to pull from the ring.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring
+ * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ * returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ * - Actual number of objects dequeued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_dequeue(struct rte_ring *r, void **obj_table,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ unsigned int is_sc, unsigned int *available)
+{
+ if (is_sc)
+ return __rte_ring_do_lf_dequeue_sc(r, obj_table, n,
+ behavior, available);
+ else
+ return __rte_ring_do_lf_dequeue_mc(r, obj_table, n,
+ behavior, available);
+}
+
+/**
* @internal Enqueue several objects on the ring
*
* @param r
@@ -436,8 +607,14 @@ static __rte_always_inline unsigned int
rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
unsigned int n, unsigned int *free_space)
{
- return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- __IS_MP, free_space);
+ if (r->flags & RING_F_LF)
+ return __rte_ring_do_lf_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED, __IS_MP,
+ free_space);
+ else
+ return __rte_ring_do_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED, __IS_MP,
+ free_space);
}
/**
@@ -459,8 +636,14 @@ static __rte_always_inline unsigned int
rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
unsigned int n, unsigned int *free_space)
{
- return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- __IS_SP, free_space);
+ if (r->flags & RING_F_LF)
+ return __rte_ring_do_lf_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED, __IS_SP,
+ free_space);
+ else
+ return __rte_ring_do_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED, __IS_SP,
+ free_space);
}
/**
@@ -486,8 +669,14 @@ static __rte_always_inline unsigned int
rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
unsigned int n, unsigned int *free_space)
{
- return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- r->prod.single, free_space);
+ if (r->flags & RING_F_LF)
+ return __rte_ring_do_lf_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED,
+ r->prod_ptr.single, free_space);
+ else
+ return __rte_ring_do_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED,
+ r->prod.single, free_space);
}
/**
@@ -570,8 +759,14 @@ static __rte_always_inline unsigned int
rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *available)
{
- return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- __IS_MC, available);
+ if (r->flags & RING_F_LF)
+ return __rte_ring_do_lf_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED, __IS_MC,
+ available);
+ else
+ return __rte_ring_do_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED, __IS_MC,
+ available);
}
/**
@@ -594,8 +789,14 @@ static __rte_always_inline unsigned int
rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *available)
{
- return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- __IS_SC, available);
+ if (r->flags & RING_F_LF)
+ return __rte_ring_do_lf_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED, __IS_SC,
+ available);
+ else
+ return __rte_ring_do_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED, __IS_SC,
+ available);
}
/**
@@ -621,8 +822,14 @@ static __rte_always_inline unsigned int
rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
unsigned int *available)
{
- return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- r->cons.single, available);
+ if (r->flags & RING_F_LF)
+ return __rte_ring_do_lf_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED,
+ r->cons_ptr.single, available);
+ else
+ return __rte_ring_do_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED,
+ r->cons.single, available);
}
/**
@@ -697,9 +904,13 @@ rte_ring_dequeue(struct rte_ring *r, void **obj_p)
static inline unsigned
rte_ring_count(const struct rte_ring *r)
{
- uint32_t prod_tail = r->prod.tail;
- uint32_t cons_tail = r->cons.tail;
- uint32_t count = (prod_tail - cons_tail) & r->mask;
+ uint32_t count;
+
+ if (r->flags & RING_F_LF)
+ count = (r->prod_ptr.tail - r->cons_ptr.tail) & r->mask;
+ else
+ count = (r->prod.tail - r->cons.tail) & r->mask;
+
return (count > r->capacity) ? r->capacity : count;
}
@@ -819,8 +1030,14 @@ static __rte_always_inline unsigned
rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
unsigned int n, unsigned int *free_space)
{
- return __rte_ring_do_enqueue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
+ if (r->flags & RING_F_LF)
+ return __rte_ring_do_lf_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ __IS_MP, free_space);
+ else
+ return __rte_ring_do_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ __IS_MP, free_space);
}
/**
@@ -842,8 +1059,14 @@ static __rte_always_inline unsigned
rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
unsigned int n, unsigned int *free_space)
{
- return __rte_ring_do_enqueue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
+ if (r->flags & RING_F_LF)
+ return __rte_ring_do_lf_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ __IS_SP, free_space);
+ else
+ return __rte_ring_do_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ __IS_SP, free_space);
}
/**
@@ -869,8 +1092,14 @@ static __rte_always_inline unsigned
rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
unsigned int n, unsigned int *free_space)
{
- return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE,
- r->prod.single, free_space);
+ if (r->flags & RING_F_LF)
+ return __rte_ring_do_lf_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ r->prod_ptr.single, free_space);
+ else
+ return __rte_ring_do_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ r->prod.single, free_space);
}
/**
@@ -897,8 +1126,14 @@ static __rte_always_inline unsigned
rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *available)
{
- return __rte_ring_do_dequeue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
+ if (r->flags & RING_F_LF)
+ return __rte_ring_do_lf_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ __IS_MC, available);
+ else
+ return __rte_ring_do_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ __IS_MC, available);
}
/**
@@ -922,8 +1157,14 @@ static __rte_always_inline unsigned
rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *available)
{
- return __rte_ring_do_dequeue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
+ if (r->flags & RING_F_LF)
+ return __rte_ring_do_lf_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ __IS_SC, available);
+ else
+ return __rte_ring_do_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ __IS_SC, available);
}
/**
@@ -949,9 +1190,14 @@ static __rte_always_inline unsigned
rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *available)
{
- return __rte_ring_do_dequeue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE,
- r->cons.single, available);
+ if (r->flags & RING_F_LF)
+ return __rte_ring_do_lf_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ r->cons_ptr.single, available);
+ else
+ return __rte_ring_do_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ r->cons.single, available);
}
#ifdef __cplusplus
diff --git a/lib/librte_ring/rte_ring_c11_mem.h b/lib/librte_ring/rte_ring_c11_mem.h
index 545caf257..a672d161e 100644
--- a/lib/librte_ring/rte_ring_c11_mem.h
+++ b/lib/librte_ring/rte_ring_c11_mem.h
@@ -221,8 +221,8 @@ __rte_ring_move_prod_head_ptr(struct rte_ring *r, unsigned int is_sp,
/* Ensure the head is read before tail */
__atomic_thread_fence(__ATOMIC_ACQUIRE);
- /* load-acquire synchronize with store-release of ht->tail
- * in update_tail.
+ /* load-acquire synchronize with store-release of tail in
+ * __rte_ring_do_lf_dequeue_{sc, mc}.
*/
cons_tail = __atomic_load_n(&r->cons_ptr.tail,
__ATOMIC_ACQUIRE);
@@ -247,6 +247,7 @@ __rte_ring_move_prod_head_ptr(struct rte_ring *r, unsigned int is_sp,
0, __ATOMIC_RELAXED,
__ATOMIC_RELAXED);
} while (unlikely(success == 0));
+
return n;
}
@@ -293,8 +294,8 @@ __rte_ring_move_cons_head_ptr(struct rte_ring *r, unsigned int is_sc,
/* Ensure the head is read before tail */
__atomic_thread_fence(__ATOMIC_ACQUIRE);
- /* this load-acquire synchronize with store-release of ht->tail
- * in update_tail.
+ /* load-acquire synchronize with store-release of tail in
+ * __rte_ring_do_lf_enqueue_{sp, mp}.
*/
prod_tail = __atomic_load_n(&r->prod_ptr.tail,
__ATOMIC_ACQUIRE);
@@ -318,6 +319,363 @@ __rte_ring_move_cons_head_ptr(struct rte_ring *r, unsigned int is_sc,
0, __ATOMIC_RELAXED,
__ATOMIC_RELAXED);
} while (unlikely(success == 0));
+
+ return n;
+}
+
+/**
+ * @internal
+ * Enqueue several objects on the lock-free ring (single-producer only)
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring
+ * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param free_space
+ * returns the amount of space after the enqueue operation has finished
+ * @return
+ * Actual number of objects enqueued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_enqueue_sp(struct rte_ring *r, void * const *obj_table,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ unsigned int *free_space)
+{
+ uint32_t free_entries;
+ uintptr_t head, next;
+
+ n = __rte_ring_move_prod_head_ptr(r, 1, n, behavior,
+ &head, &next, &free_entries);
+ if (n == 0)
+ goto end;
+
+ ENQUEUE_PTRS_LF(r, &r->ring, head, obj_table, n);
+
+ __atomic_store_n(&r->prod_ptr.tail,
+ r->prod_ptr.tail + n,
+ __ATOMIC_RELEASE);
+end:
+ if (free_space != NULL)
+ *free_space = free_entries - n;
+ return n;
+}
+
+/* This macro defines the number of times an enqueueing thread can fail to find
+ * a free ring slot before reloading its producer tail index.
+ */
+#define ENQ_RETRY_LIMIT 32
+
+/**
+ * @internal
+ * Get the next producer tail index.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param idx
+ * The local tail index
+ * @return
+ * If the ring's tail is ahead of the local tail, return the shared tail.
+ * Else, return tail + 1.
+ */
+static __rte_always_inline uintptr_t
+__rte_ring_reload_tail(struct rte_ring *r, uintptr_t idx)
+{
+ uintptr_t fresh = __atomic_load_n(&r->prod_ptr.tail, __ATOMIC_RELAXED);
+
+ if ((intptr_t)(idx - fresh) < 0)
+ idx = fresh; /* fresh is after idx, use it instead */
+ else
+ idx++; /* Continue with next slot */
+
+ return idx;
+}
+
+/**
+ * @internal
+ * Update the ring's producer tail index. If another thread already updated
+ * the index beyond the caller's tail value, do nothing.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param idx
+ * The local tail index
+ * @return
+ * If the shared tail is ahead of the local tail, return the shared tail.
+ * Else, return tail + 1.
+ */
+static __rte_always_inline uintptr_t
+__rte_ring_lf_update_tail(struct rte_ring *r, uintptr_t val)
+{
+ volatile uintptr_t *loc = &r->prod_ptr.tail;
+ uintptr_t old = __atomic_load_n(loc, __ATOMIC_RELAXED);
+
+ do {
+ /* Check if the tail has already been updated. */
+ if ((intptr_t)(val - old) < 0)
+ return old;
+
+ /* Else val >= old, need to update *loc */
+ } while (!__atomic_compare_exchange_n(loc, &old, val,
+ 1, __ATOMIC_RELEASE,
+ __ATOMIC_RELAXED));
+
+ return val;
+}
+
+/**
+ * @internal
+ * Enqueue several objects on the lock-free ring (multi-producer safe)
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring
+ * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param free_space
+ * returns the amount of space after the enqueue operation has finished
+ * @return
+ * Actual number of objects enqueued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_enqueue_mp(struct rte_ring *r, void * const *obj_table,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ unsigned int *free_space)
+{
+#if !defined(ALLOW_EXPERIMENTAL_API)
+ RTE_SET_USED(r);
+ RTE_SET_USED(obj_table);
+ RTE_SET_USED(n);
+ RTE_SET_USED(behavior);
+ RTE_SET_USED(free_space);
+ printf("[%s()] RING_F_LF requires an experimental API."
+ " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n"
+ , __func__);
+ return 0;
+#else
+ struct rte_ring_lf_entry *base;
+ uintptr_t head, next, tail;
+ unsigned int i;
+ uint32_t avail;
+
+ /* Atomically update the prod head to reserve n slots. The prod tail
+ * is modified at the end of the function.
+ */
+ n = __rte_ring_move_prod_head_ptr(r, 0, n, behavior,
+ &head, &next, &avail);
+
+ tail = __atomic_load_n(&r->prod_ptr.tail, __ATOMIC_RELAXED);
+ head = __atomic_load_n(&r->cons_ptr.tail, __ATOMIC_ACQUIRE);
+
+ if (unlikely(n == 0))
+ goto end;
+
+ base = (struct rte_ring_lf_entry *)&r->ring;
+
+ for (i = 0; i < n; i++) {
+ unsigned int retries = 0;
+ int success = 0;
+
+ /* Enqueue to the tail entry. If another thread wins the race,
+ * retry with the new tail.
+ */
+ do {
+ struct rte_ring_lf_entry old_value, new_value;
+ struct rte_ring_lf_entry *ring_ptr;
+
+ ring_ptr = &base[tail & r->mask];
+
+ old_value = *ring_ptr;
+
+ if (old_value.cnt != (tail >> r->log2_size)) {
+ /* This slot has already been used. Depending
+ * on how far behind this thread is, either go
+ * to the next slot or reload the tail.
+ */
+ uintptr_t prev_tail;
+
+ prev_tail = (tail + r->size) >> r->log2_size;
+
+ if (old_value.cnt != prev_tail ||
+ ++retries == ENQ_RETRY_LIMIT) {
+ /* This thread either fell 2+ laps
+ * behind or hit the retry limit, so
+ * reload the tail index.
+ */
+ tail = __rte_ring_reload_tail(r, tail);
+ retries = 0;
+ } else {
+ /* Slot already used, try the next. */
+ tail++;
+
+ }
+
+ continue;
+ }
+
+ /* Found a free slot, try to enqueue next element. */
+ new_value.ptr = obj_table[i];
+ new_value.cnt = (tail + r->size) >> r->log2_size;
+
+#ifdef RTE_ARCH_64
+ success = rte_atomic128_cmp_exchange(
+ (rte_int128_t *)ring_ptr,
+ (rte_int128_t *)&old_value,
+ (rte_int128_t *)&new_value,
+ 1, __ATOMIC_RELEASE,
+ __ATOMIC_RELAXED);
+#else
+ success = __atomic_compare_exchange(
+ (uint64_t *)ring_ptr,
+ &old_value,
+ &new_value,
+ 1, __ATOMIC_RELEASE,
+ __ATOMIC_RELAXED);
+#endif
+ } while (success == 0);
+
+ /* Only increment tail if the CAS succeeds, since it can
+ * spuriously fail on some architectures.
+ */
+ tail++;
+ }
+
+end:
+ tail = __rte_ring_lf_update_tail(r, tail);
+
+ if (free_space != NULL)
+ *free_space = avail - n;
+ return n;
+#endif
+}
+
+/**
+ * @internal
+ * Dequeue several objects from the lock-free ring (single-consumer only)
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to pull from the ring.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring
+ * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ * returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ * - Actual number of objects dequeued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_dequeue_sc(struct rte_ring *r, void **obj_table,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ unsigned int *available)
+{
+ uintptr_t cons_tail, prod_tail, avail;
+
+ cons_tail = __atomic_load_n(&r->cons_ptr.tail, __ATOMIC_RELAXED);
+ prod_tail = __atomic_load_n(&r->prod_ptr.tail, __ATOMIC_ACQUIRE);
+
+ avail = prod_tail - cons_tail;
+
+ /* Set the actual entries for dequeue */
+ if (unlikely(avail < n))
+ n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : avail;
+
+ if (unlikely(n == 0))
+ goto end;
+
+ DEQUEUE_PTRS_LF(r, &r->ring, cons_tail, obj_table, n);
+
+ /* Use a read barrier and store-relaxed so we don't unnecessarily order
+ * writes.
+ */
+ rte_smp_rmb();
+
+ __atomic_store_n(&r->cons_ptr.tail, cons_tail + n, __ATOMIC_RELAXED);
+end:
+ if (available != NULL)
+ *available = avail - n;
+
+ return n;
+}
+
+/**
+ * @internal
+ * Dequeue several objects from the lock-free ring (multi-consumer safe)
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to pull from the ring.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring
+ * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ * returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ * - Actual number of objects dequeued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_dequeue_mc(struct rte_ring *r, void **obj_table,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ unsigned int *available)
+{
+ uintptr_t cons_tail, prod_tail, avail;
+
+ cons_tail = __atomic_load_n(&r->cons_ptr.tail, __ATOMIC_RELAXED);
+
+ do {
+ /* Load tail on every iteration to avoid spurious queue empty
+ * situations.
+ */
+ prod_tail = __atomic_load_n(&r->prod_ptr.tail,
+ __ATOMIC_ACQUIRE);
+
+ avail = prod_tail - cons_tail;
+
+ /* Set the actual entries for dequeue */
+ if (unlikely(avail < n))
+ n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : avail;
+
+ if (unlikely(n == 0))
+ goto end;
+
+ DEQUEUE_PTRS_LF(r, &r->ring, cons_tail, obj_table, n);
+
+ /* Use a read barrier and store-relaxed so we don't
+ * unnecessarily order writes.
+ */
+ rte_smp_rmb();
+
+ } while (!__atomic_compare_exchange_n(&r->cons_ptr.tail,
+ &cons_tail, cons_tail + n,
+ 0, __ATOMIC_RELAXED,
+ __ATOMIC_RELAXED));
+
+end:
+ if (available != NULL)
+ *available = avail - n;
+
return n;
}
diff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_ring_generic.h
index 6a0e1bbfb..944b353f4 100644
--- a/lib/librte_ring/rte_ring_generic.h
+++ b/lib/librte_ring/rte_ring_generic.h
@@ -297,4 +297,358 @@ __rte_ring_move_cons_head_ptr(struct rte_ring *r, unsigned int is_sc,
return n;
}
+/**
+ * @internal
+ * Enqueue several objects on the lock-free ring (single-producer only)
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring
+ * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param free_space
+ * returns the amount of space after the enqueue operation has finished
+ * @return
+ * Actual number of objects enqueued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_enqueue_sp(struct rte_ring *r, void * const *obj_table,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ unsigned int *free_space)
+{
+ uint32_t free_entries;
+ uintptr_t head, next;
+
+ n = __rte_ring_move_prod_head_ptr(r, 1, n, behavior,
+ &head, &next, &free_entries);
+ if (n == 0)
+ goto end;
+
+ ENQUEUE_PTRS_LF(r, &r->ring, head, obj_table, n);
+
+ rte_smp_wmb();
+
+ r->prod_ptr.tail += n;
+end:
+ if (free_space != NULL)
+ *free_space = free_entries - n;
+ return n;
+}
+
+/* This macro defines the number of times an enqueueing thread can fail to find
+ * a free ring slot before reloading its producer tail index.
+ */
+#define ENQ_RETRY_LIMIT 32
+
+/**
+ * @internal
+ * Get the next producer tail index.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param idx
+ * The local tail index
+ * @return
+ * If the ring's tail is ahead of the local tail, return the shared tail.
+ * Else, return tail + 1.
+ */
+static __rte_always_inline uintptr_t
+__rte_ring_reload_tail(struct rte_ring *r, uintptr_t idx)
+{
+ uintptr_t fresh = r->prod_ptr.tail;
+
+ if ((intptr_t)(idx - fresh) < 0)
+ /* fresh is after idx, use it instead */
+ idx = fresh;
+ else
+ /* Continue with next slot */
+ idx++;
+
+ return idx;
+}
+
+/**
+ * @internal
+ * Update the ring's producer tail index. If another thread already updated
+ * the index beyond the caller's tail value, do nothing.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param idx
+ * The local tail index
+ * @return
+ * If the shared tail is ahead of the local tail, return the shared tail.
+ * Else, return tail + 1.
+ */
+static __rte_always_inline uintptr_t
+__rte_ring_lf_update_tail(struct rte_ring *r, uintptr_t val)
+{
+ volatile uintptr_t *loc = &r->prod_ptr.tail;
+ uintptr_t old = *loc;
+
+ do {
+ /* Check if the tail has already been updated. */
+ if ((intptr_t)(val - old) < 0)
+ return old;
+
+ /* Else val >= old, need to update *loc */
+ } while (!__sync_bool_compare_and_swap(loc, old, val));
+
+ return val;
+}
+
+/**
+ * @internal
+ * Enqueue several objects on the lock-free ring (multi-producer safe)
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring
+ * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param free_space
+ * returns the amount of space after the enqueue operation has finished
+ * @return
+ * Actual number of objects enqueued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_enqueue_mp(struct rte_ring *r, void * const *obj_table,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ unsigned int *free_space)
+{
+#if !defined(ALLOW_EXPERIMENTAL_API)
+ RTE_SET_USED(r);
+ RTE_SET_USED(obj_table);
+ RTE_SET_USED(n);
+ RTE_SET_USED(behavior);
+ RTE_SET_USED(free_space);
+ printf("[%s()] RING_F_LF requires an experimental API."
+ " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n"
+ , __func__);
+ return 0;
+#else
+ struct rte_ring_lf_entry *base;
+ uintptr_t head, next, tail;
+ unsigned int i;
+ uint32_t avail;
+
+ /* Atomically update the prod head to reserve n slots. The prod tail
+ * is modified at the end of the function.
+ */
+ n = __rte_ring_move_prod_head_ptr(r, 0, n, behavior,
+ &head, &next, &avail);
+
+ tail = r->prod_ptr.tail;
+
+ rte_smp_rmb();
+
+ head = r->cons_ptr.tail;
+
+ if (unlikely(n == 0))
+ goto end;
+
+ base = (struct rte_ring_lf_entry *)&r->ring;
+
+ for (i = 0; i < n; i++) {
+ unsigned int retries = 0;
+ int success = 0;
+
+ /* Enqueue to the tail entry. If another thread wins the race,
+ * retry with the new tail.
+ */
+ do {
+ struct rte_ring_lf_entry old_value, new_value;
+ struct rte_ring_lf_entry *ring_ptr;
+
+ ring_ptr = &base[tail & r->mask];
+
+ old_value = *ring_ptr;
+
+ if (old_value.cnt != (tail >> r->log2_size)) {
+ /* This slot has already been used. Depending
+ * on how far behind this thread is, either go
+ * to the next slot or reload the tail.
+ */
+ uintptr_t prev_tail;
+
+ prev_tail = (tail + r->size) >> r->log2_size;
+
+ if (old_value.cnt != prev_tail ||
+ ++retries == ENQ_RETRY_LIMIT) {
+ /* This thread either fell 2+ laps
+ * behind or hit the retry limit, so
+ * reload the tail index.
+ */
+ tail = __rte_ring_reload_tail(r, tail);
+ retries = 0;
+ } else {
+ /* Slot already used, try the next. */
+ tail++;
+
+ }
+
+ continue;
+ }
+
+ /* Found a free slot, try to enqueue next element. */
+ new_value.ptr = obj_table[i];
+ new_value.cnt = (tail + r->size) >> r->log2_size;
+
+#ifdef RTE_ARCH_64
+ success = rte_atomic128_cmp_exchange(
+ (rte_int128_t *)ring_ptr,
+ (rte_int128_t *)&old_value,
+ (rte_int128_t *)&new_value,
+ 1, __ATOMIC_RELEASE,
+ __ATOMIC_RELAXED);
+#else
+ uint64_t *old_ptr = (uint64_t *)&old_value;
+ uint64_t *new_ptr = (uint64_t *)&new_value;
+
+ success = rte_atomic64_cmpset(
+ (volatile uint64_t *)ring_ptr,
+ *old_ptr, *new_ptr);
+#endif
+ } while (success == 0);
+
+ /* Only increment tail if the CAS succeeds, since it can
+ * spuriously fail on some architectures.
+ */
+ tail++;
+ }
+
+end:
+
+ tail = __rte_ring_lf_update_tail(r, tail);
+
+ if (free_space != NULL)
+ *free_space = avail - n;
+ return n;
+#endif
+}
+
+/**
+ * @internal
+ * Dequeue several objects from the lock-free ring (single-consumer only)
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to pull from the ring.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring
+ * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ * returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ * - Actual number of objects dequeued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_dequeue_sc(struct rte_ring *r, void **obj_table,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ unsigned int *available)
+{
+ uintptr_t cons_tail, prod_tail, avail;
+
+ cons_tail = r->cons_ptr.tail;
+
+ rte_smp_rmb();
+
+ prod_tail = r->prod_ptr.tail;
+
+ avail = prod_tail - cons_tail;
+
+ /* Set the actual entries for dequeue */
+ if (unlikely(avail < n))
+ n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : avail;
+
+ if (unlikely(n == 0))
+ goto end;
+
+ DEQUEUE_PTRS_LF(r, &r->ring, cons_tail, obj_table, n);
+
+ rte_smp_rmb();
+
+ r->cons_ptr.tail += n;
+end:
+ if (available != NULL)
+ *available = avail - n;
+
+ return n;
+}
+
+/**
+ * @internal
+ * Dequeue several objects from the lock-free ring (multi-consumer safe)
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to pull from the ring.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring
+ * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ * returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ * - Actual number of objects dequeued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_dequeue_mc(struct rte_ring *r, void **obj_table,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ unsigned int *available)
+{
+ uintptr_t cons_tail, prod_tail, avail;
+
+ cons_tail = r->cons_ptr.tail;
+
+ do {
+ rte_smp_rmb();
+
+ /* Load tail on every iteration to avoid spurious queue empty
+ * situations.
+ */
+ prod_tail = r->prod_ptr.tail;
+
+ avail = prod_tail - cons_tail;
+
+ /* Set the actual entries for dequeue */
+ if (unlikely(avail < n))
+ n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : avail;
+
+ if (unlikely(n == 0))
+ goto end;
+
+ DEQUEUE_PTRS_LF(r, &r->ring, cons_tail, obj_table, n);
+
+ } while (!__sync_bool_compare_and_swap(&r->cons_ptr.tail,
+ cons_tail, cons_tail + n));
+
+end:
+ if (available != NULL)
+ *available = avail - n;
+
+ return n;
+}
+
#endif /* _RTE_RING_GENERIC_H_ */
diff --git a/lib/librte_ring/rte_ring_version.map b/lib/librte_ring/rte_ring_version.map
index d935efd0d..8969467af 100644
--- a/lib/librte_ring/rte_ring_version.map
+++ b/lib/librte_ring/rte_ring_version.map
@@ -17,3 +17,10 @@ DPDK_2.2 {
rte_ring_free;
} DPDK_2.0;
+
+DPDK_19.05 {
+ global:
+
+ rte_ring_get_memsize;
+
+} DPDK_2.2;
--
2.13.6
next prev parent reply other threads:[~2019-03-06 15:04 UTC|newest]
Thread overview: 123+ messages / expand[flat|nested] mbox.gz Atom feed top
2019-01-10 21:01 [dpdk-dev] [PATCH 0/6] Add non-blocking ring Gage Eads
2019-01-10 21:01 ` [dpdk-dev] [PATCH 1/6] ring: change head and tail to pointer-width size Gage Eads
2019-01-11 4:38 ` Stephen Hemminger
2019-01-11 19:07 ` Eads, Gage
2019-01-11 10:25 ` Burakov, Anatoly
2019-01-11 19:12 ` Eads, Gage
2019-01-11 19:55 ` Stephen Hemminger
2019-01-15 15:48 ` Eads, Gage
2019-01-11 10:40 ` Burakov, Anatoly
2019-01-11 10:58 ` Bruce Richardson
2019-01-11 11:30 ` Burakov, Anatoly
[not found] ` <20190111115851.GC3336@bricha3-MOBL.ger.corp.intel.com>
2019-01-11 19:27 ` Eads, Gage
2019-01-21 14:14 ` Burakov, Anatoly
2019-01-22 18:27 ` Eads, Gage
2019-01-10 21:01 ` [dpdk-dev] [PATCH 2/6] ring: add a non-blocking implementation Gage Eads
2019-01-10 21:01 ` [dpdk-dev] [PATCH 3/6] test_ring: add non-blocking ring autotest Gage Eads
2019-01-10 21:01 ` [dpdk-dev] [PATCH 4/6] test_ring_perf: add non-blocking ring perf test Gage Eads
2019-01-10 21:01 ` [dpdk-dev] [PATCH 5/6] mempool/ring: add non-blocking ring handlers Gage Eads
2019-01-13 13:43 ` Andrew Rybchenko
2019-01-10 21:01 ` [dpdk-dev] [PATCH 6/6] doc: add NB ring comment to EAL "known issues" Gage Eads
2019-01-11 2:51 ` Varghese, Vipin
2019-01-11 19:30 ` Eads, Gage
2019-01-14 0:07 ` Varghese, Vipin
2019-01-15 23:52 ` [dpdk-dev] [PATCH v2 0/5] Add non-blocking ring Gage Eads
2019-01-15 23:52 ` [dpdk-dev] [PATCH v2 1/5] ring: change head and tail to pointer-width size Gage Eads
2019-01-15 23:52 ` [dpdk-dev] [PATCH v2 2/5] ring: add a non-blocking implementation Gage Eads
2019-01-15 23:52 ` [dpdk-dev] [PATCH v2 3/5] test_ring: add non-blocking ring autotest Gage Eads
2019-01-15 23:52 ` [dpdk-dev] [PATCH v2 4/5] test_ring_perf: add non-blocking ring perf test Gage Eads
2019-01-15 23:52 ` [dpdk-dev] [PATCH v2 5/5] mempool/ring: add non-blocking ring handlers Gage Eads
2019-01-16 0:26 ` [dpdk-dev] [PATCH v2 0/5] Add non-blocking ring Stephen Hemminger
2019-01-18 15:23 ` [dpdk-dev] [PATCH v3 " Gage Eads
2019-01-18 15:23 ` [dpdk-dev] [PATCH v3 1/5] ring: add 64-bit headtail structure Gage Eads
2019-01-18 15:23 ` [dpdk-dev] [PATCH v3 2/5] ring: add a non-blocking implementation Gage Eads
2019-01-22 10:12 ` Ola Liljedahl
2019-01-22 14:49 ` Ola Liljedahl
2019-01-22 21:31 ` Eads, Gage
2019-01-23 10:16 ` Ola Liljedahl
2019-01-25 17:21 ` Eads, Gage
2019-01-28 10:35 ` Ola Liljedahl
2019-01-28 18:54 ` Eads, Gage
2019-01-28 22:31 ` Ola Liljedahl
2019-01-28 13:34 ` Jerin Jacob Kollanukkaran
2019-01-28 13:43 ` Ola Liljedahl
2019-01-28 14:04 ` Jerin Jacob Kollanukkaran
2019-01-28 14:06 ` Ola Liljedahl
2019-01-28 18:59 ` Eads, Gage
2019-01-18 15:23 ` [dpdk-dev] [PATCH v3 3/5] test_ring: add non-blocking ring autotest Gage Eads
2019-01-18 15:23 ` [dpdk-dev] [PATCH v3 4/5] test_ring_perf: add non-blocking ring perf test Gage Eads
2019-01-18 15:23 ` [dpdk-dev] [PATCH v3 5/5] mempool/ring: add non-blocking ring handlers Gage Eads
2019-01-22 9:27 ` [dpdk-dev] [PATCH v3 0/5] Add non-blocking ring Ola Liljedahl
2019-01-22 10:15 ` Ola Liljedahl
2019-01-22 19:15 ` Eads, Gage
2019-01-23 16:02 ` Jerin Jacob Kollanukkaran
2019-01-23 16:29 ` Ola Liljedahl
2019-01-28 13:10 ` [dpdk-dev] [EXT] " Jerin Jacob Kollanukkaran
2019-01-25 5:20 ` [dpdk-dev] " Honnappa Nagarahalli
2019-01-25 17:42 ` Eads, Gage
2019-01-25 17:56 ` Eads, Gage
2019-01-28 10:41 ` Ola Liljedahl
2019-01-28 18:14 ` [dpdk-dev] [PATCH v4 " Gage Eads
2019-01-28 18:14 ` [dpdk-dev] [PATCH v4 1/5] ring: add 64-bit headtail structure Gage Eads
2019-01-29 12:56 ` Ola Liljedahl
2019-01-30 4:26 ` Eads, Gage
2019-01-28 18:14 ` [dpdk-dev] [PATCH v4 2/5] ring: add a non-blocking implementation Gage Eads
2019-01-28 18:14 ` [dpdk-dev] [PATCH v4 3/5] test_ring: add non-blocking ring autotest Gage Eads
2019-01-28 18:14 ` [dpdk-dev] [PATCH v4 4/5] test_ring_perf: add non-blocking ring perf test Gage Eads
2019-01-28 18:14 ` [dpdk-dev] [PATCH v4 5/5] mempool/ring: add non-blocking ring handlers Gage Eads
2019-03-05 17:40 ` [dpdk-dev] [PATCH v5 0/6] Add lock-free ring and mempool handler Gage Eads
2019-03-05 17:40 ` [dpdk-dev] [PATCH v5 1/6] ring: add a pointer-width headtail structure Gage Eads
2019-03-05 17:40 ` [dpdk-dev] [PATCH v5 2/6] ring: add a ring start marker Gage Eads
2019-03-05 17:40 ` [dpdk-dev] [PATCH v5 3/6] ring: add a lock-free implementation Gage Eads
2019-03-05 17:40 ` [dpdk-dev] [PATCH v5 4/6] test_ring: add lock-free ring autotest Gage Eads
2019-03-05 17:40 ` [dpdk-dev] [PATCH v5 5/6] test_ring_perf: add lock-free ring perf test Gage Eads
2019-03-05 17:40 ` [dpdk-dev] [PATCH v5 6/6] mempool/ring: add lock-free ring handlers Gage Eads
2019-03-06 15:03 ` [dpdk-dev] [PATCH v6 0/6] Add lock-free ring and mempool handler Gage Eads
2019-03-06 15:03 ` [dpdk-dev] [PATCH v6 1/6] ring: add a pointer-width headtail structure Gage Eads
2019-03-06 15:03 ` [dpdk-dev] [PATCH v6 2/6] ring: add a ring start marker Gage Eads
2019-03-06 15:03 ` Gage Eads [this message]
2019-03-06 15:03 ` [dpdk-dev] [PATCH v6 4/6] test_ring: add lock-free ring autotest Gage Eads
2019-03-06 15:03 ` [dpdk-dev] [PATCH v6 5/6] test_ring_perf: add lock-free ring perf test Gage Eads
2019-03-06 15:03 ` [dpdk-dev] [PATCH v6 6/6] mempool/ring: add lock-free ring handlers Gage Eads
2019-03-18 21:35 ` [dpdk-dev] [PATCH v7 0/6] Add lock-free ring and mempool handler Gage Eads
2019-03-18 21:35 ` Gage Eads
2019-03-18 21:35 ` [dpdk-dev] [PATCH v7 1/6] ring: add a pointer-width headtail structure Gage Eads
2019-03-18 21:35 ` Gage Eads
2019-03-18 21:35 ` [dpdk-dev] [PATCH v7 2/6] ring: add a ring start marker Gage Eads
2019-03-18 21:35 ` Gage Eads
2019-03-18 21:35 ` [dpdk-dev] [PATCH v7 3/6] ring: add a lock-free implementation Gage Eads
2019-03-18 21:35 ` Gage Eads
2019-03-19 15:50 ` Stephen Hemminger
2019-03-19 15:50 ` Stephen Hemminger
2019-03-18 21:35 ` [dpdk-dev] [PATCH v7 4/6] test_ring: add lock-free ring autotest Gage Eads
2019-03-18 21:35 ` Gage Eads
2019-03-18 21:35 ` [dpdk-dev] [PATCH v7 5/6] test_ring_perf: add lock-free ring perf test Gage Eads
2019-03-18 21:35 ` Gage Eads
2019-03-18 21:35 ` [dpdk-dev] [PATCH v7 6/6] mempool/ring: add lock-free ring handlers Gage Eads
2019-03-18 21:35 ` Gage Eads
2019-03-18 21:49 ` [dpdk-dev] [PATCH v7 0/6] Add lock-free ring and mempool handler Eads, Gage
2019-03-18 21:49 ` Eads, Gage
2019-03-19 15:51 ` Stephen Hemminger
2019-03-19 15:51 ` Stephen Hemminger
2019-04-01 19:23 ` Eads, Gage
2019-04-01 19:23 ` Eads, Gage
2019-04-02 10:16 ` Ola Liljedahl
2019-04-02 10:16 ` Ola Liljedahl
2019-04-04 22:28 ` Eads, Gage
2019-04-04 22:28 ` Eads, Gage
2019-03-19 1:20 ` [dpdk-dev] [PATCH v8 " Gage Eads
2019-03-19 1:20 ` Gage Eads
2019-03-19 1:20 ` [dpdk-dev] [PATCH v8 1/6] ring: add a pointer-width headtail structure Gage Eads
2019-03-19 1:20 ` Gage Eads
2019-03-19 1:20 ` [dpdk-dev] [PATCH v8 2/6] ring: add a ring start marker Gage Eads
2019-03-19 1:20 ` Gage Eads
2019-03-19 1:20 ` [dpdk-dev] [PATCH v8 3/6] ring: add a lock-free implementation Gage Eads
2019-03-19 1:20 ` Gage Eads
2019-03-19 1:20 ` [dpdk-dev] [PATCH v8 4/6] test_ring: add lock-free ring autotest Gage Eads
2019-03-19 1:20 ` Gage Eads
2019-03-19 1:20 ` [dpdk-dev] [PATCH v8 5/6] test_ring_perf: add lock-free ring perf test Gage Eads
2019-03-19 1:20 ` Gage Eads
2019-03-19 1:20 ` [dpdk-dev] [PATCH v8 6/6] mempool/ring: add lock-free ring handlers Gage Eads
2019-03-19 1:20 ` Gage Eads
2019-04-03 16:46 ` [dpdk-dev] [PATCH v8 0/6] Add lock-free ring and mempool handler Thomas Monjalon
2019-04-03 16:46 ` Thomas Monjalon
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20190306150342.2894-4-gage.eads@intel.com \
--to=gage.eads@intel.com \
--cc=Ola.Liljedahl@arm.com \
--cc=arybchenko@solarflare.com \
--cc=bruce.richardson@intel.com \
--cc=dev@dpdk.org \
--cc=jerinj@marvell.com \
--cc=konstantin.ananyev@intel.com \
--cc=mczekaj@marvell.com \
--cc=nd@arm.com \
--cc=olivier.matz@6wind.com \
--cc=stephen@networkplumber.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).