DPDK patches and discussions
 help / color / mirror / Atom feed
From: Jerin Jacob <jerinjacobk@gmail.com>
To: Pavan Nikhilesh <pbhagavatula@marvell.com>
Cc: Jerin Jacob <jerinj@marvell.com>, dpdk-dev <dev@dpdk.org>
Subject: Re: [PATCH 2/6] app/eventdev: clean up worker state before exit
Date: Fri, 13 May 2022 19:10:01 +0530	[thread overview]
Message-ID: <CALBAE1OZEJXdikPckmnE8DkXFmDyypHpPX5yWUdUNzQSqZx1Mw@mail.gmail.com> (raw)
In-Reply-To: <20220426211412.6138-2-pbhagavatula@marvell.com>

On Wed, Apr 27, 2022 at 2:44 AM Pavan Nikhilesh
<pbhagavatula@marvell.com> wrote:
>
> Event ports are configured to implicitly release the scheduler contexts
> currently held in the next call to rte_event_dequeue_burst().
> A worker core might still hold a scheduling context during exit, as the
> next call to rte_event_dequeue_burst() is never made.
> This might lead to deadlock based on the worker exit timing and when
> there are very less number of flows.
>
> Add clean up function to release any scheduling contexts held by the
> worker by using RTE_EVENT_OP_RELEASE.
>
> Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>

Acked-by: Jerin Jacob <jerinj@marvell.com>

