From: Jerin Jacob <jerinjacobk@gmail.com>
To: Pavan Nikhilesh <pbhagavatula@marvell.com>
Cc: Jerin Jacob <jerinj@marvell.com>, dpdk-dev <dev@dpdk.org>
Subject: Re: [PATCH 2/6] app/eventdev: clean up worker state before exit
Date: Fri, 13 May 2022 19:10:01 +0530 [thread overview]
Message-ID: <CALBAE1OZEJXdikPckmnE8DkXFmDyypHpPX5yWUdUNzQSqZx1Mw@mail.gmail.com> (raw)
In-Reply-To: <20220426211412.6138-2-pbhagavatula@marvell.com>
On Wed, Apr 27, 2022 at 2:44 AM Pavan Nikhilesh
<pbhagavatula@marvell.com> wrote:
>
> Event ports are configured to implicitly release the scheduler contexts
> currently held in the next call to rte_event_dequeue_burst().
> A worker core might still hold a scheduling context during exit, as the
> next call to rte_event_dequeue_burst() is never made.
> This might lead to deadlock based on the worker exit timing and when
> there are very less number of flows.
>
> Add clean up function to release any scheduling contexts held by the
> worker by using RTE_EVENT_OP_RELEASE.
>
> Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
Acked-by: Jerin Jacob <jerinj@marvell.com>
> ---
> app/test-eventdev/test_perf_atq.c | 31 +++--
> app/test-eventdev/test_perf_common.c | 17 +++
> app/test-eventdev/test_perf_common.h | 3 +
> app/test-eventdev/test_perf_queue.c | 30 +++--
> app/test-eventdev/test_pipeline_atq.c | 134 ++++++++++++---------
> app/test-eventdev/test_pipeline_common.c | 39 ++++++
> app/test-eventdev/test_pipeline_common.h | 59 ++++++---
> app/test-eventdev/test_pipeline_queue.c | 145 ++++++++++++++---------
> 8 files changed, 304 insertions(+), 154 deletions(-)
>
> diff --git a/app/test-eventdev/test_perf_atq.c b/app/test-eventdev/test_perf_atq.c
> index bac3ea602f..5a0b190384 100644
> --- a/app/test-eventdev/test_perf_atq.c
> +++ b/app/test-eventdev/test_perf_atq.c
> @@ -37,13 +37,14 @@ atq_fwd_event(struct rte_event *const ev, uint8_t *const sched_type_list,
> static int
> perf_atq_worker(void *arg, const int enable_fwd_latency)
> {
> - PERF_WORKER_INIT;
> + uint16_t enq = 0, deq = 0;
> struct rte_event ev;
> + PERF_WORKER_INIT;
>
> while (t->done == false) {
> - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> - if (!event) {
> + if (!deq) {
> rte_pause();
> continue;
> }
> @@ -78,24 +79,29 @@ perf_atq_worker(void *arg, const int enable_fwd_latency)
> bufs, sz, cnt);
> } else {
> atq_fwd_event(&ev, sched_type_list, nb_stages);
> - while (rte_event_enqueue_burst(dev, port, &ev, 1) != 1)
> - rte_pause();
> + do {
> + enq = rte_event_enqueue_burst(dev, port, &ev,
> + 1);
> + } while (!enq && !t->done);
> }
> }
> +
> + perf_worker_cleanup(pool, dev, port, &ev, enq, deq);
> +
> return 0;
> }
>
> static int
> perf_atq_worker_burst(void *arg, const int enable_fwd_latency)
> {
> - PERF_WORKER_INIT;
> - uint16_t i;
> /* +1 to avoid prefetch out of array check */
> struct rte_event ev[BURST_SIZE + 1];
> + uint16_t enq = 0, nb_rx = 0;
> + PERF_WORKER_INIT;
> + uint16_t i;
>
> while (t->done == false) {
> - uint16_t const nb_rx = rte_event_dequeue_burst(dev, port, ev,
> - BURST_SIZE, 0);
> + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
> if (!nb_rx) {
> rte_pause();
> @@ -146,14 +152,15 @@ perf_atq_worker_burst(void *arg, const int enable_fwd_latency)
> }
> }
>
> - uint16_t enq;
> -
> enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
> - while (enq < nb_rx) {
> + while ((enq < nb_rx) && !t->done) {
> enq += rte_event_enqueue_burst(dev, port,
> ev + enq, nb_rx - enq);
> }
> }
> +
> + perf_worker_cleanup(pool, dev, port, ev, enq, nb_rx);
> +
> return 0;
> }
>
> diff --git a/app/test-eventdev/test_perf_common.c b/app/test-eventdev/test_perf_common.c
> index e93b0e7272..f673a9fddd 100644
> --- a/app/test-eventdev/test_perf_common.c
> +++ b/app/test-eventdev/test_perf_common.c
> @@ -985,6 +985,23 @@ perf_opt_dump(struct evt_options *opt, uint8_t nb_queues)
> evt_dump("prod_enq_burst_sz", "%d", opt->prod_enq_burst_sz);
> }
>
> +void
> +perf_worker_cleanup(struct rte_mempool *const pool, uint8_t dev_id,
> + uint8_t port_id, struct rte_event events[], uint16_t nb_enq,
> + uint16_t nb_deq)
> +{
> + int i;
> +
> + if (nb_deq) {
> + for (i = nb_enq; i < nb_deq; i++)
> + rte_mempool_put(pool, events[i].event_ptr);
> +
> + for (i = 0; i < nb_deq; i++)
> + events[i].op = RTE_EVENT_OP_RELEASE;
> + rte_event_enqueue_burst(dev_id, port_id, events, nb_deq);
> + }
> +}
> +
> void
> perf_eventdev_destroy(struct evt_test *test, struct evt_options *opt)
> {
> diff --git a/app/test-eventdev/test_perf_common.h b/app/test-eventdev/test_perf_common.h
> index e504bb1df9..f6bfc73be0 100644
> --- a/app/test-eventdev/test_perf_common.h
> +++ b/app/test-eventdev/test_perf_common.h
> @@ -184,5 +184,8 @@ void perf_cryptodev_destroy(struct evt_test *test, struct evt_options *opt);
> void perf_ethdev_destroy(struct evt_test *test, struct evt_options *opt);
> void perf_ethdev_rx_stop(struct evt_test *test, struct evt_options *opt);
> void perf_mempool_destroy(struct evt_test *test, struct evt_options *opt);
> +void perf_worker_cleanup(struct rte_mempool *const pool, uint8_t dev_id,
> + uint8_t port_id, struct rte_event events[],
> + uint16_t nb_enq, uint16_t nb_deq);
>
> #endif /* _TEST_PERF_COMMON_ */
> diff --git a/app/test-eventdev/test_perf_queue.c b/app/test-eventdev/test_perf_queue.c
> index 108f1742a7..b498cacef6 100644
> --- a/app/test-eventdev/test_perf_queue.c
> +++ b/app/test-eventdev/test_perf_queue.c
> @@ -39,13 +39,14 @@ fwd_event(struct rte_event *const ev, uint8_t *const sched_type_list,
> static int
> perf_queue_worker(void *arg, const int enable_fwd_latency)
> {
> - PERF_WORKER_INIT;
> + uint16_t enq = 0, deq = 0;
> struct rte_event ev;
> + PERF_WORKER_INIT;
>
> while (t->done == false) {
> - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> - if (!event) {
> + if (!deq) {
> rte_pause();
> continue;
> }
> @@ -80,24 +81,29 @@ perf_queue_worker(void *arg, const int enable_fwd_latency)
> &ev, w, bufs, sz, cnt);
> } else {
> fwd_event(&ev, sched_type_list, nb_stages);
> - while (rte_event_enqueue_burst(dev, port, &ev, 1) != 1)
> - rte_pause();
> + do {
> + enq = rte_event_enqueue_burst(dev, port, &ev,
> + 1);
> + } while (!enq && !t->done);
> }
> }
> +
> + perf_worker_cleanup(pool, dev, port, &ev, enq, deq);
> +
> return 0;
> }
>
> static int
> perf_queue_worker_burst(void *arg, const int enable_fwd_latency)
> {
> - PERF_WORKER_INIT;
> - uint16_t i;
> /* +1 to avoid prefetch out of array check */
> struct rte_event ev[BURST_SIZE + 1];
> + uint16_t enq = 0, nb_rx = 0;
> + PERF_WORKER_INIT;
> + uint16_t i;
>
> while (t->done == false) {
> - uint16_t const nb_rx = rte_event_dequeue_burst(dev, port, ev,
> - BURST_SIZE, 0);
> + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
> if (!nb_rx) {
> rte_pause();
> @@ -147,14 +153,16 @@ perf_queue_worker_burst(void *arg, const int enable_fwd_latency)
> }
> }
>
> - uint16_t enq;
>
> enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
> - while (enq < nb_rx) {
> + while (enq < nb_rx && !t->done) {
> enq += rte_event_enqueue_burst(dev, port,
> ev + enq, nb_rx - enq);
> }
> }
> +
> + perf_worker_cleanup(pool, dev, port, ev, enq, nb_rx);
> +
> return 0;
> }
>
> diff --git a/app/test-eventdev/test_pipeline_atq.c b/app/test-eventdev/test_pipeline_atq.c
> index 79218502ba..4b10197127 100644
> --- a/app/test-eventdev/test_pipeline_atq.c
> +++ b/app/test-eventdev/test_pipeline_atq.c
> @@ -21,18 +21,20 @@ static __rte_noinline int
> pipeline_atq_worker_single_stage_tx(void *arg)
> {
> PIPELINE_WORKER_SINGLE_STAGE_INIT;
> + uint8_t enq = 0, deq = 0;
>
> while (t->done == false) {
> - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> - if (!event) {
> + if (!deq) {
> rte_pause();
> continue;
> }
>
> - pipeline_event_tx(dev, port, &ev);
> + deq = pipeline_event_tx(dev, port, &ev, t);
> w->processed_pkts++;
> }
> + pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
> return 0;
> }
> @@ -42,20 +44,22 @@ pipeline_atq_worker_single_stage_fwd(void *arg)
> {
> PIPELINE_WORKER_SINGLE_STAGE_INIT;
> const uint8_t *tx_queue = t->tx_evqueue_id;
> + uint8_t enq = 0, deq = 0;
>
> while (t->done == false) {
> - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> - if (!event) {
> + if (!deq) {
> rte_pause();
> continue;
> }
>
> ev.queue_id = tx_queue[ev.mbuf->port];
> pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
> - pipeline_event_enqueue(dev, port, &ev);
> + enq = pipeline_event_enqueue(dev, port, &ev, t);
> w->processed_pkts++;
> }
> + pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
> return 0;
> }
> @@ -64,10 +68,10 @@ static __rte_noinline int
> pipeline_atq_worker_single_stage_burst_tx(void *arg)
> {
> PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
> + uint16_t nb_rx = 0, nb_tx = 0;
>
> while (t->done == false) {
> - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
> - BURST_SIZE, 0);
> + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
> if (!nb_rx) {
> rte_pause();
> @@ -79,9 +83,10 @@ pipeline_atq_worker_single_stage_burst_tx(void *arg)
> rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
> }
>
> - pipeline_event_tx_burst(dev, port, ev, nb_rx);
> - w->processed_pkts += nb_rx;
> + nb_tx = pipeline_event_tx_burst(dev, port, ev, nb_rx, t);
> + w->processed_pkts += nb_tx;
> }
> + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
> return 0;
> }
> @@ -91,10 +96,10 @@ pipeline_atq_worker_single_stage_burst_fwd(void *arg)
> {
> PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
> const uint8_t *tx_queue = t->tx_evqueue_id;
> + uint16_t nb_rx = 0, nb_tx = 0;
>
> while (t->done == false) {
> - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
> - BURST_SIZE, 0);
> + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
> if (!nb_rx) {
> rte_pause();
> @@ -108,9 +113,10 @@ pipeline_atq_worker_single_stage_burst_fwd(void *arg)
> pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
> }
>
> - pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> - w->processed_pkts += nb_rx;
> + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
> + w->processed_pkts += nb_tx;
> }
> + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
> return 0;
> }
> @@ -119,19 +125,21 @@ static __rte_noinline int
> pipeline_atq_worker_single_stage_tx_vector(void *arg)
> {
> PIPELINE_WORKER_SINGLE_STAGE_INIT;
> + uint8_t enq = 0, deq = 0;
> uint16_t vector_sz;
>
> while (!t->done) {
> - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> - if (!event) {
> + if (!deq) {
> rte_pause();
> continue;
> }
> vector_sz = ev.vec->nb_elem;
> - pipeline_event_tx_vector(dev, port, &ev);
> + enq = pipeline_event_tx_vector(dev, port, &ev, t);
> w->processed_pkts += vector_sz;
> }
> + pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
> return 0;
> }
> @@ -141,12 +149,13 @@ pipeline_atq_worker_single_stage_fwd_vector(void *arg)
> {
> PIPELINE_WORKER_SINGLE_STAGE_INIT;
> const uint8_t *tx_queue = t->tx_evqueue_id;
> + uint8_t enq = 0, deq = 0;
> uint16_t vector_sz;
>
> while (!t->done) {
> - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> - if (!event) {
> + if (!deq) {
> rte_pause();
> continue;
> }
> @@ -155,9 +164,10 @@ pipeline_atq_worker_single_stage_fwd_vector(void *arg)
> ev.queue_id = tx_queue[ev.vec->port];
> ev.vec->queue = 0;
> pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
> - pipeline_event_enqueue(dev, port, &ev);
> + enq = pipeline_event_enqueue(dev, port, &ev, t);
> w->processed_pkts += vector_sz;
> }
> + pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
> return 0;
> }
> @@ -166,11 +176,11 @@ static __rte_noinline int
> pipeline_atq_worker_single_stage_burst_tx_vector(void *arg)
> {
> PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
> + uint16_t nb_rx = 0, nb_tx = 0;
> uint16_t vector_sz;
>
> while (!t->done) {
> - uint16_t nb_rx =
> - rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
> + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
> if (!nb_rx) {
> rte_pause();
> @@ -182,9 +192,10 @@ pipeline_atq_worker_single_stage_burst_tx_vector(void *arg)
> ev[i].vec->queue = 0;
> }
>
> - pipeline_event_tx_burst(dev, port, ev, nb_rx);
> + nb_tx = pipeline_event_tx_burst(dev, port, ev, nb_rx, t);
> w->processed_pkts += vector_sz;
> }
> + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
> return 0;
> }
> @@ -194,11 +205,11 @@ pipeline_atq_worker_single_stage_burst_fwd_vector(void *arg)
> {
> PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
> const uint8_t *tx_queue = t->tx_evqueue_id;
> + uint16_t nb_rx = 0, nb_tx = 0;
> uint16_t vector_sz;
>
> while (!t->done) {
> - uint16_t nb_rx =
> - rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
> + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
> if (!nb_rx) {
> rte_pause();
> @@ -214,9 +225,10 @@ pipeline_atq_worker_single_stage_burst_fwd_vector(void *arg)
> RTE_SCHED_TYPE_ATOMIC);
> }
>
> - pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
> w->processed_pkts += vector_sz;
> }
> + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
> return 0;
> }
> @@ -225,11 +237,12 @@ static __rte_noinline int
> pipeline_atq_worker_multi_stage_tx(void *arg)
> {
> PIPELINE_WORKER_MULTI_STAGE_INIT;
> + uint8_t enq = 0, deq = 0;
>
> while (t->done == false) {
> - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> - if (!event) {
> + if (!deq) {
> rte_pause();
> continue;
> }
> @@ -237,15 +250,16 @@ pipeline_atq_worker_multi_stage_tx(void *arg)
> cq_id = ev.sub_event_type % nb_stages;
>
> if (cq_id == last_queue) {
> - pipeline_event_tx(dev, port, &ev);
> + enq = pipeline_event_tx(dev, port, &ev, t);
> w->processed_pkts++;
> continue;
> }
>
> ev.sub_event_type++;
> pipeline_fwd_event(&ev, sched_type_list[cq_id]);
> - pipeline_event_enqueue(dev, port, &ev);
> + enq = pipeline_event_enqueue(dev, port, &ev, t);
> }
> + pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
> return 0;
> }
> @@ -255,11 +269,12 @@ pipeline_atq_worker_multi_stage_fwd(void *arg)
> {
> PIPELINE_WORKER_MULTI_STAGE_INIT;
> const uint8_t *tx_queue = t->tx_evqueue_id;
> + uint8_t enq = 0, deq = 0;
>
> while (t->done == false) {
> - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> - if (!event) {
> + if (!deq) {
> rte_pause();
> continue;
> }
> @@ -275,8 +290,9 @@ pipeline_atq_worker_multi_stage_fwd(void *arg)
> pipeline_fwd_event(&ev, sched_type_list[cq_id]);
> }
>
> - pipeline_event_enqueue(dev, port, &ev);
> + enq = pipeline_event_enqueue(dev, port, &ev, t);
> }
> + pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
> return 0;
> }
> @@ -285,10 +301,10 @@ static __rte_noinline int
> pipeline_atq_worker_multi_stage_burst_tx(void *arg)
> {
> PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
> + uint16_t nb_rx = 0, nb_tx = 0;
>
> while (t->done == false) {
> - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
> - BURST_SIZE, 0);
> + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
> if (!nb_rx) {
> rte_pause();
> @@ -300,7 +316,7 @@ pipeline_atq_worker_multi_stage_burst_tx(void *arg)
> cq_id = ev[i].sub_event_type % nb_stages;
>
> if (cq_id == last_queue) {
> - pipeline_event_tx(dev, port, &ev[i]);
> + pipeline_event_tx(dev, port, &ev[i], t);
> ev[i].op = RTE_EVENT_OP_RELEASE;
> w->processed_pkts++;
> continue;
> @@ -310,8 +326,9 @@ pipeline_atq_worker_multi_stage_burst_tx(void *arg)
> pipeline_fwd_event(&ev[i], sched_type_list[cq_id]);
> }
>
> - pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
> }
> + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
> return 0;
> }
> @@ -321,10 +338,10 @@ pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
> {
> PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
> const uint8_t *tx_queue = t->tx_evqueue_id;
> + uint16_t nb_rx = 0, nb_tx = 0;
>
> while (t->done == false) {
> - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
> - BURST_SIZE, 0);
> + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
> if (!nb_rx) {
> rte_pause();
> @@ -347,8 +364,9 @@ pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
> }
> }
>
> - pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
> }
> + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
> return 0;
> }
> @@ -357,12 +375,13 @@ static __rte_noinline int
> pipeline_atq_worker_multi_stage_tx_vector(void *arg)
> {
> PIPELINE_WORKER_MULTI_STAGE_INIT;
> + uint8_t enq = 0, deq = 0;
> uint16_t vector_sz;
>
> while (!t->done) {
> - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> - if (!event) {
> + if (!deq) {
> rte_pause();
> continue;
> }
> @@ -371,15 +390,16 @@ pipeline_atq_worker_multi_stage_tx_vector(void *arg)
>
> if (cq_id == last_queue) {
> vector_sz = ev.vec->nb_elem;
> - pipeline_event_tx_vector(dev, port, &ev);
> + enq = pipeline_event_tx_vector(dev, port, &ev, t);
> w->processed_pkts += vector_sz;
> continue;
> }
>
> ev.sub_event_type++;
> pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
> - pipeline_event_enqueue(dev, port, &ev);
> + enq = pipeline_event_enqueue(dev, port, &ev, t);
> }
> + pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
> return 0;
> }
> @@ -389,12 +409,13 @@ pipeline_atq_worker_multi_stage_fwd_vector(void *arg)
> {
> PIPELINE_WORKER_MULTI_STAGE_INIT;
> const uint8_t *tx_queue = t->tx_evqueue_id;
> + uint8_t enq = 0, deq = 0;
> uint16_t vector_sz;
>
> while (!t->done) {
> - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> - if (!event) {
> + if (!deq) {
> rte_pause();
> continue;
> }
> @@ -406,14 +427,15 @@ pipeline_atq_worker_multi_stage_fwd_vector(void *arg)
> ev.vec->queue = 0;
> vector_sz = ev.vec->nb_elem;
> pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
> - pipeline_event_enqueue(dev, port, &ev);
> + enq = pipeline_event_enqueue(dev, port, &ev, t);
> w->processed_pkts += vector_sz;
> } else {
> ev.sub_event_type++;
> pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
> - pipeline_event_enqueue(dev, port, &ev);
> + enq = pipeline_event_enqueue(dev, port, &ev, t);
> }
> }
> + pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
> return 0;
> }
> @@ -422,11 +444,11 @@ static __rte_noinline int
> pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg)
> {
> PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
> + uint16_t nb_rx = 0, nb_tx = 0;
> uint16_t vector_sz;
>
> while (!t->done) {
> - uint16_t nb_rx =
> - rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
> + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
> if (!nb_rx) {
> rte_pause();
> @@ -438,7 +460,7 @@ pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg)
>
> if (cq_id == last_queue) {
> vector_sz = ev[i].vec->nb_elem;
> - pipeline_event_tx_vector(dev, port, &ev[i]);
> + pipeline_event_tx_vector(dev, port, &ev[i], t);
> ev[i].op = RTE_EVENT_OP_RELEASE;
> w->processed_pkts += vector_sz;
> continue;
> @@ -449,8 +471,9 @@ pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg)
> sched_type_list[cq_id]);
> }
>
> - pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
> }
> + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
> return 0;
> }
> @@ -460,11 +483,11 @@ pipeline_atq_worker_multi_stage_burst_fwd_vector(void *arg)
> {
> PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
> const uint8_t *tx_queue = t->tx_evqueue_id;
> + uint16_t nb_rx = 0, nb_tx = 0;
> uint16_t vector_sz;
>
> while (!t->done) {
> - uint16_t nb_rx =
> - rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
> + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
> if (!nb_rx) {
> rte_pause();
> @@ -488,8 +511,9 @@ pipeline_atq_worker_multi_stage_burst_fwd_vector(void *arg)
> }
> }
>
> - pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
> }
> + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
> return 0;
> }
> diff --git a/app/test-eventdev/test_pipeline_common.c b/app/test-eventdev/test_pipeline_common.c
> index d994c91678..a8dd070000 100644
> --- a/app/test-eventdev/test_pipeline_common.c
> +++ b/app/test-eventdev/test_pipeline_common.c
> @@ -505,6 +505,45 @@ pipeline_event_tx_adapter_setup(struct evt_options *opt,
> return ret;
> }
>
> +static void
> +pipeline_vector_array_free(struct rte_event events[], uint16_t num)
> +{
> + uint16_t i;
> +
> + for (i = 0; i < num; i++) {
> + rte_pktmbuf_free_bulk(events[i].vec->mbufs,
> + events[i].vec->nb_elem);
> + rte_mempool_put(rte_mempool_from_obj(events[i].vec),
> + events[i].vec);
> + }
> +}
> +
> +void
> +pipeline_worker_cleanup(uint8_t dev, uint8_t port, struct rte_event ev[],
> + uint16_t enq, uint16_t deq)
> +{
> + int i;
> +
> + if (!(deq - enq))
> + return;
> +
> + if (deq) {
> + for (i = enq; i < deq; i++) {
> + if (ev[i].op == RTE_EVENT_OP_RELEASE)
> + continue;
> + if (ev[i].event_type & RTE_EVENT_TYPE_VECTOR)
> + pipeline_vector_array_free(&ev[i], 1);
> + else
> + rte_pktmbuf_free(ev[i].mbuf);
> + }
> +
> + for (i = 0; i < deq; i++)
> + ev[i].op = RTE_EVENT_OP_RELEASE;
> +
> + rte_event_enqueue_burst(dev, port, ev, deq);
> + }
> +}
> +
> void
> pipeline_ethdev_rx_stop(struct evt_test *test, struct evt_options *opt)
> {
> diff --git a/app/test-eventdev/test_pipeline_common.h b/app/test-eventdev/test_pipeline_common.h
> index c979c33772..a6443faea4 100644
> --- a/app/test-eventdev/test_pipeline_common.h
> +++ b/app/test-eventdev/test_pipeline_common.h
> @@ -109,59 +109,80 @@ pipeline_fwd_event_vector(struct rte_event *ev, uint8_t sched)
> ev->sched_type = sched;
> }
>
> -static __rte_always_inline void
> +static __rte_always_inline uint8_t
> pipeline_event_tx(const uint8_t dev, const uint8_t port,
> - struct rte_event * const ev)
> + struct rte_event *const ev, struct test_pipeline *t)
> {
> + uint8_t enq;
> +
> rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0);
> - while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0))
> - rte_pause();
> + do {
> + enq = rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0);
> + } while (!enq && !t->done);
> +
> + return enq;
> }
>
> -static __rte_always_inline void
> +static __rte_always_inline uint8_t
> pipeline_event_tx_vector(const uint8_t dev, const uint8_t port,
> - struct rte_event *const ev)
> + struct rte_event *const ev, struct test_pipeline *t)
> {
> + uint8_t enq;
> +
> ev->vec->queue = 0;
> + do {
> + enq = rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0);
> + } while (!enq && !t->done);
>
> - while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0))
> - rte_pause();
> + return enq;
> }
>
> -static __rte_always_inline void
> +static __rte_always_inline uint16_t
> pipeline_event_tx_burst(const uint8_t dev, const uint8_t port,
> - struct rte_event *ev, const uint16_t nb_rx)
> + struct rte_event *ev, const uint16_t nb_rx,
> + struct test_pipeline *t)
> {
> uint16_t enq;
>
> enq = rte_event_eth_tx_adapter_enqueue(dev, port, ev, nb_rx, 0);
> - while (enq < nb_rx) {
> + while (enq < nb_rx && !t->done) {
> enq += rte_event_eth_tx_adapter_enqueue(dev, port,
> ev + enq, nb_rx - enq, 0);
> }
> +
> + return enq;
> }
>
> -static __rte_always_inline void
> +static __rte_always_inline uint8_t
> pipeline_event_enqueue(const uint8_t dev, const uint8_t port,
> - struct rte_event *ev)
> + struct rte_event *ev, struct test_pipeline *t)
> {
> - while (rte_event_enqueue_burst(dev, port, ev, 1) != 1)
> - rte_pause();
> + uint8_t enq;
> +
> + do {
> + enq = rte_event_enqueue_burst(dev, port, ev, 1);
> + } while (!enq && !t->done);
> +
> + return enq;
> }
>
> -static __rte_always_inline void
> +static __rte_always_inline uint16_t
> pipeline_event_enqueue_burst(const uint8_t dev, const uint8_t port,
> - struct rte_event *ev, const uint16_t nb_rx)
> + struct rte_event *ev, const uint16_t nb_rx,
> + struct test_pipeline *t)
> {
> uint16_t enq;
>
> enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
> - while (enq < nb_rx) {
> + while (enq < nb_rx && !t->done) {
> enq += rte_event_enqueue_burst(dev, port,
> ev + enq, nb_rx - enq);
> }
> +
> + return enq;
> }
>
> +
> static inline int
> pipeline_nb_event_ports(struct evt_options *opt)
> {
> @@ -188,5 +209,7 @@ void pipeline_eventdev_destroy(struct evt_test *test, struct evt_options *opt);
> void pipeline_ethdev_destroy(struct evt_test *test, struct evt_options *opt);
> void pipeline_ethdev_rx_stop(struct evt_test *test, struct evt_options *opt);
> void pipeline_mempool_destroy(struct evt_test *test, struct evt_options *opt);
> +void pipeline_worker_cleanup(uint8_t dev, uint8_t port, struct rte_event ev[],
> + uint16_t enq, uint16_t deq);
>
> #endif /* _TEST_PIPELINE_COMMON_ */
> diff --git a/app/test-eventdev/test_pipeline_queue.c b/app/test-eventdev/test_pipeline_queue.c
> index 343f8f3b1d..e989396474 100644
> --- a/app/test-eventdev/test_pipeline_queue.c
> +++ b/app/test-eventdev/test_pipeline_queue.c
> @@ -21,24 +21,27 @@ static __rte_noinline int
> pipeline_queue_worker_single_stage_tx(void *arg)
> {
> PIPELINE_WORKER_SINGLE_STAGE_INIT;
> + uint8_t enq = 0, deq = 0;
>
> while (t->done == false) {
> - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> - if (!event) {
> + if (!deq) {
> rte_pause();
> continue;
> }
>
> if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
> - pipeline_event_tx(dev, port, &ev);
> + enq = pipeline_event_tx(dev, port, &ev, t);
> + ev.op = RTE_EVENT_OP_RELEASE;
> w->processed_pkts++;
> } else {
> ev.queue_id++;
> pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
> - pipeline_event_enqueue(dev, port, &ev);
> + enq = pipeline_event_enqueue(dev, port, &ev, t);
> }
> }
> + pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
> return 0;
> }
> @@ -48,11 +51,12 @@ pipeline_queue_worker_single_stage_fwd(void *arg)
> {
> PIPELINE_WORKER_SINGLE_STAGE_INIT;
> const uint8_t *tx_queue = t->tx_evqueue_id;
> + uint8_t enq = 0, deq = 0;
>
> while (t->done == false) {
> - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> - if (!event) {
> + if (!deq) {
> rte_pause();
> continue;
> }
> @@ -60,9 +64,10 @@ pipeline_queue_worker_single_stage_fwd(void *arg)
> ev.queue_id = tx_queue[ev.mbuf->port];
> rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0);
> pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
> - pipeline_event_enqueue(dev, port, &ev);
> + enq = pipeline_event_enqueue(dev, port, &ev, t);
> w->processed_pkts++;
> }
> + pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
> return 0;
> }
> @@ -71,10 +76,10 @@ static __rte_noinline int
> pipeline_queue_worker_single_stage_burst_tx(void *arg)
> {
> PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
> + uint16_t nb_rx = 0, nb_tx = 0;
>
> while (t->done == false) {
> - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
> - BURST_SIZE, 0);
> + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
> if (!nb_rx) {
> rte_pause();
> @@ -84,17 +89,18 @@ pipeline_queue_worker_single_stage_burst_tx(void *arg)
> for (i = 0; i < nb_rx; i++) {
> rte_prefetch0(ev[i + 1].mbuf);
> if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
> - pipeline_event_tx(dev, port, &ev[i]);
> + pipeline_event_tx(dev, port, &ev[i], t);
> + ev[i].op = RTE_EVENT_OP_RELEASE;
> w->processed_pkts++;
> } else {
> ev[i].queue_id++;
> pipeline_fwd_event(&ev[i],
> RTE_SCHED_TYPE_ATOMIC);
> - pipeline_event_enqueue_burst(dev, port, ev,
> - nb_rx);
> }
> }
> + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
> }
> + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
> return 0;
> }
> @@ -104,10 +110,10 @@ pipeline_queue_worker_single_stage_burst_fwd(void *arg)
> {
> PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
> const uint8_t *tx_queue = t->tx_evqueue_id;
> + uint16_t nb_rx = 0, nb_tx = 0;
>
> while (t->done == false) {
> - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
> - BURST_SIZE, 0);
> + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
> if (!nb_rx) {
> rte_pause();
> @@ -121,9 +127,10 @@ pipeline_queue_worker_single_stage_burst_fwd(void *arg)
> pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
> }
>
> - pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
> w->processed_pkts += nb_rx;
> }
> + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
> return 0;
> }
> @@ -132,26 +139,29 @@ static __rte_noinline int
> pipeline_queue_worker_single_stage_tx_vector(void *arg)
> {
> PIPELINE_WORKER_SINGLE_STAGE_INIT;
> + uint8_t enq = 0, deq = 0;
> uint16_t vector_sz;
>
> while (!t->done) {
> - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> - if (!event) {
> + if (!deq) {
> rte_pause();
> continue;
> }
>
> if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
> vector_sz = ev.vec->nb_elem;
> - pipeline_event_tx_vector(dev, port, &ev);
> + enq = pipeline_event_tx_vector(dev, port, &ev, t);
> + ev.op = RTE_EVENT_OP_RELEASE;
> w->processed_pkts += vector_sz;
> } else {
> ev.queue_id++;
> pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
> - pipeline_event_enqueue(dev, port, &ev);
> + enq = pipeline_event_enqueue(dev, port, &ev, t);
> }
> }
> + pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
> return 0;
> }
> @@ -161,12 +171,13 @@ pipeline_queue_worker_single_stage_fwd_vector(void *arg)
> {
> PIPELINE_WORKER_SINGLE_STAGE_INIT;
> const uint8_t *tx_queue = t->tx_evqueue_id;
> + uint8_t enq = 0, deq = 0;
> uint16_t vector_sz;
>
> while (!t->done) {
> - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> - if (!event) {
> + if (!deq) {
> rte_pause();
> continue;
> }
> @@ -175,9 +186,10 @@ pipeline_queue_worker_single_stage_fwd_vector(void *arg)
> ev.vec->queue = 0;
> vector_sz = ev.vec->nb_elem;
> pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
> - pipeline_event_enqueue(dev, port, &ev);
> + enq = pipeline_event_enqueue(dev, port, &ev, t);
> w->processed_pkts += vector_sz;
> }
> + pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
> return 0;
> }
> @@ -186,11 +198,11 @@ static __rte_noinline int
> pipeline_queue_worker_single_stage_burst_tx_vector(void *arg)
> {
> PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
> + uint16_t nb_rx = 0, nb_tx = 0;
> uint16_t vector_sz;
>
> while (!t->done) {
> - uint16_t nb_rx =
> - rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
> + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
> if (!nb_rx) {
> rte_pause();
> @@ -200,7 +212,7 @@ pipeline_queue_worker_single_stage_burst_tx_vector(void *arg)
> for (i = 0; i < nb_rx; i++) {
> if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
> vector_sz = ev[i].vec->nb_elem;
> - pipeline_event_tx_vector(dev, port, &ev[i]);
> + pipeline_event_tx_vector(dev, port, &ev[i], t);
> ev[i].op = RTE_EVENT_OP_RELEASE;
> w->processed_pkts += vector_sz;
> } else {
> @@ -210,8 +222,9 @@ pipeline_queue_worker_single_stage_burst_tx_vector(void *arg)
> }
> }
>
> - pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
> }
> + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
> return 0;
> }
> @@ -221,11 +234,11 @@ pipeline_queue_worker_single_stage_burst_fwd_vector(void *arg)
> {
> PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
> const uint8_t *tx_queue = t->tx_evqueue_id;
> + uint16_t nb_rx = 0, nb_tx = 0;
> uint16_t vector_sz;
>
> while (!t->done) {
> - uint16_t nb_rx =
> - rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
> + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
> if (!nb_rx) {
> rte_pause();
> @@ -241,9 +254,10 @@ pipeline_queue_worker_single_stage_burst_fwd_vector(void *arg)
> RTE_SCHED_TYPE_ATOMIC);
> }
>
> - pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
> w->processed_pkts += vector_sz;
> }
> + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
> return 0;
> }
> @@ -253,11 +267,12 @@ pipeline_queue_worker_multi_stage_tx(void *arg)
> {
> PIPELINE_WORKER_MULTI_STAGE_INIT;
> const uint8_t *tx_queue = t->tx_evqueue_id;
> + uint8_t enq = 0, deq = 0;
>
> while (t->done == false) {
> - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> - if (!event) {
> + if (!deq) {
> rte_pause();
> continue;
> }
> @@ -265,7 +280,8 @@ pipeline_queue_worker_multi_stage_tx(void *arg)
> cq_id = ev.queue_id % nb_stages;
>
> if (ev.queue_id == tx_queue[ev.mbuf->port]) {
> - pipeline_event_tx(dev, port, &ev);
> + enq = pipeline_event_tx(dev, port, &ev, t);
> + ev.op = RTE_EVENT_OP_RELEASE;
> w->processed_pkts++;
> continue;
> }
> @@ -274,8 +290,9 @@ pipeline_queue_worker_multi_stage_tx(void *arg)
> pipeline_fwd_event(&ev, cq_id != last_queue ?
> sched_type_list[cq_id] :
> RTE_SCHED_TYPE_ATOMIC);
> - pipeline_event_enqueue(dev, port, &ev);
> + enq = pipeline_event_enqueue(dev, port, &ev, t);
> }
> + pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
> return 0;
> }
> @@ -285,11 +302,12 @@ pipeline_queue_worker_multi_stage_fwd(void *arg)
> {
> PIPELINE_WORKER_MULTI_STAGE_INIT;
> const uint8_t *tx_queue = t->tx_evqueue_id;
> + uint8_t enq = 0, deq = 0;
>
> while (t->done == false) {
> - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> - if (!event) {
> + if (!deq) {
> rte_pause();
> continue;
> }
> @@ -300,14 +318,15 @@ pipeline_queue_worker_multi_stage_fwd(void *arg)
> ev.queue_id = tx_queue[ev.mbuf->port];
> rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0);
> pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
> - pipeline_event_enqueue(dev, port, &ev);
> + enq = pipeline_event_enqueue(dev, port, &ev, t);
> w->processed_pkts++;
> } else {
> ev.queue_id++;
> pipeline_fwd_event(&ev, sched_type_list[cq_id]);
> - pipeline_event_enqueue(dev, port, &ev);
> + enq = pipeline_event_enqueue(dev, port, &ev, t);
> }
> }
> + pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
> return 0;
> }
> @@ -317,10 +336,10 @@ pipeline_queue_worker_multi_stage_burst_tx(void *arg)
> {
> PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
> const uint8_t *tx_queue = t->tx_evqueue_id;
> + uint16_t nb_rx = 0, nb_tx = 0;
>
> while (t->done == false) {
> - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
> - BURST_SIZE, 0);
> + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
> if (!nb_rx) {
> rte_pause();
> @@ -332,7 +351,8 @@ pipeline_queue_worker_multi_stage_burst_tx(void *arg)
> cq_id = ev[i].queue_id % nb_stages;
>
> if (ev[i].queue_id == tx_queue[ev[i].mbuf->port]) {
> - pipeline_event_tx(dev, port, &ev[i]);
> + pipeline_event_tx(dev, port, &ev[i], t);
> + ev[i].op = RTE_EVENT_OP_RELEASE;
> w->processed_pkts++;
> continue;
> }
> @@ -341,9 +361,10 @@ pipeline_queue_worker_multi_stage_burst_tx(void *arg)
> pipeline_fwd_event(&ev[i], cq_id != last_queue ?
> sched_type_list[cq_id] :
> RTE_SCHED_TYPE_ATOMIC);
> - pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> }
> + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
> }
> + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
> return 0;
> }
> @@ -353,11 +374,11 @@ pipeline_queue_worker_multi_stage_burst_fwd(void *arg)
> {
> PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
> const uint8_t *tx_queue = t->tx_evqueue_id;
> + uint16_t nb_rx = 0, nb_tx = 0;
>
> while (t->done == false) {
> uint16_t processed_pkts = 0;
> - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
> - BURST_SIZE, 0);
> + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
> if (!nb_rx) {
> rte_pause();
> @@ -381,9 +402,10 @@ pipeline_queue_worker_multi_stage_burst_fwd(void *arg)
> }
> }
>
> - pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
> w->processed_pkts += processed_pkts;
> }
> + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
> return 0;
> }
> @@ -393,12 +415,13 @@ pipeline_queue_worker_multi_stage_tx_vector(void *arg)
> {
> PIPELINE_WORKER_MULTI_STAGE_INIT;
> const uint8_t *tx_queue = t->tx_evqueue_id;
> + uint8_t enq = 0, deq = 0;
> uint16_t vector_sz;
>
> while (!t->done) {
> - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> - if (!event) {
> + if (!deq) {
> rte_pause();
> continue;
> }
> @@ -407,8 +430,9 @@ pipeline_queue_worker_multi_stage_tx_vector(void *arg)
>
> if (ev.queue_id == tx_queue[ev.vec->port]) {
> vector_sz = ev.vec->nb_elem;
> - pipeline_event_tx_vector(dev, port, &ev);
> + enq = pipeline_event_tx_vector(dev, port, &ev, t);
> w->processed_pkts += vector_sz;
> + ev.op = RTE_EVENT_OP_RELEASE;
> continue;
> }
>
> @@ -416,8 +440,9 @@ pipeline_queue_worker_multi_stage_tx_vector(void *arg)
> pipeline_fwd_event_vector(&ev, cq_id != last_queue
> ? sched_type_list[cq_id]
> : RTE_SCHED_TYPE_ATOMIC);
> - pipeline_event_enqueue(dev, port, &ev);
> + enq = pipeline_event_enqueue(dev, port, &ev, t);
> }
> + pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
> return 0;
> }
> @@ -427,12 +452,13 @@ pipeline_queue_worker_multi_stage_fwd_vector(void *arg)
> {
> PIPELINE_WORKER_MULTI_STAGE_INIT;
> const uint8_t *tx_queue = t->tx_evqueue_id;
> + uint8_t enq = 0, deq = 0;
> uint16_t vector_sz;
>
> while (!t->done) {
> - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> - if (!event) {
> + if (!deq) {
> rte_pause();
> continue;
> }
> @@ -449,8 +475,9 @@ pipeline_queue_worker_multi_stage_fwd_vector(void *arg)
> pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
> }
>
> - pipeline_event_enqueue(dev, port, &ev);
> + enq = pipeline_event_enqueue(dev, port, &ev, t);
> }
> + pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
> return 0;
> }
> @@ -460,11 +487,11 @@ pipeline_queue_worker_multi_stage_burst_tx_vector(void *arg)
> {
> PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
> const uint8_t *tx_queue = t->tx_evqueue_id;
> + uint16_t nb_rx = 0, nb_tx = 0;
> uint16_t vector_sz;
>
> while (!t->done) {
> - uint16_t nb_rx =
> - rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
> + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
> if (!nb_rx) {
> rte_pause();
> @@ -476,7 +503,7 @@ pipeline_queue_worker_multi_stage_burst_tx_vector(void *arg)
>
> if (ev[i].queue_id == tx_queue[ev[i].vec->port]) {
> vector_sz = ev[i].vec->nb_elem;
> - pipeline_event_tx_vector(dev, port, &ev[i]);
> + pipeline_event_tx_vector(dev, port, &ev[i], t);
> ev[i].op = RTE_EVENT_OP_RELEASE;
> w->processed_pkts += vector_sz;
> continue;
> @@ -489,8 +516,9 @@ pipeline_queue_worker_multi_stage_burst_tx_vector(void *arg)
> : RTE_SCHED_TYPE_ATOMIC);
> }
>
> - pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
> }
> + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
> return 0;
> }
> @@ -500,11 +528,11 @@ pipeline_queue_worker_multi_stage_burst_fwd_vector(void *arg)
> {
> PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
> const uint8_t *tx_queue = t->tx_evqueue_id;
> + uint16_t nb_rx = 0, nb_tx = 0;
> uint16_t vector_sz;
>
> while (!t->done) {
> - uint16_t nb_rx =
> - rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
> + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
> if (!nb_rx) {
> rte_pause();
> @@ -527,8 +555,9 @@ pipeline_queue_worker_multi_stage_burst_fwd_vector(void *arg)
> }
> }
>
> - pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
> }
> + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
> return 0;
> }
> --
> 2.25.1
>
next prev parent reply other threads:[~2022-05-13 13:40 UTC|newest]
Thread overview: 17+ messages / expand[flat|nested] mbox.gz Atom feed top
2022-04-26 21:14 [PATCH 1/6] app/eventdev: simplify signal handling and teardown Pavan Nikhilesh
2022-04-26 21:14 ` [PATCH 2/6] app/eventdev: clean up worker state before exit Pavan Nikhilesh
2022-05-13 13:40 ` Jerin Jacob [this message]
2022-04-26 21:14 ` [PATCH 3/6] examples/eventdev: " Pavan Nikhilesh
2022-04-26 21:14 ` [PATCH 4/6] examples/l3fwd: " Pavan Nikhilesh
2022-04-26 21:14 ` [PATCH 5/6] examples/l2fwd-event: " Pavan Nikhilesh
2022-04-26 21:14 ` [PATCH 6/6] examples/ipsec-secgw: cleanup " Pavan Nikhilesh
2022-05-13 13:41 ` Jerin Jacob
2022-05-13 11:49 ` [PATCH 1/6] app/eventdev: simplify signal handling and teardown Jerin Jacob
2022-05-13 13:39 ` Jerin Jacob
2022-05-13 16:07 ` [PATCH v2 " pbhagavatula
2022-05-13 16:07 ` [PATCH v2 2/6] app/eventdev: clean up worker state before exit pbhagavatula
2022-05-13 16:07 ` [PATCH v2 3/6] examples/eventdev: " pbhagavatula
2022-05-13 16:07 ` [PATCH v2 4/6] examples/l3fwd: " pbhagavatula
2022-05-13 16:07 ` [PATCH v2 5/6] examples/l2fwd-event: " pbhagavatula
2022-05-13 16:07 ` [PATCH v2 6/6] examples/ipsec-secgw: cleanup " pbhagavatula
2022-05-16 16:46 ` [PATCH v2 1/6] app/eventdev: simplify signal handling and teardown Jerin Jacob
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=CALBAE1OZEJXdikPckmnE8DkXFmDyypHpPX5yWUdUNzQSqZx1Mw@mail.gmail.com \
--to=jerinjacobk@gmail.com \
--cc=dev@dpdk.org \
--cc=jerinj@marvell.com \
--cc=pbhagavatula@marvell.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).