patches for DPDK stable branches
 help / color / mirror / Atom feed
* [PATCH 22.11 1/2] ring: establish a safe partial order in hts-ring
@ 2025-11-13  5:04 Wathsala Vithanage
  2025-11-13  5:04 ` [PATCH 22.11 2/2] ring: establish safe partial order in RTS mode Wathsala Vithanage
  0 siblings, 1 reply; 2+ messages in thread
From: Wathsala Vithanage @ 2025-11-13  5:04 UTC (permalink / raw)
  To: Honnappa Nagarahalli, Konstantin Ananyev
  Cc: stable, Wathsala Vithanage, ola.liljedahl

[ upstream commit 66d5f962780694f6aebf000907fc3ce7a72584f9 ]

Enforce a safe partial order by making the CAS and the preceding head
load use release and acquire semantics. This creates a pairwise
happens-before relationship between threads of the same role.

Combine the two load-acquire operations of ht.raw, which were previously
split across the two paths of a conditional branch, into
__rte_ring_hts_head_wait. This simplifies the branching logic and makes
the synchronization behavior easier to understand.

Add comments to explain synchronizes with edges in detail.

Signed-off-by: Wathsala Vithanage <wathsala.vithanage at arm.com>
Signed-off-by: Ola Liljedahl <ola.liljedahl at arm.com>
---
 lib/ring/rte_ring_hts_elem_pvt.h | 96 +++++++++++++++++++++++---------
 1 file changed, 71 insertions(+), 25 deletions(-)

diff --git a/lib/ring/rte_ring_hts_elem_pvt.h b/lib/ring/rte_ring_hts_elem_pvt.h
index a8678d3052..0961b60339 100644
--- a/lib/ring/rte_ring_hts_elem_pvt.h
+++ b/lib/ring/rte_ring_hts_elem_pvt.h
@@ -30,22 +30,40 @@ __rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t old_tail,
 	RTE_SET_USED(enqueue);
 
 	tail = old_tail + num;
+
+	/*
+	 * R0: Release the tail update. Establishes a synchronization edge with
+	 * the load-acquire at A1/A3. This release ensures that all updates to
+	 * *ht and the ring array made by this thread become visible to the
+	 * opposing thread once the tail value written here is observed.
+	 */
 	__atomic_store_n(&ht->ht.pos.tail, tail, __ATOMIC_RELEASE);
 }
 
 /**
- * @internal waits till tail will become equal to head.
- * Means no writer/reader is active for that ring.
- * Suppose to work as serialization point.
+ * @internal
+ * Waits until the tail becomes equal to the head.
+ * This indicates that another thread has finished its transaction, and there
+ * is a chance that we could be the next writer or reader in line.
+ *
+ * Returns ht.raw at this point. The value may be imprecise, since another
+ * thread might change the state before we observe ht.raw, but that does not
+ * matter. The function __rte_ring_hts_move_head() can detect and recall this
+ * function when it reaches the linearization point (CAS).
  */
-static __rte_always_inline void
+static __rte_always_inline union __rte_ring_hts_pos
 __rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht,
-		union __rte_ring_hts_pos *p)
+			 int memorder)
 {
-	while (p->pos.head != p->pos.tail) {
+	union __rte_ring_hts_pos p;
+	p.raw = __atomic_load_n(&ht->ht.raw, memorder);
+
+	while (p.pos.head != p.pos.tail) {
 		rte_pause();
-		p->raw = __atomic_load_n(&ht->ht.raw, __ATOMIC_ACQUIRE);
+		p.raw = __atomic_load_n(&ht->ht.raw, memorder);
 	}
+
+	return p;
 }
 
 /**
@@ -56,13 +74,11 @@ __rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
 	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
 	uint32_t *free_entries)
 {
-	uint32_t n;
+	uint32_t n, cons_tail;
 	union __rte_ring_hts_pos np, op;
 
 	const uint32_t capacity = r->capacity;
 
-	op.raw = __atomic_load_n(&r->hts_prod.ht.raw, __ATOMIC_ACQUIRE);
-
 	do {
 		/* Reset n to the initial burst count */
 		n = num;
@@ -72,7 +88,20 @@ __rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
 		 * make sure that we read prod head/tail *before*
 		 * reading cons tail.
 		 */
-		__rte_ring_hts_head_wait(&r->hts_prod, &op);
+		/*
+		 * A0: Synchronizes with the CAS at R1.
+		 * Establishes a happens-before relationship with a thread of the same
+		 * type that released the ht.raw, ensuring this thread observes all of
+		 * its memory effects needed to maintain a safe partial order.
+		 */
+		op = __rte_ring_hts_head_wait(&r->hts_prod, __ATOMIC_ACQUIRE);
+
+		/*
+		 * A1: Establish a synchronizes-with edge using a store-release at R0.
+		 * This ensures that all memory effects from the preceding opposing
+		 * thread are observed.
+		 */
+		cons_tail = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE);
 
 		/*
 		 *  The subtraction is done between two unsigned 32bits value
@@ -80,7 +109,7 @@ __rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
 		 * *old_head > cons_tail). So 'free_entries' is always between 0
 		 * and capacity (which is < size).
 		 */
-		*free_entries = capacity + r->cons.tail - op.pos.head;
+		*free_entries = capacity + cons_tail - op.pos.head;
 
 		/* check that we have enough room in ring */
 		if (unlikely(n > *free_entries))
@@ -94,13 +123,16 @@ __rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
 		np.pos.head = op.pos.head + n;
 
 	/*
-	 * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
-	 *  - OOO reads of cons tail value
-	 *  - OOO copy of elems from the ring
+	 * R1: Establishes a synchronizes-with edge with the load-acquire
+	 * of ht.raw at A0. This makes sure that the store-release to the
+	 * tail by this thread, if it was of the opposite type, becomes
+	 * visible to another thread of the current type. That thread will
+	 * then observe the updates in the same order, keeping a safe
+	 * partial order.
 	 */
 	} while (__atomic_compare_exchange_n(&r->hts_prod.ht.raw,
 			&op.raw, np.raw,
-			0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0);
+			0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
 
 	*old_head = op.pos.head;
 	return n;
@@ -114,11 +146,9 @@ __rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num,
 	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
 	uint32_t *entries)
 {
-	uint32_t n;
+	uint32_t n, prod_tail;
 	union __rte_ring_hts_pos np, op;
 
-	op.raw = __atomic_load_n(&r->hts_cons.ht.raw, __ATOMIC_ACQUIRE);
-
 	/* move cons.head atomically */
 	do {
 		/* Restore n as it may change every loop */
@@ -129,14 +159,27 @@ __rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num,
 		 * make sure that we read cons head/tail *before*
 		 * reading prod tail.
 		 */
-		__rte_ring_hts_head_wait(&r->hts_cons, &op);
+		/*
+		 * A2: Synchronizes with the CAS at R2.
+		 * Establishes a happens-before relationship with a thread of the same
+		 * type that released the ht.raw, ensuring this thread observes all of
+		 * its memory effects needed to maintain a safe partial order.
+		 */
+		op = __rte_ring_hts_head_wait(&r->hts_cons, __ATOMIC_ACQUIRE);
+
+		/*
+		 * A3: Establish a synchronizes-with edge using a store-release at R0.
+		 * This ensures that all memory effects from the preceding opposing
+		 * thread are observed.
+		 */
+		prod_tail = __atomic_load_n(&r->prod.tail, __ATOMIC_ACQUIRE);
 
 		/* The subtraction is done between two unsigned 32bits value
 		 * (the result is always modulo 32 bits even if we have
 		 * cons_head > prod_tail). So 'entries' is always between 0
 		 * and size(ring)-1.
 		 */
-		*entries = r->prod.tail - op.pos.head;
+		*entries = prod_tail - op.pos.head;
 
 		/* Set the actual entries for dequeue */
 		if (n > *entries)
@@ -149,13 +192,16 @@ __rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num,
 		np.pos.head = op.pos.head + n;
 
 	/*
-	 * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
-	 *  - OOO reads of prod tail value
-	 *  - OOO copy of elems from the ring
+	 * R2: Establishes a synchronizes-with edge with the load-acquire
+	 * of ht.raw at A2. This makes sure that the store-release to the
+	 * tail by this thread, if it was of the opposite type, becomes
+	 * visible to another thread of the current type. That thread will
+	 * then observe the updates in the same order, keeping a safe
+	 * partial order.
 	 */
 	} while (__atomic_compare_exchange_n(&r->hts_cons.ht.raw,
 			&op.raw, np.raw,
-			0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0);
+			0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
 
 	*old_head = op.pos.head;
 	return n;
-- 
2.43.0


^ permalink raw reply	[flat|nested] 2+ messages in thread

* [PATCH 22.11 2/2] ring: establish safe partial order in RTS mode
  2025-11-13  5:04 [PATCH 22.11 1/2] ring: establish a safe partial order in hts-ring Wathsala Vithanage
@ 2025-11-13  5:04 ` Wathsala Vithanage
  0 siblings, 0 replies; 2+ messages in thread
