From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from foss.arm.com (usa-sjc-mx-foss1.foss.arm.com [217.140.101.70]) by dpdk.org (Postfix) with ESMTP id A11C41B55E; Mon, 6 Aug 2018 03:18:27 +0200 (CEST) Received: from usa-sjc-imap-foss1.foss.arm.com (unknown [10.72.51.249]) by usa-sjc-mx-foss1.foss.arm.com (Postfix) with ESMTP id 95D6118A; Sun, 5 Aug 2018 18:18:26 -0700 (PDT) Received: from net-debian.shanghai.arm.com (net-debian.shanghai.arm.com [10.169.36.53]) by usa-sjc-imap-foss1.foss.arm.com (Postfix) with ESMTPA id 4A9213F5D0; Sun, 5 Aug 2018 18:18:24 -0700 (PDT) From: Gavin Hu To: dev@dpdk.org Cc: gavin.hu@arm.com, Honnappa.Nagarahalli@arm.com, steve.capper@arm.com, Ola.Liljedahl@arm.com, jerin.jacob@caviumnetworks.com, hemant.agrawal@nxp.com, jia.he@hxt-semitech.com, stable@dpdk.org Date: Mon, 6 Aug 2018 09:18:05 +0800 Message-Id: <20180806011805.7857-1-gavin.hu@arm.com> X-Mailer: git-send-email 2.11.0 Subject: [dpdk-stable] [PATCH] ring: fix c11 memory ordering issue X-BeenThere: stable@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: patches for DPDK stable branches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 06 Aug 2018 01:18:28 -0000 1) In update_tail, read ht->tail using __atomic_load. 2) In __rte_ring_move_prod_head, move the __atomic_load_n up and out of the do {} while loop as upon failure the old_head will be updated, another load is not necessary. 3) Synchronize the load-acquires of prod.tail and cons.tail with store- releases of update_tail which releases all ring updates up to the value of ht->tail. 4) When calling __atomic_compare_exchange_n, relaxed ordering for both success and failure, as multiple threads can work independently on the same end of the ring (either enqueue or dequeue) without synchronization, not as operating on tail, which has to be finished in sequence. Fixes: 39368ebfc6 ("ring: introduce C11 memory model barrier option") Cc: stable@dpdk.org Reviewed-by: Honnappa Nagarahalli Reviewed-by: Steve Capper Reviewed-by: Ola Liljedahl Signed-off-by: Gavin Hu --- lib/librte_ring/rte_ring_c11_mem.h | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/lib/librte_ring/rte_ring_c11_mem.h b/lib/librte_ring/rte_ring_c11_mem.h index 94df3c4a6..cfa3be4a7 100644 --- a/lib/librte_ring/rte_ring_c11_mem.h +++ b/lib/librte_ring/rte_ring_c11_mem.h @@ -21,7 +21,8 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, * we need to wait for them to complete */ if (!single) - while (unlikely(ht->tail != old_val)) + while (unlikely(old_val != __atomic_load_n(&ht->tail, + __ATOMIC_RELAXED))) rte_pause(); __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE); @@ -60,20 +61,24 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, unsigned int max = n; int success; + *old_head = __atomic_load_n(&r->prod.head, __ATOMIC_RELAXED); do { /* Reset n to the initial burst count */ n = max; - *old_head = __atomic_load_n(&r->prod.head, - __ATOMIC_ACQUIRE); - /* - * The subtraction is done between two unsigned 32bits value + /* load-acquire synchronize with store-release of ht->tail + * in update_tail. + */ + const uint32_t cons_tail = __atomic_load_n(&r->cons.tail, + __ATOMIC_ACQUIRE); + + /* The subtraction is done between two unsigned 32bits value * (the result is always modulo 32 bits even if we have * *old_head > cons_tail). So 'free_entries' is always between 0 * and capacity (which is < size). */ - *free_entries = (capacity + r->cons.tail - *old_head); + *free_entries = (capacity + cons_tail - *old_head); /* check that we have enough room in ring */ if (unlikely(n > *free_entries)) @@ -87,9 +92,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, if (is_sp) r->prod.head = *new_head, success = 1; else + /* on failure, *old_head is updated */ success = __atomic_compare_exchange_n(&r->prod.head, old_head, *new_head, - 0, __ATOMIC_ACQUIRE, + /*weak=*/0, __ATOMIC_RELAXED, __ATOMIC_RELAXED); } while (unlikely(success == 0)); return n; @@ -128,18 +134,23 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc, int success; /* move cons.head atomically */ + *old_head = __atomic_load_n(&r->cons.head, __ATOMIC_RELAXED); do { /* Restore n as it may change every loop */ n = max; - *old_head = __atomic_load_n(&r->cons.head, - __ATOMIC_ACQUIRE); + + /* this load-acquire synchronize with store-release of ht->tail + * in update_tail. + */ + const uint32_t prod_tail = __atomic_load_n(&r->prod.tail, + __ATOMIC_ACQUIRE); /* The subtraction is done between two unsigned 32bits value * (the result is always modulo 32 bits even if we have * cons_head > prod_tail). So 'entries' is always between 0 * and size(ring)-1. */ - *entries = (r->prod.tail - *old_head); + *entries = (prod_tail - *old_head); /* Set the actual entries for dequeue */ if (n > *entries) @@ -152,10 +163,11 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc, if (is_sc) r->cons.head = *new_head, success = 1; else + /* on failure, *old_head will be updated */ success = __atomic_compare_exchange_n(&r->cons.head, - old_head, *new_head, - 0, __ATOMIC_ACQUIRE, - __ATOMIC_RELAXED); + old_head, *new_head, + /*weak=*/0, __ATOMIC_RELAXED, + __ATOMIC_RELAXED); } while (unlikely(success == 0)); return n; } -- 2.11.0