DPDK patches and discussions
 help / color / mirror / Atom feed
From: Pavan Nikhilesh <pbhagavatula@marvell.com>
To: <jerinj@marvell.com>, Harry van Haaren <harry.van.haaren@intel.com>
Cc: <dev@dpdk.org>, Pavan Nikhilesh <pbhagavatula@marvell.com>
Subject: [PATCH 3/6] examples/eventdev: clean up worker state before exit
Date: Wed, 27 Apr 2022 02:44:09 +0530	[thread overview]
Message-ID: <20220426211412.6138-3-pbhagavatula@marvell.com> (raw)
In-Reply-To: <20220426211412.6138-1-pbhagavatula@marvell.com>

Event ports are configured to implicitly release the scheduler contexts
currently held in the next call to rte_event_dequeue_burst().
A worker core might still hold a scheduling context during exit, as the
next call to rte_event_dequeue_burst() is never made.
This might lead to deadlock based on the worker exit timing and when
there are very less number of flows.

Add clean up function to release any scheduling contexts held by the
worker by using RTE_EVENT_OP_RELEASE.

Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
---
 examples/eventdev_pipeline/pipeline_common.h  | 22 ++++++
 .../pipeline_worker_generic.c                 | 23 +++---
 .../eventdev_pipeline/pipeline_worker_tx.c    | 79 ++++++++++++-------
 3 files changed, 87 insertions(+), 37 deletions(-)

diff --git a/examples/eventdev_pipeline/pipeline_common.h b/examples/eventdev_pipeline/pipeline_common.h
index b12eb281e1..9899b257b0 100644
--- a/examples/eventdev_pipeline/pipeline_common.h
+++ b/examples/eventdev_pipeline/pipeline_common.h
@@ -140,5 +140,27 @@ schedule_devices(unsigned int lcore_id)
 	}
 }
 
+static inline void
+worker_cleanup(uint8_t dev_id, uint8_t port_id, struct rte_event events[],
+	       uint16_t nb_enq, uint16_t nb_deq)
+{
+	int i;
+
+	if (!(nb_deq - nb_enq))
+		return;
+
+	if (nb_deq) {
+		for (i = nb_enq; i < nb_deq; i++) {
+			if (events[i].op == RTE_EVENT_OP_RELEASE)
+				continue;
+			rte_pktmbuf_free(events[i].mbuf);
+		}
+
+		for (i = 0; i < nb_deq; i++)
+			events[i].op = RTE_EVENT_OP_RELEASE;
+		rte_event_enqueue_burst(dev_id, port_id, events, nb_deq);
+	}
+}
+
 void set_worker_generic_setup_data(struct setup_data *caps, bool burst);
 void set_worker_tx_enq_setup_data(struct setup_data *caps, bool burst);
diff --git a/examples/eventdev_pipeline/pipeline_worker_generic.c b/examples/eventdev_pipeline/pipeline_worker_generic.c
index ce1e92d59e..c564c808e2 100644
--- a/examples/eventdev_pipeline/pipeline_worker_generic.c
+++ b/examples/eventdev_pipeline/pipeline_worker_generic.c
@@ -16,6 +16,7 @@ worker_generic(void *arg)
 	uint8_t port_id = data->port_id;
 	size_t sent = 0, received = 0;
 	unsigned int lcore_id = rte_lcore_id();
+	uint16_t nb_rx = 0, nb_tx = 0;
 
 	while (!fdata->done) {
 
@@ -27,8 +28,7 @@ worker_generic(void *arg)
 			continue;
 		}
 
-		const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
-				&ev, 1, 0);
+		nb_rx = rte_event_dequeue_burst(dev_id, port_id, &ev, 1, 0);
 
 		if (nb_rx == 0) {
 			rte_pause();
@@ -47,11 +47,14 @@ worker_generic(void *arg)
 
 		work();
 
-		while (rte_event_enqueue_burst(dev_id, port_id, &ev, 1) != 1)
-			rte_pause();
+		do {
+			nb_tx = rte_event_enqueue_burst(dev_id, port_id, &ev,
+							1);
+		} while (!nb_tx && !fdata->done);
 		sent++;
 	}
 
