From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from foss.arm.com (usa-sjc-mx-foss1.foss.arm.com [217.140.101.70]) by dpdk.org (Postfix) with ESMTP id D235D8D88; Tue, 7 Aug 2018 05:19:57 +0200 (CEST) Received: from usa-sjc-imap-foss1.foss.arm.com (unknown [10.72.51.249]) by usa-sjc-mx-foss1.foss.arm.com (Postfix) with ESMTP id E671680D; Mon, 6 Aug 2018 20:19:55 -0700 (PDT) Received: from net-debian.shanghai.arm.com (net-debian.shanghai.arm.com [10.169.36.53]) by usa-sjc-imap-foss1.foss.arm.com (Postfix) with ESMTPA id 5BAC33F2EA; Mon, 6 Aug 2018 20:19:54 -0700 (PDT) From: Gavin Hu To: dev@dpdk.org Cc: gavin.hu@arm.com, Honnappa.Nagarahalli@arm.com, steve.capper@arm.com, Ola.Liljedahl@arm.com, jerin.jacob@caviumnetworks.com, hemant.agrawal@nxp.com, jia.he@hxt-semitech.com, stable@dpdk.org Date: Tue, 7 Aug 2018 11:19:43 +0800 Message-Id: <20180807031943.5331-1-gavin.hu@arm.com> X-Mailer: git-send-email 2.11.0 In-Reply-To: <20180806011805.7857-1-gavin.hu@arm.com> References: <20180806011805.7857-1-gavin.hu@arm.com> Subject: [dpdk-dev] [PATCH v2] ring: fix c11 memory ordering issue X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 07 Aug 2018 03:19:58 -0000 This patch includes two bug fixes(#1 and 2) and two optimisations(#3 and #4). 1) In update_tail, read ht->tail using __atomic_load.Although the compiler currently seems to be doing the right thing even without _atomic_load, we don't want to give the compiler freedom to optimise what should be an atomic load, it should not be arbitarily moved around. 2) Synchronize the load-acquire of the tail and the store-release within update_tail, the store release ensures all the ring operations, engqueu or dequeue are seen by the observers as soon as they see the updated tail. The load-acquire is required for correctly compu- tate the free_entries or avail_entries, respectively for enqueue and dequeue operations, the data dependency is not reliable for ordering as the compiler might break it by saving to temporary values to boost performance. 3) In __rte_ring_move_prod_head, move the __atomic_load_n up and out of the do {} while loop as upon failure the old_head will be updated, another load is costy and not necessary. 4) When calling __atomic_compare_exchange_n, relaxed ordering for both success and failure, as multiple threads can work independently on the same end of the ring (either enqueue or dequeue) without synchronization, not as operating on tail, which has to be finished in sequence. The patch was benchmarked with test/ring_perf_autotest and it decreases the enqueue/dequeue latency by 5% ~ 24.6% with two lcores, the real gains are dependent on the number of lcores, depth of the ring, SPSC or MPMC. For 1 lcore, it also improves a little, about 3 ~ 4%. It is a big improvement, in case of MPMC, with rings size of 32, it saves latency up to (6.90-5.20)/6.90 = 24.6%. Test result data: SP/SC bulk enq/dequeue (size: 8): 13.19 MP/MC bulk enq/dequeue (size: 8): 25.79 SP/SC bulk enq/dequeue (size: 32): 3.85 MP/MC bulk enq/dequeue (size: 32): 6.90 SP/SC bulk enq/dequeue (size: 8): 12.05 MP/MC bulk enq/dequeue (size: 8): 23.06 SP/SC bulk enq/dequeue (size: 32): 3.62 MP/MC bulk enq/dequeue (size: 32): 5.20 Fixes: 39368ebfc6 ("ring: introduce C11 memory model barrier option") Cc: stable@dpdk.org Signed-off-by: Gavin Hu Reviewed-by: Honnappa Nagarahalli Reviewed-by: Steve Capper Reviewed-by: Ola Liljedahl --- lib/librte_ring/rte_ring_c11_mem.h | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/lib/librte_ring/rte_ring_c11_mem.h b/lib/librte_ring/rte_ring_c11_mem.h index 94df3c4a6..cfa3be4a7 100644 --- a/lib/librte_ring/rte_ring_c11_mem.h +++ b/lib/librte_ring/rte_ring_c11_mem.h @@ -21,7 +21,8 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, * we need to wait for them to complete */ if (!single) - while (unlikely(ht->tail != old_val)) + while (unlikely(old_val != __atomic_load_n(&ht->tail, + __ATOMIC_RELAXED))) rte_pause(); __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE); @@ -60,20 +61,24 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, unsigned int max = n; int success; + *old_head = __atomic_load_n(&r->prod.head, __ATOMIC_RELAXED); do { /* Reset n to the initial burst count */ n = max; - *old_head = __atomic_load_n(&r->prod.head, - __ATOMIC_ACQUIRE); - /* - * The subtraction is done between two unsigned 32bits value + /* load-acquire synchronize with store-release of ht->tail + * in update_tail. + */ + const uint32_t cons_tail = __atomic_load_n(&r->cons.tail, + __ATOMIC_ACQUIRE); + + /* The subtraction is done between two unsigned 32bits value * (the result is always modulo 32 bits even if we have * *old_head > cons_tail). So 'free_entries' is always between 0 * and capacity (which is < size). */ - *free_entries = (capacity + r->cons.tail - *old_head); + *free_entries = (capacity + cons_tail - *old_head); /* check that we have enough room in ring */ if (unlikely(n > *free_entries)) @@ -87,9 +92,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, if (is_sp) r->prod.head = *new_head, success = 1; else + /* on failure, *old_head is updated */ success = __atomic_compare_exchange_n(&r->prod.head, old_head, *new_head, - 0, __ATOMIC_ACQUIRE, + /*weak=*/0, __ATOMIC_RELAXED, __ATOMIC_RELAXED); } while (unlikely(success == 0)); return n; @@ -128,18 +134,23 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc, int success; /* move cons.head atomically */ + *old_head = __atomic_load_n(&r->cons.head, __ATOMIC_RELAXED); do { /* Restore n as it may change every loop */ n = max; - *old_head = __atomic_load_n(&r->cons.head, - __ATOMIC_ACQUIRE); + + /* this load-acquire synchronize with store-release of ht->tail + * in update_tail. + */ + const uint32_t prod_tail = __atomic_load_n(&r->prod.tail, + __ATOMIC_ACQUIRE); /* The subtraction is done between two unsigned 32bits value * (the result is always modulo 32 bits even if we have * cons_head > prod_tail). So 'entries' is always between 0 * and size(ring)-1. */ - *entries = (r->prod.tail - *old_head); + *entries = (prod_tail - *old_head); /* Set the actual entries for dequeue */ if (n > *entries) @@ -152,10 +163,11 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc, if (is_sc) r->cons.head = *new_head, success = 1; else + /* on failure, *old_head will be updated */ success = __atomic_compare_exchange_n(&r->cons.head, - old_head, *new_head, - 0, __ATOMIC_ACQUIRE, - __ATOMIC_RELAXED); + old_head, *new_head, + /*weak=*/0, __ATOMIC_RELAXED, + __ATOMIC_RELAXED); } while (unlikely(success == 0)); return n; } -- 2.11.0