From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 0010FA00C3; Fri, 13 May 2022 15:40:30 +0200 (CEST) Received: from [217.70.189.124] (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id E309C40DF7; Fri, 13 May 2022 15:40:30 +0200 (CEST) Received: from mail-io1-f43.google.com (mail-io1-f43.google.com [209.85.166.43]) by mails.dpdk.org (Postfix) with ESMTP id 6DE0E40DDE for ; Fri, 13 May 2022 15:40:29 +0200 (CEST) Received: by mail-io1-f43.google.com with SMTP id o190so8668068iof.10 for ; Fri, 13 May 2022 06:40:29 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20210112; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=Ie3UBamNLdqvY5q0+GCm18H8wsvSleJOj3GROJYuZv4=; b=HLMTJeiS1aTGyrXqrDl2tFxXPMepol1/F9LKdJjVedgSIE96A/daivv/+gKvlfsqb3 uwVpqLnmCJpokLuPBhgOFfqH3gs645dsnqHuOERCkPno2/cb2N0lPFyIS1cHQ5knQ2hv R9Um0THmBF77sNWigKCpxsqvcsSShw0/yON6GiBBNd8Q5dTiAn3DIUDM+/3QQy+sz8bI 1R5oJ3BuN+E8WbylgOT8p0ZXrDpp5EM4YLRayRfIBQUy796Mj5pculW41JIrms1hITRz B1IwTHAgZZjQC++Hm0HnLIFiujSeUt4qJ//mO1xuISAe4kZjSszWaliIxIjBA/QLUjXL 4V+Q== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20210112; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=Ie3UBamNLdqvY5q0+GCm18H8wsvSleJOj3GROJYuZv4=; b=z7iUrwJC3YfPe+5zb8c5fEQBemG+CGxVvY4bzmkJragkT0yv2SZ7Ah1wmOy5fjrF6U yd+Hoyd21m79TWV9tp4XdOAY237tXcR5S0K5DZNnYz4xLv0bPwnlDN9s+A3+QRsbvD1B QR8H2INiPlyKuSWM5UPjRq9zjpuGdKRDy/b4YCibA65MFdNnfk33xGkgUkpG7s7ohlUQ JF2gobswlk2OoINAHdAxqECdHx1vi1LSX1QG0bQvAFNPKa/d/B2x+cJw0NiT+ea+j/5v xT4yJYagpHPG+2QyuwMJ2U3LE8vzTS9jsHv5spYw0qykEdF/p/WjM6QBpAgOabEtdIQY yTnA== X-Gm-Message-State: AOAM530COCLuAdxuap+bkjYixAX1xupyZ2jlfV4qxVHJvO/kM3zUxtd9 +NwowZ3eF7mEydKB4CkS56tYfQVAWslpE8GMB9/7afRkQWE= X-Google-Smtp-Source: ABdhPJxf6DP3oWPT+CGNqdRhOEPWarn73aVX+6AwoBRQ8zTt/qi6KyZUtcsKYz06C/UxxAZafTmUtcug8GcIBaTKab8= X-Received: by 2002:a05:6602:2b0d:b0:649:b2f:6290 with SMTP id p13-20020a0566022b0d00b006490b2f6290mr2283878iov.94.1652449228414; Fri, 13 May 2022 06:40:28 -0700 (PDT) MIME-Version: 1.0 References: <20220426211412.6138-1-pbhagavatula@marvell.com> <20220426211412.6138-2-pbhagavatula@marvell.com> In-Reply-To: <20220426211412.6138-2-pbhagavatula@marvell.com> From: Jerin Jacob Date: Fri, 13 May 2022 19:10:01 +0530 Message-ID: Subject: Re: [PATCH 2/6] app/eventdev: clean up worker state before exit To: Pavan Nikhilesh Cc: Jerin Jacob , dpdk-dev Content-Type: text/plain; charset="UTF-8" X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org On Wed, Apr 27, 2022 at 2:44 AM Pavan Nikhilesh wrote: > > Event ports are configured to implicitly release the scheduler contexts > currently held in the next call to rte_event_dequeue_burst(). > A worker core might still hold a scheduling context during exit, as the > next call to rte_event_dequeue_burst() is never made. > This might lead to deadlock based on the worker exit timing and when > there are very less number of flows. > > Add clean up function to release any scheduling contexts held by the > worker by using RTE_EVENT_OP_RELEASE. > > Signed-off-by: Pavan Nikhilesh Acked-by: Jerin Jacob > --- > app/test-eventdev/test_perf_atq.c | 31 +++-- > app/test-eventdev/test_perf_common.c | 17 +++ > app/test-eventdev/test_perf_common.h | 3 + > app/test-eventdev/test_perf_queue.c | 30 +++-- > app/test-eventdev/test_pipeline_atq.c | 134 ++++++++++++--------- > app/test-eventdev/test_pipeline_common.c | 39 ++++++ > app/test-eventdev/test_pipeline_common.h | 59 ++++++--- > app/test-eventdev/test_pipeline_queue.c | 145 ++++++++++++++--------- > 8 files changed, 304 insertions(+), 154 deletions(-) > > diff --git a/app/test-eventdev/test_perf_atq.c b/app/test-eventdev/test_perf_atq.c > index bac3ea602f..5a0b190384 100644 > --- a/app/test-eventdev/test_perf_atq.c > +++ b/app/test-eventdev/test_perf_atq.c > @@ -37,13 +37,14 @@ atq_fwd_event(struct rte_event *const ev, uint8_t *const sched_type_list, > static int > perf_atq_worker(void *arg, const int enable_fwd_latency) > { > - PERF_WORKER_INIT; > + uint16_t enq = 0, deq = 0; > struct rte_event ev; > + PERF_WORKER_INIT; > > while (t->done == false) { > - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > > - if (!event) { > + if (!deq) { > rte_pause(); > continue; > } > @@ -78,24 +79,29 @@ perf_atq_worker(void *arg, const int enable_fwd_latency) > bufs, sz, cnt); > } else { > atq_fwd_event(&ev, sched_type_list, nb_stages); > - while (rte_event_enqueue_burst(dev, port, &ev, 1) != 1) > - rte_pause(); > + do { > + enq = rte_event_enqueue_burst(dev, port, &ev, > + 1); > + } while (!enq && !t->done); > } > } > + > + perf_worker_cleanup(pool, dev, port, &ev, enq, deq); > + > return 0; > } > > static int > perf_atq_worker_burst(void *arg, const int enable_fwd_latency) > { > - PERF_WORKER_INIT; > - uint16_t i; > /* +1 to avoid prefetch out of array check */ > struct rte_event ev[BURST_SIZE + 1]; > + uint16_t enq = 0, nb_rx = 0; > + PERF_WORKER_INIT; > + uint16_t i; > > while (t->done == false) { > - uint16_t const nb_rx = rte_event_dequeue_burst(dev, port, ev, > - BURST_SIZE, 0); > + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > > if (!nb_rx) { > rte_pause(); > @@ -146,14 +152,15 @@ perf_atq_worker_burst(void *arg, const int enable_fwd_latency) > } > } > > - uint16_t enq; > - > enq = rte_event_enqueue_burst(dev, port, ev, nb_rx); > - while (enq < nb_rx) { > + while ((enq < nb_rx) && !t->done) { > enq += rte_event_enqueue_burst(dev, port, > ev + enq, nb_rx - enq); > } > } > + > + perf_worker_cleanup(pool, dev, port, ev, enq, nb_rx); > + > return 0; > } > > diff --git a/app/test-eventdev/test_perf_common.c b/app/test-eventdev/test_perf_common.c > index e93b0e7272..f673a9fddd 100644 > --- a/app/test-eventdev/test_perf_common.c > +++ b/app/test-eventdev/test_perf_common.c > @@ -985,6 +985,23 @@ perf_opt_dump(struct evt_options *opt, uint8_t nb_queues) > evt_dump("prod_enq_burst_sz", "%d", opt->prod_enq_burst_sz); > } > > +void > +perf_worker_cleanup(struct rte_mempool *const pool, uint8_t dev_id, > + uint8_t port_id, struct rte_event events[], uint16_t nb_enq, > + uint16_t nb_deq) > +{ > + int i; > + > + if (nb_deq) { > + for (i = nb_enq; i < nb_deq; i++) > + rte_mempool_put(pool, events[i].event_ptr); > + > + for (i = 0; i < nb_deq; i++) > + events[i].op = RTE_EVENT_OP_RELEASE; > + rte_event_enqueue_burst(dev_id, port_id, events, nb_deq); > + } > +} > + > void > perf_eventdev_destroy(struct evt_test *test, struct evt_options *opt) > { > diff --git a/app/test-eventdev/test_perf_common.h b/app/test-eventdev/test_perf_common.h > index e504bb1df9..f6bfc73be0 100644 > --- a/app/test-eventdev/test_perf_common.h > +++ b/app/test-eventdev/test_perf_common.h > @@ -184,5 +184,8 @@ void perf_cryptodev_destroy(struct evt_test *test, struct evt_options *opt); > void perf_ethdev_destroy(struct evt_test *test, struct evt_options *opt); > void perf_ethdev_rx_stop(struct evt_test *test, struct evt_options *opt); > void perf_mempool_destroy(struct evt_test *test, struct evt_options *opt); > +void perf_worker_cleanup(struct rte_mempool *const pool, uint8_t dev_id, > + uint8_t port_id, struct rte_event events[], > + uint16_t nb_enq, uint16_t nb_deq); > > #endif /* _TEST_PERF_COMMON_ */ > diff --git a/app/test-eventdev/test_perf_queue.c b/app/test-eventdev/test_perf_queue.c > index 108f1742a7..b498cacef6 100644 > --- a/app/test-eventdev/test_perf_queue.c > +++ b/app/test-eventdev/test_perf_queue.c > @@ -39,13 +39,14 @@ fwd_event(struct rte_event *const ev, uint8_t *const sched_type_list, > static int > perf_queue_worker(void *arg, const int enable_fwd_latency) > { > - PERF_WORKER_INIT; > + uint16_t enq = 0, deq = 0; > struct rte_event ev; > + PERF_WORKER_INIT; > > while (t->done == false) { > - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > > - if (!event) { > + if (!deq) { > rte_pause(); > continue; > } > @@ -80,24 +81,29 @@ perf_queue_worker(void *arg, const int enable_fwd_latency) > &ev, w, bufs, sz, cnt); > } else { > fwd_event(&ev, sched_type_list, nb_stages); > - while (rte_event_enqueue_burst(dev, port, &ev, 1) != 1) > - rte_pause(); > + do { > + enq = rte_event_enqueue_burst(dev, port, &ev, > + 1); > + } while (!enq && !t->done); > } > } > + > + perf_worker_cleanup(pool, dev, port, &ev, enq, deq); > + > return 0; > } > > static int > perf_queue_worker_burst(void *arg, const int enable_fwd_latency) > { > - PERF_WORKER_INIT; > - uint16_t i; > /* +1 to avoid prefetch out of array check */ > struct rte_event ev[BURST_SIZE + 1]; > + uint16_t enq = 0, nb_rx = 0; > + PERF_WORKER_INIT; > + uint16_t i; > > while (t->done == false) { > - uint16_t const nb_rx = rte_event_dequeue_burst(dev, port, ev, > - BURST_SIZE, 0); > + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > > if (!nb_rx) { > rte_pause(); > @@ -147,14 +153,16 @@ perf_queue_worker_burst(void *arg, const int enable_fwd_latency) > } > } > > - uint16_t enq; > > enq = rte_event_enqueue_burst(dev, port, ev, nb_rx); > - while (enq < nb_rx) { > + while (enq < nb_rx && !t->done) { > enq += rte_event_enqueue_burst(dev, port, > ev + enq, nb_rx - enq); > } > } > + > + perf_worker_cleanup(pool, dev, port, ev, enq, nb_rx); > + > return 0; > } > > diff --git a/app/test-eventdev/test_pipeline_atq.c b/app/test-eventdev/test_pipeline_atq.c > index 79218502ba..4b10197127 100644 > --- a/app/test-eventdev/test_pipeline_atq.c > +++ b/app/test-eventdev/test_pipeline_atq.c > @@ -21,18 +21,20 @@ static __rte_noinline int > pipeline_atq_worker_single_stage_tx(void *arg) > { > PIPELINE_WORKER_SINGLE_STAGE_INIT; > + uint8_t enq = 0, deq = 0; > > while (t->done == false) { > - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > > - if (!event) { > + if (!deq) { > rte_pause(); > continue; > } > > - pipeline_event_tx(dev, port, &ev); > + deq = pipeline_event_tx(dev, port, &ev, t); > w->processed_pkts++; > } > + pipeline_worker_cleanup(dev, port, &ev, enq, deq); > > return 0; > } > @@ -42,20 +44,22 @@ pipeline_atq_worker_single_stage_fwd(void *arg) > { > PIPELINE_WORKER_SINGLE_STAGE_INIT; > const uint8_t *tx_queue = t->tx_evqueue_id; > + uint8_t enq = 0, deq = 0; > > while (t->done == false) { > - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > > - if (!event) { > + if (!deq) { > rte_pause(); > continue; > } > > ev.queue_id = tx_queue[ev.mbuf->port]; > pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); > - pipeline_event_enqueue(dev, port, &ev); > + enq = pipeline_event_enqueue(dev, port, &ev, t); > w->processed_pkts++; > } > + pipeline_worker_cleanup(dev, port, &ev, enq, deq); > > return 0; > } > @@ -64,10 +68,10 @@ static __rte_noinline int > pipeline_atq_worker_single_stage_burst_tx(void *arg) > { > PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT; > + uint16_t nb_rx = 0, nb_tx = 0; > > while (t->done == false) { > - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, > - BURST_SIZE, 0); > + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > > if (!nb_rx) { > rte_pause(); > @@ -79,9 +83,10 @@ pipeline_atq_worker_single_stage_burst_tx(void *arg) > rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0); > } > > - pipeline_event_tx_burst(dev, port, ev, nb_rx); > - w->processed_pkts += nb_rx; > + nb_tx = pipeline_event_tx_burst(dev, port, ev, nb_rx, t); > + w->processed_pkts += nb_tx; > } > + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); > > return 0; > } > @@ -91,10 +96,10 @@ pipeline_atq_worker_single_stage_burst_fwd(void *arg) > { > PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT; > const uint8_t *tx_queue = t->tx_evqueue_id; > + uint16_t nb_rx = 0, nb_tx = 0; > > while (t->done == false) { > - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, > - BURST_SIZE, 0); > + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > > if (!nb_rx) { > rte_pause(); > @@ -108,9 +113,10 @@ pipeline_atq_worker_single_stage_burst_fwd(void *arg) > pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); > } > > - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); > - w->processed_pkts += nb_rx; > + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); > + w->processed_pkts += nb_tx; > } > + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); > > return 0; > } > @@ -119,19 +125,21 @@ static __rte_noinline int > pipeline_atq_worker_single_stage_tx_vector(void *arg) > { > PIPELINE_WORKER_SINGLE_STAGE_INIT; > + uint8_t enq = 0, deq = 0; > uint16_t vector_sz; > > while (!t->done) { > - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > > - if (!event) { > + if (!deq) { > rte_pause(); > continue; > } > vector_sz = ev.vec->nb_elem; > - pipeline_event_tx_vector(dev, port, &ev); > + enq = pipeline_event_tx_vector(dev, port, &ev, t); > w->processed_pkts += vector_sz; > } > + pipeline_worker_cleanup(dev, port, &ev, enq, deq); > > return 0; > } > @@ -141,12 +149,13 @@ pipeline_atq_worker_single_stage_fwd_vector(void *arg) > { > PIPELINE_WORKER_SINGLE_STAGE_INIT; > const uint8_t *tx_queue = t->tx_evqueue_id; > + uint8_t enq = 0, deq = 0; > uint16_t vector_sz; > > while (!t->done) { > - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > > - if (!event) { > + if (!deq) { > rte_pause(); > continue; > } > @@ -155,9 +164,10 @@ pipeline_atq_worker_single_stage_fwd_vector(void *arg) > ev.queue_id = tx_queue[ev.vec->port]; > ev.vec->queue = 0; > pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC); > - pipeline_event_enqueue(dev, port, &ev); > + enq = pipeline_event_enqueue(dev, port, &ev, t); > w->processed_pkts += vector_sz; > } > + pipeline_worker_cleanup(dev, port, &ev, enq, deq); > > return 0; > } > @@ -166,11 +176,11 @@ static __rte_noinline int > pipeline_atq_worker_single_stage_burst_tx_vector(void *arg) > { > PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT; > + uint16_t nb_rx = 0, nb_tx = 0; > uint16_t vector_sz; > > while (!t->done) { > - uint16_t nb_rx = > - rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > > if (!nb_rx) { > rte_pause(); > @@ -182,9 +192,10 @@ pipeline_atq_worker_single_stage_burst_tx_vector(void *arg) > ev[i].vec->queue = 0; > } > > - pipeline_event_tx_burst(dev, port, ev, nb_rx); > + nb_tx = pipeline_event_tx_burst(dev, port, ev, nb_rx, t); > w->processed_pkts += vector_sz; > } > + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); > > return 0; > } > @@ -194,11 +205,11 @@ pipeline_atq_worker_single_stage_burst_fwd_vector(void *arg) > { > PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT; > const uint8_t *tx_queue = t->tx_evqueue_id; > + uint16_t nb_rx = 0, nb_tx = 0; > uint16_t vector_sz; > > while (!t->done) { > - uint16_t nb_rx = > - rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > > if (!nb_rx) { > rte_pause(); > @@ -214,9 +225,10 @@ pipeline_atq_worker_single_stage_burst_fwd_vector(void *arg) > RTE_SCHED_TYPE_ATOMIC); > } > > - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); > + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); > w->processed_pkts += vector_sz; > } > + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); > > return 0; > } > @@ -225,11 +237,12 @@ static __rte_noinline int > pipeline_atq_worker_multi_stage_tx(void *arg) > { > PIPELINE_WORKER_MULTI_STAGE_INIT; > + uint8_t enq = 0, deq = 0; > > while (t->done == false) { > - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > > - if (!event) { > + if (!deq) { > rte_pause(); > continue; > } > @@ -237,15 +250,16 @@ pipeline_atq_worker_multi_stage_tx(void *arg) > cq_id = ev.sub_event_type % nb_stages; > > if (cq_id == last_queue) { > - pipeline_event_tx(dev, port, &ev); > + enq = pipeline_event_tx(dev, port, &ev, t); > w->processed_pkts++; > continue; > } > > ev.sub_event_type++; > pipeline_fwd_event(&ev, sched_type_list[cq_id]); > - pipeline_event_enqueue(dev, port, &ev); > + enq = pipeline_event_enqueue(dev, port, &ev, t); > } > + pipeline_worker_cleanup(dev, port, &ev, enq, deq); > > return 0; > } > @@ -255,11 +269,12 @@ pipeline_atq_worker_multi_stage_fwd(void *arg) > { > PIPELINE_WORKER_MULTI_STAGE_INIT; > const uint8_t *tx_queue = t->tx_evqueue_id; > + uint8_t enq = 0, deq = 0; > > while (t->done == false) { > - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > > - if (!event) { > + if (!deq) { > rte_pause(); > continue; > } > @@ -275,8 +290,9 @@ pipeline_atq_worker_multi_stage_fwd(void *arg) > pipeline_fwd_event(&ev, sched_type_list[cq_id]); > } > > - pipeline_event_enqueue(dev, port, &ev); > + enq = pipeline_event_enqueue(dev, port, &ev, t); > } > + pipeline_worker_cleanup(dev, port, &ev, enq, deq); > > return 0; > } > @@ -285,10 +301,10 @@ static __rte_noinline int > pipeline_atq_worker_multi_stage_burst_tx(void *arg) > { > PIPELINE_WORKER_MULTI_STAGE_BURST_INIT; > + uint16_t nb_rx = 0, nb_tx = 0; > > while (t->done == false) { > - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, > - BURST_SIZE, 0); > + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > > if (!nb_rx) { > rte_pause(); > @@ -300,7 +316,7 @@ pipeline_atq_worker_multi_stage_burst_tx(void *arg) > cq_id = ev[i].sub_event_type % nb_stages; > > if (cq_id == last_queue) { > - pipeline_event_tx(dev, port, &ev[i]); > + pipeline_event_tx(dev, port, &ev[i], t); > ev[i].op = RTE_EVENT_OP_RELEASE; > w->processed_pkts++; > continue; > @@ -310,8 +326,9 @@ pipeline_atq_worker_multi_stage_burst_tx(void *arg) > pipeline_fwd_event(&ev[i], sched_type_list[cq_id]); > } > > - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); > + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); > } > + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); > > return 0; > } > @@ -321,10 +338,10 @@ pipeline_atq_worker_multi_stage_burst_fwd(void *arg) > { > PIPELINE_WORKER_MULTI_STAGE_BURST_INIT; > const uint8_t *tx_queue = t->tx_evqueue_id; > + uint16_t nb_rx = 0, nb_tx = 0; > > while (t->done == false) { > - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, > - BURST_SIZE, 0); > + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > > if (!nb_rx) { > rte_pause(); > @@ -347,8 +364,9 @@ pipeline_atq_worker_multi_stage_burst_fwd(void *arg) > } > } > > - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); > + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); > } > + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); > > return 0; > } > @@ -357,12 +375,13 @@ static __rte_noinline int > pipeline_atq_worker_multi_stage_tx_vector(void *arg) > { > PIPELINE_WORKER_MULTI_STAGE_INIT; > + uint8_t enq = 0, deq = 0; > uint16_t vector_sz; > > while (!t->done) { > - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > > - if (!event) { > + if (!deq) { > rte_pause(); > continue; > } > @@ -371,15 +390,16 @@ pipeline_atq_worker_multi_stage_tx_vector(void *arg) > > if (cq_id == last_queue) { > vector_sz = ev.vec->nb_elem; > - pipeline_event_tx_vector(dev, port, &ev); > + enq = pipeline_event_tx_vector(dev, port, &ev, t); > w->processed_pkts += vector_sz; > continue; > } > > ev.sub_event_type++; > pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]); > - pipeline_event_enqueue(dev, port, &ev); > + enq = pipeline_event_enqueue(dev, port, &ev, t); > } > + pipeline_worker_cleanup(dev, port, &ev, enq, deq); > > return 0; > } > @@ -389,12 +409,13 @@ pipeline_atq_worker_multi_stage_fwd_vector(void *arg) > { > PIPELINE_WORKER_MULTI_STAGE_INIT; > const uint8_t *tx_queue = t->tx_evqueue_id; > + uint8_t enq = 0, deq = 0; > uint16_t vector_sz; > > while (!t->done) { > - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > > - if (!event) { > + if (!deq) { > rte_pause(); > continue; > } > @@ -406,14 +427,15 @@ pipeline_atq_worker_multi_stage_fwd_vector(void *arg) > ev.vec->queue = 0; > vector_sz = ev.vec->nb_elem; > pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC); > - pipeline_event_enqueue(dev, port, &ev); > + enq = pipeline_event_enqueue(dev, port, &ev, t); > w->processed_pkts += vector_sz; > } else { > ev.sub_event_type++; > pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]); > - pipeline_event_enqueue(dev, port, &ev); > + enq = pipeline_event_enqueue(dev, port, &ev, t); > } > } > + pipeline_worker_cleanup(dev, port, &ev, enq, deq); > > return 0; > } > @@ -422,11 +444,11 @@ static __rte_noinline int > pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg) > { > PIPELINE_WORKER_MULTI_STAGE_BURST_INIT; > + uint16_t nb_rx = 0, nb_tx = 0; > uint16_t vector_sz; > > while (!t->done) { > - uint16_t nb_rx = > - rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > > if (!nb_rx) { > rte_pause(); > @@ -438,7 +460,7 @@ pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg) > > if (cq_id == last_queue) { > vector_sz = ev[i].vec->nb_elem; > - pipeline_event_tx_vector(dev, port, &ev[i]); > + pipeline_event_tx_vector(dev, port, &ev[i], t); > ev[i].op = RTE_EVENT_OP_RELEASE; > w->processed_pkts += vector_sz; > continue; > @@ -449,8 +471,9 @@ pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg) > sched_type_list[cq_id]); > } > > - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); > + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); > } > + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); > > return 0; > } > @@ -460,11 +483,11 @@ pipeline_atq_worker_multi_stage_burst_fwd_vector(void *arg) > { > PIPELINE_WORKER_MULTI_STAGE_BURST_INIT; > const uint8_t *tx_queue = t->tx_evqueue_id; > + uint16_t nb_rx = 0, nb_tx = 0; > uint16_t vector_sz; > > while (!t->done) { > - uint16_t nb_rx = > - rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > > if (!nb_rx) { > rte_pause(); > @@ -488,8 +511,9 @@ pipeline_atq_worker_multi_stage_burst_fwd_vector(void *arg) > } > } > > - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); > + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); > } > + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); > > return 0; > } > diff --git a/app/test-eventdev/test_pipeline_common.c b/app/test-eventdev/test_pipeline_common.c > index d994c91678..a8dd070000 100644 > --- a/app/test-eventdev/test_pipeline_common.c > +++ b/app/test-eventdev/test_pipeline_common.c > @@ -505,6 +505,45 @@ pipeline_event_tx_adapter_setup(struct evt_options *opt, > return ret; > } > > +static void > +pipeline_vector_array_free(struct rte_event events[], uint16_t num) > +{ > + uint16_t i; > + > + for (i = 0; i < num; i++) { > + rte_pktmbuf_free_bulk(events[i].vec->mbufs, > + events[i].vec->nb_elem); > + rte_mempool_put(rte_mempool_from_obj(events[i].vec), > + events[i].vec); > + } > +} > + > +void > +pipeline_worker_cleanup(uint8_t dev, uint8_t port, struct rte_event ev[], > + uint16_t enq, uint16_t deq) > +{ > + int i; > + > + if (!(deq - enq)) > + return; > + > + if (deq) { > + for (i = enq; i < deq; i++) { > + if (ev[i].op == RTE_EVENT_OP_RELEASE) > + continue; > + if (ev[i].event_type & RTE_EVENT_TYPE_VECTOR) > + pipeline_vector_array_free(&ev[i], 1); > + else > + rte_pktmbuf_free(ev[i].mbuf); > + } > + > + for (i = 0; i < deq; i++) > + ev[i].op = RTE_EVENT_OP_RELEASE; > + > + rte_event_enqueue_burst(dev, port, ev, deq); > + } > +} > + > void > pipeline_ethdev_rx_stop(struct evt_test *test, struct evt_options *opt) > { > diff --git a/app/test-eventdev/test_pipeline_common.h b/app/test-eventdev/test_pipeline_common.h > index c979c33772..a6443faea4 100644 > --- a/app/test-eventdev/test_pipeline_common.h > +++ b/app/test-eventdev/test_pipeline_common.h > @@ -109,59 +109,80 @@ pipeline_fwd_event_vector(struct rte_event *ev, uint8_t sched) > ev->sched_type = sched; > } > > -static __rte_always_inline void > +static __rte_always_inline uint8_t > pipeline_event_tx(const uint8_t dev, const uint8_t port, > - struct rte_event * const ev) > + struct rte_event *const ev, struct test_pipeline *t) > { > + uint8_t enq; > + > rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0); > - while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0)) > - rte_pause(); > + do { > + enq = rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0); > + } while (!enq && !t->done); > + > + return enq; > } > > -static __rte_always_inline void > +static __rte_always_inline uint8_t > pipeline_event_tx_vector(const uint8_t dev, const uint8_t port, > - struct rte_event *const ev) > + struct rte_event *const ev, struct test_pipeline *t) > { > + uint8_t enq; > + > ev->vec->queue = 0; > + do { > + enq = rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0); > + } while (!enq && !t->done); > > - while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0)) > - rte_pause(); > + return enq; > } > > -static __rte_always_inline void > +static __rte_always_inline uint16_t > pipeline_event_tx_burst(const uint8_t dev, const uint8_t port, > - struct rte_event *ev, const uint16_t nb_rx) > + struct rte_event *ev, const uint16_t nb_rx, > + struct test_pipeline *t) > { > uint16_t enq; > > enq = rte_event_eth_tx_adapter_enqueue(dev, port, ev, nb_rx, 0); > - while (enq < nb_rx) { > + while (enq < nb_rx && !t->done) { > enq += rte_event_eth_tx_adapter_enqueue(dev, port, > ev + enq, nb_rx - enq, 0); > } > + > + return enq; > } > > -static __rte_always_inline void > +static __rte_always_inline uint8_t > pipeline_event_enqueue(const uint8_t dev, const uint8_t port, > - struct rte_event *ev) > + struct rte_event *ev, struct test_pipeline *t) > { > - while (rte_event_enqueue_burst(dev, port, ev, 1) != 1) > - rte_pause(); > + uint8_t enq; > + > + do { > + enq = rte_event_enqueue_burst(dev, port, ev, 1); > + } while (!enq && !t->done); > + > + return enq; > } > > -static __rte_always_inline void > +static __rte_always_inline uint16_t > pipeline_event_enqueue_burst(const uint8_t dev, const uint8_t port, > - struct rte_event *ev, const uint16_t nb_rx) > + struct rte_event *ev, const uint16_t nb_rx, > + struct test_pipeline *t) > { > uint16_t enq; > > enq = rte_event_enqueue_burst(dev, port, ev, nb_rx); > - while (enq < nb_rx) { > + while (enq < nb_rx && !t->done) { > enq += rte_event_enqueue_burst(dev, port, > ev + enq, nb_rx - enq); > } > + > + return enq; > } > > + > static inline int > pipeline_nb_event_ports(struct evt_options *opt) > { > @@ -188,5 +209,7 @@ void pipeline_eventdev_destroy(struct evt_test *test, struct evt_options *opt); > void pipeline_ethdev_destroy(struct evt_test *test, struct evt_options *opt); > void pipeline_ethdev_rx_stop(struct evt_test *test, struct evt_options *opt); > void pipeline_mempool_destroy(struct evt_test *test, struct evt_options *opt); > +void pipeline_worker_cleanup(uint8_t dev, uint8_t port, struct rte_event ev[], > + uint16_t enq, uint16_t deq); > > #endif /* _TEST_PIPELINE_COMMON_ */ > diff --git a/app/test-eventdev/test_pipeline_queue.c b/app/test-eventdev/test_pipeline_queue.c > index 343f8f3b1d..e989396474 100644 > --- a/app/test-eventdev/test_pipeline_queue.c > +++ b/app/test-eventdev/test_pipeline_queue.c > @@ -21,24 +21,27 @@ static __rte_noinline int > pipeline_queue_worker_single_stage_tx(void *arg) > { > PIPELINE_WORKER_SINGLE_STAGE_INIT; > + uint8_t enq = 0, deq = 0; > > while (t->done == false) { > - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > > - if (!event) { > + if (!deq) { > rte_pause(); > continue; > } > > if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { > - pipeline_event_tx(dev, port, &ev); > + enq = pipeline_event_tx(dev, port, &ev, t); > + ev.op = RTE_EVENT_OP_RELEASE; > w->processed_pkts++; > } else { > ev.queue_id++; > pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); > - pipeline_event_enqueue(dev, port, &ev); > + enq = pipeline_event_enqueue(dev, port, &ev, t); > } > } > + pipeline_worker_cleanup(dev, port, &ev, enq, deq); > > return 0; > } > @@ -48,11 +51,12 @@ pipeline_queue_worker_single_stage_fwd(void *arg) > { > PIPELINE_WORKER_SINGLE_STAGE_INIT; > const uint8_t *tx_queue = t->tx_evqueue_id; > + uint8_t enq = 0, deq = 0; > > while (t->done == false) { > - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > > - if (!event) { > + if (!deq) { > rte_pause(); > continue; > } > @@ -60,9 +64,10 @@ pipeline_queue_worker_single_stage_fwd(void *arg) > ev.queue_id = tx_queue[ev.mbuf->port]; > rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0); > pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); > - pipeline_event_enqueue(dev, port, &ev); > + enq = pipeline_event_enqueue(dev, port, &ev, t); > w->processed_pkts++; > } > + pipeline_worker_cleanup(dev, port, &ev, enq, deq); > > return 0; > } > @@ -71,10 +76,10 @@ static __rte_noinline int > pipeline_queue_worker_single_stage_burst_tx(void *arg) > { > PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT; > + uint16_t nb_rx = 0, nb_tx = 0; > > while (t->done == false) { > - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, > - BURST_SIZE, 0); > + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > > if (!nb_rx) { > rte_pause(); > @@ -84,17 +89,18 @@ pipeline_queue_worker_single_stage_burst_tx(void *arg) > for (i = 0; i < nb_rx; i++) { > rte_prefetch0(ev[i + 1].mbuf); > if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { > - pipeline_event_tx(dev, port, &ev[i]); > + pipeline_event_tx(dev, port, &ev[i], t); > + ev[i].op = RTE_EVENT_OP_RELEASE; > w->processed_pkts++; > } else { > ev[i].queue_id++; > pipeline_fwd_event(&ev[i], > RTE_SCHED_TYPE_ATOMIC); > - pipeline_event_enqueue_burst(dev, port, ev, > - nb_rx); > } > } > + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); > } > + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); > > return 0; > } > @@ -104,10 +110,10 @@ pipeline_queue_worker_single_stage_burst_fwd(void *arg) > { > PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT; > const uint8_t *tx_queue = t->tx_evqueue_id; > + uint16_t nb_rx = 0, nb_tx = 0; > > while (t->done == false) { > - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, > - BURST_SIZE, 0); > + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > > if (!nb_rx) { > rte_pause(); > @@ -121,9 +127,10 @@ pipeline_queue_worker_single_stage_burst_fwd(void *arg) > pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); > } > > - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); > + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); > w->processed_pkts += nb_rx; > } > + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); > > return 0; > } > @@ -132,26 +139,29 @@ static __rte_noinline int > pipeline_queue_worker_single_stage_tx_vector(void *arg) > { > PIPELINE_WORKER_SINGLE_STAGE_INIT; > + uint8_t enq = 0, deq = 0; > uint16_t vector_sz; > > while (!t->done) { > - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > > - if (!event) { > + if (!deq) { > rte_pause(); > continue; > } > > if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { > vector_sz = ev.vec->nb_elem; > - pipeline_event_tx_vector(dev, port, &ev); > + enq = pipeline_event_tx_vector(dev, port, &ev, t); > + ev.op = RTE_EVENT_OP_RELEASE; > w->processed_pkts += vector_sz; > } else { > ev.queue_id++; > pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC); > - pipeline_event_enqueue(dev, port, &ev); > + enq = pipeline_event_enqueue(dev, port, &ev, t); > } > } > + pipeline_worker_cleanup(dev, port, &ev, enq, deq); > > return 0; > } > @@ -161,12 +171,13 @@ pipeline_queue_worker_single_stage_fwd_vector(void *arg) > { > PIPELINE_WORKER_SINGLE_STAGE_INIT; > const uint8_t *tx_queue = t->tx_evqueue_id; > + uint8_t enq = 0, deq = 0; > uint16_t vector_sz; > > while (!t->done) { > - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > > - if (!event) { > + if (!deq) { > rte_pause(); > continue; > } > @@ -175,9 +186,10 @@ pipeline_queue_worker_single_stage_fwd_vector(void *arg) > ev.vec->queue = 0; > vector_sz = ev.vec->nb_elem; > pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC); > - pipeline_event_enqueue(dev, port, &ev); > + enq = pipeline_event_enqueue(dev, port, &ev, t); > w->processed_pkts += vector_sz; > } > + pipeline_worker_cleanup(dev, port, &ev, enq, deq); > > return 0; > } > @@ -186,11 +198,11 @@ static __rte_noinline int > pipeline_queue_worker_single_stage_burst_tx_vector(void *arg) > { > PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT; > + uint16_t nb_rx = 0, nb_tx = 0; > uint16_t vector_sz; > > while (!t->done) { > - uint16_t nb_rx = > - rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > > if (!nb_rx) { > rte_pause(); > @@ -200,7 +212,7 @@ pipeline_queue_worker_single_stage_burst_tx_vector(void *arg) > for (i = 0; i < nb_rx; i++) { > if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { > vector_sz = ev[i].vec->nb_elem; > - pipeline_event_tx_vector(dev, port, &ev[i]); > + pipeline_event_tx_vector(dev, port, &ev[i], t); > ev[i].op = RTE_EVENT_OP_RELEASE; > w->processed_pkts += vector_sz; > } else { > @@ -210,8 +222,9 @@ pipeline_queue_worker_single_stage_burst_tx_vector(void *arg) > } > } > > - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); > + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); > } > + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); > > return 0; > } > @@ -221,11 +234,11 @@ pipeline_queue_worker_single_stage_burst_fwd_vector(void *arg) > { > PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT; > const uint8_t *tx_queue = t->tx_evqueue_id; > + uint16_t nb_rx = 0, nb_tx = 0; > uint16_t vector_sz; > > while (!t->done) { > - uint16_t nb_rx = > - rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > > if (!nb_rx) { > rte_pause(); > @@ -241,9 +254,10 @@ pipeline_queue_worker_single_stage_burst_fwd_vector(void *arg) > RTE_SCHED_TYPE_ATOMIC); > } > > - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); > + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); > w->processed_pkts += vector_sz; > } > + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); > > return 0; > } > @@ -253,11 +267,12 @@ pipeline_queue_worker_multi_stage_tx(void *arg) > { > PIPELINE_WORKER_MULTI_STAGE_INIT; > const uint8_t *tx_queue = t->tx_evqueue_id; > + uint8_t enq = 0, deq = 0; > > while (t->done == false) { > - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > > - if (!event) { > + if (!deq) { > rte_pause(); > continue; > } > @@ -265,7 +280,8 @@ pipeline_queue_worker_multi_stage_tx(void *arg) > cq_id = ev.queue_id % nb_stages; > > if (ev.queue_id == tx_queue[ev.mbuf->port]) { > - pipeline_event_tx(dev, port, &ev); > + enq = pipeline_event_tx(dev, port, &ev, t); > + ev.op = RTE_EVENT_OP_RELEASE; > w->processed_pkts++; > continue; > } > @@ -274,8 +290,9 @@ pipeline_queue_worker_multi_stage_tx(void *arg) > pipeline_fwd_event(&ev, cq_id != last_queue ? > sched_type_list[cq_id] : > RTE_SCHED_TYPE_ATOMIC); > - pipeline_event_enqueue(dev, port, &ev); > + enq = pipeline_event_enqueue(dev, port, &ev, t); > } > + pipeline_worker_cleanup(dev, port, &ev, enq, deq); > > return 0; > } > @@ -285,11 +302,12 @@ pipeline_queue_worker_multi_stage_fwd(void *arg) > { > PIPELINE_WORKER_MULTI_STAGE_INIT; > const uint8_t *tx_queue = t->tx_evqueue_id; > + uint8_t enq = 0, deq = 0; > > while (t->done == false) { > - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > > - if (!event) { > + if (!deq) { > rte_pause(); > continue; > } > @@ -300,14 +318,15 @@ pipeline_queue_worker_multi_stage_fwd(void *arg) > ev.queue_id = tx_queue[ev.mbuf->port]; > rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0); > pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); > - pipeline_event_enqueue(dev, port, &ev); > + enq = pipeline_event_enqueue(dev, port, &ev, t); > w->processed_pkts++; > } else { > ev.queue_id++; > pipeline_fwd_event(&ev, sched_type_list[cq_id]); > - pipeline_event_enqueue(dev, port, &ev); > + enq = pipeline_event_enqueue(dev, port, &ev, t); > } > } > + pipeline_worker_cleanup(dev, port, &ev, enq, deq); > > return 0; > } > @@ -317,10 +336,10 @@ pipeline_queue_worker_multi_stage_burst_tx(void *arg) > { > PIPELINE_WORKER_MULTI_STAGE_BURST_INIT; > const uint8_t *tx_queue = t->tx_evqueue_id; > + uint16_t nb_rx = 0, nb_tx = 0; > > while (t->done == false) { > - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, > - BURST_SIZE, 0); > + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > > if (!nb_rx) { > rte_pause(); > @@ -332,7 +351,8 @@ pipeline_queue_worker_multi_stage_burst_tx(void *arg) > cq_id = ev[i].queue_id % nb_stages; > > if (ev[i].queue_id == tx_queue[ev[i].mbuf->port]) { > - pipeline_event_tx(dev, port, &ev[i]); > + pipeline_event_tx(dev, port, &ev[i], t); > + ev[i].op = RTE_EVENT_OP_RELEASE; > w->processed_pkts++; > continue; > } > @@ -341,9 +361,10 @@ pipeline_queue_worker_multi_stage_burst_tx(void *arg) > pipeline_fwd_event(&ev[i], cq_id != last_queue ? > sched_type_list[cq_id] : > RTE_SCHED_TYPE_ATOMIC); > - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); > } > + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); > } > + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); > > return 0; > } > @@ -353,11 +374,11 @@ pipeline_queue_worker_multi_stage_burst_fwd(void *arg) > { > PIPELINE_WORKER_MULTI_STAGE_BURST_INIT; > const uint8_t *tx_queue = t->tx_evqueue_id; > + uint16_t nb_rx = 0, nb_tx = 0; > > while (t->done == false) { > uint16_t processed_pkts = 0; > - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, > - BURST_SIZE, 0); > + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > > if (!nb_rx) { > rte_pause(); > @@ -381,9 +402,10 @@ pipeline_queue_worker_multi_stage_burst_fwd(void *arg) > } > } > > - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); > + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); > w->processed_pkts += processed_pkts; > } > + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); > > return 0; > } > @@ -393,12 +415,13 @@ pipeline_queue_worker_multi_stage_tx_vector(void *arg) > { > PIPELINE_WORKER_MULTI_STAGE_INIT; > const uint8_t *tx_queue = t->tx_evqueue_id; > + uint8_t enq = 0, deq = 0; > uint16_t vector_sz; > > while (!t->done) { > - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > > - if (!event) { > + if (!deq) { > rte_pause(); > continue; > } > @@ -407,8 +430,9 @@ pipeline_queue_worker_multi_stage_tx_vector(void *arg) > > if (ev.queue_id == tx_queue[ev.vec->port]) { > vector_sz = ev.vec->nb_elem; > - pipeline_event_tx_vector(dev, port, &ev); > + enq = pipeline_event_tx_vector(dev, port, &ev, t); > w->processed_pkts += vector_sz; > + ev.op = RTE_EVENT_OP_RELEASE; > continue; > } > > @@ -416,8 +440,9 @@ pipeline_queue_worker_multi_stage_tx_vector(void *arg) > pipeline_fwd_event_vector(&ev, cq_id != last_queue > ? sched_type_list[cq_id] > : RTE_SCHED_TYPE_ATOMIC); > - pipeline_event_enqueue(dev, port, &ev); > + enq = pipeline_event_enqueue(dev, port, &ev, t); > } > + pipeline_worker_cleanup(dev, port, &ev, enq, deq); > > return 0; > } > @@ -427,12 +452,13 @@ pipeline_queue_worker_multi_stage_fwd_vector(void *arg) > { > PIPELINE_WORKER_MULTI_STAGE_INIT; > const uint8_t *tx_queue = t->tx_evqueue_id; > + uint8_t enq = 0, deq = 0; > uint16_t vector_sz; > > while (!t->done) { > - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); > > - if (!event) { > + if (!deq) { > rte_pause(); > continue; > } > @@ -449,8 +475,9 @@ pipeline_queue_worker_multi_stage_fwd_vector(void *arg) > pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]); > } > > - pipeline_event_enqueue(dev, port, &ev); > + enq = pipeline_event_enqueue(dev, port, &ev, t); > } > + pipeline_worker_cleanup(dev, port, &ev, enq, deq); > > return 0; > } > @@ -460,11 +487,11 @@ pipeline_queue_worker_multi_stage_burst_tx_vector(void *arg) > { > PIPELINE_WORKER_MULTI_STAGE_BURST_INIT; > const uint8_t *tx_queue = t->tx_evqueue_id; > + uint16_t nb_rx = 0, nb_tx = 0; > uint16_t vector_sz; > > while (!t->done) { > - uint16_t nb_rx = > - rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > > if (!nb_rx) { > rte_pause(); > @@ -476,7 +503,7 @@ pipeline_queue_worker_multi_stage_burst_tx_vector(void *arg) > > if (ev[i].queue_id == tx_queue[ev[i].vec->port]) { > vector_sz = ev[i].vec->nb_elem; > - pipeline_event_tx_vector(dev, port, &ev[i]); > + pipeline_event_tx_vector(dev, port, &ev[i], t); > ev[i].op = RTE_EVENT_OP_RELEASE; > w->processed_pkts += vector_sz; > continue; > @@ -489,8 +516,9 @@ pipeline_queue_worker_multi_stage_burst_tx_vector(void *arg) > : RTE_SCHED_TYPE_ATOMIC); > } > > - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); > + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); > } > + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); > > return 0; > } > @@ -500,11 +528,11 @@ pipeline_queue_worker_multi_stage_burst_fwd_vector(void *arg) > { > PIPELINE_WORKER_MULTI_STAGE_BURST_INIT; > const uint8_t *tx_queue = t->tx_evqueue_id; > + uint16_t nb_rx = 0, nb_tx = 0; > uint16_t vector_sz; > > while (!t->done) { > - uint16_t nb_rx = > - rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); > > if (!nb_rx) { > rte_pause(); > @@ -527,8 +555,9 @@ pipeline_queue_worker_multi_stage_burst_fwd_vector(void *arg) > } > } > > - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); > + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); > } > + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); > > return 0; > } > -- > 2.25.1 >