+	worker_cleanup(dev_id, port_id, &ev, nb_tx, nb_rx);
 	if (!cdata.quiet)
 		printf("  worker %u thread done. RX=%zu TX=%zu\n",
 				rte_lcore_id(), received, sent);
@@ -69,10 +72,9 @@ worker_generic_burst(void *arg)
 	uint8_t port_id = data->port_id;
 	size_t sent = 0, received = 0;
 	unsigned int lcore_id = rte_lcore_id();
+	uint16_t i, nb_rx = 0, nb_tx = 0;
 
 	while (!fdata->done) {
-		uint16_t i;
-
 		if (fdata->cap.scheduler)
 			fdata->cap.scheduler(lcore_id);
 
@@ -81,8 +83,8 @@ worker_generic_burst(void *arg)
 			continue;
 		}
 
-		const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
-				events, RTE_DIM(events), 0);
+		nb_rx = rte_event_dequeue_burst(dev_id, port_id, events,
+						RTE_DIM(events), 0);
 
 		if (nb_rx == 0) {
 			rte_pause();
@@ -103,8 +105,7 @@ worker_generic_burst(void *arg)
 
 			work();
 		}
-		uint16_t nb_tx = rte_event_enqueue_burst(dev_id, port_id,
-				events, nb_rx);
+		nb_tx = rte_event_enqueue_burst(dev_id, port_id, events, nb_rx);
 		while (nb_tx < nb_rx && !fdata->done)
 			nb_tx += rte_event_enqueue_burst(dev_id, port_id,
 							events + nb_tx,
@@ -112,6 +113,8 @@ worker_generic_burst(void *arg)
 		sent += nb_tx;
 	}
 
+	worker_cleanup(dev_id, port_id, events, nb_tx, nb_rx);
+
 	if (!cdata.quiet)
 		printf("  worker %u thread done. RX=%zu TX=%zu\n",
 				rte_lcore_id(), received, sent);
diff --git a/examples/eventdev_pipeline/pipeline_worker_tx.c b/examples/eventdev_pipeline/pipeline_worker_tx.c
index 473940f8c7..a82e064c1c 100644
--- a/examples/eventdev_pipeline/pipeline_worker_tx.c
+++ b/examples/eventdev_pipeline/pipeline_worker_tx.c
@@ -18,21 +18,22 @@ static __rte_always_inline void
 worker_event_enqueue(const uint8_t dev, const uint8_t port,
 		struct rte_event *ev)
 {
-	while (rte_event_enqueue_burst(dev, port, ev, 1) != 1)
+	while (!rte_event_enqueue_burst(dev, port, ev, 1) && !fdata->done)
 		rte_pause();
 }
 
-static __rte_always_inline void
+static __rte_always_inline uint16_t
 worker_event_enqueue_burst(const uint8_t dev, const uint8_t port,
-		struct rte_event *ev, const uint16_t nb_rx)
+			   struct rte_event *ev, const uint16_t nb_rx)
 {
 	uint16_t enq;
 
 	enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
-	while (enq < nb_rx) {
+	while (enq < nb_rx && !fdata->done)
 		enq += rte_event_enqueue_burst(dev, port,
 						ev + enq, nb_rx - enq);
-	}
+
+	return enq;
 }
 
 static __rte_always_inline void
@@ -40,7 +41,8 @@ worker_tx_pkt(const uint8_t dev, const uint8_t port, struct rte_event *ev)
 {
 	exchange_mac(ev->mbuf);
 	rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0);
-	while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0))
+	while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0) &&
+	       !fdata->done)
 		rte_pause();
 }
 
@@ -76,6 +78,11 @@ worker_do_tx_single(void *arg)
 		}
 	}
 
