patches for DPDK stable branches
 help / color / mirror / Atom feed
* [dpdk-stable] [PATCH] lib/distributor: fix deadlock issue for aarch64
@ 2019-10-08  9:55 Ruifeng Wang
  2019-10-08 12:53 ` Hunt, David
                   ` (3 more replies)
  0 siblings, 4 replies; 16+ messages in thread
From: Ruifeng Wang @ 2019-10-08  9:55 UTC (permalink / raw)
  To: david.hunt
  Cc: dev, hkalra, gavin.hu, honnappa.nagarahalli, nd, Ruifeng Wang, stable

Distributor and worker threads rely on data structs in cache line
for synchronization. The shared data structs were not protected.
This caused deadlock issue on weaker memory ordering platforms as
aarch64.
Fix this issue by adding memory barriers to ensure synchronization
among cores.

Bugzilla ID: 342
Fixes: 775003ad2f96 ("distributor: add new burst-capable library")
Cc: stable@dpdk.org

Signed-off-by: Ruifeng Wang <ruifeng.wang@arm.com>
Reviewed-by: Gavin Hu <gavin.hu@arm.com>
---
 lib/librte_distributor/rte_distributor.c     | 28 ++++++++++------
 lib/librte_distributor/rte_distributor_v20.c | 34 +++++++++++++-------
 2 files changed, 41 insertions(+), 21 deletions(-)

diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c
index 21eb1fb0a..7bf96e224 100644
--- a/lib/librte_distributor/rte_distributor.c
+++ b/lib/librte_distributor/rte_distributor.c
@@ -50,7 +50,8 @@ rte_distributor_request_pkt_v1705(struct rte_distributor *d,
 
 	retptr64 = &(buf->retptr64[0]);
 	/* Spin while handshake bits are set (scheduler clears it) */
-	while (unlikely(*retptr64 & RTE_DISTRIB_GET_BUF)) {
+	while (unlikely(__atomic_load_n(retptr64, __ATOMIC_ACQUIRE)
+			& RTE_DISTRIB_GET_BUF)) {
 		rte_pause();
 		uint64_t t = rte_rdtsc()+100;
 
@@ -76,7 +77,8 @@ rte_distributor_request_pkt_v1705(struct rte_distributor *d,
 	 * Finally, set the GET_BUF  to signal to distributor that cache
 	 * line is ready for processing
 	 */
-	*retptr64 |= RTE_DISTRIB_GET_BUF;
+	__atomic_store_n(retptr64, *retptr64 | RTE_DISTRIB_GET_BUF,
+			__ATOMIC_RELEASE);
 }
 BIND_DEFAULT_SYMBOL(rte_distributor_request_pkt, _v1705, 17.05);
 MAP_STATIC_SYMBOL(void rte_distributor_request_pkt(struct rte_distributor *d,
@@ -99,7 +101,8 @@ rte_distributor_poll_pkt_v1705(struct rte_distributor *d,
 	}
 
 	/* If bit is set, return */
-	if (buf->bufptr64[0] & RTE_DISTRIB_GET_BUF)
+	if (__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE)
+		& RTE_DISTRIB_GET_BUF)
 		return -1;
 
 	/* since bufptr64 is signed, this should be an arithmetic shift */
@@ -116,6 +119,8 @@ rte_distributor_poll_pkt_v1705(struct rte_distributor *d,
 	 * on the next cacheline while we're working.
 	 */
 	buf->bufptr64[0] |= RTE_DISTRIB_GET_BUF;
+	__atomic_store_n(&(buf->bufptr64[0]),
+		buf->bufptr64[0] | RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE);
 
 	return count;
 }
@@ -183,7 +188,8 @@ rte_distributor_return_pkt_v1705(struct rte_distributor *d,
 			RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
 
 	/* set the GET_BUF but even if we got no returns */
-	buf->retptr64[0] |= RTE_DISTRIB_GET_BUF;
+	__atomic_store_n(&(buf->retptr64[0]),
+		buf->retptr64[0] | RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE);
 
 	return 0;
 }
@@ -273,7 +279,8 @@ handle_returns(struct rte_distributor *d, unsigned int wkr)
 	unsigned int count = 0;
 	unsigned int i;
 
-	if (buf->retptr64[0] & RTE_DISTRIB_GET_BUF) {
+	if (__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_ACQUIRE)
+		& RTE_DISTRIB_GET_BUF) {
 		for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
 			if (buf->retptr64[i] & RTE_DISTRIB_RETURN_BUF) {
 				oldbuf = ((uintptr_t)(buf->retptr64[i] >>
@@ -287,7 +294,7 @@ handle_returns(struct rte_distributor *d, unsigned int wkr)
 		d->returns.start = ret_start;
 		d->returns.count = ret_count;
 		/* Clear for the worker to populate with more returns */
-		buf->retptr64[0] = 0;
+		__atomic_store_n(&(buf->retptr64[0]), 0, __ATOMIC_RELEASE);
 	}
 	return count;
 }
@@ -307,7 +314,8 @@ release(struct rte_distributor *d, unsigned int wkr)
 	struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
 	unsigned int i;
 
-	while (!(d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF))
+	while (!(__atomic_load_n(&(d->bufs[wkr].bufptr64[0]), __ATOMIC_ACQUIRE)
+		& RTE_DISTRIB_GET_BUF))
 		rte_pause();
 
 	handle_returns(d, wkr);
@@ -328,7 +336,8 @@ release(struct rte_distributor *d, unsigned int wkr)
 	d->backlog[wkr].count = 0;
 
 	/* Clear the GET bit */
-	buf->bufptr64[0] &= ~RTE_DISTRIB_GET_BUF;
+	__atomic_store_n(&(buf->bufptr64[0]),
+		buf->bufptr64[0] & ~RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE);
 	return  buf->count;
 
 }
@@ -574,7 +583,8 @@ rte_distributor_clear_returns_v1705(struct rte_distributor *d)
 
 	/* throw away returns, so workers can exit */
 	for (wkr = 0; wkr < d->num_workers; wkr++)
-		d->bufs[wkr].retptr64[0] = 0;
+		__atomic_store_n(&(d->bufs[wkr].retptr64[0]), 0,
+				__ATOMIC_RELEASE);
 }
 BIND_DEFAULT_SYMBOL(rte_distributor_clear_returns, _v1705, 17.05);
 MAP_STATIC_SYMBOL(void rte_distributor_clear_returns(struct rte_distributor *d),
diff --git a/lib/librte_distributor/rte_distributor_v20.c b/lib/librte_distributor/rte_distributor_v20.c
index cdc0969a8..3a5810c6d 100644
--- a/lib/librte_distributor/rte_distributor_v20.c
+++ b/lib/librte_distributor/rte_distributor_v20.c
@@ -34,9 +34,10 @@ rte_distributor_request_pkt_v20(struct rte_distributor_v20 *d,
 	union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id];
 	int64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
 			| RTE_DISTRIB_GET_BUF;
-	while (unlikely(buf->bufptr64 & RTE_DISTRIB_FLAGS_MASK))
+	while (unlikely(__atomic_load_n(&(buf->bufptr64), __ATOMIC_ACQUIRE)
+		& RTE_DISTRIB_FLAGS_MASK))
 		rte_pause();
-	buf->bufptr64 = req;
+	__atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE);
 }
 VERSION_SYMBOL(rte_distributor_request_pkt, _v20, 2.0);
 
@@ -45,7 +46,8 @@ rte_distributor_poll_pkt_v20(struct rte_distributor_v20 *d,
 		unsigned worker_id)
 {
 	union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id];
-	if (buf->bufptr64 & RTE_DISTRIB_GET_BUF)
+	if (__atomic_load_n(&(buf->bufptr64), __ATOMIC_ACQUIRE)
+		& RTE_DISTRIB_GET_BUF)
 		return NULL;
 
 	/* since bufptr64 is signed, this should be an arithmetic shift */
@@ -73,7 +75,7 @@ rte_distributor_return_pkt_v20(struct rte_distributor_v20 *d,
 	union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id];
 	uint64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
 			| RTE_DISTRIB_RETURN_BUF;
-	buf->bufptr64 = req;
+	__atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE);
 	return 0;
 }
 VERSION_SYMBOL(rte_distributor_return_pkt, _v20, 2.0);
@@ -117,7 +119,7 @@ handle_worker_shutdown(struct rte_distributor_v20 *d, unsigned int wkr)
 {
 	d->in_flight_tags[wkr] = 0;
 	d->in_flight_bitmask &= ~(1UL << wkr);
-	d->bufs[wkr].bufptr64 = 0;
+	__atomic_store_n(&(d->bufs[wkr].bufptr64), 0, __ATOMIC_RELEASE);
 	if (unlikely(d->backlog[wkr].count != 0)) {
 		/* On return of a packet, we need to move the
 		 * queued packets for this core elsewhere.
@@ -165,13 +167,17 @@ process_returns(struct rte_distributor_v20 *d)
 		const int64_t data = d->bufs[wkr].bufptr64;
 		uintptr_t oldbuf = 0;
 
-		if (data & RTE_DISTRIB_GET_BUF) {
+		if (__atomic_load_n(&data, __ATOMIC_ACQUIRE)
+			& RTE_DISTRIB_GET_BUF) {
 			flushed++;
 			if (d->backlog[wkr].count)
-				d->bufs[wkr].bufptr64 =
-						backlog_pop(&d->backlog[wkr]);
+				__atomic_store_n(&(d->bufs[wkr].bufptr64),
+					backlog_pop(&d->backlog[wkr]),
+					__ATOMIC_RELEASE);
 			else {
-				d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF;
+				__atomic_store_n(&(d->bufs[wkr].bufptr64),
+					RTE_DISTRIB_GET_BUF,
+					__ATOMIC_RELEASE);
 				d->in_flight_tags[wkr] = 0;
 				d->in_flight_bitmask &= ~(1UL << wkr);
 			}
@@ -251,7 +257,8 @@ rte_distributor_process_v20(struct rte_distributor_v20 *d,
 			}
 		}
 
-		if ((data & RTE_DISTRIB_GET_BUF) &&
+		if ((__atomic_load_n(&data, __ATOMIC_ACQUIRE)
+			& RTE_DISTRIB_GET_BUF) &&
 				(d->backlog[wkr].count || next_mb)) {
 
 			if (d->backlog[wkr].count)
@@ -280,13 +287,16 @@ rte_distributor_process_v20(struct rte_distributor_v20 *d,
 	 * if they are ready */
 	for (wkr = 0; wkr < d->num_workers; wkr++)
 		if (d->backlog[wkr].count &&
-				(d->bufs[wkr].bufptr64 & RTE_DISTRIB_GET_BUF)) {
+				(__atomic_load_n(&(d->bufs[wkr].bufptr64),
+				__ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF)) {
 
 			int64_t oldbuf = d->bufs[wkr].bufptr64 >>
 					RTE_DISTRIB_FLAG_BITS;
 			store_return(oldbuf, d, &ret_start, &ret_count);
 
-			d->bufs[wkr].bufptr64 = backlog_pop(&d->backlog[wkr]);
+			__atomic_store_n(&(d->bufs[wkr].bufptr64),
+				backlog_pop(&d->backlog[wkr]),
+				__ATOMIC_RELEASE);
 		}
 
 	d->returns.start = ret_start;
-- 
2.17.1


^ permalink raw reply	[flat|nested] 16+ messages in thread

end of thread, other threads:[~2019-10-25  8:13 UTC | newest]

Thread overview: 16+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-10-08  9:55 [dpdk-stable] [PATCH] lib/distributor: fix deadlock issue for aarch64 Ruifeng Wang
2019-10-08 12:53 ` Hunt, David
2019-10-08 17:05 ` [dpdk-stable] [dpdk-dev] " Aaron Conole
2019-10-08 19:46   ` David Marchand
2019-10-08 20:08     ` Aaron Conole
2019-10-09  5:52     ` Ruifeng Wang (Arm Technology China)
2019-10-17 11:42       ` [dpdk-stable] [EXT] " Harman Kalra
2019-10-17 13:48         ` Ruifeng Wang (Arm Technology China)
     [not found] ` <20191012024352.23545-1-ruifeng.wang@arm.com>
2019-10-12  2:43   ` [dpdk-stable] [PATCH v2 1/2] " Ruifeng Wang
2019-10-13  2:31     ` Honnappa Nagarahalli
2019-10-14 10:00       ` Ruifeng Wang (Arm Technology China)
2019-10-12  2:43   ` [dpdk-stable] [PATCH v2 2/2] test/distributor: fix false unit test failure Ruifeng Wang
     [not found] ` <20191015092826.13002-1-ruifeng.wang@arm.com>
2019-10-15  9:28   ` [dpdk-stable] [PATCH v3 1/2] lib/distributor: fix deadlock issue for aarch64 Ruifeng Wang
2019-10-25  8:13     ` Hunt, David
2019-10-15  9:28   ` [dpdk-stable] [PATCH v3 2/2] test/distributor: fix false unit test failure Ruifeng Wang
2019-10-25  8:13     ` Hunt, David

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).