From: Wathsala Vithanage @ 2025-11-13  5:04 UTC (permalink / raw)
  To: Honnappa Nagarahalli, Konstantin Ananyev
  Cc: stable, Wathsala Vithanage, Ola Liljedahl

[ upstream commit 36b69b5f958e10eb5beb4292ade57199a722a045 ]

Enforce a safe partial order by making the CAS and the preceding head
load use release and acquire semantics. This creates a pairwise
happens-before relationship between threads of the same kind
(i.e. between consumers or between producers).

Combine the two load-acquire operations of ht->head.raw, which were
previously split across the two paths of a conditional branch, into
__rte_ring_rts_head_wait. This simplifies the branching logic and makes
the synchronization behavior easier to understand.

Add comments to explain synchronizes with edges in detail.

Signed-off-by: Wathsala Vithanage <wathsala.vithanage@arm.com>
Signed-off-by: Ola Liljedahl <ola.liljedahl@arm.com>
---
 lib/ring/rte_ring_rts_elem_pvt.h | 97 ++++++++++++++++++++++++--------
 1 file changed, 72 insertions(+), 25 deletions(-)

diff --git a/lib/ring/rte_ring_rts_elem_pvt.h b/lib/ring/rte_ring_rts_elem_pvt.h
index 7164213ee0..4b8e98e826 100644
--- a/lib/ring/rte_ring_rts_elem_pvt.h
+++ b/lib/ring/rte_ring_rts_elem_pvt.h
@@ -31,6 +31,17 @@ __rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht)
 	 * might preceded us, then don't update tail with new value.
 	 */
 
