From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 38ED948AF6 for ; Thu, 13 Nov 2025 06:07:10 +0100 (CET) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 1CE7F40431; Thu, 13 Nov 2025 06:07:10 +0100 (CET) Received: from foss.arm.com (foss.arm.com [217.140.110.172]) by mails.dpdk.org (Postfix) with ESMTP id 7971B40431 for ; Thu, 13 Nov 2025 06:07:08 +0100 (CET) Received: from usa-sjc-imap-foss1.foss.arm.com (unknown [10.121.207.14]) by usa-sjc-mx-foss1.foss.arm.com (Postfix) with ESMTP id 11D7F150C; Wed, 12 Nov 2025 21:07:00 -0800 (PST) Received: from ampere-altra-2-1.usa.arm.com (ampere-altra-2-1.usa.arm.com [10.118.91.158]) by usa-sjc-imap-foss1.foss.arm.com (Postfix) with ESMTPA id 8BD243F66E; Wed, 12 Nov 2025 21:07:07 -0800 (PST) From: Wathsala Vithanage To: Honnappa Nagarahalli , Konstantin Ananyev Cc: stable@dpdk.org, Wathsala Vithanage , ola.liljedahl@arm.com Subject: [PATCH 22.11 1/2] ring: establish a safe partial order in hts-ring Date: Thu, 13 Nov 2025 05:04:55 +0000 Message-ID: <20251113050645.1327486-1-wathsala.vithanage@arm.com> X-Mailer: git-send-email 2.43.0 MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-BeenThere: stable@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: patches for DPDK stable branches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: stable-bounces@dpdk.org [ upstream commit 66d5f962780694f6aebf000907fc3ce7a72584f9 ] Enforce a safe partial order by making the CAS and the preceding head load use release and acquire semantics. This creates a pairwise happens-before relationship between threads of the same role. Combine the two load-acquire operations of ht.raw, which were previously split across the two paths of a conditional branch, into __rte_ring_hts_head_wait. This simplifies the branching logic and makes the synchronization behavior easier to understand. Add comments to explain synchronizes with edges in detail. Signed-off-by: Wathsala Vithanage Signed-off-by: Ola Liljedahl --- lib/ring/rte_ring_hts_elem_pvt.h | 96 +++++++++++++++++++++++--------- 1 file changed, 71 insertions(+), 25 deletions(-) diff --git a/lib/ring/rte_ring_hts_elem_pvt.h b/lib/ring/rte_ring_hts_elem_pvt.h index a8678d3052..0961b60339 100644 --- a/lib/ring/rte_ring_hts_elem_pvt.h +++ b/lib/ring/rte_ring_hts_elem_pvt.h @@ -30,22 +30,40 @@ __rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t old_tail, RTE_SET_USED(enqueue); tail = old_tail + num; + + /* + * R0: Release the tail update. Establishes a synchronization edge with + * the load-acquire at A1/A3. This release ensures that all updates to + * *ht and the ring array made by this thread become visible to the + * opposing thread once the tail value written here is observed. + */ __atomic_store_n(&ht->ht.pos.tail, tail, __ATOMIC_RELEASE); } /** - * @internal waits till tail will become equal to head. - * Means no writer/reader is active for that ring. - * Suppose to work as serialization point. + * @internal + * Waits until the tail becomes equal to the head. + * This indicates that another thread has finished its transaction, and there + * is a chance that we could be the next writer or reader in line. + * + * Returns ht.raw at this point. The value may be imprecise, since another + * thread might change the state before we observe ht.raw, but that does not + * matter. The function __rte_ring_hts_move_head() can detect and recall this + * function when it reaches the linearization point (CAS). */ -static __rte_always_inline void +static __rte_always_inline union __rte_ring_hts_pos __rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht, - union __rte_ring_hts_pos *p) + int memorder) { - while (p->pos.head != p->pos.tail) { + union __rte_ring_hts_pos p; + p.raw = __atomic_load_n(&ht->ht.raw, memorder); + + while (p.pos.head != p.pos.tail) { rte_pause(); - p->raw = __atomic_load_n(&ht->ht.raw, __ATOMIC_ACQUIRE); + p.raw = __atomic_load_n(&ht->ht.raw, memorder); } + + return p; } /** @@ -56,13 +74,11 @@ __rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num, enum rte_ring_queue_behavior behavior, uint32_t *old_head, uint32_t *free_entries) { - uint32_t n; + uint32_t n, cons_tail; union __rte_ring_hts_pos np, op; const uint32_t capacity = r->capacity; - op.raw = __atomic_load_n(&r->hts_prod.ht.raw, __ATOMIC_ACQUIRE); - do { /* Reset n to the initial burst count */ n = num; @@ -72,7 +88,20 @@ __rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num, * make sure that we read prod head/tail *before* * reading cons tail. */ - __rte_ring_hts_head_wait(&r->hts_prod, &op); + /* + * A0: Synchronizes with the CAS at R1. + * Establishes a happens-before relationship with a thread of the same + * type that released the ht.raw, ensuring this thread observes all of + * its memory effects needed to maintain a safe partial order. + */ + op = __rte_ring_hts_head_wait(&r->hts_prod, __ATOMIC_ACQUIRE); + + /* + * A1: Establish a synchronizes-with edge using a store-release at R0. + * This ensures that all memory effects from the preceding opposing + * thread are observed. + */ + cons_tail = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE); /* * The subtraction is done between two unsigned 32bits value @@ -80,7 +109,7 @@ __rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num, * *old_head > cons_tail). So 'free_entries' is always between 0 * and capacity (which is < size). */ - *free_entries = capacity + r->cons.tail - op.pos.head; + *free_entries = capacity + cons_tail - op.pos.head; /* check that we have enough room in ring */ if (unlikely(n > *free_entries)) @@ -94,13 +123,16 @@ __rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num, np.pos.head = op.pos.head + n; /* - * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent: - * - OOO reads of cons tail value - * - OOO copy of elems from the ring + * R1: Establishes a synchronizes-with edge with the load-acquire + * of ht.raw at A0. This makes sure that the store-release to the + * tail by this thread, if it was of the opposite type, becomes + * visible to another thread of the current type. That thread will + * then observe the updates in the same order, keeping a safe + * partial order. */ } while (__atomic_compare_exchange_n(&r->hts_prod.ht.raw, &op.raw, np.raw, - 0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0); + 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0); *old_head = op.pos.head; return n; @@ -114,11 +146,9 @@ __rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num, enum rte_ring_queue_behavior behavior, uint32_t *old_head, uint32_t *entries) { - uint32_t n; + uint32_t n, prod_tail; union __rte_ring_hts_pos np, op; - op.raw = __atomic_load_n(&r->hts_cons.ht.raw, __ATOMIC_ACQUIRE); - /* move cons.head atomically */ do { /* Restore n as it may change every loop */ @@ -129,14 +159,27 @@ __rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num, * make sure that we read cons head/tail *before* * reading prod tail. */ - __rte_ring_hts_head_wait(&r->hts_cons, &op); + /* + * A2: Synchronizes with the CAS at R2. + * Establishes a happens-before relationship with a thread of the same + * type that released the ht.raw, ensuring this thread observes all of + * its memory effects needed to maintain a safe partial order. + */ + op = __rte_ring_hts_head_wait(&r->hts_cons, __ATOMIC_ACQUIRE); + + /* + * A3: Establish a synchronizes-with edge using a store-release at R0. + * This ensures that all memory effects from the preceding opposing + * thread are observed. + */ + prod_tail = __atomic_load_n(&r->prod.tail, __ATOMIC_ACQUIRE); /* The subtraction is done between two unsigned 32bits value * (the result is always modulo 32 bits even if we have * cons_head > prod_tail). So 'entries' is always between 0 * and size(ring)-1. */ - *entries = r->prod.tail - op.pos.head; + *entries = prod_tail - op.pos.head; /* Set the actual entries for dequeue */ if (n > *entries) @@ -149,13 +192,16 @@ __rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num, np.pos.head = op.pos.head + n; /* - * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent: - * - OOO reads of prod tail value - * - OOO copy of elems from the ring + * R2: Establishes a synchronizes-with edge with the load-acquire + * of ht.raw at A2. This makes sure that the store-release to the + * tail by this thread, if it was of the opposite type, becomes + * visible to another thread of the current type. That thread will + * then observe the updates in the same order, keeping a safe + * partial order. */ } while (__atomic_compare_exchange_n(&r->hts_cons.ht.raw, &op.raw, np.raw, - 0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0); + 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0); *old_head = op.pos.head; return n; -- 2.43.0