> ---
>  app/test-eventdev/test_perf_atq.c        |  31 +++--
>  app/test-eventdev/test_perf_common.c     |  17 +++
>  app/test-eventdev/test_perf_common.h     |   3 +
>  app/test-eventdev/test_perf_queue.c      |  30 +++--
>  app/test-eventdev/test_pipeline_atq.c    | 134 ++++++++++++---------
>  app/test-eventdev/test_pipeline_common.c |  39 ++++++
>  app/test-eventdev/test_pipeline_common.h |  59 ++++++---
>  app/test-eventdev/test_pipeline_queue.c  | 145 ++++++++++++++---------
>  8 files changed, 304 insertions(+), 154 deletions(-)
>
> diff --git a/app/test-eventdev/test_perf_atq.c b/app/test-eventdev/test_perf_atq.c
> index bac3ea602f..5a0b190384 100644
> --- a/app/test-eventdev/test_perf_atq.c
> +++ b/app/test-eventdev/test_perf_atq.c
> @@ -37,13 +37,14 @@ atq_fwd_event(struct rte_event *const ev, uint8_t *const sched_type_list,
>  static int
>  perf_atq_worker(void *arg, const int enable_fwd_latency)
>  {
> -       PERF_WORKER_INIT;
> +       uint16_t enq = 0, deq = 0;
>         struct rte_event ev;
> +       PERF_WORKER_INIT;
>
>         while (t->done == false) {
> -               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> +               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> -               if (!event) {
> +               if (!deq) {
>                         rte_pause();
>                         continue;
>                 }
> @@ -78,24 +79,29 @@ perf_atq_worker(void *arg, const int enable_fwd_latency)
>                                          bufs, sz, cnt);
>                 } else {
>                         atq_fwd_event(&ev, sched_type_list, nb_stages);
> -                       while (rte_event_enqueue_burst(dev, port, &ev, 1) != 1)
> -                               rte_pause();
> +                       do {
> +                               enq = rte_event_enqueue_burst(dev, port, &ev,
> +                                                             1);
> +                       } while (!enq && !t->done);
>                 }
>         }
> +
> +       perf_worker_cleanup(pool, dev, port, &ev, enq, deq);
> +
>         return 0;
>  }
>
>  static int
>  perf_atq_worker_burst(void *arg, const int enable_fwd_latency)
>  {
> -       PERF_WORKER_INIT;
> -       uint16_t i;
>         /* +1 to avoid prefetch out of array check */
>         struct rte_event ev[BURST_SIZE + 1];
> +       uint16_t enq = 0, nb_rx = 0;
> +       PERF_WORKER_INIT;
> +       uint16_t i;
>
>         while (t->done == false) {
> -               uint16_t const nb_rx = rte_event_dequeue_burst(dev, port, ev,
> -                               BURST_SIZE, 0);
> +               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
>                 if (!nb_rx) {
>                         rte_pause();
> @@ -146,14 +152,15 @@ perf_atq_worker_burst(void *arg, const int enable_fwd_latency)
>                         }
>                 }
>
> -               uint16_t enq;
> -
>                 enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
> -               while (enq < nb_rx) {
> +               while ((enq < nb_rx) && !t->done) {
>                         enq += rte_event_enqueue_burst(dev, port,
>                                                         ev + enq, nb_rx - enq);
>                 }
>         }
> +
> +       perf_worker_cleanup(pool, dev, port, ev, enq, nb_rx);
> +
>         return 0;
>  }
>
> diff --git a/app/test-eventdev/test_perf_common.c b/app/test-eventdev/test_perf_common.c
> index e93b0e7272..f673a9fddd 100644
> --- a/app/test-eventdev/test_perf_common.c
> +++ b/app/test-eventdev/test_perf_common.c
> @@ -985,6 +985,23 @@ perf_opt_dump(struct evt_options *opt, uint8_t nb_queues)
>         evt_dump("prod_enq_burst_sz", "%d", opt->prod_enq_burst_sz);
>  }
>
> +void
> +perf_worker_cleanup(struct rte_mempool *const pool, uint8_t dev_id,
> +                   uint8_t port_id, struct rte_event events[], uint16_t nb_enq,
> +                   uint16_t nb_deq)
> +{
> +       int i;
> +
> +       if (nb_deq) {
> +               for (i = nb_enq; i < nb_deq; i++)
> +                       rte_mempool_put(pool, events[i].event_ptr);
> +
> +               for (i = 0; i < nb_deq; i++)
> +                       events[i].op = RTE_EVENT_OP_RELEASE;
> +               rte_event_enqueue_burst(dev_id, port_id, events, nb_deq);
> +       }
> +}
> +
>  void
>  perf_eventdev_destroy(struct evt_test *test, struct evt_options *opt)
>  {
> diff --git a/app/test-eventdev/test_perf_common.h b/app/test-eventdev/test_perf_common.h
> index e504bb1df9..f6bfc73be0 100644
> --- a/app/test-eventdev/test_perf_common.h
> +++ b/app/test-eventdev/test_perf_common.h
> @@ -184,5 +184,8 @@ void perf_cryptodev_destroy(struct evt_test *test, struct evt_options *opt);
>  void perf_ethdev_destroy(struct evt_test *test, struct evt_options *opt);
>  void perf_ethdev_rx_stop(struct evt_test *test, struct evt_options *opt);
>  void perf_mempool_destroy(struct evt_test *test, struct evt_options *opt);
> +void perf_worker_cleanup(struct rte_mempool *const pool, uint8_t dev_id,
> +                        uint8_t port_id, struct rte_event events[],
> +                        uint16_t nb_enq, uint16_t nb_deq);
>
>  #endif /* _TEST_PERF_COMMON_ */
> diff --git a/app/test-eventdev/test_perf_queue.c b/app/test-eventdev/test_perf_queue.c
> index 108f1742a7..b498cacef6 100644
> --- a/app/test-eventdev/test_perf_queue.c
> +++ b/app/test-eventdev/test_perf_queue.c
> @@ -39,13 +39,14 @@ fwd_event(struct rte_event *const ev, uint8_t *const sched_type_list,
>  static int
>  perf_queue_worker(void *arg, const int enable_fwd_latency)
>  {
> -       PERF_WORKER_INIT;
> +       uint16_t enq = 0, deq = 0;
>         struct rte_event ev;
> +       PERF_WORKER_INIT;
>
>         while (t->done == false) {
> -               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> +               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> -               if (!event) {
> +               if (!deq) {
>                         rte_pause();
>                         continue;
>                 }
> @@ -80,24 +81,29 @@ perf_queue_worker(void *arg, const int enable_fwd_latency)
>                                         &ev, w, bufs, sz, cnt);
>                 } else {
>                         fwd_event(&ev, sched_type_list, nb_stages);
> -                       while (rte_event_enqueue_burst(dev, port, &ev, 1) != 1)
> -                               rte_pause();
> +                       do {
> +                               enq = rte_event_enqueue_burst(dev, port, &ev,
> +                                                             1);
> +                       } while (!enq && !t->done);
>                 }
>         }
> +
> +       perf_worker_cleanup(pool, dev, port, &ev, enq, deq);
> +
>         return 0;
>  }
>
>  static int
>  perf_queue_worker_burst(void *arg, const int enable_fwd_latency)
>  {
> -       PERF_WORKER_INIT;
> -       uint16_t i;
>         /* +1 to avoid prefetch out of array check */
>         struct rte_event ev[BURST_SIZE + 1];
> +       uint16_t enq = 0, nb_rx = 0;
> +       PERF_WORKER_INIT;
> +       uint16_t i;
>
>         while (t->done == false) {
> -               uint16_t const nb_rx = rte_event_dequeue_burst(dev, port, ev,
> -                               BURST_SIZE, 0);
> +               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
>                 if (!nb_rx) {
>                         rte_pause();
> @@ -147,14 +153,16 @@ perf_queue_worker_burst(void *arg, const int enable_fwd_latency)
>                         }
>                 }
>
> -               uint16_t enq;
>
>                 enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
> -               while (enq < nb_rx) {
> +               while (enq < nb_rx && !t->done) {
>                         enq += rte_event_enqueue_burst(dev, port,
>                                                         ev + enq, nb_rx - enq);
>                 }
>         }
> +
> +       perf_worker_cleanup(pool, dev, port, ev, enq, nb_rx);
> +
>         return 0;
>  }
>
> diff --git a/app/test-eventdev/test_pipeline_atq.c b/app/test-eventdev/test_pipeline_atq.c
> index 79218502ba..4b10197127 100644
> --- a/app/test-eventdev/test_pipeline_atq.c
> +++ b/app/test-eventdev/test_pipeline_atq.c
> @@ -21,18 +21,20 @@ static __rte_noinline int
>  pipeline_atq_worker_single_stage_tx(void *arg)
>  {
>         PIPELINE_WORKER_SINGLE_STAGE_INIT;
> +       uint8_t enq = 0, deq = 0;
>
>         while (t->done == false) {
> -               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> +               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> -               if (!event) {
> +               if (!deq) {
>                         rte_pause();
>                         continue;
>                 }
>
> -               pipeline_event_tx(dev, port, &ev);
> +               deq = pipeline_event_tx(dev, port, &ev, t);
>                 w->processed_pkts++;
>         }
> +       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
>         return 0;
>  }
> @@ -42,20 +44,22 @@ pipeline_atq_worker_single_stage_fwd(void *arg)
>  {
>         PIPELINE_WORKER_SINGLE_STAGE_INIT;
>         const uint8_t *tx_queue = t->tx_evqueue_id;
> +       uint8_t enq = 0, deq = 0;
>
>         while (t->done == false) {
> -               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> +               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> -               if (!event) {
> +               if (!deq) {
>                         rte_pause();
>                         continue;
>                 }
>
>                 ev.queue_id = tx_queue[ev.mbuf->port];
>                 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
> -               pipeline_event_enqueue(dev, port, &ev);
> +               enq = pipeline_event_enqueue(dev, port, &ev, t);
>                 w->processed_pkts++;
>         }
> +       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
>         return 0;
>  }
> @@ -64,10 +68,10 @@ static __rte_noinline int
>  pipeline_atq_worker_single_stage_burst_tx(void *arg)
>  {
>         PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
> +       uint16_t nb_rx = 0, nb_tx = 0;
>
>         while (t->done == false) {
> -               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
> -                               BURST_SIZE, 0);
> +               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
>                 if (!nb_rx) {
>                         rte_pause();
> @@ -79,9 +83,10 @@ pipeline_atq_worker_single_stage_burst_tx(void *arg)
>                         rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
>                 }
>
> -               pipeline_event_tx_burst(dev, port, ev, nb_rx);
> -               w->processed_pkts += nb_rx;
> +               nb_tx = pipeline_event_tx_burst(dev, port, ev, nb_rx, t);
> +               w->processed_pkts += nb_tx;
>         }
> +       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
>         return 0;
>  }
> @@ -91,10 +96,10 @@ pipeline_atq_worker_single_stage_burst_fwd(void *arg)
>  {
>         PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
>         const uint8_t *tx_queue = t->tx_evqueue_id;
> +       uint16_t nb_rx = 0, nb_tx = 0;
>
>         while (t->done == false) {
> -               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
> -                               BURST_SIZE, 0);
> +               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
>                 if (!nb_rx) {
>                         rte_pause();
> @@ -108,9 +113,10 @@ pipeline_atq_worker_single_stage_burst_fwd(void *arg)
>                         pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
>                 }
>
> -               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> -               w->processed_pkts += nb_rx;
> +               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
> +               w->processed_pkts += nb_tx;
>         }
> +       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
>         return 0;
>  }
> @@ -119,19 +125,21 @@ static __rte_noinline int
>  pipeline_atq_worker_single_stage_tx_vector(void *arg)
>  {
>         PIPELINE_WORKER_SINGLE_STAGE_INIT;
> +       uint8_t enq = 0, deq = 0;
>         uint16_t vector_sz;
>
>         while (!t->done) {
> -               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> +               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> -               if (!event) {
> +               if (!deq) {
>                         rte_pause();
>                         continue;
>                 }
>                 vector_sz = ev.vec->nb_elem;
> -               pipeline_event_tx_vector(dev, port, &ev);
> +               enq = pipeline_event_tx_vector(dev, port, &ev, t);
>                 w->processed_pkts += vector_sz;
>         }
> +       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
>         return 0;
>  }
> @@ -141,12 +149,13 @@ pipeline_atq_worker_single_stage_fwd_vector(void *arg)
>  {
>         PIPELINE_WORKER_SINGLE_STAGE_INIT;
>         const uint8_t *tx_queue = t->tx_evqueue_id;
> +       uint8_t enq = 0, deq = 0;
>         uint16_t vector_sz;
>
>         while (!t->done) {
> -               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> +               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> -               if (!event) {
> +               if (!deq) {
>                         rte_pause();
>                         continue;
>                 }
> @@ -155,9 +164,10 @@ pipeline_atq_worker_single_stage_fwd_vector(void *arg)
>                 ev.queue_id = tx_queue[ev.vec->port];
>                 ev.vec->queue = 0;
>                 pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
> -               pipeline_event_enqueue(dev, port, &ev);
> +               enq = pipeline_event_enqueue(dev, port, &ev, t);
>                 w->processed_pkts += vector_sz;
>         }
> +       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
>         return 0;
>  }
> @@ -166,11 +176,11 @@ static __rte_noinline int
>  pipeline_atq_worker_single_stage_burst_tx_vector(void *arg)
>  {
>         PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
> +       uint16_t nb_rx = 0, nb_tx = 0;
>         uint16_t vector_sz;
>
>         while (!t->done) {
> -               uint16_t nb_rx =
> -                       rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
> +               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
>                 if (!nb_rx) {
>                         rte_pause();
> @@ -182,9 +192,10 @@ pipeline_atq_worker_single_stage_burst_tx_vector(void *arg)
>                         ev[i].vec->queue = 0;
>                 }
>
> -               pipeline_event_tx_burst(dev, port, ev, nb_rx);
> +               nb_tx = pipeline_event_tx_burst(dev, port, ev, nb_rx, t);
>                 w->processed_pkts += vector_sz;
>         }
> +       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
>         return 0;
>  }
> @@ -194,11 +205,11 @@ pipeline_atq_worker_single_stage_burst_fwd_vector(void *arg)
>  {
>         PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
>         const uint8_t *tx_queue = t->tx_evqueue_id;
> +       uint16_t nb_rx = 0, nb_tx = 0;
>         uint16_t vector_sz;
>
>         while (!t->done) {
> -               uint16_t nb_rx =
> -                       rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
> +               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
>                 if (!nb_rx) {
>                         rte_pause();
> @@ -214,9 +225,10 @@ pipeline_atq_worker_single_stage_burst_fwd_vector(void *arg)
>                                                   RTE_SCHED_TYPE_ATOMIC);
>                 }
>
> -               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> +               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
>                 w->processed_pkts += vector_sz;
>         }
> +       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
>         return 0;
>  }
> @@ -225,11 +237,12 @@ static __rte_noinline int
>  pipeline_atq_worker_multi_stage_tx(void *arg)
>  {
>         PIPELINE_WORKER_MULTI_STAGE_INIT;
> +       uint8_t enq = 0, deq = 0;
>
>         while (t->done == false) {
> -               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> +               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> -               if (!event) {
> +               if (!deq) {
>                         rte_pause();
>                         continue;
>                 }
> @@ -237,15 +250,16 @@ pipeline_atq_worker_multi_stage_tx(void *arg)
>                 cq_id = ev.sub_event_type % nb_stages;
>
>                 if (cq_id == last_queue) {
> -                       pipeline_event_tx(dev, port, &ev);
> +                       enq = pipeline_event_tx(dev, port, &ev, t);
>                         w->processed_pkts++;
>                         continue;
>                 }
>
>                 ev.sub_event_type++;
>                 pipeline_fwd_event(&ev, sched_type_list[cq_id]);
> -               pipeline_event_enqueue(dev, port, &ev);
> +               enq = pipeline_event_enqueue(dev, port, &ev, t);
>         }
> +       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
>         return 0;
>  }
> @@ -255,11 +269,12 @@ pipeline_atq_worker_multi_stage_fwd(void *arg)
>  {
>         PIPELINE_WORKER_MULTI_STAGE_INIT;
>         const uint8_t *tx_queue = t->tx_evqueue_id;
> +       uint8_t enq = 0, deq = 0;
>
>         while (t->done == false) {
> -               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> +               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> -               if (!event) {
> +               if (!deq) {
>                         rte_pause();
>                         continue;
>                 }
> @@ -275,8 +290,9 @@ pipeline_atq_worker_multi_stage_fwd(void *arg)
>                         pipeline_fwd_event(&ev, sched_type_list[cq_id]);
>                 }
>
> -               pipeline_event_enqueue(dev, port, &ev);
> +               enq = pipeline_event_enqueue(dev, port, &ev, t);
>         }
> +       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
>         return 0;
>  }
> @@ -285,10 +301,10 @@ static __rte_noinline int
>  pipeline_atq_worker_multi_stage_burst_tx(void *arg)
>  {
>         PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
> +       uint16_t nb_rx = 0, nb_tx = 0;
>
>         while (t->done == false) {
> -               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
> -                               BURST_SIZE, 0);
> +               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
>                 if (!nb_rx) {
>                         rte_pause();
> @@ -300,7 +316,7 @@ pipeline_atq_worker_multi_stage_burst_tx(void *arg)
>                         cq_id = ev[i].sub_event_type % nb_stages;
>
>                         if (cq_id == last_queue) {
> -                               pipeline_event_tx(dev, port, &ev[i]);
> +                               pipeline_event_tx(dev, port, &ev[i], t);
>                                 ev[i].op = RTE_EVENT_OP_RELEASE;
>                                 w->processed_pkts++;
>                                 continue;
> @@ -310,8 +326,9 @@ pipeline_atq_worker_multi_stage_burst_tx(void *arg)
>                         pipeline_fwd_event(&ev[i], sched_type_list[cq_id]);
>                 }
>
> -               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> +               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
>         }
> +       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
>         return 0;
>  }
> @@ -321,10 +338,10 @@ pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
>  {
>         PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
>         const uint8_t *tx_queue = t->tx_evqueue_id;
> +       uint16_t nb_rx = 0, nb_tx = 0;
>
>         while (t->done == false) {
> -               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
> -                               BURST_SIZE, 0);
> +               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
>                 if (!nb_rx) {
>                         rte_pause();
> @@ -347,8 +364,9 @@ pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
>                         }
>                 }
>
> -               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> +               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
>         }
> +       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
>         return 0;
>  }
> @@ -357,12 +375,13 @@ static __rte_noinline int
>  pipeline_atq_worker_multi_stage_tx_vector(void *arg)
>  {
>         PIPELINE_WORKER_MULTI_STAGE_INIT;
> +       uint8_t enq = 0, deq = 0;
>         uint16_t vector_sz;
>
>         while (!t->done) {
> -               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> +               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> -               if (!event) {
> +               if (!deq) {
>                         rte_pause();
>                         continue;
>                 }
> @@ -371,15 +390,16 @@ pipeline_atq_worker_multi_stage_tx_vector(void *arg)
>
>                 if (cq_id == last_queue) {
>                         vector_sz = ev.vec->nb_elem;
> -                       pipeline_event_tx_vector(dev, port, &ev);
> +                       enq = pipeline_event_tx_vector(dev, port, &ev, t);
>                         w->processed_pkts += vector_sz;
>                         continue;
>                 }
>
>                 ev.sub_event_type++;
>                 pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
> -               pipeline_event_enqueue(dev, port, &ev);
> +               enq = pipeline_event_enqueue(dev, port, &ev, t);
>         }
> +       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
>         return 0;
>  }
> @@ -389,12 +409,13 @@ pipeline_atq_worker_multi_stage_fwd_vector(void *arg)
>  {
>         PIPELINE_WORKER_MULTI_STAGE_INIT;
>         const uint8_t *tx_queue = t->tx_evqueue_id;
> +       uint8_t enq = 0, deq = 0;
>         uint16_t vector_sz;
>
>         while (!t->done) {
> -               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> +               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> -               if (!event) {
> +               if (!deq) {
>                         rte_pause();
>                         continue;
>                 }
> @@ -406,14 +427,15 @@ pipeline_atq_worker_multi_stage_fwd_vector(void *arg)
>                         ev.vec->queue = 0;
>                         vector_sz = ev.vec->nb_elem;
>                         pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
> -                       pipeline_event_enqueue(dev, port, &ev);
> +                       enq = pipeline_event_enqueue(dev, port, &ev, t);
>                         w->processed_pkts += vector_sz;
>                 } else {
>                         ev.sub_event_type++;
>                         pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
> -                       pipeline_event_enqueue(dev, port, &ev);
> +                       enq = pipeline_event_enqueue(dev, port, &ev, t);
>                 }
>         }
> +       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
>         return 0;
>  }
> @@ -422,11 +444,11 @@ static __rte_noinline int
>  pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg)
>  {
>         PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
> +       uint16_t nb_rx = 0, nb_tx = 0;
>         uint16_t vector_sz;
>
>         while (!t->done) {
> -               uint16_t nb_rx =
> -                       rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
> +               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
>                 if (!nb_rx) {
>                         rte_pause();
> @@ -438,7 +460,7 @@ pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg)
>
>                         if (cq_id == last_queue) {
>                                 vector_sz = ev[i].vec->nb_elem;
> -                               pipeline_event_tx_vector(dev, port, &ev[i]);
> +                               pipeline_event_tx_vector(dev, port, &ev[i], t);
>                                 ev[i].op = RTE_EVENT_OP_RELEASE;
>                                 w->processed_pkts += vector_sz;
>                                 continue;
> @@ -449,8 +471,9 @@ pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg)
>                                                   sched_type_list[cq_id]);
>                 }
>
> -               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> +               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
>         }
> +       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
>         return 0;
>  }
> @@ -460,11 +483,11 @@ pipeline_atq_worker_multi_stage_burst_fwd_vector(void *arg)
>  {
>         PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
>         const uint8_t *tx_queue = t->tx_evqueue_id;
> +       uint16_t nb_rx = 0, nb_tx = 0;
>         uint16_t vector_sz;
>
>         while (!t->done) {
> -               uint16_t nb_rx =
> -                       rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
> +               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
>                 if (!nb_rx) {
>                         rte_pause();
> @@ -488,8 +511,9 @@ pipeline_atq_worker_multi_stage_burst_fwd_vector(void *arg)
>                         }
>                 }
>
> -               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> +               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
>         }
> +       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
>         return 0;
>  }
> diff --git a/app/test-eventdev/test_pipeline_common.c b/app/test-eventdev/test_pipeline_common.c
> index d994c91678..a8dd070000 100644
> --- a/app/test-eventdev/test_pipeline_common.c
> +++ b/app/test-eventdev/test_pipeline_common.c
> @@ -505,6 +505,45 @@ pipeline_event_tx_adapter_setup(struct evt_options *opt,
>         return ret;
>  }
>
> +static void
> +pipeline_vector_array_free(struct rte_event events[], uint16_t num)
> +{
> +       uint16_t i;
> +
> +       for (i = 0; i < num; i++) {
> +               rte_pktmbuf_free_bulk(events[i].vec->mbufs,
> +                                     events[i].vec->nb_elem);
> +               rte_mempool_put(rte_mempool_from_obj(events[i].vec),
> +                               events[i].vec);
> +       }
> +}
> +
> +void
> +pipeline_worker_cleanup(uint8_t dev, uint8_t port, struct rte_event ev[],
> +                       uint16_t enq, uint16_t deq)
> +{
> +       int i;
> +
> +       if (!(deq - enq))
> +               return;
> +
> +       if (deq) {
> +               for (i = enq; i < deq; i++) {
> +                       if (ev[i].op == RTE_EVENT_OP_RELEASE)
> +                               continue;
> +                       if (ev[i].event_type & RTE_EVENT_TYPE_VECTOR)
> +                               pipeline_vector_array_free(&ev[i], 1);
> +                       else
> +                               rte_pktmbuf_free(ev[i].mbuf);
> +               }
> +
> +               for (i = 0; i < deq; i++)
> +                       ev[i].op = RTE_EVENT_OP_RELEASE;
> +
> +               rte_event_enqueue_burst(dev, port, ev, deq);
> +       }
> +}
> +
>  void
>  pipeline_ethdev_rx_stop(struct evt_test *test, struct evt_options *opt)
>  {
> diff --git a/app/test-eventdev/test_pipeline_common.h b/app/test-eventdev/test_pipeline_common.h
> index c979c33772..a6443faea4 100644
> --- a/app/test-eventdev/test_pipeline_common.h
> +++ b/app/test-eventdev/test_pipeline_common.h
> @@ -109,59 +109,80 @@ pipeline_fwd_event_vector(struct rte_event *ev, uint8_t sched)
>         ev->sched_type = sched;
>  }
>
> -static __rte_always_inline void
> +static __rte_always_inline uint8_t
>  pipeline_event_tx(const uint8_t dev, const uint8_t port,
> -               struct rte_event * const ev)
> +                 struct rte_event *const ev, struct test_pipeline *t)
>  {
> +       uint8_t enq;
> +
>         rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0);
> -       while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0))
> -               rte_pause();
> +       do {
> +               enq = rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0);
> +       } while (!enq && !t->done);
> +
> +       return enq;
>  }
>
> -static __rte_always_inline void
> +static __rte_always_inline uint8_t
>  pipeline_event_tx_vector(const uint8_t dev, const uint8_t port,
> -                        struct rte_event *const ev)
> +                        struct rte_event *const ev, struct test_pipeline *t)
>  {
> +       uint8_t enq;
> +
>         ev->vec->queue = 0;
> +       do {
> +               enq = rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0);
> +       } while (!enq && !t->done);
>
> -       while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0))
> -               rte_pause();
> +       return enq;
>  }
>
> -static __rte_always_inline void
> +static __rte_always_inline uint16_t
>  pipeline_event_tx_burst(const uint8_t dev, const uint8_t port,
> -               struct rte_event *ev, const uint16_t nb_rx)
> +                       struct rte_event *ev, const uint16_t nb_rx,
> +                       struct test_pipeline *t)
>  {
>         uint16_t enq;
>
>         enq = rte_event_eth_tx_adapter_enqueue(dev, port, ev, nb_rx, 0);
> -       while (enq < nb_rx) {
> +       while (enq < nb_rx && !t->done) {
>                 enq += rte_event_eth_tx_adapter_enqueue(dev, port,
>                                 ev + enq, nb_rx - enq, 0);
>         }
> +
> +       return enq;
>  }
>
> -static __rte_always_inline void
> +static __rte_always_inline uint8_t
>  pipeline_event_enqueue(const uint8_t dev, const uint8_t port,
> -               struct rte_event *ev)
> +                      struct rte_event *ev, struct test_pipeline *t)
>  {
> -       while (rte_event_enqueue_burst(dev, port, ev, 1) != 1)
> -               rte_pause();
> +       uint8_t enq;
> +
> +       do {
> +               enq = rte_event_enqueue_burst(dev, port, ev, 1);
> +       } while (!enq && !t->done);
> +
> +       return enq;
>  }
>
> -static __rte_always_inline void
> +static __rte_always_inline uint16_t
>  pipeline_event_enqueue_burst(const uint8_t dev, const uint8_t port,
> -               struct rte_event *ev, const uint16_t nb_rx)
> +                            struct rte_event *ev, const uint16_t nb_rx,
> +                            struct test_pipeline *t)
>  {
>         uint16_t enq;
>
>         enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
> -       while (enq < nb_rx) {
> +       while (enq < nb_rx && !t->done) {
>                 enq += rte_event_enqueue_burst(dev, port,
>                                                 ev + enq, nb_rx - enq);
>         }
> +
> +       return enq;
>  }
>
> +
>  static inline int
>  pipeline_nb_event_ports(struct evt_options *opt)
>  {
> @@ -188,5 +209,7 @@ void pipeline_eventdev_destroy(struct evt_test *test, struct evt_options *opt);
>  void pipeline_ethdev_destroy(struct evt_test *test, struct evt_options *opt);
>  void pipeline_ethdev_rx_stop(struct evt_test *test, struct evt_options *opt);
>  void pipeline_mempool_destroy(struct evt_test *test, struct evt_options *opt);
> +void pipeline_worker_cleanup(uint8_t dev, uint8_t port, struct rte_event ev[],
> +                            uint16_t enq, uint16_t deq);
>
>  #endif /* _TEST_PIPELINE_COMMON_ */
> diff --git a/app/test-eventdev/test_pipeline_queue.c b/app/test-eventdev/test_pipeline_queue.c
> index 343f8f3b1d..e989396474 100644
> --- a/app/test-eventdev/test_pipeline_queue.c
> +++ b/app/test-eventdev/test_pipeline_queue.c
> @@ -21,24 +21,27 @@ static __rte_noinline int
>  pipeline_queue_worker_single_stage_tx(void *arg)
>  {
>         PIPELINE_WORKER_SINGLE_STAGE_INIT;
> +       uint8_t enq = 0, deq = 0;
>
>         while (t->done == false) {
> -               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> +               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> -               if (!event) {
> +               if (!deq) {
>                         rte_pause();
>                         continue;
>                 }
>
>                 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
> -                       pipeline_event_tx(dev, port, &ev);
> +                       enq = pipeline_event_tx(dev, port, &ev, t);
> +                       ev.op = RTE_EVENT_OP_RELEASE;
>                         w->processed_pkts++;
>                 } else {
>                         ev.queue_id++;
>                         pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
> -                       pipeline_event_enqueue(dev, port, &ev);
> +                       enq = pipeline_event_enqueue(dev, port, &ev, t);
>                 }
>         }
> +       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
>         return 0;
>  }
> @@ -48,11 +51,12 @@ pipeline_queue_worker_single_stage_fwd(void *arg)
>  {
>         PIPELINE_WORKER_SINGLE_STAGE_INIT;
>         const uint8_t *tx_queue = t->tx_evqueue_id;
> +       uint8_t enq = 0, deq = 0;
>
>         while (t->done == false) {
> -               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> +               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> -               if (!event) {
> +               if (!deq) {
>                         rte_pause();
>                         continue;
>                 }
> @@ -60,9 +64,10 @@ pipeline_queue_worker_single_stage_fwd(void *arg)
>                 ev.queue_id = tx_queue[ev.mbuf->port];
>                 rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0);
>                 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
> -               pipeline_event_enqueue(dev, port, &ev);
> +               enq = pipeline_event_enqueue(dev, port, &ev, t);
>                 w->processed_pkts++;
>         }
> +       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
>         return 0;
>  }
> @@ -71,10 +76,10 @@ static __rte_noinline int
>  pipeline_queue_worker_single_stage_burst_tx(void *arg)
>  {
>         PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
> +       uint16_t nb_rx = 0, nb_tx = 0;
>
>         while (t->done == false) {
> -               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
> -                               BURST_SIZE, 0);
> +               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
>                 if (!nb_rx) {
>                         rte_pause();
> @@ -84,17 +89,18 @@ pipeline_queue_worker_single_stage_burst_tx(void *arg)
>                 for (i = 0; i < nb_rx; i++) {
>                         rte_prefetch0(ev[i + 1].mbuf);
>                         if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
> -                               pipeline_event_tx(dev, port, &ev[i]);
> +                               pipeline_event_tx(dev, port, &ev[i], t);
> +                               ev[i].op = RTE_EVENT_OP_RELEASE;
>                                 w->processed_pkts++;
>                         } else {
>                                 ev[i].queue_id++;
>                                 pipeline_fwd_event(&ev[i],
>                                                 RTE_SCHED_TYPE_ATOMIC);
> -                               pipeline_event_enqueue_burst(dev, port, ev,
> -                                               nb_rx);
>                         }
>                 }
> +               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
>         }
> +       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
>         return 0;
>  }
> @@ -104,10 +110,10 @@ pipeline_queue_worker_single_stage_burst_fwd(void *arg)
>  {
>         PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
>         const uint8_t *tx_queue = t->tx_evqueue_id;
> +       uint16_t nb_rx = 0, nb_tx = 0;
>
>         while (t->done == false) {
> -               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
> -                               BURST_SIZE, 0);
> +               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
>                 if (!nb_rx) {
>                         rte_pause();
> @@ -121,9 +127,10 @@ pipeline_queue_worker_single_stage_burst_fwd(void *arg)
>                         pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
>                 }
>
> -               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> +               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
>                 w->processed_pkts += nb_rx;
>         }
> +       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
>         return 0;
>  }
> @@ -132,26 +139,29 @@ static __rte_noinline int
>  pipeline_queue_worker_single_stage_tx_vector(void *arg)
>  {
>         PIPELINE_WORKER_SINGLE_STAGE_INIT;
> +       uint8_t enq = 0, deq = 0;
>         uint16_t vector_sz;
>
>         while (!t->done) {
> -               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> +               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> -               if (!event) {
> +               if (!deq) {
>                         rte_pause();
>                         continue;
>                 }
>
>                 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
>                         vector_sz = ev.vec->nb_elem;
> -                       pipeline_event_tx_vector(dev, port, &ev);
> +                       enq = pipeline_event_tx_vector(dev, port, &ev, t);
> +                       ev.op = RTE_EVENT_OP_RELEASE;
>                         w->processed_pkts += vector_sz;
>                 } else {
>                         ev.queue_id++;
>                         pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
> -                       pipeline_event_enqueue(dev, port, &ev);
> +                       enq = pipeline_event_enqueue(dev, port, &ev, t);
>                 }
>         }
> +       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
>         return 0;
>  }
> @@ -161,12 +171,13 @@ pipeline_queue_worker_single_stage_fwd_vector(void *arg)
>  {
>         PIPELINE_WORKER_SINGLE_STAGE_INIT;
>         const uint8_t *tx_queue = t->tx_evqueue_id;
> +       uint8_t enq = 0, deq = 0;
>         uint16_t vector_sz;
>
>         while (!t->done) {
> -               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> +               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> -               if (!event) {
> +               if (!deq) {
>                         rte_pause();
>                         continue;
>                 }
> @@ -175,9 +186,10 @@ pipeline_queue_worker_single_stage_fwd_vector(void *arg)
>                 ev.vec->queue = 0;
>                 vector_sz = ev.vec->nb_elem;
>                 pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
> -               pipeline_event_enqueue(dev, port, &ev);
> +               enq = pipeline_event_enqueue(dev, port, &ev, t);
>                 w->processed_pkts += vector_sz;
>         }
> +       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
>         return 0;
>  }
> @@ -186,11 +198,11 @@ static __rte_noinline int
>  pipeline_queue_worker_single_stage_burst_tx_vector(void *arg)
>  {
>         PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
> +       uint16_t nb_rx = 0, nb_tx = 0;
>         uint16_t vector_sz;
>
>         while (!t->done) {
> -               uint16_t nb_rx =
> -                       rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
> +               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
>                 if (!nb_rx) {
>                         rte_pause();
> @@ -200,7 +212,7 @@ pipeline_queue_worker_single_stage_burst_tx_vector(void *arg)
>                 for (i = 0; i < nb_rx; i++) {
>                         if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
>                                 vector_sz = ev[i].vec->nb_elem;
> -                               pipeline_event_tx_vector(dev, port, &ev[i]);
> +                               pipeline_event_tx_vector(dev, port, &ev[i], t);
>                                 ev[i].op = RTE_EVENT_OP_RELEASE;
>                                 w->processed_pkts += vector_sz;
>                         } else {
> @@ -210,8 +222,9 @@ pipeline_queue_worker_single_stage_burst_tx_vector(void *arg)
>                         }
>                 }
>
> -               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> +               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
>         }
> +       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
>         return 0;
>  }
> @@ -221,11 +234,11 @@ pipeline_queue_worker_single_stage_burst_fwd_vector(void *arg)
>  {
>         PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
>         const uint8_t *tx_queue = t->tx_evqueue_id;
> +       uint16_t nb_rx = 0, nb_tx = 0;
>         uint16_t vector_sz;
>
>         while (!t->done) {
> -               uint16_t nb_rx =
> -                       rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
> +               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
>                 if (!nb_rx) {
>                         rte_pause();
> @@ -241,9 +254,10 @@ pipeline_queue_worker_single_stage_burst_fwd_vector(void *arg)
>                                                   RTE_SCHED_TYPE_ATOMIC);
>                 }
>
> -               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> +               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
>                 w->processed_pkts += vector_sz;
>         }
> +       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
>         return 0;
>  }
> @@ -253,11 +267,12 @@ pipeline_queue_worker_multi_stage_tx(void *arg)
>  {
>         PIPELINE_WORKER_MULTI_STAGE_INIT;
>         const uint8_t *tx_queue = t->tx_evqueue_id;
> +       uint8_t enq = 0, deq = 0;
>
>         while (t->done == false) {
> -               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> +               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> -               if (!event) {
> +               if (!deq) {
>                         rte_pause();
>                         continue;
>                 }
> @@ -265,7 +280,8 @@ pipeline_queue_worker_multi_stage_tx(void *arg)
>                 cq_id = ev.queue_id % nb_stages;
>
>                 if (ev.queue_id == tx_queue[ev.mbuf->port]) {
> -                       pipeline_event_tx(dev, port, &ev);
> +                       enq = pipeline_event_tx(dev, port, &ev, t);
> +                       ev.op = RTE_EVENT_OP_RELEASE;
>                         w->processed_pkts++;
>                         continue;
>                 }
> @@ -274,8 +290,9 @@ pipeline_queue_worker_multi_stage_tx(void *arg)
>                 pipeline_fwd_event(&ev, cq_id != last_queue ?
>                                 sched_type_list[cq_id] :
>                                 RTE_SCHED_TYPE_ATOMIC);
> -               pipeline_event_enqueue(dev, port, &ev);
> +               enq = pipeline_event_enqueue(dev, port, &ev, t);
>         }
> +       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
>         return 0;
>  }
> @@ -285,11 +302,12 @@ pipeline_queue_worker_multi_stage_fwd(void *arg)
>  {
>         PIPELINE_WORKER_MULTI_STAGE_INIT;
>         const uint8_t *tx_queue = t->tx_evqueue_id;
> +       uint8_t enq = 0, deq = 0;
>
>         while (t->done == false) {
> -               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> +               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> -               if (!event) {
> +               if (!deq) {
>                         rte_pause();
>                         continue;
>                 }
> @@ -300,14 +318,15 @@ pipeline_queue_worker_multi_stage_fwd(void *arg)
>                         ev.queue_id = tx_queue[ev.mbuf->port];
>                         rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0);
>                         pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
> -                       pipeline_event_enqueue(dev, port, &ev);
> +                       enq = pipeline_event_enqueue(dev, port, &ev, t);
>                         w->processed_pkts++;
>                 } else {
>                         ev.queue_id++;
>                         pipeline_fwd_event(&ev, sched_type_list[cq_id]);
> -                       pipeline_event_enqueue(dev, port, &ev);
> +                       enq = pipeline_event_enqueue(dev, port, &ev, t);
>                 }
>         }
> +       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
>         return 0;
>  }
> @@ -317,10 +336,10 @@ pipeline_queue_worker_multi_stage_burst_tx(void *arg)
>  {
>         PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
>         const uint8_t *tx_queue = t->tx_evqueue_id;
> +       uint16_t nb_rx = 0, nb_tx = 0;
>
>         while (t->done == false) {
> -               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
> -                               BURST_SIZE, 0);
> +               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
>                 if (!nb_rx) {
>                         rte_pause();
> @@ -332,7 +351,8 @@ pipeline_queue_worker_multi_stage_burst_tx(void *arg)
>                         cq_id = ev[i].queue_id % nb_stages;
>
>                         if (ev[i].queue_id == tx_queue[ev[i].mbuf->port]) {
> -                               pipeline_event_tx(dev, port, &ev[i]);
> +                               pipeline_event_tx(dev, port, &ev[i], t);
> +                               ev[i].op = RTE_EVENT_OP_RELEASE;
>                                 w->processed_pkts++;
>                                 continue;
>                         }
> @@ -341,9 +361,10 @@ pipeline_queue_worker_multi_stage_burst_tx(void *arg)
>                         pipeline_fwd_event(&ev[i], cq_id != last_queue ?
>                                         sched_type_list[cq_id] :
>                                         RTE_SCHED_TYPE_ATOMIC);
> -                       pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
>                 }
> +               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
>         }
> +       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
>         return 0;
>  }
> @@ -353,11 +374,11 @@ pipeline_queue_worker_multi_stage_burst_fwd(void *arg)
>  {
>         PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
>         const uint8_t *tx_queue = t->tx_evqueue_id;
> +       uint16_t nb_rx = 0, nb_tx = 0;
>
>         while (t->done == false) {
>                 uint16_t processed_pkts = 0;
> -               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
> -                               BURST_SIZE, 0);
> +               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
>                 if (!nb_rx) {
>                         rte_pause();
> @@ -381,9 +402,10 @@ pipeline_queue_worker_multi_stage_burst_fwd(void *arg)
>                         }
>                 }
>
> -               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> +               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
>                 w->processed_pkts += processed_pkts;
>         }
> +       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
>         return 0;
>  }
> @@ -393,12 +415,13 @@ pipeline_queue_worker_multi_stage_tx_vector(void *arg)
>  {
>         PIPELINE_WORKER_MULTI_STAGE_INIT;
>         const uint8_t *tx_queue = t->tx_evqueue_id;
> +       uint8_t enq = 0, deq = 0;
>         uint16_t vector_sz;
>
>         while (!t->done) {
> -               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> +               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> -               if (!event) {
> +               if (!deq) {
>                         rte_pause();
>                         continue;
>                 }
> @@ -407,8 +430,9 @@ pipeline_queue_worker_multi_stage_tx_vector(void *arg)
>
>                 if (ev.queue_id == tx_queue[ev.vec->port]) {
>                         vector_sz = ev.vec->nb_elem;
> -                       pipeline_event_tx_vector(dev, port, &ev);
> +                       enq = pipeline_event_tx_vector(dev, port, &ev, t);
>                         w->processed_pkts += vector_sz;
> +                       ev.op = RTE_EVENT_OP_RELEASE;
>                         continue;
>                 }
>
> @@ -416,8 +440,9 @@ pipeline_queue_worker_multi_stage_tx_vector(void *arg)
>                 pipeline_fwd_event_vector(&ev, cq_id != last_queue
>                                                        ? sched_type_list[cq_id]
>                                                        : RTE_SCHED_TYPE_ATOMIC);
> -               pipeline_event_enqueue(dev, port, &ev);
> +               enq = pipeline_event_enqueue(dev, port, &ev, t);
>         }
> +       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
>         return 0;
>  }
> @@ -427,12 +452,13 @@ pipeline_queue_worker_multi_stage_fwd_vector(void *arg)
>  {
>         PIPELINE_WORKER_MULTI_STAGE_INIT;
>         const uint8_t *tx_queue = t->tx_evqueue_id;
> +       uint8_t enq = 0, deq = 0;
>         uint16_t vector_sz;
>
>         while (!t->done) {
> -               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> +               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
>
> -               if (!event) {
> +               if (!deq) {
>                         rte_pause();
>                         continue;
>                 }
> @@ -449,8 +475,9 @@ pipeline_queue_worker_multi_stage_fwd_vector(void *arg)
>                         pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
>                 }
>
> -               pipeline_event_enqueue(dev, port, &ev);
> +               enq = pipeline_event_enqueue(dev, port, &ev, t);
>         }
> +       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
>
>         return 0;
>  }
> @@ -460,11 +487,11 @@ pipeline_queue_worker_multi_stage_burst_tx_vector(void *arg)
>  {
>         PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
>         const uint8_t *tx_queue = t->tx_evqueue_id;
> +       uint16_t nb_rx = 0, nb_tx = 0;
>         uint16_t vector_sz;
>
>         while (!t->done) {
> -               uint16_t nb_rx =
> -                       rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
> +               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
>                 if (!nb_rx) {
>                         rte_pause();
> @@ -476,7 +503,7 @@ pipeline_queue_worker_multi_stage_burst_tx_vector(void *arg)
>
>                         if (ev[i].queue_id == tx_queue[ev[i].vec->port]) {
>                                 vector_sz = ev[i].vec->nb_elem;
> -                               pipeline_event_tx_vector(dev, port, &ev[i]);
> +                               pipeline_event_tx_vector(dev, port, &ev[i], t);
>                                 ev[i].op = RTE_EVENT_OP_RELEASE;
>                                 w->processed_pkts += vector_sz;
>                                 continue;
> @@ -489,8 +516,9 @@ pipeline_queue_worker_multi_stage_burst_tx_vector(void *arg)
>                                                 : RTE_SCHED_TYPE_ATOMIC);
>                 }
>
> -               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> +               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
>         }
> +       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
>         return 0;
>  }
> @@ -500,11 +528,11 @@ pipeline_queue_worker_multi_stage_burst_fwd_vector(void *arg)
>  {
>         PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
>         const uint8_t *tx_queue = t->tx_evqueue_id;
> +       uint16_t nb_rx = 0, nb_tx = 0;
>         uint16_t vector_sz;
>
>         while (!t->done) {
> -               uint16_t nb_rx =
> -                       rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
> +               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
>
>                 if (!nb_rx) {
>                         rte_pause();
> @@ -527,8 +555,9 @@ pipeline_queue_worker_multi_stage_burst_fwd_vector(void *arg)
>                         }
>                 }
>
> -               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
> +               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
>         }
> +       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
>
>         return 0;
>  }
> --
> 2.25.1
>

  reply	other threads:[~2022-05-13 13:40 UTC|newest]