+	/*
+	 * A0 = {A0.a, A0.b}: Synchronizes with the CAS at R0.
+	 * The CAS at R0 in same typed thread establishes a happens-before
+	 * relationship with this load acquire. Ensures that this thread
+	 * observes the same or later values for h.raw/h.val.cnt
+	 * observed by the other thread when it updated ht->tail.raw.
+	 * If not, ht->tail.raw may get updated out of sync (e.g. getting
+	 * updated to the same value twice). A0.a makes sure this condition
+	 * holds when CAS succeeds and A0.b when it fails.
+	 */
+	/* A0.a */
 	ot.raw = __atomic_load_n(&ht->tail.raw, __ATOMIC_ACQUIRE);
 
 	do {
@@ -40,7 +51,11 @@ __rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht)
 		nt.raw = ot.raw;
 		if (++nt.val.cnt == h.val.cnt)
 			nt.val.pos = h.val.pos;
-
+	/*
+	 * R0: Synchronizes with A2 of a different thread of the opposite type and A0.b
+	 * of a different thread of the same type.
+	 */
+	/* A0.b */
 	} while (__atomic_compare_exchange_n(&ht->tail.raw, &ot.raw, nt.raw,
 			0, __ATOMIC_RELEASE, __ATOMIC_ACQUIRE) == 0);
 }
@@ -49,18 +64,22 @@ __rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht)
  * @internal This function waits till head/tail distance wouldn't
  * exceed pre-defined max value.
  */
-static __rte_always_inline void
+static __rte_always_inline union __rte_ring_rts_poscnt
 __rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht,
-	union __rte_ring_rts_poscnt *h)
+			 int memorder)
 {
-	uint32_t max;
+	union __rte_ring_rts_poscnt h;
+	uint32_t max = ht->htd_max;
+
 
-	max = ht->htd_max;
+	h.raw = __atomic_load_n(&ht->head.raw, memorder);
 
-	while (h->val.pos - ht->tail.val.pos > max) {
+	while (h.val.pos - ht->tail.val.pos > max) {
 		rte_pause();
-		h->raw = __atomic_load_n(&ht->head.raw, __ATOMIC_ACQUIRE);
+		h.raw = __atomic_load_n(&ht->head.raw, memorder);
 	}
+
+	return h;
 }
 
 /**
@@ -71,13 +90,11 @@ __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
 	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
 	uint32_t *free_entries)
 {
-	uint32_t n;
+	uint32_t n, cons_tail;
 	union __rte_ring_rts_poscnt nh, oh;
 
 	const uint32_t capacity = r->capacity;
 
-	oh.raw = __atomic_load_n(&r->rts_prod.head.raw, __ATOMIC_ACQUIRE);
-
 	do {
 		/* Reset n to the initial burst count */
 		n = num;
@@ -87,7 +104,20 @@ __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
 		 * make sure that we read prod head *before*
 		 * reading cons tail.
 		 */
-		__rte_ring_rts_head_wait(&r->rts_prod, &oh);
+		/*
+		 * A1 Synchronizes with the CAS at R1.
+		 * Establishes a happens-before relationship with a thread of the same
+		 * type that released the ht.raw, ensuring this thread observes all of
+		 * its memory effects needed to maintain a safe partial order.
+		 */
+		oh = __rte_ring_rts_head_wait(&r->rts_prod, __ATOMIC_ACQUIRE);
+
+		/*
+		 * A2: Establish a synchronizes-with edge using a store-release at R0.
+		 * This ensures that all memory effects from the preceding opposing
+		 * thread are observed.
+		 */
+		cons_tail = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE);
 
 		/*
 		 *  The subtraction is done between two unsigned 32bits value
@@ -95,7 +125,7 @@ __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
 		 * *old_head > cons_tail). So 'free_entries' is always between 0
 		 * and capacity (which is < size).
 		 */
