From: Konstantin Ananyev <konstantin.ananyev@huawei.com>
To: <dev@dpdk.org>
Cc: <honnappa.nagarahalli@arm.com>, <jerinj@marvell.com>,
<hemant.agrawal@nxp.com>, <bruce.richardson@intel.com>,
<drc@linux.vnet.ibm.com>, <ruifeng.wang@arm.com>,
<mb@smartsharesystems.com>, <eimear.morrissey@huawei.com>,
<stephen@networkplumber.org>
Subject: [PATCH v7 2/7] ring: common functions for 'move head' ops
Date: Wed, 30 Oct 2024 17:22:59 -0400 [thread overview]
Message-ID: <20241030212304.104180-3-konstantin.ananyev@huawei.com> (raw)
In-Reply-To: <20241030212304.104180-1-konstantin.ananyev@huawei.com>
Note upfront: that change doesn't introduce any functional or
performance changes.
It is just a code-reordering for:
- code deduplication
- ability in future to re-use the same code to introduce new functionality
For each sync mode corresponding move_prod_head() and
move_cons_head() are nearly identical to each other,
the only differences are:
- do we need to use a @capacity to calculate number of entries or not.
- what we need to update (prod/cons) and what is used as
read-only counterpart.
So instead of having 2 copies of nearly identical functions,
introduce a new common one that could be used by both functions:
move_prod_head() and move_cons_head().
As another positive thing - we can get rid of referencing whole rte_ring
structure in that new common sub-function.
Signed-off-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
---
lib/ring/rte_ring_c11_pvt.h | 156 ++++++++++---------------------
lib/ring/rte_ring_elem_pvt.h | 66 +++++++++++++
lib/ring/rte_ring_generic_pvt.h | 143 +++++++++-------------------
lib/ring/rte_ring_hts_elem_pvt.h | 107 ++++++++++-----------
lib/ring/rte_ring_rts_elem_pvt.h | 107 ++++++++++-----------
5 files changed, 255 insertions(+), 324 deletions(-)
diff --git a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h
index 629b2d9288..b9388af0da 100644
--- a/lib/ring/rte_ring_c11_pvt.h
+++ b/lib/ring/rte_ring_c11_pvt.h
@@ -11,6 +11,17 @@
#ifndef _RTE_RING_C11_PVT_H_
#define _RTE_RING_C11_PVT_H_
+/**
+ * @file rte_ring_c11_pvt.h
+ * It is not recommended to include this file directly,
+ * include <rte_ring.h> instead.
+ * Contains internal helper functions for MP/SP and MC/SC ring modes.
+ * For more information please refer to <rte_ring.h>.
+ */
+
+/**
+ * @internal This function updates tail values.
+ */
static __rte_always_inline void
__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
uint32_t new_val, uint32_t single, uint32_t enqueue)
@@ -29,40 +40,45 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
}
/**
- * @internal This function updates the producer head for enqueue
+ * @internal This is a helper function that moves the producer/consumer head
*
- * @param r
- * A pointer to the ring structure
- * @param is_sp
- * Indicates whether multi-producer path is needed or not
+ * @param d
+ * A pointer to the headtail structure with head value to be moved
+ * @param s
+ * A pointer to the counter-part headtail structure. Note that this
+ * function only reads tail value from it
+ * @param capacity
+ * Either ring capacity value (for producer), or zero (for consumer)
+ * @param is_st
+ * Indicates whether multi-thread safe path is needed or not
* @param n
- * The number of elements we will want to enqueue, i.e. how far should the
- * head be moved
+ * The number of elements we want to move head value on
* @param behavior
- * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
- * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * RTE_RING_QUEUE_FIXED: Move on a fixed number of items
+ * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
* @param old_head
- * Returns head value as it was before the move, i.e. where enqueue starts
+ * Returns head value as it was before the move
* @param new_head
- * Returns the current/new head value i.e. where enqueue finishes
- * @param free_entries
- * Returns the amount of free space in the ring BEFORE head was moved
+ * Returns the new head value
+ * @param entries
+ * Returns the number of ring entries available BEFORE head was moved
* @return
- * Actual number of objects enqueued.
- * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ * Actual number of objects the head was moved on
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
*/
static __rte_always_inline unsigned int
-__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
- unsigned int n, enum rte_ring_queue_behavior behavior,
- uint32_t *old_head, uint32_t *new_head,
- uint32_t *free_entries)
+__rte_ring_headtail_move_head(struct rte_ring_headtail *d,
+ const struct rte_ring_headtail *s, uint32_t capacity,
+ unsigned int is_st, unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
{
- const uint32_t capacity = r->capacity;
- uint32_t cons_tail;
- unsigned int max = n;
+ uint32_t stail;
int success;
+ unsigned int max = n;
- *old_head = rte_atomic_load_explicit(&r->prod.head, rte_memory_order_relaxed);
+ *old_head = rte_atomic_load_explicit(&d->head,
+ rte_memory_order_relaxed);
do {
/* Reset n to the initial burst count */
n = max;
@@ -73,112 +89,36 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
/* load-acquire synchronize with store-release of ht->tail
* in update_tail.
*/
- cons_tail = rte_atomic_load_explicit(&r->cons.tail,
+ stail = rte_atomic_load_explicit(&s->tail,
rte_memory_order_acquire);
/* The subtraction is done between two unsigned 32bits value
* (the result is always modulo 32 bits even if we have
- * *old_head > cons_tail). So 'free_entries' is always between 0
+ * *old_head > s->tail). So 'entries' is always between 0
* and capacity (which is < size).
*/
- *free_entries = (capacity + cons_tail - *old_head);
+ *entries = (capacity + stail - *old_head);
/* check that we have enough room in ring */
- if (unlikely(n > *free_entries))
+ if (unlikely(n > *entries))
n = (behavior == RTE_RING_QUEUE_FIXED) ?
- 0 : *free_entries;
+ 0 : *entries;
if (n == 0)
return 0;
*new_head = *old_head + n;
- if (is_sp) {
- r->prod.head = *new_head;
+ if (is_st) {
+ d->head = *new_head;
success = 1;
} else
/* on failure, *old_head is updated */
- success = rte_atomic_compare_exchange_strong_explicit(&r->prod.head,
- old_head, *new_head,
+ success = rte_atomic_compare_exchange_strong_explicit(
+ &d->head, old_head, *new_head,
rte_memory_order_relaxed,
rte_memory_order_relaxed);
} while (unlikely(success == 0));
return n;
}
-/**
- * @internal This function updates the consumer head for dequeue
- *
- * @param r
- * A pointer to the ring structure
- * @param is_sc
- * Indicates whether multi-consumer path is needed or not
- * @param n
- * The number of elements we will want to dequeue, i.e. how far should the
- * head be moved
- * @param behavior
- * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
- * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
- * @param old_head
- * Returns head value as it was before the move, i.e. where dequeue starts
- * @param new_head
- * Returns the current/new head value i.e. where dequeue finishes
- * @param entries
- * Returns the number of entries in the ring BEFORE head was moved
- * @return
- * - Actual number of objects dequeued.
- * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
- */
-static __rte_always_inline unsigned int
-__rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
- unsigned int n, enum rte_ring_queue_behavior behavior,
- uint32_t *old_head, uint32_t *new_head,
- uint32_t *entries)
-{
- unsigned int max = n;
- uint32_t prod_tail;
- int success;
-
- /* move cons.head atomically */
- *old_head = rte_atomic_load_explicit(&r->cons.head, rte_memory_order_relaxed);
- do {
- /* Restore n as it may change every loop */
- n = max;
-
- /* Ensure the head is read before tail */
- rte_atomic_thread_fence(rte_memory_order_acquire);
-
- /* this load-acquire synchronize with store-release of ht->tail
- * in update_tail.
- */
- prod_tail = rte_atomic_load_explicit(&r->prod.tail,
- rte_memory_order_acquire);
-
- /* The subtraction is done between two unsigned 32bits value
- * (the result is always modulo 32 bits even if we have
- * cons_head > prod_tail). So 'entries' is always between 0
- * and size(ring)-1.
- */
- *entries = (prod_tail - *old_head);
-
- /* Set the actual entries for dequeue */
- if (n > *entries)
- n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
-
- if (unlikely(n == 0))
- return 0;
-
- *new_head = *old_head + n;
- if (is_sc) {
- r->cons.head = *new_head;
- success = 1;
- } else
- /* on failure, *old_head will be updated */
- success = rte_atomic_compare_exchange_strong_explicit(&r->cons.head,
- old_head, *new_head,
- rte_memory_order_relaxed,
- rte_memory_order_relaxed);
- } while (unlikely(success == 0));
- return n;
-}
-
#endif /* _RTE_RING_C11_PVT_H_ */
diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h
index 4b80f58980..3a83668a08 100644
--- a/lib/ring/rte_ring_elem_pvt.h
+++ b/lib/ring/rte_ring_elem_pvt.h
@@ -293,6 +293,72 @@ __rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head,
#include "rte_ring_generic_pvt.h"
#endif
+/**
+ * @internal This function updates the producer head for enqueue
+ *
+ * @param r
+ * A pointer to the ring structure
+ * @param is_sp
+ * Indicates whether multi-producer path is needed or not
+ * @param n
+ * The number of elements we will want to enqueue, i.e. how far should the
+ * head be moved
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
+ * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param old_head
+ * Returns head value as it was before the move, i.e. where enqueue starts
+ * @param new_head
+ * Returns the current/new head value i.e. where enqueue finishes
+ * @param free_entries
+ * Returns the amount of free space in the ring BEFORE head was moved
+ * @return
+ * Actual number of objects enqueued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ uint32_t *old_head, uint32_t *new_head,
+ uint32_t *free_entries)
+{
+ return __rte_ring_headtail_move_head(&r->prod, &r->cons, r->capacity,
+ is_sp, n, behavior, old_head, new_head, free_entries);
+}
+
+/**
+ * @internal This function updates the consumer head for dequeue
+ *
+ * @param r
+ * A pointer to the ring structure
+ * @param is_sc
+ * Indicates whether multi-consumer path is needed or not
+ * @param n
+ * The number of elements we will want to dequeue, i.e. how far should the
+ * head be moved
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
+ * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param old_head
+ * Returns head value as it was before the move, i.e. where dequeue starts
+ * @param new_head
+ * Returns the current/new head value i.e. where dequeue finishes
+ * @param entries
+ * Returns the number of entries in the ring BEFORE head was moved
+ * @return
+ * - Actual number of objects dequeued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ uint32_t *old_head, uint32_t *new_head,
+ uint32_t *entries)
+{
+ return __rte_ring_headtail_move_head(&r->cons, &r->prod, 0,
+ is_sc, n, behavior, old_head, new_head, entries);
+}
+
/**
* @internal Enqueue several objects on the ring
*
diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_generic_pvt.h
index 457f41dab3..affd2d5ba7 100644
--- a/lib/ring/rte_ring_generic_pvt.h
+++ b/lib/ring/rte_ring_generic_pvt.h
@@ -10,6 +10,17 @@
#ifndef _RTE_RING_GENERIC_PVT_H_
#define _RTE_RING_GENERIC_PVT_H_
+/**
+ * @file rte_ring_generic_pvt.h
+ * It is not recommended to include this file directly,
+ * include <rte_ring.h> instead.
+ * Contains internal helper functions for MP/SP and MC/SC ring modes.
+ * For more information please refer to <rte_ring.h>.
+ */
+
+/**
+ * @internal This function updates tail values.
+ */
static __rte_always_inline void
__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
uint32_t new_val, uint32_t single, uint32_t enqueue)
@@ -30,35 +41,39 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
}
/**
- * @internal This function updates the producer head for enqueue
+ * @internal This is a helper function that moves the producer/consumer head
*
- * @param r
- * A pointer to the ring structure
- * @param is_sp
- * Indicates whether multi-producer path is needed or not
+ * @param d
+ * A pointer to the headtail structure with head value to be moved
+ * @param s
+ * A pointer to the counter-part headtail structure. Note that this
+ * function only reads tail value from it
+ * @param capacity
+ * Either ring capacity value (for producer), or zero (for consumer)
+ * @param is_st
+ * Indicates whether multi-thread safe path is needed or not
* @param n
- * The number of elements we will want to enqueue, i.e. how far should the
- * head be moved
+ * The number of elements we want to move head value on
* @param behavior
- * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
- * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * RTE_RING_QUEUE_FIXED: Move on a fixed number of items
+ * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
* @param old_head
- * Returns head value as it was before the move, i.e. where enqueue starts
+ * Returns head value as it was before the move
* @param new_head
- * Returns the current/new head value i.e. where enqueue finishes
- * @param free_entries
- * Returns the amount of free space in the ring BEFORE head was moved
+ * Returns the new head value
+ * @param entries
+ * Returns the number of ring entries available BEFORE head was moved
* @return
- * Actual number of objects enqueued.
- * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ * Actual number of objects the head was moved on
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
*/
static __rte_always_inline unsigned int
-__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
- unsigned int n, enum rte_ring_queue_behavior behavior,
- uint32_t *old_head, uint32_t *new_head,
- uint32_t *free_entries)
+__rte_ring_headtail_move_head(struct rte_ring_headtail *d,
+ const struct rte_ring_headtail *s, uint32_t capacity,
+ unsigned int is_st, unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
{
- const uint32_t capacity = r->capacity;
unsigned int max = n;
int success;
@@ -66,7 +81,7 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
/* Reset n to the initial burst count */
n = max;
- *old_head = r->prod.head;
+ *old_head = d->head;
/* add rmb barrier to avoid load/load reorder in weak
* memory model. It is noop on x86
@@ -76,97 +91,27 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
/*
* The subtraction is done between two unsigned 32bits value
* (the result is always modulo 32 bits even if we have
- * *old_head > cons_tail). So 'free_entries' is always between 0
+ * *old_head > s->tail). So 'entries' is always between 0
* and capacity (which is < size).
*/
- *free_entries = (capacity + r->cons.tail - *old_head);
+ *entries = (capacity + s->tail - *old_head);
/* check that we have enough room in ring */
- if (unlikely(n > *free_entries))
+ if (unlikely(n > *entries))
n = (behavior == RTE_RING_QUEUE_FIXED) ?
- 0 : *free_entries;
+ 0 : *entries;
if (n == 0)
return 0;
*new_head = *old_head + n;
- if (is_sp) {
- r->prod.head = *new_head;
+ if (is_st) {
+ d->head = *new_head;
success = 1;
} else
- success = rte_atomic32_cmpset((uint32_t *)(uintptr_t)&r->prod.head,
- *old_head, *new_head);
- } while (unlikely(success == 0));
- return n;
-}
-
-/**
- * @internal This function updates the consumer head for dequeue
- *
- * @param r
- * A pointer to the ring structure
- * @param is_sc
- * Indicates whether multi-consumer path is needed or not
- * @param n
- * The number of elements we will want to dequeue, i.e. how far should the
- * head be moved
- * @param behavior
- * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
- * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
- * @param old_head
- * Returns head value as it was before the move, i.e. where dequeue starts
- * @param new_head
- * Returns the current/new head value i.e. where dequeue finishes
- * @param entries
- * Returns the number of entries in the ring BEFORE head was moved
- * @return
- * - Actual number of objects dequeued.
- * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
- */
-static __rte_always_inline unsigned int
-__rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
- unsigned int n, enum rte_ring_queue_behavior behavior,
- uint32_t *old_head, uint32_t *new_head,
- uint32_t *entries)
-{
- unsigned int max = n;
- int success;
-
- /* move cons.head atomically */
- do {
- /* Restore n as it may change every loop */
- n = max;
-
- *old_head = r->cons.head;
-
- /* add rmb barrier to avoid load/load reorder in weak
- * memory model. It is noop on x86
- */
- rte_smp_rmb();
-
- /* The subtraction is done between two unsigned 32bits value
- * (the result is always modulo 32 bits even if we have
- * cons_head > prod_tail). So 'entries' is always between 0
- * and size(ring)-1.
- */
- *entries = (r->prod.tail - *old_head);
-
- /* Set the actual entries for dequeue */
- if (n > *entries)
- n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
-
- if (unlikely(n == 0))
- return 0;
-
- *new_head = *old_head + n;
- if (is_sc) {
- r->cons.head = *new_head;
- rte_smp_rmb();
- success = 1;
- } else {
- success = rte_atomic32_cmpset((uint32_t *)(uintptr_t)&r->cons.head,
+ success = rte_atomic32_cmpset(
+ (uint32_t *)(uintptr_t)&d->head,
*old_head, *new_head);
- }
} while (unlikely(success == 0));
return n;
}
diff --git a/lib/ring/rte_ring_hts_elem_pvt.h b/lib/ring/rte_ring_hts_elem_pvt.h
index 91f5eeccb9..e2b82dd1e6 100644
--- a/lib/ring/rte_ring_hts_elem_pvt.h
+++ b/lib/ring/rte_ring_hts_elem_pvt.h
@@ -51,19 +51,39 @@ __rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht,
}
/**
- * @internal This function updates the producer head for enqueue
+ * @internal This is a helper function that moves the producer/consumer head
+ *
+ * @param d
+ * A pointer to the headtail structure with head value to be moved
+ * @param s
+ * A pointer to the counter-part headtail structure. Note that this
+ * function only reads tail value from it
+ * @param capacity
+ * Either ring capacity value (for producer), or zero (for consumer)
+ * Indicates whether multi-thread safe path is needed or not
+ * @param num
+ * The number of elements we want to move head value on
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Move on a fixed number of items
+ * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
+ * @param old_head
+ * Returns head value as it was before the move
+ * @param entries
+ * Returns the number of ring entries available BEFORE head was moved
+ * @return
+ * Actual number of objects the head was moved on
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
*/
-static __rte_always_inline unsigned int
-__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
+static __rte_always_inline uint32_t
+__rte_ring_hts_move_head(struct rte_ring_hts_headtail *d,
+ const struct rte_ring_headtail *s, uint32_t capacity, unsigned int num,
enum rte_ring_queue_behavior behavior, uint32_t *old_head,
- uint32_t *free_entries)
+ uint32_t *entries)
{
uint32_t n;
union __rte_ring_hts_pos np, op;
- const uint32_t capacity = r->capacity;
-
- op.raw = rte_atomic_load_explicit(&r->hts_prod.ht.raw, rte_memory_order_acquire);
+ op.raw = rte_atomic_load_explicit(&d->ht.raw, rte_memory_order_acquire);
do {
/* Reset n to the initial burst count */
@@ -74,20 +94,20 @@ __rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
* make sure that we read prod head/tail *before*
* reading cons tail.
*/
- __rte_ring_hts_head_wait(&r->hts_prod, &op);
+ __rte_ring_hts_head_wait(d, &op);
/*
* The subtraction is done between two unsigned 32bits value
* (the result is always modulo 32 bits even if we have
- * *old_head > cons_tail). So 'free_entries' is always between 0
+ * *old_head > cons_tail). So 'entries' is always between 0
* and capacity (which is < size).
*/
- *free_entries = capacity + r->cons.tail - op.pos.head;
+ *entries = capacity + s->tail - op.pos.head;
/* check that we have enough room in ring */
- if (unlikely(n > *free_entries))
+ if (unlikely(n > *entries))
n = (behavior == RTE_RING_QUEUE_FIXED) ?
- 0 : *free_entries;
+ 0 : *entries;
if (n == 0)
break;
@@ -100,13 +120,25 @@ __rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
* - OOO reads of cons tail value
* - OOO copy of elems from the ring
*/
- } while (rte_atomic_compare_exchange_strong_explicit(&r->hts_prod.ht.raw,
+ } while (rte_atomic_compare_exchange_strong_explicit(&d->ht.raw,
(uint64_t *)(uintptr_t)&op.raw, np.raw,
- rte_memory_order_acquire, rte_memory_order_acquire) == 0);
+ rte_memory_order_acquire,
+ rte_memory_order_acquire) == 0);
*old_head = op.pos.head;
return n;
}
+/**
+ * @internal This function updates the producer head for enqueue
+ */
+static __rte_always_inline unsigned int
+__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
+ enum rte_ring_queue_behavior behavior, uint32_t *old_head,
+ uint32_t *free_entries)
+{
+ return __rte_ring_hts_move_head(&r->hts_prod, &r->cons,
+ r->capacity, num, behavior, old_head, free_entries);
+}
/**
* @internal This function updates the consumer head for dequeue
@@ -116,51 +148,8 @@ __rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num,
enum rte_ring_queue_behavior behavior, uint32_t *old_head,
uint32_t *entries)
{
- uint32_t n;
- union __rte_ring_hts_pos np, op;
-
- op.raw = rte_atomic_load_explicit(&r->hts_cons.ht.raw, rte_memory_order_acquire);
-
- /* move cons.head atomically */
- do {
- /* Restore n as it may change every loop */
- n = num;
-
- /*
- * wait for tail to be equal to head,
- * make sure that we read cons head/tail *before*
- * reading prod tail.
- */
- __rte_ring_hts_head_wait(&r->hts_cons, &op);
-
- /* The subtraction is done between two unsigned 32bits value
- * (the result is always modulo 32 bits even if we have
- * cons_head > prod_tail). So 'entries' is always between 0
- * and size(ring)-1.
- */
- *entries = r->prod.tail - op.pos.head;
-
- /* Set the actual entries for dequeue */
- if (n > *entries)
- n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
-
- if (unlikely(n == 0))
- break;
-
- np.pos.tail = op.pos.tail;
- np.pos.head = op.pos.head + n;
-
- /*
- * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
- * - OOO reads of prod tail value
- * - OOO copy of elems from the ring
- */
- } while (rte_atomic_compare_exchange_strong_explicit(&r->hts_cons.ht.raw,
- (uint64_t *)(uintptr_t)&op.raw, np.raw,
- rte_memory_order_acquire, rte_memory_order_acquire) == 0);
-
- *old_head = op.pos.head;
- return n;
+ return __rte_ring_hts_move_head(&r->hts_cons, &r->prod,
+ 0, num, behavior, old_head, entries);
}
/**
diff --git a/lib/ring/rte_ring_rts_elem_pvt.h b/lib/ring/rte_ring_rts_elem_pvt.h
index 122650346b..96825931f8 100644
--- a/lib/ring/rte_ring_rts_elem_pvt.h
+++ b/lib/ring/rte_ring_rts_elem_pvt.h
@@ -65,19 +65,40 @@ __rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht,
}
/**
- * @internal This function updates the producer head for enqueue.
+ * @internal This is a helper function that moves the producer/consumer head
+ *
+ * @param d
+ * A pointer to the headtail structure with head value to be moved
+ * @param s
+ * A pointer to the counter-part headtail structure. Note that this
+ * function only reads tail value from it
+ * @param capacity
+ * Either ring capacity value (for producer), or zero (for consumer)
+ * Indicates whether multi-thread safe path is needed or not
+ * @param num
+ * The number of elements we want to move head value on
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Move on a fixed number of items
+ * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
+ * @param old_head
+ * Returns head value as it was before the move
+ * @param entries
+ * Returns the number of ring entries available BEFORE head was moved
+ * @return
+ * Actual number of objects the head was moved on
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
*/
static __rte_always_inline uint32_t
-__rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
+__rte_ring_rts_move_head(struct rte_ring_rts_headtail *d,
+ const struct rte_ring_headtail *s, uint32_t capacity, uint32_t num,
enum rte_ring_queue_behavior behavior, uint32_t *old_head,
- uint32_t *free_entries)
+ uint32_t *entries)
{
uint32_t n;
union __rte_ring_rts_poscnt nh, oh;
- const uint32_t capacity = r->capacity;
-
- oh.raw = rte_atomic_load_explicit(&r->rts_prod.head.raw, rte_memory_order_acquire);
+ oh.raw = rte_atomic_load_explicit(&d->head.raw,
+ rte_memory_order_acquire);
do {
/* Reset n to the initial burst count */
@@ -88,20 +109,20 @@ __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
* make sure that we read prod head *before*
* reading cons tail.
*/
- __rte_ring_rts_head_wait(&r->rts_prod, &oh);
+ __rte_ring_rts_head_wait(d, &oh);
/*
* The subtraction is done between two unsigned 32bits value
* (the result is always modulo 32 bits even if we have
- * *old_head > cons_tail). So 'free_entries' is always between 0
+ * *old_head > cons_tail). So 'entries' is always between 0
* and capacity (which is < size).
*/
- *free_entries = capacity + r->cons.tail - oh.val.pos;
+ *entries = capacity + s->tail - oh.val.pos;
/* check that we have enough room in ring */
- if (unlikely(n > *free_entries))
+ if (unlikely(n > *entries))
n = (behavior == RTE_RING_QUEUE_FIXED) ?
- 0 : *free_entries;
+ 0 : *entries;
if (n == 0)
break;
@@ -114,14 +135,27 @@ __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
* - OOO reads of cons tail value
* - OOO copy of elems to the ring
*/
- } while (rte_atomic_compare_exchange_strong_explicit(&r->rts_prod.head.raw,
+ } while (rte_atomic_compare_exchange_strong_explicit(&d->head.raw,
(uint64_t *)(uintptr_t)&oh.raw, nh.raw,
- rte_memory_order_acquire, rte_memory_order_acquire) == 0);
+ rte_memory_order_acquire,
+ rte_memory_order_acquire) == 0);
*old_head = oh.val.pos;
return n;
}
+/**
+ * @internal This function updates the producer head for enqueue.
+ */
+static __rte_always_inline uint32_t
+__rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
+ enum rte_ring_queue_behavior behavior, uint32_t *old_head,
+ uint32_t *free_entries)
+{
+ return __rte_ring_rts_move_head(&r->rts_prod, &r->cons,
+ r->capacity, num, behavior, old_head, free_entries);
+}
+
/**
* @internal This function updates the consumer head for dequeue
*/
@@ -130,51 +164,8 @@ __rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num,
enum rte_ring_queue_behavior behavior, uint32_t *old_head,
uint32_t *entries)
{
- uint32_t n;
- union __rte_ring_rts_poscnt nh, oh;
-
- oh.raw = rte_atomic_load_explicit(&r->rts_cons.head.raw, rte_memory_order_acquire);
-
- /* move cons.head atomically */
- do {
- /* Restore n as it may change every loop */
- n = num;
-
- /*
- * wait for cons head/tail distance,
- * make sure that we read cons head *before*
- * reading prod tail.
- */
- __rte_ring_rts_head_wait(&r->rts_cons, &oh);
-
- /* The subtraction is done between two unsigned 32bits value
- * (the result is always modulo 32 bits even if we have
- * cons_head > prod_tail). So 'entries' is always between 0
- * and size(ring)-1.
- */
- *entries = r->prod.tail - oh.val.pos;
-
- /* Set the actual entries for dequeue */
- if (n > *entries)
- n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
-
- if (unlikely(n == 0))
- break;
-
- nh.val.pos = oh.val.pos + n;
- nh.val.cnt = oh.val.cnt + 1;
-
- /*
- * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
- * - OOO reads of prod tail value
- * - OOO copy of elems from the ring
- */
- } while (rte_atomic_compare_exchange_strong_explicit(&r->rts_cons.head.raw,
- (uint64_t *)(uintptr_t)&oh.raw, nh.raw,
- rte_memory_order_acquire, rte_memory_order_acquire) == 0);
-
- *old_head = oh.val.pos;
- return n;
+ return __rte_ring_rts_move_head(&r->rts_cons, &r->prod,
+ 0, num, behavior, old_head, entries);
}
/**
--
2.35.3
next prev parent reply other threads:[~2024-10-30 20:32 UTC|newest]
Thread overview: 68+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-08-15 8:53 [RFC 0/6] Stage-Ordered API and other extensions for ring library Konstantin Ananyev
2024-08-15 8:53 ` [RFC 1/6] ring: common functions for 'move head' ops Konstantin Ananyev
2024-08-15 8:53 ` [RFC 2/6] ring: make copying functions generic Konstantin Ananyev
2024-08-15 8:53 ` [RFC 3/6] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-08-15 11:11 ` Morten Brørup
2024-08-15 12:41 ` Konstantin Ananyev
2024-08-15 13:22 ` Morten Brørup
2024-08-26 19:04 ` Mattias Rönnblom
2024-09-03 13:55 ` Konstantin Ananyev
2024-08-15 8:53 ` [RFC 4/6] app/test: add unit tests for soring API Konstantin Ananyev
2024-08-15 8:53 ` [RFC 5/6] examples/l3fwd: make ACL work in pipeline and eventdev modes Konstantin Ananyev
2024-08-15 8:53 ` [RFC 6/6] ring: minimize reads of the counterpart cache-line Konstantin Ananyev
2024-09-06 13:13 ` [RFCv2 0/6] Stage-Ordered API and other extensions for ring library Konstantin Ananyev
2024-09-06 13:13 ` [RFCv2 1/6] ring: common functions for 'move head' ops Konstantin Ananyev
2024-09-06 13:13 ` [RFCv2 2/6] ring: make copying functions generic Konstantin Ananyev
2024-09-06 13:13 ` [RFCv2 3/6] ring: make dump function more verbose Konstantin Ananyev
2024-09-06 13:13 ` [RFCv2 4/6] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-09-06 13:13 ` [RFCv2 5/6] app/test: add unit tests for soring API Konstantin Ananyev
2024-09-06 13:13 ` [RFCv2 6/6] examples/l3fwd: make ACL work in pipeline and eventdev modes Konstantin Ananyev
2024-09-16 12:37 ` [PATCH v3 0/5] Stage-Ordered API and other extensions for ring library Konstantin Ananyev
2024-09-16 12:37 ` [PATCH v3 1/5] ring: common functions for 'move head' ops Konstantin Ananyev
2024-09-16 12:37 ` [PATCH v3 2/5] ring: make copying functions generic Konstantin Ananyev
2024-09-16 12:37 ` [PATCH v3 3/5] ring: make dump function more verbose Konstantin Ananyev
2024-09-16 12:37 ` [PATCH v3 4/5] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-09-16 12:37 ` [PATCH v3 5/5] app/test: add unit tests for soring API Konstantin Ananyev
2024-09-17 12:09 ` [PATCH v4 0/5] Stage-Ordered API and other extensions for ring library Konstantin Ananyev
2024-09-17 12:09 ` [PATCH v4 1/5] ring: common functions for 'move head' ops Konstantin Ananyev
2024-09-17 12:09 ` [PATCH v4 2/5] ring: make copying functions generic Konstantin Ananyev
2024-09-17 12:09 ` [PATCH v4 3/5] ring: make dump function more verbose Konstantin Ananyev
2024-09-17 12:09 ` [PATCH v4 4/5] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-09-19 17:03 ` Jerin Jacob
2024-09-17 12:09 ` [PATCH v4 5/5] app/test: add unit tests for soring API Konstantin Ananyev
2024-10-12 18:09 ` [PATCH v4 0/5] Stage-Ordered API and other extensions for ring library Stephen Hemminger
2024-10-15 13:01 ` [PATCH v5 0/6] " Konstantin Ananyev
2024-10-15 13:01 ` [PATCH v5 1/6] ring: common functions for 'move head' ops Konstantin Ananyev
2024-10-15 15:04 ` Morten Brørup
2024-10-15 13:01 ` [PATCH v5 2/6] ring: make copying functions generic Konstantin Ananyev
2024-10-15 13:01 ` [PATCH v5 3/6] ring: make dump function more verbose Konstantin Ananyev
2024-10-15 13:01 ` [PATCH v5 4/6] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-10-15 13:01 ` [PATCH v5 5/6] app/test: add unit tests for soring API Konstantin Ananyev
2024-10-15 13:01 ` [PATCH v5 6/6] test: add stress test suite Konstantin Ananyev
2024-10-15 15:59 ` [PATCH v5 0/6] Stage-Ordered API and other extensions for ring library Stephen Hemminger
2024-10-15 16:02 ` Stephen Hemminger
2024-10-21 16:08 ` [PATCH v6 0/7] " Konstantin Ananyev
2024-10-21 16:08 ` [PATCH v6 1/7] test/ring: fix failure with custom number of lcores Konstantin Ananyev
2024-10-21 16:08 ` [PATCH v6 2/7] ring: common functions for 'move head' ops Konstantin Ananyev
2024-10-21 16:08 ` [PATCH v6 3/7] ring: make copying functions generic Konstantin Ananyev
2024-10-21 16:08 ` [PATCH v6 4/7] ring: make dump function more verbose Konstantin Ananyev
2024-10-21 16:08 ` [PATCH v6 5/7] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-10-21 16:08 ` [PATCH v6 6/7] app/test: add unit tests for soring API Konstantin Ananyev
2024-10-21 17:47 ` [PATCH v6 0/7] Stage-Ordered API and other extensions for ring library Konstantin Ananyev
2024-10-21 17:47 ` [PATCH v6 1/7] test/ring: fix failure with custom number of lcores Konstantin Ananyev
2024-10-21 17:47 ` [PATCH v6 2/7] ring: common functions for 'move head' ops Konstantin Ananyev
2024-10-21 17:47 ` [PATCH v6 3/7] ring: make copying functions generic Konstantin Ananyev
2024-10-21 17:47 ` [PATCH v6 4/7] ring: make dump function more verbose Konstantin Ananyev
2024-10-21 17:47 ` [PATCH v6 5/7] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-10-21 17:47 ` [PATCH v6 6/7] app/test: add unit tests for soring API Konstantin Ananyev
2024-10-21 17:47 ` [PATCH v6 7/7] test: add stress test suite Konstantin Ananyev
2024-10-28 17:18 ` [PATCH v6 0/7] Stage-Ordered API and other extensions for ring library David Christensen
2024-10-29 14:32 ` Konstantin Ananyev
2024-10-30 21:22 ` [PATCH v7 " Konstantin Ananyev
2024-10-30 21:22 ` [PATCH v7 1/7] test/ring: fix failure with custom number of lcores Konstantin Ananyev
2024-10-30 21:22 ` Konstantin Ananyev [this message]
2024-10-30 21:23 ` [PATCH v7 3/7] ring: make copying functions generic Konstantin Ananyev
2024-10-30 21:23 ` [PATCH v7 4/7] ring: make dump function more verbose Konstantin Ananyev
2024-10-30 21:23 ` [PATCH v7 5/7] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-10-30 21:23 ` [PATCH v7 6/7] app/test: add unit tests for soring API Konstantin Ananyev
2024-10-30 21:23 ` [PATCH v7 7/7] test: add stress test suite Konstantin Ananyev
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20241030212304.104180-3-konstantin.ananyev@huawei.com \
--to=konstantin.ananyev@huawei.com \
--cc=bruce.richardson@intel.com \
--cc=dev@dpdk.org \
--cc=drc@linux.vnet.ibm.com \
--cc=eimear.morrissey@huawei.com \
--cc=hemant.agrawal@nxp.com \
--cc=honnappa.nagarahalli@arm.com \
--cc=jerinj@marvell.com \
--cc=mb@smartsharesystems.com \
--cc=ruifeng.wang@arm.com \
--cc=stephen@networkplumber.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).