From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 3CA1748AF7 for ; Thu, 13 Nov 2025 06:07:10 +0100 (CET) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 36C6340B9C; Thu, 13 Nov 2025 06:07:10 +0100 (CET) Received: from foss.arm.com (foss.arm.com [217.140.110.172]) by mails.dpdk.org (Postfix) with ESMTP id D1D4A40A87 for ; Thu, 13 Nov 2025 06:07:08 +0100 (CET) Received: from usa-sjc-imap-foss1.foss.arm.com (unknown [10.121.207.14]) by usa-sjc-mx-foss1.foss.arm.com (Postfix) with ESMTP id 74A7F1595; Wed, 12 Nov 2025 21:07:00 -0800 (PST) Received: from ampere-altra-2-1.usa.arm.com (ampere-altra-2-1.usa.arm.com [10.118.91.158]) by usa-sjc-imap-foss1.foss.arm.com (Postfix) with ESMTPA id F0C413F66E; Wed, 12 Nov 2025 21:07:07 -0800 (PST) From: Wathsala Vithanage To: Honnappa Nagarahalli , Konstantin Ananyev Cc: stable@dpdk.org, Wathsala Vithanage , Ola Liljedahl Subject: [PATCH 22.11 2/2] ring: establish safe partial order in RTS mode Date: Thu, 13 Nov 2025 05:04:56 +0000 Message-ID: <20251113050645.1327486-2-wathsala.vithanage@arm.com> X-Mailer: git-send-email 2.43.0 In-Reply-To: <20251113050645.1327486-1-wathsala.vithanage@arm.com> References: <20251113050645.1327486-1-wathsala.vithanage@arm.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-BeenThere: stable@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: patches for DPDK stable branches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: stable-bounces@dpdk.org [ upstream commit 36b69b5f958e10eb5beb4292ade57199a722a045 ] Enforce a safe partial order by making the CAS and the preceding head load use release and acquire semantics. This creates a pairwise happens-before relationship between threads of the same kind (i.e. between consumers or between producers). Combine the two load-acquire operations of ht->head.raw, which were previously split across the two paths of a conditional branch, into __rte_ring_rts_head_wait. This simplifies the branching logic and makes the synchronization behavior easier to understand. Add comments to explain synchronizes with edges in detail. Signed-off-by: Wathsala Vithanage Signed-off-by: Ola Liljedahl --- lib/ring/rte_ring_rts_elem_pvt.h | 97 ++++++++++++++++++++++++-------- 1 file changed, 72 insertions(+), 25 deletions(-) diff --git a/lib/ring/rte_ring_rts_elem_pvt.h b/lib/ring/rte_ring_rts_elem_pvt.h index 7164213ee0..4b8e98e826 100644 --- a/lib/ring/rte_ring_rts_elem_pvt.h +++ b/lib/ring/rte_ring_rts_elem_pvt.h @@ -31,6 +31,17 @@ __rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht) * might preceded us, then don't update tail with new value. */ + /* + * A0 = {A0.a, A0.b}: Synchronizes with the CAS at R0. + * The CAS at R0 in same typed thread establishes a happens-before + * relationship with this load acquire. Ensures that this thread + * observes the same or later values for h.raw/h.val.cnt + * observed by the other thread when it updated ht->tail.raw. + * If not, ht->tail.raw may get updated out of sync (e.g. getting + * updated to the same value twice). A0.a makes sure this condition + * holds when CAS succeeds and A0.b when it fails. + */ + /* A0.a */ ot.raw = __atomic_load_n(&ht->tail.raw, __ATOMIC_ACQUIRE); do { @@ -40,7 +51,11 @@ __rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht) nt.raw = ot.raw; if (++nt.val.cnt == h.val.cnt) nt.val.pos = h.val.pos; - + /* + * R0: Synchronizes with A2 of a different thread of the opposite type and A0.b + * of a different thread of the same type. + */ + /* A0.b */ } while (__atomic_compare_exchange_n(&ht->tail.raw, &ot.raw, nt.raw, 0, __ATOMIC_RELEASE, __ATOMIC_ACQUIRE) == 0); } @@ -49,18 +64,22 @@ __rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht) * @internal This function waits till head/tail distance wouldn't * exceed pre-defined max value. */ -static __rte_always_inline void +static __rte_always_inline union __rte_ring_rts_poscnt __rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht, - union __rte_ring_rts_poscnt *h) + int memorder) { - uint32_t max; + union __rte_ring_rts_poscnt h; + uint32_t max = ht->htd_max; + - max = ht->htd_max; + h.raw = __atomic_load_n(&ht->head.raw, memorder); - while (h->val.pos - ht->tail.val.pos > max) { + while (h.val.pos - ht->tail.val.pos > max) { rte_pause(); - h->raw = __atomic_load_n(&ht->head.raw, __ATOMIC_ACQUIRE); + h.raw = __atomic_load_n(&ht->head.raw, memorder); } + + return h; } /** @@ -71,13 +90,11 @@ __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num, enum rte_ring_queue_behavior behavior, uint32_t *old_head, uint32_t *free_entries) { - uint32_t n; + uint32_t n, cons_tail; union __rte_ring_rts_poscnt nh, oh; const uint32_t capacity = r->capacity; - oh.raw = __atomic_load_n(&r->rts_prod.head.raw, __ATOMIC_ACQUIRE); - do { /* Reset n to the initial burst count */ n = num; @@ -87,7 +104,20 @@ __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num, * make sure that we read prod head *before* * reading cons tail. */ - __rte_ring_rts_head_wait(&r->rts_prod, &oh); + /* + * A1 Synchronizes with the CAS at R1. + * Establishes a happens-before relationship with a thread of the same + * type that released the ht.raw, ensuring this thread observes all of + * its memory effects needed to maintain a safe partial order. + */ + oh = __rte_ring_rts_head_wait(&r->rts_prod, __ATOMIC_ACQUIRE); + + /* + * A2: Establish a synchronizes-with edge using a store-release at R0. + * This ensures that all memory effects from the preceding opposing + * thread are observed. + */ + cons_tail = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE); /* * The subtraction is done between two unsigned 32bits value @@ -95,7 +125,7 @@ __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num, * *old_head > cons_tail). So 'free_entries' is always between 0 * and capacity (which is < size). */ - *free_entries = capacity + r->cons.tail - oh.val.pos; + *free_entries = capacity + cons_tail - oh.val.pos; /* check that we have enough room in ring */ if (unlikely(n > *free_entries)) @@ -109,13 +139,16 @@ __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num, nh.val.cnt = oh.val.cnt + 1; /* - * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent: - * - OOO reads of cons tail value - * - OOO copy of elems to the ring + * R1: Establishes a synchronizes-with edge with the load-acquire + * of ht.raw at A1. Ensures that the store-release to the tail by + * this thread, if it was of the opposite type, becomes + * visible to another thread of the current type. That thread will + * then observe the updates in the same order, keeping a safe + * partial order. */ } while (__atomic_compare_exchange_n(&r->rts_prod.head.raw, &oh.raw, nh.raw, - 0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0); + 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0); *old_head = oh.val.pos; return n; @@ -129,11 +162,9 @@ __rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num, enum rte_ring_queue_behavior behavior, uint32_t *old_head, uint32_t *entries) { - uint32_t n; + uint32_t n, prod_tail; union __rte_ring_rts_poscnt nh, oh; - oh.raw = __atomic_load_n(&r->rts_cons.head.raw, __ATOMIC_ACQUIRE); - /* move cons.head atomically */ do { /* Restore n as it may change every loop */ @@ -144,14 +175,27 @@ __rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num, * make sure that we read cons head *before* * reading prod tail. */ - __rte_ring_rts_head_wait(&r->rts_cons, &oh); + /* + * A3: Synchronizes with the CAS at R2. + * Establishes a happens-before relationship with a thread of the same + * type that released the ht.raw, ensuring this thread observes all of + * its memory effects needed to maintain a safe partial order. + */ + oh = __rte_ring_rts_head_wait(&r->rts_cons, __ATOMIC_ACQUIRE); + + /* + * A4: Establish a synchronizes-with edge using a store-release at R0. + * This ensures that all memory effects from the preceding opposing + * thread are observed. + */ + prod_tail = __atomic_load_n(&r->prod.tail, __ATOMIC_ACQUIRE); /* The subtraction is done between two unsigned 32bits value * (the result is always modulo 32 bits even if we have * cons_head > prod_tail). So 'entries' is always between 0 * and size(ring)-1. */ - *entries = r->prod.tail - oh.val.pos; + *entries = prod_tail - oh.val.pos; /* Set the actual entries for dequeue */ if (n > *entries) @@ -164,13 +208,16 @@ __rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num, nh.val.cnt = oh.val.cnt + 1; /* - * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent: - * - OOO reads of prod tail value - * - OOO copy of elems from the ring + * R2: Establishes a synchronizes-with edge with the load-acquire + * of ht.raw at A3. Ensures that the store-release to the tail by + * this thread, if it was of the opposite type, becomes + * visible to another thread of the current type. That thread will + * then observe the updates in the same order, keeping a safe + * partial order. */ } while (__atomic_compare_exchange_n(&r->rts_cons.head.raw, &oh.raw, nh.raw, - 0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0); + 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0); *old_head = oh.val.pos; return n; -- 2.43.0