-		*free_entries = capacity + r->cons.tail - oh.val.pos;
+		*free_entries = capacity + cons_tail - oh.val.pos;
 
 		/* check that we have enough room in ring */
 		if (unlikely(n > *free_entries))
@@ -109,13 +139,16 @@ __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
 		nh.val.cnt = oh.val.cnt + 1;
 
 	/*
-	 * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
-	 *  - OOO reads of cons tail value
-	 *  - OOO copy of elems to the ring
+	 * R1: Establishes a synchronizes-with edge with the load-acquire
+	 * of ht.raw at A1. Ensures that the store-release to the tail by
+	 * this thread, if it was of the opposite type, becomes
+	 * visible to another thread of the current type. That thread will
+	 * then observe the updates in the same order, keeping a safe
+	 * partial order.
 	 */
 	} while (__atomic_compare_exchange_n(&r->rts_prod.head.raw,
 			&oh.raw, nh.raw,
-			0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0);
+			0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
 
 	*old_head = oh.val.pos;
 	return n;
@@ -129,11 +162,9 @@ __rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num,
 	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
 	uint32_t *entries)
 {
-	uint32_t n;
+	uint32_t n, prod_tail;
 	union __rte_ring_rts_poscnt nh, oh;
 
-	oh.raw = __atomic_load_n(&r->rts_cons.head.raw, __ATOMIC_ACQUIRE);
-
 	/* move cons.head atomically */
 	do {
 		/* Restore n as it may change every loop */
@@ -144,14 +175,27 @@ __rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num,
 		 * make sure that we read cons head *before*
 		 * reading prod tail.
 		 */
-		__rte_ring_rts_head_wait(&r->rts_cons, &oh);
+		/*
+		 * A3: Synchronizes with the CAS at R2.
+		 * Establishes a happens-before relationship with a thread of the same
+		 * type that released the ht.raw, ensuring this thread observes all of
+		 * its memory effects needed to maintain a safe partial order.
+		 */
+		oh = __rte_ring_rts_head_wait(&r->rts_cons, __ATOMIC_ACQUIRE);
+
+		/*
+		 * A4: Establish a synchronizes-with edge using a store-release at R0.
+		 * This ensures that all memory effects from the preceding opposing
+		 * thread are observed.
+		 */
+		prod_tail = __atomic_load_n(&r->prod.tail, __ATOMIC_ACQUIRE);
 
 		/* The subtraction is done between two unsigned 32bits value
 		 * (the result is always modulo 32 bits even if we have
 		 * cons_head > prod_tail). So 'entries' is always between 0
 		 * and size(ring)-1.
 		 */
-		*entries = r->prod.tail - oh.val.pos;
+		*entries = prod_tail - oh.val.pos;
 
 		/* Set the actual entries for dequeue */
 		if (n > *entries)
@@ -164,13 +208,16 @@ __rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num,
 		nh.val.cnt = oh.val.cnt + 1;
 
 	/*
-	 * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
-	 *  - OOO reads of prod tail value
-	 *  - OOO copy of elems from the ring
+	 * R2: Establishes a synchronizes-with edge with the load-acquire
+	 * of ht.raw at A3. Ensures that the store-release to the tail by
+	 * this thread, if it was of the opposite type, becomes
+	 * visible to another thread of the current type. That thread will
+	 * then observe the updates in the same order, keeping a safe
+	 * partial order.
 	 */
 	} while (__atomic_compare_exchange_n(&r->rts_cons.head.raw,
 			&oh.raw, nh.raw,
-			0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0);
+			0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
 
 	*old_head = oh.val.pos;
 	return n;
-- 
2.43.0


^ permalink raw reply	[flat|nested] 2+ messages in thread

end of thread, other threads:[~2025-11-13  5:07 UTC | newest]

Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2025-11-13  5:04 [PATCH 22.11 1/2] ring: establish a safe partial order in hts-ring Wathsala Vithanage
2025-11-13  5:04 ` [PATCH 22.11 2/2] ring: establish safe partial order in RTS mode Wathsala Vithanage

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).