From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id 2AFB0A04F3 for ; Thu, 19 Dec 2019 15:37:17 +0100 (CET) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 2185B1BF71; Thu, 19 Dec 2019 15:37:17 +0100 (CET) Received: from mail-wm1-f48.google.com (mail-wm1-f48.google.com [209.85.128.48]) by dpdk.org (Postfix) with ESMTP id 825121BF82 for ; Thu, 19 Dec 2019 15:37:15 +0100 (CET) Received: by mail-wm1-f48.google.com with SMTP id p17so5875511wmb.0 for ; Thu, 19 Dec 2019 06:37:15 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references :mime-version:content-transfer-encoding; bh=YNTxlTQooelg+lFX8V7ggb+O9aiW5nvtNOfLdJ1JxKk=; b=kis2xBpLZ2wEbTqAYbdU/zHw+vTSefZS8/bscFYve5SOMrb38xNZD5VjKT4zG044fa 5BP5XsXTOTukKYoUtRGvYbG5ACHAdPoHRnUkGWmkfzDsA654cDz8KbFS50TLBTOSIjAo RmOZ8C8Pbl3NyBc+uVH6JGXB4K6jK8WVkUcFoNWAauAFNzV27mP3qsKEpsFHG3zrIEwn /b1y0ozUJs67PA4A+Z+0IomEDOQAKoneSmNlbOX14ZSCfowK3X3NHW5U1koBUdDqNIaS E3okK0uD/viTUE7LAEHAtnJjMXwMIefCE4XbFTP+Cw5ILP+s/G0TUzs5YgmVzdgxGKc/ SdUg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references:mime-version:content-transfer-encoding; bh=YNTxlTQooelg+lFX8V7ggb+O9aiW5nvtNOfLdJ1JxKk=; b=gCE0wDCSOdNXLTe8E9G+TEmp6C0BgoK4Zk9ayWjmDgvDMSdSPGJMvGbGe1ZtVq+1yB uiSIwYnrYmxEX0Ec9LlHlroiEtV6HCbh2P+kUa2wUrdIC0rmIz3P410R8YKAGba9NQGO e6ktAJSB4+nGCo1fqsRbAr6sM6FjPCTzcg7LU763WLvVng0YxzXw7ObGkwTiiZPdVzgF /6c1tYePhIH/iVbV5eDn+LixJS/ZUy1Z7vGk0od25sM+v0/2MeO8VhJoRdz2h/rbaOy2 IcwA/j/+5TMhTibBUZ2IR88xlDFbR4/Z9p4uTV/g1DhCxi2Va3MYkSyCnyvRoVKoaVY5 VHeg== X-Gm-Message-State: APjAAAVyvzYxEYDBi4GlBhdpq8/lvQSDJ4c6BhtNe4Ig58EAspuzUxxI J1/wpGGKvVCkWjiYlqFvW4U= X-Google-Smtp-Source: APXvYqwA50GiRDiDVw5A3T913yQCIisGxwEEbirGItEcCutlQFHHAdtA1VXiJxKx7JIh2S6FtupbtA== X-Received: by 2002:a1c:f31a:: with SMTP id q26mr10268362wmq.142.1576766234884; Thu, 19 Dec 2019 06:37:14 -0800 (PST) Received: from localhost ([88.98.246.218]) by smtp.gmail.com with ESMTPSA id q68sm7456344wme.14.2019.12.19.06.37.14 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Thu, 19 Dec 2019 06:37:14 -0800 (PST) From: luca.boccassi@gmail.com To: Ruifeng Wang Cc: Gavin Hu , David Hunt , dpdk stable Date: Thu, 19 Dec 2019 14:33:22 +0000 Message-Id: <20191219143447.21506-55-luca.boccassi@gmail.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20191219143447.21506-1-luca.boccassi@gmail.com> References: <20191219143447.21506-1-luca.boccassi@gmail.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [dpdk-stable] patch 'lib/distributor: fix deadlock on aarch64' has been queued to LTS release 17.11.10 X-BeenThere: stable@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: patches for DPDK stable branches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: stable-bounces@dpdk.org Sender: "stable" Hi, FYI, your patch has been queued to LTS release 17.11.10 Note it hasn't been pushed to http://dpdk.org/browse/dpdk-stable yet. It will be pushed if I get no objections before 12/21/19. So please shout if anyone has objections. Also note that after the patch there's a diff of the upstream commit vs the patch applied to the branch. This will indicate if there was any rebasing needed to apply to the stable branch. If there were code changes for rebasing (ie: not only metadata diffs), please double check that the rebase was correctly done. Thanks. Luca Boccassi --- >From f76ab2f8e6c01f149358f5bd549d8bf362b61894 Mon Sep 17 00:00:00 2001 From: Ruifeng Wang Date: Tue, 15 Oct 2019 17:28:25 +0800 Subject: [PATCH] lib/distributor: fix deadlock on aarch64 [ upstream commit 52833924822490391df3dce3eec3a2ee7777acc5 ] Distributor and worker threads rely on data structs in cache line for synchronization. The shared data structs were not protected. This caused deadlock issue on weaker memory ordering platforms as aarch64. Fix this issue by adding memory barriers to ensure synchronization among cores. Bugzilla ID: 342 Fixes: 775003ad2f96 ("distributor: add new burst-capable library") Signed-off-by: Ruifeng Wang Reviewed-by: Gavin Hu Acked-by: David Hunt --- lib/librte_distributor/rte_distributor.c | 68 ++++++++++++++------ lib/librte_distributor/rte_distributor_v20.c | 59 ++++++++++++----- 2 files changed, 92 insertions(+), 35 deletions(-) diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c index 6ad2301315..00fc003f23 100644 --- a/lib/librte_distributor/rte_distributor.c +++ b/lib/librte_distributor/rte_distributor.c @@ -76,8 +76,11 @@ rte_distributor_request_pkt_v1705(struct rte_distributor *d, } retptr64 = &(buf->retptr64[0]); - /* Spin while handshake bits are set (scheduler clears it) */ - while (unlikely(*retptr64 & RTE_DISTRIB_GET_BUF)) { + /* Spin while handshake bits are set (scheduler clears it). + * Sync with worker on GET_BUF flag. + */ + while (unlikely(__atomic_load_n(retptr64, __ATOMIC_ACQUIRE) + & RTE_DISTRIB_GET_BUF)) { rte_pause(); uint64_t t = rte_rdtsc()+100; @@ -102,8 +105,10 @@ rte_distributor_request_pkt_v1705(struct rte_distributor *d, /* * Finally, set the GET_BUF to signal to distributor that cache * line is ready for processing + * Sync with distributor to release retptrs */ - *retptr64 |= RTE_DISTRIB_GET_BUF; + __atomic_store_n(retptr64, *retptr64 | RTE_DISTRIB_GET_BUF, + __ATOMIC_RELEASE); } BIND_DEFAULT_SYMBOL(rte_distributor_request_pkt, _v1705, 17.05); MAP_STATIC_SYMBOL(void rte_distributor_request_pkt(struct rte_distributor *d, @@ -125,8 +130,11 @@ rte_distributor_poll_pkt_v1705(struct rte_distributor *d, return (pkts[0]) ? 1 : 0; } - /* If bit is set, return */ - if (buf->bufptr64[0] & RTE_DISTRIB_GET_BUF) + /* If bit is set, return + * Sync with distributor to acquire bufptrs + */ + if (__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE) + & RTE_DISTRIB_GET_BUF) return -1; /* since bufptr64 is signed, this should be an arithmetic shift */ @@ -141,8 +149,10 @@ rte_distributor_poll_pkt_v1705(struct rte_distributor *d, * so now we've got the contents of the cacheline into an array of * mbuf pointers, so toggle the bit so scheduler can start working * on the next cacheline while we're working. + * Sync with distributor on GET_BUF flag. Release bufptrs. */ - buf->bufptr64[0] |= RTE_DISTRIB_GET_BUF; + __atomic_store_n(&(buf->bufptr64[0]), + buf->bufptr64[0] | RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE); return count; } @@ -201,6 +211,8 @@ rte_distributor_return_pkt_v1705(struct rte_distributor *d, return -EINVAL; } + /* Sync with distributor to acquire retptrs */ + __atomic_thread_fence(__ATOMIC_ACQUIRE); for (i = 0; i < RTE_DIST_BURST_SIZE; i++) /* Switch off the return bit first */ buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF; @@ -209,8 +221,11 @@ rte_distributor_return_pkt_v1705(struct rte_distributor *d, buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) << RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF; - /* set the GET_BUF but even if we got no returns */ - buf->retptr64[0] |= RTE_DISTRIB_GET_BUF; + /* set the GET_BUF but even if we got no returns. + * Sync with distributor on GET_BUF flag. Release retptrs. + */ + __atomic_store_n(&(buf->retptr64[0]), + buf->retptr64[0] | RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE); return 0; } @@ -300,7 +315,9 @@ handle_returns(struct rte_distributor *d, unsigned int wkr) unsigned int count = 0; unsigned int i; - if (buf->retptr64[0] & RTE_DISTRIB_GET_BUF) { + /* Sync on GET_BUF flag. Acquire retptrs. */ + if (__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_ACQUIRE) + & RTE_DISTRIB_GET_BUF) { for (i = 0; i < RTE_DIST_BURST_SIZE; i++) { if (buf->retptr64[i] & RTE_DISTRIB_RETURN_BUF) { oldbuf = ((uintptr_t)(buf->retptr64[i] >> @@ -313,8 +330,10 @@ handle_returns(struct rte_distributor *d, unsigned int wkr) } d->returns.start = ret_start; d->returns.count = ret_count; - /* Clear for the worker to populate with more returns */ - buf->retptr64[0] = 0; + /* Clear for the worker to populate with more returns. + * Sync with distributor on GET_BUF flag. Release retptrs. + */ + __atomic_store_n(&(buf->retptr64[0]), 0, __ATOMIC_RELEASE); } return count; } @@ -334,7 +353,9 @@ release(struct rte_distributor *d, unsigned int wkr) struct rte_distributor_buffer *buf = &(d->bufs[wkr]); unsigned int i; - while (!(d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF)) + /* Sync with worker on GET_BUF flag */ + while (!(__atomic_load_n(&(d->bufs[wkr].bufptr64[0]), __ATOMIC_ACQUIRE) + & RTE_DISTRIB_GET_BUF)) rte_pause(); handle_returns(d, wkr); @@ -354,8 +375,11 @@ release(struct rte_distributor *d, unsigned int wkr) d->backlog[wkr].count = 0; - /* Clear the GET bit */ - buf->bufptr64[0] &= ~RTE_DISTRIB_GET_BUF; + /* Clear the GET bit. + * Sync with worker on GET_BUF flag. Release bufptrs. + */ + __atomic_store_n(&(buf->bufptr64[0]), + buf->bufptr64[0] & ~RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE); return buf->count; } @@ -382,7 +406,9 @@ rte_distributor_process_v1705(struct rte_distributor *d, if (unlikely(num_mbufs == 0)) { /* Flush out all non-full cache-lines to workers. */ for (wid = 0 ; wid < d->num_workers; wid++) { - if ((d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF)) { + /* Sync with worker on GET_BUF flag. */ + if (__atomic_load_n(&(d->bufs[wid].bufptr64[0]), + __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF) { release(d, wid); handle_returns(d, wid); } @@ -394,7 +420,9 @@ rte_distributor_process_v1705(struct rte_distributor *d, uint16_t matches[RTE_DIST_BURST_SIZE]; unsigned int pkts; - if (d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF) + /* Sync with worker on GET_BUF flag. */ + if (__atomic_load_n(&(d->bufs[wkr].bufptr64[0]), + __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF) d->bufs[wkr].count = 0; if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE) @@ -492,7 +520,9 @@ rte_distributor_process_v1705(struct rte_distributor *d, /* Flush out all non-full cache-lines to workers. */ for (wid = 0 ; wid < d->num_workers; wid++) - if ((d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF)) + /* Sync with worker on GET_BUF flag. */ + if ((__atomic_load_n(&(d->bufs[wid].bufptr64[0]), + __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF)) release(d, wid); return num_mbufs; @@ -598,7 +628,9 @@ rte_distributor_clear_returns_v1705(struct rte_distributor *d) /* throw away returns, so workers can exit */ for (wkr = 0; wkr < d->num_workers; wkr++) - d->bufs[wkr].retptr64[0] = 0; + /* Sync with worker. Release retptrs. */ + __atomic_store_n(&(d->bufs[wkr].retptr64[0]), 0, + __ATOMIC_RELEASE); } BIND_DEFAULT_SYMBOL(rte_distributor_clear_returns, _v1705, 17.05); MAP_STATIC_SYMBOL(void rte_distributor_clear_returns(struct rte_distributor *d), diff --git a/lib/librte_distributor/rte_distributor_v20.c b/lib/librte_distributor/rte_distributor_v20.c index 5be6efd47b..6fede5c38a 100644 --- a/lib/librte_distributor/rte_distributor_v20.c +++ b/lib/librte_distributor/rte_distributor_v20.c @@ -62,9 +62,12 @@ rte_distributor_request_pkt_v20(struct rte_distributor_v20 *d, union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id]; int64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_GET_BUF; - while (unlikely(buf->bufptr64 & RTE_DISTRIB_FLAGS_MASK)) + while (unlikely(__atomic_load_n(&buf->bufptr64, __ATOMIC_RELAXED) + & RTE_DISTRIB_FLAGS_MASK)) rte_pause(); - buf->bufptr64 = req; + + /* Sync with distributor on GET_BUF flag. */ + __atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE); } VERSION_SYMBOL(rte_distributor_request_pkt, _v20, 2.0); @@ -73,7 +76,9 @@ rte_distributor_poll_pkt_v20(struct rte_distributor_v20 *d, unsigned worker_id) { union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id]; - if (buf->bufptr64 & RTE_DISTRIB_GET_BUF) + /* Sync with distributor. Acquire bufptr64. */ + if (__atomic_load_n(&buf->bufptr64, __ATOMIC_ACQUIRE) + & RTE_DISTRIB_GET_BUF) return NULL; /* since bufptr64 is signed, this should be an arithmetic shift */ @@ -101,7 +106,8 @@ rte_distributor_return_pkt_v20(struct rte_distributor_v20 *d, union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id]; uint64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF; - buf->bufptr64 = req; + /* Sync with distributor on RETURN_BUF flag. */ + __atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE); return 0; } VERSION_SYMBOL(rte_distributor_return_pkt, _v20, 2.0); @@ -145,7 +151,8 @@ handle_worker_shutdown(struct rte_distributor_v20 *d, unsigned int wkr) { d->in_flight_tags[wkr] = 0; d->in_flight_bitmask &= ~(1UL << wkr); - d->bufs[wkr].bufptr64 = 0; + /* Sync with worker. Release bufptr64. */ + __atomic_store_n(&(d->bufs[wkr].bufptr64), 0, __ATOMIC_RELEASE); if (unlikely(d->backlog[wkr].count != 0)) { /* On return of a packet, we need to move the * queued packets for this core elsewhere. @@ -189,17 +196,23 @@ process_returns(struct rte_distributor_v20 *d) ret_count = d->returns.count; for (wkr = 0; wkr < d->num_workers; wkr++) { - - const int64_t data = d->bufs[wkr].bufptr64; uintptr_t oldbuf = 0; + /* Sync with worker. Acquire bufptr64. */ + const int64_t data = __atomic_load_n(&(d->bufs[wkr].bufptr64), + __ATOMIC_ACQUIRE); if (data & RTE_DISTRIB_GET_BUF) { flushed++; if (d->backlog[wkr].count) - d->bufs[wkr].bufptr64 = - backlog_pop(&d->backlog[wkr]); + /* Sync with worker. Release bufptr64. */ + __atomic_store_n(&(d->bufs[wkr].bufptr64), + backlog_pop(&d->backlog[wkr]), + __ATOMIC_RELEASE); else { - d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF; + /* Sync with worker on GET_BUF flag. */ + __atomic_store_n(&(d->bufs[wkr].bufptr64), + RTE_DISTRIB_GET_BUF, + __ATOMIC_RELEASE); d->in_flight_tags[wkr] = 0; d->in_flight_bitmask &= ~(1UL << wkr); } @@ -235,9 +248,10 @@ rte_distributor_process_v20(struct rte_distributor_v20 *d, return process_returns(d); while (next_idx < num_mbufs || next_mb != NULL) { - - int64_t data = d->bufs[wkr].bufptr64; uintptr_t oldbuf = 0; + /* Sync with worker. Acquire bufptr64. */ + int64_t data = __atomic_load_n(&(d->bufs[wkr].bufptr64), + __ATOMIC_ACQUIRE); if (!next_mb) { next_mb = mbufs[next_idx++]; @@ -283,11 +297,16 @@ rte_distributor_process_v20(struct rte_distributor_v20 *d, (d->backlog[wkr].count || next_mb)) { if (d->backlog[wkr].count) - d->bufs[wkr].bufptr64 = - backlog_pop(&d->backlog[wkr]); + /* Sync with worker. Release bufptr64. */ + __atomic_store_n(&(d->bufs[wkr].bufptr64), + backlog_pop(&d->backlog[wkr]), + __ATOMIC_RELEASE); else { - d->bufs[wkr].bufptr64 = next_value; + /* Sync with worker. Release bufptr64. */ + __atomic_store_n(&(d->bufs[wkr].bufptr64), + next_value, + __ATOMIC_RELEASE); d->in_flight_tags[wkr] = new_tag; d->in_flight_bitmask |= (1UL << wkr); next_mb = NULL; @@ -308,13 +327,19 @@ rte_distributor_process_v20(struct rte_distributor_v20 *d, * if they are ready */ for (wkr = 0; wkr < d->num_workers; wkr++) if (d->backlog[wkr].count && - (d->bufs[wkr].bufptr64 & RTE_DISTRIB_GET_BUF)) { + /* Sync with worker. Acquire bufptr64. */ + (__atomic_load_n(&(d->bufs[wkr].bufptr64), + __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF)) { int64_t oldbuf = d->bufs[wkr].bufptr64 >> RTE_DISTRIB_FLAG_BITS; + store_return(oldbuf, d, &ret_start, &ret_count); - d->bufs[wkr].bufptr64 = backlog_pop(&d->backlog[wkr]); + /* Sync with worker. Release bufptr64. */ + __atomic_store_n(&(d->bufs[wkr].bufptr64), + backlog_pop(&d->backlog[wkr]), + __ATOMIC_RELEASE); } d->returns.start = ret_start; -- 2.20.1 --- Diff of the applied patch vs upstream commit (please double-check if non-empty: --- --- - 2019-12-19 14:32:28.511895027 +0000 +++ 0055-lib-distributor-fix-deadlock-on-aarch64.patch 2019-12-19 14:32:26.085297429 +0000 @@ -1,8 +1,10 @@ -From 52833924822490391df3dce3eec3a2ee7777acc5 Mon Sep 17 00:00:00 2001 +From f76ab2f8e6c01f149358f5bd549d8bf362b61894 Mon Sep 17 00:00:00 2001 From: Ruifeng Wang Date: Tue, 15 Oct 2019 17:28:25 +0800 Subject: [PATCH] lib/distributor: fix deadlock on aarch64 +[ upstream commit 52833924822490391df3dce3eec3a2ee7777acc5 ] + Distributor and worker threads rely on data structs in cache line for synchronization. The shared data structs were not protected. This caused deadlock issue on weaker memory ordering platforms as @@ -12,35 +14,20 @@ Bugzilla ID: 342 Fixes: 775003ad2f96 ("distributor: add new burst-capable library") -Cc: stable@dpdk.org Signed-off-by: Ruifeng Wang Reviewed-by: Gavin Hu Acked-by: David Hunt --- - lib/librte_distributor/meson.build | 5 ++ lib/librte_distributor/rte_distributor.c | 68 ++++++++++++++------ lib/librte_distributor/rte_distributor_v20.c | 59 ++++++++++++----- - 3 files changed, 97 insertions(+), 35 deletions(-) + 2 files changed, 92 insertions(+), 35 deletions(-) -diff --git a/lib/librte_distributor/meson.build b/lib/librte_distributor/meson.build -index dba7e3b2aa..26577dbc19 100644 ---- a/lib/librte_distributor/meson.build -+++ b/lib/librte_distributor/meson.build -@@ -9,3 +9,8 @@ else - endif - headers = files('rte_distributor.h') - deps += ['mbuf'] -+ -+# for clang 32-bit compiles we need libatomic for 64-bit atomic ops -+if cc.get_id() == 'clang' and dpdk_conf.get('RTE_ARCH_64') == false -+ ext_deps += cc.find_library('atomic') -+endif diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c -index 21eb1fb0a1..0a03625c9f 100644 +index 6ad2301315..00fc003f23 100644 --- a/lib/librte_distributor/rte_distributor.c +++ b/lib/librte_distributor/rte_distributor.c -@@ -49,8 +49,11 @@ rte_distributor_request_pkt_v1705(struct rte_distributor *d, +@@ -76,8 +76,11 @@ rte_distributor_request_pkt_v1705(struct rte_distributor *d, } retptr64 = &(buf->retptr64[0]); @@ -54,7 +41,7 @@ rte_pause(); uint64_t t = rte_rdtsc()+100; -@@ -75,8 +78,10 @@ rte_distributor_request_pkt_v1705(struct rte_distributor *d, +@@ -102,8 +105,10 @@ rte_distributor_request_pkt_v1705(struct rte_distributor *d, /* * Finally, set the GET_BUF to signal to distributor that cache * line is ready for processing @@ -66,7 +53,7 @@ } BIND_DEFAULT_SYMBOL(rte_distributor_request_pkt, _v1705, 17.05); MAP_STATIC_SYMBOL(void rte_distributor_request_pkt(struct rte_distributor *d, -@@ -98,8 +103,11 @@ rte_distributor_poll_pkt_v1705(struct rte_distributor *d, +@@ -125,8 +130,11 @@ rte_distributor_poll_pkt_v1705(struct rte_distributor *d, return (pkts[0]) ? 1 : 0; } @@ -80,7 +67,7 @@ return -1; /* since bufptr64 is signed, this should be an arithmetic shift */ -@@ -114,8 +122,10 @@ rte_distributor_poll_pkt_v1705(struct rte_distributor *d, +@@ -141,8 +149,10 @@ rte_distributor_poll_pkt_v1705(struct rte_distributor *d, * so now we've got the contents of the cacheline into an array of * mbuf pointers, so toggle the bit so scheduler can start working * on the next cacheline while we're working. @@ -92,7 +79,7 @@ return count; } -@@ -174,6 +184,8 @@ rte_distributor_return_pkt_v1705(struct rte_distributor *d, +@@ -201,6 +211,8 @@ rte_distributor_return_pkt_v1705(struct rte_distributor *d, return -EINVAL; } @@ -101,7 +88,7 @@ for (i = 0; i < RTE_DIST_BURST_SIZE; i++) /* Switch off the return bit first */ buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF; -@@ -182,8 +194,11 @@ rte_distributor_return_pkt_v1705(struct rte_distributor *d, +@@ -209,8 +221,11 @@ rte_distributor_return_pkt_v1705(struct rte_distributor *d, buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) << RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF; @@ -115,7 +102,7 @@ return 0; } -@@ -273,7 +288,9 @@ handle_returns(struct rte_distributor *d, unsigned int wkr) +@@ -300,7 +315,9 @@ handle_returns(struct rte_distributor *d, unsigned int wkr) unsigned int count = 0; unsigned int i; @@ -126,7 +113,7 @@ for (i = 0; i < RTE_DIST_BURST_SIZE; i++) { if (buf->retptr64[i] & RTE_DISTRIB_RETURN_BUF) { oldbuf = ((uintptr_t)(buf->retptr64[i] >> -@@ -286,8 +303,10 @@ handle_returns(struct rte_distributor *d, unsigned int wkr) +@@ -313,8 +330,10 @@ handle_returns(struct rte_distributor *d, unsigned int wkr) } d->returns.start = ret_start; d->returns.count = ret_count; @@ -139,7 +126,7 @@ } return count; } -@@ -307,7 +326,9 @@ release(struct rte_distributor *d, unsigned int wkr) +@@ -334,7 +353,9 @@ release(struct rte_distributor *d, unsigned int wkr) struct rte_distributor_buffer *buf = &(d->bufs[wkr]); unsigned int i; @@ -150,7 +137,7 @@ rte_pause(); handle_returns(d, wkr); -@@ -327,8 +348,11 @@ release(struct rte_distributor *d, unsigned int wkr) +@@ -354,8 +375,11 @@ release(struct rte_distributor *d, unsigned int wkr) d->backlog[wkr].count = 0; @@ -164,18 +151,18 @@ return buf->count; } -@@ -355,7 +379,9 @@ rte_distributor_process_v1705(struct rte_distributor *d, +@@ -382,7 +406,9 @@ rte_distributor_process_v1705(struct rte_distributor *d, if (unlikely(num_mbufs == 0)) { /* Flush out all non-full cache-lines to workers. */ for (wid = 0 ; wid < d->num_workers; wid++) { -- if (d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF) { +- if ((d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF)) { + /* Sync with worker on GET_BUF flag. */ + if (__atomic_load_n(&(d->bufs[wid].bufptr64[0]), + __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF) { release(d, wid); handle_returns(d, wid); } -@@ -367,7 +393,9 @@ rte_distributor_process_v1705(struct rte_distributor *d, +@@ -394,7 +420,9 @@ rte_distributor_process_v1705(struct rte_distributor *d, uint16_t matches[RTE_DIST_BURST_SIZE]; unsigned int pkts; @@ -186,7 +173,7 @@ d->bufs[wkr].count = 0; if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE) -@@ -465,7 +493,9 @@ rte_distributor_process_v1705(struct rte_distributor *d, +@@ -492,7 +520,9 @@ rte_distributor_process_v1705(struct rte_distributor *d, /* Flush out all non-full cache-lines to workers. */ for (wid = 0 ; wid < d->num_workers; wid++) @@ -197,7 +184,7 @@ release(d, wid); return num_mbufs; -@@ -574,7 +604,9 @@ rte_distributor_clear_returns_v1705(struct rte_distributor *d) +@@ -598,7 +628,9 @@ rte_distributor_clear_returns_v1705(struct rte_distributor *d) /* throw away returns, so workers can exit */ for (wkr = 0; wkr < d->num_workers; wkr++) @@ -209,10 +196,10 @@ BIND_DEFAULT_SYMBOL(rte_distributor_clear_returns, _v1705, 17.05); MAP_STATIC_SYMBOL(void rte_distributor_clear_returns(struct rte_distributor *d), diff --git a/lib/librte_distributor/rte_distributor_v20.c b/lib/librte_distributor/rte_distributor_v20.c -index cdc0969a89..ef6d5cb4b8 100644 +index 5be6efd47b..6fede5c38a 100644 --- a/lib/librte_distributor/rte_distributor_v20.c +++ b/lib/librte_distributor/rte_distributor_v20.c -@@ -34,9 +34,12 @@ rte_distributor_request_pkt_v20(struct rte_distributor_v20 *d, +@@ -62,9 +62,12 @@ rte_distributor_request_pkt_v20(struct rte_distributor_v20 *d, union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id]; int64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_GET_BUF; @@ -227,7 +214,7 @@ } VERSION_SYMBOL(rte_distributor_request_pkt, _v20, 2.0); -@@ -45,7 +48,9 @@ rte_distributor_poll_pkt_v20(struct rte_distributor_v20 *d, +@@ -73,7 +76,9 @@ rte_distributor_poll_pkt_v20(struct rte_distributor_v20 *d, unsigned worker_id) { union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id]; @@ -238,7 +225,7 @@ return NULL; /* since bufptr64 is signed, this should be an arithmetic shift */ -@@ -73,7 +78,8 @@ rte_distributor_return_pkt_v20(struct rte_distributor_v20 *d, +@@ -101,7 +106,8 @@ rte_distributor_return_pkt_v20(struct rte_distributor_v20 *d, union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id]; uint64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF; @@ -248,7 +235,7 @@ return 0; } VERSION_SYMBOL(rte_distributor_return_pkt, _v20, 2.0); -@@ -117,7 +123,8 @@ handle_worker_shutdown(struct rte_distributor_v20 *d, unsigned int wkr) +@@ -145,7 +151,8 @@ handle_worker_shutdown(struct rte_distributor_v20 *d, unsigned int wkr) { d->in_flight_tags[wkr] = 0; d->in_flight_bitmask &= ~(1UL << wkr); @@ -258,7 +245,7 @@ if (unlikely(d->backlog[wkr].count != 0)) { /* On return of a packet, we need to move the * queued packets for this core elsewhere. -@@ -161,17 +168,23 @@ process_returns(struct rte_distributor_v20 *d) +@@ -189,17 +196,23 @@ process_returns(struct rte_distributor_v20 *d) ret_count = d->returns.count; for (wkr = 0; wkr < d->num_workers; wkr++) { @@ -287,7 +274,7 @@ d->in_flight_tags[wkr] = 0; d->in_flight_bitmask &= ~(1UL << wkr); } -@@ -207,9 +220,10 @@ rte_distributor_process_v20(struct rte_distributor_v20 *d, +@@ -235,9 +248,10 @@ rte_distributor_process_v20(struct rte_distributor_v20 *d, return process_returns(d); while (next_idx < num_mbufs || next_mb != NULL) { @@ -300,7 +287,7 @@ if (!next_mb) { next_mb = mbufs[next_idx++]; -@@ -255,11 +269,16 @@ rte_distributor_process_v20(struct rte_distributor_v20 *d, +@@ -283,11 +297,16 @@ rte_distributor_process_v20(struct rte_distributor_v20 *d, (d->backlog[wkr].count || next_mb)) { if (d->backlog[wkr].count) @@ -320,7 +307,7 @@ d->in_flight_tags[wkr] = new_tag; d->in_flight_bitmask |= (1UL << wkr); next_mb = NULL; -@@ -280,13 +299,19 @@ rte_distributor_process_v20(struct rte_distributor_v20 *d, +@@ -308,13 +327,19 @@ rte_distributor_process_v20(struct rte_distributor_v20 *d, * if they are ready */ for (wkr = 0; wkr < d->num_workers; wkr++) if (d->backlog[wkr].count &&