+	if (ev.u64) {
+		ev.op = RTE_EVENT_OP_RELEASE;
+		rte_event_enqueue_burst(dev, port, &ev, 1);
+	}
+
 	if (!cdata.quiet)
 		printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
 				rte_lcore_id(), received, fwd, tx);
@@ -111,6 +118,11 @@ worker_do_tx_single_atq(void *arg)
 		}
 	}
 
+	if (ev.u64) {
+		ev.op = RTE_EVENT_OP_RELEASE;
+		rte_event_enqueue_burst(dev, port, &ev, 1);
+	}
+
 	if (!cdata.quiet)
 		printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
 				rte_lcore_id(), received, fwd, tx);
@@ -126,11 +138,10 @@ worker_do_tx_single_burst(void *arg)
 	const uint8_t dev = data->dev_id;
 	const uint8_t port = data->port_id;
 	size_t fwd = 0, received = 0, tx = 0;
+	uint16_t nb_tx = 0, nb_rx = 0, i;
 
 	while (!fdata->done) {
-		uint16_t i;
-		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
-				BATCH_SIZE, 0);
+		nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0);
 
 		if (!nb_rx) {
 			rte_pause();
@@ -153,10 +164,12 @@ worker_do_tx_single_burst(void *arg)
 			work();
 		}
 
-		worker_event_enqueue_burst(dev, port, ev, nb_rx);
-		fwd += nb_rx;
+		nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx);
+		fwd += nb_tx;
 	}
 
+	worker_cleanup(dev, port, ev, nb_tx, nb_rx);
+
 	if (!cdata.quiet)
 		printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
 				rte_lcore_id(), received, fwd, tx);
@@ -172,11 +185,10 @@ worker_do_tx_single_burst_atq(void *arg)
 	const uint8_t dev = data->dev_id;
 	const uint8_t port = data->port_id;
 	size_t fwd = 0, received = 0, tx = 0;
+	uint16_t i, nb_rx = 0, nb_tx = 0;
 
 	while (!fdata->done) {
-		uint16_t i;
-		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
-				BATCH_SIZE, 0);
+		nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0);
 
 		if (!nb_rx) {
 			rte_pause();
@@ -197,10 +209,12 @@ worker_do_tx_single_burst_atq(void *arg)
 			work();
 		}
 
-		worker_event_enqueue_burst(dev, port, ev, nb_rx);
-		fwd += nb_rx;
+		nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx);
+		fwd += nb_tx;
 	}
 
+	worker_cleanup(dev, port, ev, nb_tx, nb_rx);
+
 	if (!cdata.quiet)
 		printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
 				rte_lcore_id(), received, fwd, tx);
@@ -251,6 +265,11 @@ worker_do_tx(void *arg)
 		fwd++;
 	}
 
+	if (ev.u64) {
+		ev.op = RTE_EVENT_OP_RELEASE;
+		rte_event_enqueue_burst(dev, port, &ev, 1);
+	}
+
 	if (!cdata.quiet)
 		printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
 				rte_lcore_id(), received, fwd, tx);
@@ -297,6 +316,11 @@ worker_do_tx_atq(void *arg)
 		fwd++;
 	}
 
+	if (ev.u64) {
+		ev.op = RTE_EVENT_OP_RELEASE;
+		rte_event_enqueue_burst(dev, port, &ev, 1);
+	}
+
 	if (!cdata.quiet)
 		printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
 				rte_lcore_id(), received, fwd, tx);
@@ -314,11 +338,10 @@ worker_do_tx_burst(void *arg)
 	uint8_t port = data->port_id;
 	uint8_t lst_qid = cdata.num_stages - 1;
 	size_t fwd = 0, received = 0, tx = 0;
