From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 10CC2A00C3; Fri, 13 May 2022 18:07:47 +0200 (CEST) Received: from [217.70.189.124] (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id D4ACC4283E; Fri, 13 May 2022 18:07:32 +0200 (CEST) Received: from mx0b-0016f401.pphosted.com (mx0b-0016f401.pphosted.com [67.231.156.173]) by mails.dpdk.org (Postfix) with ESMTP id CFFB042835 for ; Fri, 13 May 2022 18:07:30 +0200 (CEST) Received: from pps.filterd (m0045851.ppops.net [127.0.0.1]) by mx0b-0016f401.pphosted.com (8.17.1.5/8.17.1.5) with ESMTP id 24DCSlDo010438; Fri, 13 May 2022 09:07:30 -0700 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=marvell.com; h=from : to : cc : subject : date : message-id : in-reply-to : references : mime-version : content-transfer-encoding : content-type; s=pfpt0220; bh=vdwbTYvh9kr0SaHIdIQF68T46d6LL2z85RBo4IRxvYY=; b=MszR8lqb9br8Lz+E6kWTTccQ9E/bnSV25ZFkuYqzDSapwNlXCNjfofgymjQu1u6Cfowa LePIiySDBp6DIqupfLdbYKwFaS2EsbpWxh5ugt6R0cmtzKzTRG8rLsKGIMmPZotJEJ0p oMQ1SV+DwDJ+rbWKMLdGyUP9KzP76SInE8TwuIuCQtuklXJLQGEuammqrwhzAzcu6h2w ko3ScRe3ufTMGxxFoSsUMZ/evFixhApIJZFUkRdc05+iGp1/TMkThLpljFbal5YQHs4X UwcKyO7CjtaUc6tJXQPARBebb0WfViaVTucm9NYjAJNZSkisETjrMzf0a2kRC5UVPDKs 2g== Received: from dc5-exch02.marvell.com ([199.233.59.182]) by mx0b-0016f401.pphosted.com (PPS) with ESMTPS id 3g1c37b43a-1 (version=TLSv1.2 cipher=ECDHE-RSA-AES256-SHA384 bits=256 verify=NOT); Fri, 13 May 2022 09:07:30 -0700 Received: from DC5-EXCH01.marvell.com (10.69.176.38) by DC5-EXCH02.marvell.com (10.69.176.39) with Microsoft SMTP Server (TLS) id 15.0.1497.18; Fri, 13 May 2022 09:07:27 -0700 Received: from maili.marvell.com (10.69.176.80) by DC5-EXCH01.marvell.com (10.69.176.38) with Microsoft SMTP Server id 15.0.1497.2 via Frontend Transport; Fri, 13 May 2022 09:07:27 -0700 Received: from MININT-80QBFE8.corp.innovium.com (unknown [10.193.70.72]) by maili.marvell.com (Postfix) with ESMTP id A8A713F706F; Fri, 13 May 2022 09:07:25 -0700 (PDT) From: To: , Harry van Haaren CC: , Pavan Nikhilesh Subject: [PATCH v2 3/6] examples/eventdev: clean up worker state before exit Date: Fri, 13 May 2022 21:37:16 +0530 Message-ID: <20220513160719.10558-3-pbhagavatula@marvell.com> X-Mailer: git-send-email 2.25.1 In-Reply-To: <20220513160719.10558-1-pbhagavatula@marvell.com> References: <20220426211412.6138-1-pbhagavatula@marvell.com> <20220513160719.10558-1-pbhagavatula@marvell.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Content-Type: text/plain X-Proofpoint-GUID: drpvUOa0_GrYp7aM8VBQI1yidsqeSUtj X-Proofpoint-ORIG-GUID: drpvUOa0_GrYp7aM8VBQI1yidsqeSUtj X-Proofpoint-Virus-Version: vendor=baseguard engine=ICAP:2.0.205,Aquarius:18.0.858,Hydra:6.0.486,FMLib:17.11.64.514 definitions=2022-05-13_08,2022-05-13_01,2022-02-23_01 X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org From: Pavan Nikhilesh Event ports are configured to implicitly release the scheduler contexts currently held in the next call to rte_event_dequeue_burst(). A worker core might still hold a scheduling context during exit, as the next call to rte_event_dequeue_burst() is never made. This might lead to deadlock based on the worker exit timing and when there are very less number of flows. Add clean up function to release any scheduling contexts held by the worker by using RTE_EVENT_OP_RELEASE. Signed-off-by: Pavan Nikhilesh --- examples/eventdev_pipeline/pipeline_common.h | 22 ++++++ .../pipeline_worker_generic.c | 23 +++--- .../eventdev_pipeline/pipeline_worker_tx.c | 79 ++++++++++++------- 3 files changed, 87 insertions(+), 37 deletions(-) diff --git a/examples/eventdev_pipeline/pipeline_common.h b/examples/eventdev_pipeline/pipeline_common.h index b12eb281e1..9899b257b0 100644 --- a/examples/eventdev_pipeline/pipeline_common.h +++ b/examples/eventdev_pipeline/pipeline_common.h @@ -140,5 +140,27 @@ schedule_devices(unsigned int lcore_id) } } +static inline void +worker_cleanup(uint8_t dev_id, uint8_t port_id, struct rte_event events[], + uint16_t nb_enq, uint16_t nb_deq) +{ + int i; + + if (!(nb_deq - nb_enq)) + return; + + if (nb_deq) { + for (i = nb_enq; i < nb_deq; i++) { + if (events[i].op == RTE_EVENT_OP_RELEASE) + continue; + rte_pktmbuf_free(events[i].mbuf); + } + + for (i = 0; i < nb_deq; i++) + events[i].op = RTE_EVENT_OP_RELEASE; + rte_event_enqueue_burst(dev_id, port_id, events, nb_deq); + } +} + void set_worker_generic_setup_data(struct setup_data *caps, bool burst); void set_worker_tx_enq_setup_data(struct setup_data *caps, bool burst); diff --git a/examples/eventdev_pipeline/pipeline_worker_generic.c b/examples/eventdev_pipeline/pipeline_worker_generic.c index ce1e92d59e..c564c808e2 100644 --- a/examples/eventdev_pipeline/pipeline_worker_generic.c +++ b/examples/eventdev_pipeline/pipeline_worker_generic.c @@ -16,6 +16,7 @@ worker_generic(void *arg) uint8_t port_id = data->port_id; size_t sent = 0, received = 0; unsigned int lcore_id = rte_lcore_id(); + uint16_t nb_rx = 0, nb_tx = 0; while (!fdata->done) { @@ -27,8 +28,7 @@ worker_generic(void *arg) continue; } - const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id, - &ev, 1, 0); + nb_rx = rte_event_dequeue_burst(dev_id, port_id, &ev, 1, 0); if (nb_rx == 0) { rte_pause(); @@ -47,11 +47,14 @@ worker_generic(void *arg) work(); - while (rte_event_enqueue_burst(dev_id, port_id, &ev, 1) != 1) - rte_pause(); + do { + nb_tx = rte_event_enqueue_burst(dev_id, port_id, &ev, + 1); + } while (!nb_tx && !fdata->done); sent++; } + worker_cleanup(dev_id, port_id, &ev, nb_tx, nb_rx); if (!cdata.quiet) printf(" worker %u thread done. RX=%zu TX=%zu\n", rte_lcore_id(), received, sent); @@ -69,10 +72,9 @@ worker_generic_burst(void *arg) uint8_t port_id = data->port_id; size_t sent = 0, received = 0; unsigned int lcore_id = rte_lcore_id(); + uint16_t i, nb_rx = 0, nb_tx = 0; while (!fdata->done) { - uint16_t i; - if (fdata->cap.scheduler) fdata->cap.scheduler(lcore_id); @@ -81,8 +83,8 @@ worker_generic_burst(void *arg) continue; } - const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id, - events, RTE_DIM(events), 0); + nb_rx = rte_event_dequeue_burst(dev_id, port_id, events, + RTE_DIM(events), 0); if (nb_rx == 0) { rte_pause(); @@ -103,8 +105,7 @@ worker_generic_burst(void *arg) work(); } - uint16_t nb_tx = rte_event_enqueue_burst(dev_id, port_id, - events, nb_rx); + nb_tx = rte_event_enqueue_burst(dev_id, port_id, events, nb_rx); while (nb_tx < nb_rx && !fdata->done) nb_tx += rte_event_enqueue_burst(dev_id, port_id, events + nb_tx, @@ -112,6 +113,8 @@ worker_generic_burst(void *arg) sent += nb_tx; } + worker_cleanup(dev_id, port_id, events, nb_tx, nb_rx); + if (!cdata.quiet) printf(" worker %u thread done. RX=%zu TX=%zu\n", rte_lcore_id(), received, sent); diff --git a/examples/eventdev_pipeline/pipeline_worker_tx.c b/examples/eventdev_pipeline/pipeline_worker_tx.c index 473940f8c7..a82e064c1c 100644 --- a/examples/eventdev_pipeline/pipeline_worker_tx.c +++ b/examples/eventdev_pipeline/pipeline_worker_tx.c @@ -18,21 +18,22 @@ static __rte_always_inline void worker_event_enqueue(const uint8_t dev, const uint8_t port, struct rte_event *ev) { - while (rte_event_enqueue_burst(dev, port, ev, 1) != 1) + while (!rte_event_enqueue_burst(dev, port, ev, 1) && !fdata->done) rte_pause(); } -static __rte_always_inline void +static __rte_always_inline uint16_t worker_event_enqueue_burst(const uint8_t dev, const uint8_t port, - struct rte_event *ev, const uint16_t nb_rx) + struct rte_event *ev, const uint16_t nb_rx) { uint16_t enq; enq = rte_event_enqueue_burst(dev, port, ev, nb_rx); - while (enq < nb_rx) { + while (enq < nb_rx && !fdata->done) enq += rte_event_enqueue_burst(dev, port, ev + enq, nb_rx - enq); - } + + return enq; } static __rte_always_inline void @@ -40,7 +41,8 @@ worker_tx_pkt(const uint8_t dev, const uint8_t port, struct rte_event *ev) { exchange_mac(ev->mbuf); rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0); - while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0)) + while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0) && + !fdata->done) rte_pause(); } @@ -76,6 +78,11 @@ worker_do_tx_single(void *arg) } } + if (ev.u64) { + ev.op = RTE_EVENT_OP_RELEASE; + rte_event_enqueue_burst(dev, port, &ev, 1); + } + if (!cdata.quiet) printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", rte_lcore_id(), received, fwd, tx); @@ -111,6 +118,11 @@ worker_do_tx_single_atq(void *arg) } } + if (ev.u64) { + ev.op = RTE_EVENT_OP_RELEASE; + rte_event_enqueue_burst(dev, port, &ev, 1); + } + if (!cdata.quiet) printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", rte_lcore_id(), received, fwd, tx); @@ -126,11 +138,10 @@ worker_do_tx_single_burst(void *arg) const uint8_t dev = data->dev_id; const uint8_t port = data->port_id; size_t fwd = 0, received = 0, tx = 0; + uint16_t nb_tx = 0, nb_rx = 0, i; while (!fdata->done) { - uint16_t i; - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, - BATCH_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0); if (!nb_rx) { rte_pause(); @@ -153,10 +164,12 @@ worker_do_tx_single_burst(void *arg) work(); } - worker_event_enqueue_burst(dev, port, ev, nb_rx); - fwd += nb_rx; + nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx); + fwd += nb_tx; } + worker_cleanup(dev, port, ev, nb_tx, nb_rx); + if (!cdata.quiet) printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", rte_lcore_id(), received, fwd, tx); @@ -172,11 +185,10 @@ worker_do_tx_single_burst_atq(void *arg) const uint8_t dev = data->dev_id; const uint8_t port = data->port_id; size_t fwd = 0, received = 0, tx = 0; + uint16_t i, nb_rx = 0, nb_tx = 0; while (!fdata->done) { - uint16_t i; - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, - BATCH_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0); if (!nb_rx) { rte_pause(); @@ -197,10 +209,12 @@ worker_do_tx_single_burst_atq(void *arg) work(); } - worker_event_enqueue_burst(dev, port, ev, nb_rx); - fwd += nb_rx; + nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx); + fwd += nb_tx; } + worker_cleanup(dev, port, ev, nb_tx, nb_rx); + if (!cdata.quiet) printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", rte_lcore_id(), received, fwd, tx); @@ -251,6 +265,11 @@ worker_do_tx(void *arg) fwd++; } + if (ev.u64) { + ev.op = RTE_EVENT_OP_RELEASE; + rte_event_enqueue_burst(dev, port, &ev, 1); + } + if (!cdata.quiet) printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", rte_lcore_id(), received, fwd, tx); @@ -297,6 +316,11 @@ worker_do_tx_atq(void *arg) fwd++; } + if (ev.u64) { + ev.op = RTE_EVENT_OP_RELEASE; + rte_event_enqueue_burst(dev, port, &ev, 1); + } + if (!cdata.quiet) printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", rte_lcore_id(), received, fwd, tx); @@ -314,11 +338,10 @@ worker_do_tx_burst(void *arg) uint8_t port = data->port_id; uint8_t lst_qid = cdata.num_stages - 1; size_t fwd = 0, received = 0, tx = 0; + uint16_t i, nb_rx = 0, nb_tx = 0; while (!fdata->done) { - uint16_t i; - const uint16_t nb_rx = rte_event_dequeue_burst(dev, port, - ev, BATCH_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0); if (nb_rx == 0) { rte_pause(); @@ -347,11 +370,13 @@ worker_do_tx_burst(void *arg) } work(); } - worker_event_enqueue_burst(dev, port, ev, nb_rx); - fwd += nb_rx; + nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx); + fwd += nb_tx; } + worker_cleanup(dev, port, ev, nb_tx, nb_rx); + if (!cdata.quiet) printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", rte_lcore_id(), received, fwd, tx); @@ -369,12 +394,10 @@ worker_do_tx_burst_atq(void *arg) uint8_t port = data->port_id; uint8_t lst_qid = cdata.num_stages - 1; size_t fwd = 0, received = 0, tx = 0; + uint16_t i, nb_rx = 0, nb_tx = 0; while (!fdata->done) { - uint16_t i; - - const uint16_t nb_rx = rte_event_dequeue_burst(dev, port, - ev, BATCH_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0); if (nb_rx == 0) { rte_pause(); @@ -402,10 +425,12 @@ worker_do_tx_burst_atq(void *arg) work(); } - worker_event_enqueue_burst(dev, port, ev, nb_rx); - fwd += nb_rx; + nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx); + fwd += nb_tx; } + worker_cleanup(dev, port, ev, nb_tx, nb_rx); + if (!cdata.quiet) printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", rte_lcore_id(), received, fwd, tx); -- 2.25.1