Thread overview: 17+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2022-04-26 21:14 [PATCH 1/6] app/eventdev: simplify signal handling and teardown Pavan Nikhilesh
2022-04-26 21:14 ` [PATCH 2/6] app/eventdev: clean up worker state before exit Pavan Nikhilesh
2022-05-13 13:40   ` Jerin Jacob [this message]
2022-04-26 21:14 ` [PATCH 3/6] examples/eventdev: " Pavan Nikhilesh
2022-04-26 21:14 ` [PATCH 4/6] examples/l3fwd: " Pavan Nikhilesh
2022-04-26 21:14 ` [PATCH 5/6] examples/l2fwd-event: " Pavan Nikhilesh
2022-04-26 21:14 ` [PATCH 6/6] examples/ipsec-secgw: cleanup " Pavan Nikhilesh
2022-05-13 13:41   ` Jerin Jacob
2022-05-13 11:49 ` [PATCH 1/6] app/eventdev: simplify signal handling and teardown Jerin Jacob
2022-05-13 13:39 ` Jerin Jacob
2022-05-13 16:07 ` [PATCH v2 " pbhagavatula
2022-05-13 16:07   ` [PATCH v2 2/6] app/eventdev: clean up worker state before exit pbhagavatula
2022-05-13 16:07   ` [PATCH v2 3/6] examples/eventdev: " pbhagavatula
2022-05-13 16:07   ` [PATCH v2 4/6] examples/l3fwd: " pbhagavatula
2022-05-13 16:07   ` [PATCH v2 5/6] examples/l2fwd-event: " pbhagavatula
2022-05-13 16:07   ` [PATCH v2 6/6] examples/ipsec-secgw: cleanup " pbhagavatula
2022-05-16 16:46   ` [PATCH v2 1/6] app/eventdev: simplify signal handling and teardown Jerin Jacob

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=CALBAE1OZEJXdikPckmnE8DkXFmDyypHpPX5yWUdUNzQSqZx1Mw@mail.gmail.com \
    --to=jerinjacobk@gmail.com \
    --cc=dev@dpdk.org \
    --cc=jerinj@marvell.com \
    --cc=pbhagavatula@marvell.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).