+	uint16_t i, nb_rx = 0, nb_tx = 0;
 
 	while (!fdata->done) {
-		uint16_t i;
-		const uint16_t nb_rx = rte_event_dequeue_burst(dev, port,
-				ev, BATCH_SIZE, 0);
+		nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0);
 
 		if (nb_rx == 0) {
 			rte_pause();
@@ -347,11 +370,13 @@ worker_do_tx_burst(void *arg)
 			}
 			work();
 		}
-		worker_event_enqueue_burst(dev, port, ev, nb_rx);
 
-		fwd += nb_rx;
+		nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx);
+		fwd += nb_tx;
 	}
 
+	worker_cleanup(dev, port, ev, nb_tx, nb_rx);
+
 	if (!cdata.quiet)
 		printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
 				rte_lcore_id(), received, fwd, tx);
@@ -369,12 +394,10 @@ worker_do_tx_burst_atq(void *arg)
 	uint8_t port = data->port_id;
 	uint8_t lst_qid = cdata.num_stages - 1;
 	size_t fwd = 0, received = 0, tx = 0;
+	uint16_t i, nb_rx = 0, nb_tx = 0;
 
 	while (!fdata->done) {
-		uint16_t i;
-
-		const uint16_t nb_rx = rte_event_dequeue_burst(dev, port,
-				ev, BATCH_SIZE, 0);
+		nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0);
 
 		if (nb_rx == 0) {
 			rte_pause();
@@ -402,10 +425,12 @@ worker_do_tx_burst_atq(void *arg)
 			work();
 		}
 
-		worker_event_enqueue_burst(dev, port, ev, nb_rx);
-		fwd += nb_rx;
+		nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx);
+		fwd += nb_tx;
 	}
 
+	worker_cleanup(dev, port, ev, nb_tx, nb_rx);
+
 	if (!cdata.quiet)
 		printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
 				rte_lcore_id(), received, fwd, tx);
-- 
2.25.1


  parent reply	other threads:[~2022-04-26 21:14 UTC|newest]

Thread overview: 17+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2022-04-26 21:14 [PATCH 1/6] app/eventdev: simplify signal handling and teardown Pavan Nikhilesh
2022-04-26 21:14 ` [PATCH 2/6] app/eventdev: clean up worker state before exit Pavan Nikhilesh
2022-05-13 13:40   ` Jerin Jacob
2022-04-26 21:14 ` Pavan Nikhilesh [this message]
2022-04-26 21:14 ` [PATCH 4/6] examples/l3fwd: " Pavan Nikhilesh
2022-04-26 21:14 ` [PATCH 5/6] examples/l2fwd-event: " Pavan Nikhilesh
2022-04-26 21:14 ` [PATCH 6/6] examples/ipsec-secgw: cleanup " Pavan Nikhilesh
2022-05-13 13:41   ` Jerin Jacob
2022-05-13 11:49 ` [PATCH 1/6] app/eventdev: simplify signal handling and teardown Jerin Jacob
2022-05-13 13:39 ` Jerin Jacob
2022-05-13 16:07 ` [PATCH v2 " pbhagavatula
2022-05-13 16:07   ` [PATCH v2 2/6] app/eventdev: clean up worker state before exit pbhagavatula
2022-05-13 16:07   ` [PATCH v2 3/6] examples/eventdev: " pbhagavatula
2022-05-13 16:07   ` [PATCH v2 4/6] examples/l3fwd: " pbhagavatula
2022-05-13 16:07   ` [PATCH v2 5/6] examples/l2fwd-event: " pbhagavatula
2022-05-13 16:07   ` [PATCH v2 6/6] examples/ipsec-secgw: cleanup " pbhagavatula
2022-05-16 16:46   ` [PATCH v2 1/6] app/eventdev: simplify signal handling and teardown Jerin Jacob

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20220426211412.6138-3-pbhagavatula@marvell.com \
    --to=pbhagavatula@marvell.com \
    --cc=dev@dpdk.org \
    --cc=harry.van.haaren@intel.com \
    --cc=jerinj@marvell.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).