From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <Pavan.Bhagavatula@cavium.com>
Received: from NAM02-BL2-obe.outbound.protection.outlook.com
 (mail-bl2nam02on0089.outbound.protection.outlook.com [104.47.38.89])
 by dpdk.org (Postfix) with ESMTP id D21F71B294
 for <dev@dpdk.org>; Wed, 10 Jan 2018 15:52:47 +0100 (CET)
DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed;
 d=CAVIUMNETWORKS.onmicrosoft.com; s=selector1-cavium-com;
 h=From:Date:Subject:Message-ID:Content-Type:MIME-Version;
 bh=MssOJ+8xVGSbpz9SU0mZL/a5HHtvnsEt2FIFoNUDg+A=;
 b=Q4K8BQILxct6rZ87c3dfi4gK+fSz1giiQKfXGW75JrTRbAWBNEPys2sbxZm11x4tOAbWt0DHN5Iwf5mgeX//xwpV9qTJ9LQU6axJ7W9552qA7pbOj3eM2X6ULEp0JQp+s/+fL763uLDHsG4kJ3iolPns5vxjexIoCv2TzLLQ6yc=
Authentication-Results: spf=none (sender IP is )
 smtp.mailfrom=Pavan.Bhagavatula@cavium.com; 
Received: from Pavan-LT.caveonetworks.com (111.93.218.67) by
 DM5PR07MB3466.namprd07.prod.outlook.com (10.164.153.21) with Microsoft SMTP
 Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384_P256) id
 15.20.386.5; Wed, 10 Jan 2018 14:52:42 +0000
From: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
To: jerin.jacob@caviumnetworks.com, santosh.shukla@caviumnetworks.com,
 harry.van.haaren@intel.com, gage.eads@intel.com, hemant.agrawal@nxp.com,
 nipun.gupta@nxp.com, liang.j.ma@intel.com
Cc: dev@dpdk.org,
	Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
Date: Wed, 10 Jan 2018 20:21:41 +0530
Message-Id: <20180110145144.28403-9-pbhagavatula@caviumnetworks.com>
X-Mailer: git-send-email 2.14.1
In-Reply-To: <20180110145144.28403-1-pbhagavatula@caviumnetworks.com>
References: <20171130072406.15605-1-pbhagavatula@caviumnetworks.com>
 <20180110145144.28403-1-pbhagavatula@caviumnetworks.com>
MIME-Version: 1.0
Content-Type: text/plain
X-Originating-IP: [111.93.218.67]
X-ClientProxiedBy: BN6PR16CA0009.namprd16.prod.outlook.com (10.172.212.147) To
 DM5PR07MB3466.namprd07.prod.outlook.com (10.164.153.21)
X-MS-PublicTrafficType: Email
X-MS-Office365-Filtering-Correlation-Id: 652a608e-9ddb-4691-a9c4-08d55839cf7d
X-Microsoft-Antispam: UriScan:; BCL:0; PCL:0;
 RULEID:(4534020)(4602075)(4627115)(201703031133081)(201702281549075)(5600026)(4604075)(2017052603307)(7153060)(7193020);
 SRVR:DM5PR07MB3466; 
X-Microsoft-Exchange-Diagnostics: 1; DM5PR07MB3466;
 3:uAq9WBJV8qo7h/t9D405yvWd3RFWo3mk8ZcyJRehnqzxCLp6JgzatcsUvNi90Hp2abbv9txLviHZOBZV+R8xkijQWTeebhzWCl4+RRfjtr2HlNayitv+0Wvsp0WehZgo4bB4MwFKMZHnDKMkpZNj9Nwma8ulLqfw9myT0583OPglFErD4KzbHgQR7Fpq1Ca0eLZvDndTR6mKvWMndEL/gDsm1WqfMFoiJ8adegFEaGJuraOJ3XTzGfbW7nKwe8jH;
 25:1ZRDMg7PEJhQM3X3phWVvRf/XrmCXhUqIiK0ursxMOC7YRRBNuLfA6WO9IlbvNQIphjg7GasYQst5eHHbZwJMQXehQhw9v4W5zk50lNXT0sIDlSiIC4BJJI+c6X0/33QdqqVRCQOH9FPEthfJQqmbAgr53hg716eZuT97g7YyEOZc1/fFWm2doIFd/VcWhsdRnLZLbRtUX7PfZYu7ps4azMHEqCgSLHA7w9o2/2OB35xob2Ls+3lS+8CHi+k1V53vG7wQ6oSbqevnVsW80U07njYdJ6NvsmsaPt+O6w/MXr2ePXQhgngLO7pKgiZ8ZqEH4OcAtI3Hg9zKKBwL7TDrQ==;
 31:C27r/Mm4eyKLr9QBdy88dceg6j8ao6Q+VCoqDTStvVSssdjZ59Aszm5ZL+2iHHFLZ6UqE5gJao/qTAqhDPrvoVKSvTP+jHkydHxNBwPN3S5NlwXmUlXrCkSrUJzaBq5CggGX1xNtLqCz2QCdZ8oeEzyABs7gXEVld0U2Djt0az+ogIkYWLyU8Qk5tcyZrTMsLxeZzAn6mLey1357Bd5+s+UCJvh/6heE0lkvM4GvGds=
X-MS-TrafficTypeDiagnostic: DM5PR07MB3466:
X-Microsoft-Exchange-Diagnostics: 1; DM5PR07MB3466;
 20:9AGHg9PU1LHcx7RepXmRVAszK4Je0cVhkMmqaWRHXh8K0r/OfHladZKGT0GezwfrxOp/WtYRdatEYQj10/c43YpgKe6FBsUfMOVvRPuTU+ZO1rN9Pthvu9xmP7jc7nib+G9gz2lnT6Dej/xMbaUZIFefS6JiqfhOElTGLgkVGzDJm9uuM2JQx2ztXo44RgpHR39+Lr6SlVAzZA2LkXjBro4uAKf9k+16vV8OnDU36q5sU4Ix5Jr5AaQL8BaACDBMm8FgpQ7vOcqVywvt9EvhVDcRclKk1Tm/kSm9WHtVNKCbr9rKTY/YYoVxfDlWjkageuuC8l86Dlg4yN6npk5/H0YFgYQgtGX0iNiKiGYUmLEwprAiJiq2R3T/TcCrpRpgi5qcwvbMJdOqF9K2mtJBYf5OqIn5idf1N3wzMlcLuvv/ET9i47CN3jKhp4dBH4t2Q+3OFh+q7PZ3d9TejABZ5jAhf7BF3K7soFxv7iQeWnejmGbFUTZXvf+3/HtFdSdJWJG5h6j1/1wjR8rhearJGt57+AtSlFmpk1mx93sSo2UpzBLt+REZ4dwvEZeNdypBLLnMsvrRGV6m0h9GG+YsqkVZVhv/csIF1vkYwkYd8sI=;
 4:fiQkvOzLqMk6JOpCLbqpcCZwo0okR1s1DIiGMEFpjfdGcIV3HUYbV+8Ji4iXQ6TWnJDDthgRPiAcTvPxFtdOVXOLGTIAQ28bIZ4LN+d0j3SiR4auwrhDzIHoXrW6EP7764rKVO+nDKMebjgKSaRl26K3/H7Cf8exRNKPcszcDTQwl0cfVO9nBG5uRdWtXiAroFRIOOaX2FkdP4+QcEYqESYf6+YYPfK3TbVrTrwnMUb+2Ov545PhW0qNOXg69T6IfsClklqsBQ7bzy1xVagKIw==
X-Microsoft-Antispam-PRVS: <DM5PR07MB3466175E5C9B4F0522BF30D480110@DM5PR07MB3466.namprd07.prod.outlook.com>
X-Exchange-Antispam-Report-Test: UriScan:;
X-Exchange-Antispam-Report-CFA-Test: BCL:0; PCL:0;
 RULEID:(6040470)(2401047)(5005006)(8121501046)(10201501046)(3002001)(93006095)(3231023)(944501075)(6041268)(20161123560045)(20161123564045)(20161123558120)(201703131423095)(201702281528075)(20161123555045)(201703061421075)(201703061406153)(20161123562045)(6072148)(201708071742011);
 SRVR:DM5PR07MB3466; BCL:0; PCL:0; RULEID:(100000803101)(100110400095);
 SRVR:DM5PR07MB3466; 
X-Forefront-PRVS: 0548586081
X-Forefront-Antispam-Report: SFV:NSPM;
 SFS:(10009020)(39860400002)(346002)(366004)(376002)(396003)(39380400002)(199004)(189003)(76176011)(25786009)(107886003)(4326008)(2950100002)(106356001)(42882006)(8656006)(8676002)(59450400001)(105586002)(53936002)(6486002)(52116002)(53416004)(6512007)(50466002)(47776003)(5009440100003)(305945005)(68736007)(51416003)(386003)(6506007)(69596002)(36756003)(72206003)(2906002)(16586007)(50226002)(316002)(478600001)(7736002)(48376002)(3846002)(1076002)(6116002)(5660300001)(66066001)(16526018)(81166006)(97736004)(81156014)(8936002)(42262002);
 DIR:OUT; SFP:1101; SCL:1; SRVR:DM5PR07MB3466; H:Pavan-LT.caveonetworks.com;
 FPR:; SPF:None; PTR:InfoNoRecords; A:1; MX:1; LANG:en; 
Received-SPF: None (protection.outlook.com: cavium.com does not designate
 permitted sender hosts)
X-Microsoft-Exchange-Diagnostics: =?us-ascii?Q?1; DM5PR07MB3466;
 23:7BG6IiYiPO6WC3De51vjszdTzomf0gMp1Ibi+XuRU?=
 =?us-ascii?Q?H93brFX79MRN+kDPIBCvOadPmUk1TMkcf7JJKPAh45plXULvFFH7G9ORfA1P?=
 =?us-ascii?Q?WQIS1Ao2z6BJIkfy12gCw9nVI3VKKFnWpqSBx8QnrcRc/DH0mCaYkYIL5F2m?=
 =?us-ascii?Q?XVe5Bm5BXTQkgx8f/lzbrL0/AifLpvc87fBbbgywTFYIPgybfybT6ADaUI+s?=
 =?us-ascii?Q?0PsSTFy1L8WEHdO4Vd5iCLKs82ZZmtCO/XmWLPu4nYwLCz3IyIqZg+wHW/8Z?=
 =?us-ascii?Q?oLG9yKvh5tPJmKZoAip7Ix8Wy0gXs3hgUziLeXaBJACOrBqQvei3PMT3Y9Ut?=
 =?us-ascii?Q?0cDEGLgPS9oxUvVzr3k9r5H32xX255jp4a252VPhQY/n4sWqVA6eLclaz8nv?=
 =?us-ascii?Q?MPu+3FLN+rxfPv+j3y1sTqpXl9YO/OK9O2uPvmlAUAyWMtA85cHUybeSQSty?=
 =?us-ascii?Q?vsZPFH1LLb2WMm2O3qY9y8ZE06XgodVXRgAo2c76O2Iy+ljFTn5vuwN1Wjgt?=
 =?us-ascii?Q?FdE8RpNUjEbA5dMlUn4TsxxelBgMF95Ifj9O3L0xiLmSgTWAuGopnsiBNVj2?=
 =?us-ascii?Q?na+fI4N9+xZTzU6sphAr1qo5KX3kzZGU5JU3h1kuEONakSuWv0NdD4z0TPwO?=
 =?us-ascii?Q?AXe71BuoyoslYJjQ5CDg4ExDeaamf+BUB2pT0fuzIOaVw9oQxh6bTBpFUXxf?=
 =?us-ascii?Q?Mk+jEFay8i/zgdB+jDuZVMGNt71vswY3xgTOLA+tcY60Wv8WuwsqcqqmwNWw?=
 =?us-ascii?Q?XDzDwo2RP09Jznjj9d6POqxOs1KOczeB4lZtywI7fleRPNG59h19nYlGr7oe?=
 =?us-ascii?Q?cDCY8bR8O0RoqtsAFRvWuuKEbqDN2J0u/UBwRqsOBRCxUaWDZDd3bXFA/6mp?=
 =?us-ascii?Q?oOCHDQ7jQegm8X9DPlL9OT0Oj+VpOx6aQvUSiyLqN4iPpb5pIewvm+scNpXG?=
 =?us-ascii?Q?ki0cy36AqxHTD2vR2/eS/ibWBCTSQ+wIyL2Di867/NA5LncUvGIn4ogbP0FN?=
 =?us-ascii?Q?5D1aj4V4fKBpw0w1pY2ZGrH16tGiFLNw747kmwEwARmCtFh1B0EGfI60nT2z?=
 =?us-ascii?Q?KnzNSNYo2P9LR2q+7LBEtXNal3uEYn0ijwC7mswk37PYDR+ZT38zFF9+B9ir?=
 =?us-ascii?Q?PtUggd/tYp8cLuFLTOdKtcbkMq7BYpWWSGO/+74NSVTZUhfHhST2OoA3C7oA?=
 =?us-ascii?Q?6jHSU2fNyTqoy1kalbPB3qknuqEdkED3lhKNk6PCzwPhMx/l3E5fLxvPg=3D?=
 =?us-ascii?Q?=3D?=
X-Microsoft-Exchange-Diagnostics: 1; DM5PR07MB3466;
 6:lMVo4kGofIeSi9aL+9xRqy8741nYoJ+jhUZ+oYrkbEh/J9Wl8veoOUVTipoiH/WfKYCET3NEVXqYXG9EZ6rd8LjYs27irUThwa4AuLD5pW9nINW/EGfOOzvyAT9PXs69pNrsDzdsT+BCG49xrzROdIXCGeN3/c/whnD2xil4bEIyz1373y6kdmxMUG4IIitjSErkmAkTJEKkBBA0Z1j17jM2S/iHT6MNn1F2RF+HMPKBV/t1nByRLasvCAAFGs4uywHvKC4jgx4H9zlVgOReHsgYlfx1oeOSgu0Juqv+1aqAK6dJVg07b+BzT4VuHneY8DVImgFKqw8kuoJ81jw/p06HwKdDjgAaGOV3kByVMxo=;
 5:EqIxto8hbU69iuKs6A4eJeKXEg+X90PTXSr1tTVk59mavbBaFGeNqOJtwQluGvNnCfzJN/y7cqAUFJf6DaSEur4pi4regc1pol3enllxw7RJ4uWNQWR61lHuQnU4FLsf2nbKiYw3uu5tO9CZFT/dVzPw9pLOC8J1JyHcHKpBRro=;
 24:VwUP9uTLIYRAR+NJhMfHKSqwwB04KBoxFpixcDV+lZEHmELudDfoRgOfTUtHI2ISXoNIh6N82LJiyKvXBBs8ngybMoZxHrpbCdzyGfgcCNo=;
 7:BeNt86hEyVbxIvGWQFhym/7aCIdtLrrmmI81qqlod4M/Aj/GTqka1zjiTtiR2U5s5Q6ERfQaKAf5R/RFo9RcLS2D3tLHNIhCmk3Lh+rSeImqunCvNBACwG1d/UzVP+yNoX4RQ6HW/fYQao1AXkfI8VR9dxsf+81qrGHQDj32jRnTsWnFKOyWnDCavXys4nG3K8pPrcFiNn9GZMkRb3YoRv30V6ULNXU+/J8Al/PvHZrdTJA2j8xv025ZwSc/R9FE
SpamDiagnosticOutput: 1:99
SpamDiagnosticMetadata: NSPM
X-OriginatorOrg: caviumnetworks.com
X-MS-Exchange-CrossTenant-OriginalArrivalTime: 10 Jan 2018 14:52:42.4823 (UTC)
X-MS-Exchange-CrossTenant-Network-Message-Id: 652a608e-9ddb-4691-a9c4-08d55839cf7d
X-MS-Exchange-CrossTenant-FromEntityHeader: Hosted
X-MS-Exchange-CrossTenant-Id: 711e4ccf-2e9b-4bcf-a551-4094005b6194
X-MS-Exchange-Transport-CrossTenantHeadersStamped: DM5PR07MB3466
Subject: [dpdk-dev] [PATCH v3 09/12] app/eventdev: add pipeline queue worker
	functions
X-BeenThere: dev@dpdk.org
X-Mailman-Version: 2.1.15
Precedence: list
List-Id: DPDK patches and discussions <dev.dpdk.org>
List-Unsubscribe: <https://dpdk.org/ml/options/dev>,
 <mailto:dev-request@dpdk.org?subject=unsubscribe>
List-Archive: <http://dpdk.org/ml/archives/dev/>
List-Post: <mailto:dev@dpdk.org>
List-Help: <mailto:dev-request@dpdk.org?subject=help>
List-Subscribe: <https://dpdk.org/ml/listinfo/dev>,
 <mailto:dev-request@dpdk.org?subject=subscribe>
X-List-Received-Date: Wed, 10 Jan 2018 14:52:48 -0000

Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
---
 app/test-eventdev/test_pipeline_common.h |  80 +++++++
 app/test-eventdev/test_pipeline_queue.c  | 367 ++++++++++++++++++++++++++++++-
 2 files changed, 446 insertions(+), 1 deletion(-)

diff --git a/app/test-eventdev/test_pipeline_common.h b/app/test-eventdev/test_pipeline_common.h
index 481cb133b..b50e20adc 100644
--- a/app/test-eventdev/test_pipeline_common.h
+++ b/app/test-eventdev/test_pipeline_common.h
@@ -51,6 +51,86 @@ struct test_pipeline {
 	uint8_t sched_type_list[EVT_MAX_STAGES] __rte_cache_aligned;
 } __rte_cache_aligned;
 
+#define BURST_SIZE 16
+
+static __rte_always_inline void
+pipeline_fwd_event(struct rte_event *ev, uint8_t sched)
+{
+	ev->event_type = RTE_EVENT_TYPE_CPU;
+	ev->op = RTE_EVENT_OP_FORWARD;
+	ev->sched_type = sched;
+}
+
+static __rte_always_inline void
+pipeline_event_enqueue(const uint8_t dev, const uint8_t port,
+		struct rte_event *ev)
+{
+	while (rte_event_enqueue_burst(dev, port, ev, 1) != 1)
+		rte_pause();
+}
+
+static __rte_always_inline void
+pipeline_event_enqueue_burst(const uint8_t dev, const uint8_t port,
+		struct rte_event *ev, const uint16_t nb_rx)
+{
+	uint16_t enq;
+
+	enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
+	while (enq < nb_rx) {
+		enq += rte_event_enqueue_burst(dev, port,
+						ev + enq, nb_rx - enq);
+	}
+}
+
+static __rte_always_inline void
+pipeline_tx_pkt_safe(struct rte_mbuf *mbuf)
+{
+	while (rte_eth_tx_burst(mbuf->port, 0, &mbuf, 1) != 1)
+		rte_pause();
+}
+
+static __rte_always_inline void
+pipeline_tx_pkt_unsafe(struct rte_mbuf *mbuf, struct test_pipeline *t)
+{
+	rte_spinlock_t *lk = &t->tx_lk[mbuf->port];
+
+	rte_spinlock_lock(lk);
+	pipeline_tx_pkt_safe(mbuf);
+	rte_spinlock_unlock(lk);
+}
+
+static __rte_always_inline void
+pipeline_tx_unsafe_burst(struct rte_mbuf *mbuf, struct test_pipeline *t)
+{
+	uint16_t port = mbuf->port;
+	rte_spinlock_t *lk = &t->tx_lk[port];
+
+	rte_spinlock_lock(lk);
+	rte_eth_tx_buffer(port, 0, t->tx_buf[port], mbuf);
+	rte_spinlock_unlock(lk);
+}
+
+static __rte_always_inline void
+pipeline_tx_flush(struct test_pipeline *t, const uint8_t nb_ports)
+{
+	int i;
+	rte_spinlock_t *lk;
+
+	for (i = 0; i < nb_ports; i++) {
+		lk = &t->tx_lk[i];
+
+		rte_spinlock_lock(lk);
+		rte_eth_tx_buffer_flush(i, 0, t->tx_buf[i]);
+		rte_spinlock_unlock(lk);
+	}
+}
+
+static inline int
+pipeline_nb_event_ports(struct evt_options *opt)
+{
+	return evt_nr_active_lcores(opt->wlcores);
+}
+
 int pipeline_test_result(struct evt_test *test, struct evt_options *opt);
 int pipeline_opt_check(struct evt_options *opt, uint64_t nb_queues);
 int pipeline_test_setup(struct evt_test *test, struct evt_options *opt);
diff --git a/app/test-eventdev/test_pipeline_queue.c b/app/test-eventdev/test_pipeline_queue.c
index 4b50e7b54..bc3f3dc18 100644
--- a/app/test-eventdev/test_pipeline_queue.c
+++ b/app/test-eventdev/test_pipeline_queue.c
@@ -15,10 +15,375 @@ pipeline_queue_nb_event_queues(struct evt_options *opt)
 	return (eth_count * opt->nb_stages) + eth_count;
 }
 
+static int
+pipeline_queue_worker_single_stage_safe(void *arg)
+{
+	struct worker_data *w  = arg;
+	struct test_pipeline *t = w->t;
+	const uint8_t dev = w->dev_id;
+	const uint8_t port = w->port_id;
+	struct rte_event ev;
+
+	while (t->done == false) {
+		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+		if (!event) {
+			rte_pause();
+			continue;
+		}
+
+		if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
+			pipeline_tx_pkt_safe(ev.mbuf);
+			w->processed_pkts++;
+		} else {
+			ev.queue_id++;
+			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+			pipeline_event_enqueue(dev, port, &ev);
+		}
+	}
+
+	return 0;
+}
+
+static int
+pipeline_queue_worker_single_stage_unsafe(void *arg)
+{
+	struct worker_data *w  = arg;
+	struct test_pipeline *t = w->t;
+	const uint8_t dev = w->dev_id;
+	const uint8_t port = w->port_id;
+	struct rte_event ev;
+
+	while (t->done == false) {
+		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+		if (!event) {
+			rte_pause();
+			continue;
+		}
+
+		if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
+			pipeline_tx_pkt_unsafe(ev.mbuf, t);
+			w->processed_pkts++;
+		} else {
+			ev.queue_id++;
+			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+			pipeline_event_enqueue(dev, port, &ev);
+		}
+	}
+
+	return 0;
+}
+
+static int
+pipeline_queue_worker_single_stage_burst_safe(void *arg)
+{
+	int i;
+	struct worker_data *w  = arg;
+	struct test_pipeline *t = w->t;
+	const uint8_t dev = w->dev_id;
+	const uint8_t port = w->port_id;
+	struct rte_event ev[BURST_SIZE + 1];
+
+	while (t->done == false) {
+		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
+				BURST_SIZE, 0);
+
+		if (!nb_rx) {
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			rte_prefetch0(ev[i + 1].mbuf);
+			if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+
+				pipeline_tx_pkt_safe(ev[i].mbuf);
+				ev[i].op = RTE_EVENT_OP_RELEASE;
+				w->processed_pkts++;
+			} else {
+				ev[i].queue_id++;
+				pipeline_fwd_event(&ev[i],
+						RTE_SCHED_TYPE_ATOMIC);
+			}
+		}
+
+		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+	}
+
+	return 0;
+}
+
+static int
+pipeline_queue_worker_single_stage_burst_unsafe(void *arg)
+{
+	int i;
+	struct worker_data *w  = arg;
+	struct test_pipeline *t = w->t;
+	const uint8_t dev = w->dev_id;
+	const uint8_t port = w->port_id;
+	struct rte_event ev[BURST_SIZE + 1];
+	const uint16_t nb_ports = rte_eth_dev_count();
+
+	while (t->done == false) {
+		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
+				BURST_SIZE, 0);
+
+		if (!nb_rx) {
+			pipeline_tx_flush(t, nb_ports);
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			rte_prefetch0(ev[i + 1].mbuf);
+			if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+
+				pipeline_tx_unsafe_burst(ev[i].mbuf, t);
+				ev[i].op = RTE_EVENT_OP_RELEASE;
+				w->processed_pkts++;
+			} else {
+
+				ev[i].queue_id++;
+				pipeline_fwd_event(&ev[i],
+						RTE_SCHED_TYPE_ATOMIC);
+			}
+		}
+
+		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+	}
+
+	return 0;
+}
+
+
+static int
+pipeline_queue_worker_multi_stage_safe(void *arg)
+{
+	struct worker_data *w  = arg;
+	struct test_pipeline *t = w->t;
+	const uint8_t dev = w->dev_id;
+	const uint8_t port = w->port_id;
+	const uint8_t last_queue = t->opt->nb_stages - 1;
+	const uint8_t nb_stages = t->opt->nb_stages + 1;
+	uint8_t *const sched_type_list = &t->sched_type_list[0];
+	uint8_t cq_id;
+	struct rte_event ev;
+
+
+	while (t->done == false) {
+		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+		if (!event) {
+			rte_pause();
+			continue;
+		}
+
+		cq_id = ev.queue_id % nb_stages;
+
+		if (cq_id >= last_queue) {
+			if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
+
+				pipeline_tx_pkt_safe(ev.mbuf);
+				w->processed_pkts++;
+				continue;
+			}
+			ev.queue_id += (cq_id == last_queue) ? 1 : 0;
+			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+		} else {
+			ev.queue_id++;
+			pipeline_fwd_event(&ev, sched_type_list[cq_id]);
+		}
+
+		pipeline_event_enqueue(dev, port, &ev);
+	}
+	return 0;
+}
+
+static int
+pipeline_queue_worker_multi_stage_unsafe(void *arg)
+{
+	struct worker_data *w  = arg;
+	struct test_pipeline *t = w->t;
+	const uint8_t dev = w->dev_id;
+	const uint8_t port = w->port_id;
+	const uint8_t last_queue = t->opt->nb_stages - 1;
+	const uint8_t nb_stages = t->opt->nb_stages + 1;
+	uint8_t *const sched_type_list = &t->sched_type_list[0];
+	uint8_t cq_id;
+	struct rte_event ev;
+
+
+	while (t->done == false) {
+		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+		if (!event) {
+			rte_pause();
+			continue;
+		}
+
+		cq_id = ev.queue_id % nb_stages;
+
+		if (cq_id >= last_queue) {
+			if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
+
+				pipeline_tx_pkt_unsafe(ev.mbuf, t);
+				w->processed_pkts++;
+				continue;
+			}
+			ev.queue_id += (cq_id == last_queue) ? 1 : 0;
+			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+		} else {
+			ev.queue_id++;
+			pipeline_fwd_event(&ev, sched_type_list[cq_id]);
+		}
+
+		pipeline_event_enqueue(dev, port, &ev);
+	}
+	return 0;
+}
+
+static int
+pipeline_queue_worker_multi_stage_burst_safe(void *arg)
+{
+	int i;
+	struct worker_data *w  = arg;
+	struct test_pipeline *t = w->t;
+	const uint8_t dev = w->dev_id;
+	const uint8_t port = w->port_id;
+	uint8_t *const sched_type_list = &t->sched_type_list[0];
+	const uint8_t last_queue = t->opt->nb_stages - 1;
+	const uint8_t nb_stages = t->opt->nb_stages + 1;
+	uint8_t cq_id;
+	struct rte_event ev[BURST_SIZE + 1];
+
+	while (t->done == false) {
+		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
+				BURST_SIZE, 0);
+
+		if (!nb_rx) {
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			rte_prefetch0(ev[i + 1].mbuf);
+			cq_id = ev[i].queue_id % nb_stages;
+
+			if (cq_id >= last_queue) {
+				if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+
+					pipeline_tx_pkt_safe(ev[i].mbuf);
+					ev[i].op = RTE_EVENT_OP_RELEASE;
+					w->processed_pkts++;
+					continue;
+				}
+
+				ev[i].queue_id += (cq_id == last_queue) ? 1 : 0;
+				pipeline_fwd_event(&ev[i],
+						RTE_SCHED_TYPE_ATOMIC);
+			} else {
+				ev[i].queue_id++;
+				pipeline_fwd_event(&ev[i],
+						sched_type_list[cq_id]);
+			}
+
+		}
+
+		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+	}
+	return 0;
+}
+
+static int
+pipeline_queue_worker_multi_stage_burst_unsafe(void *arg)
+{
+	int i;
+	struct worker_data *w  = arg;
+	struct test_pipeline *t = w->t;
+	const uint8_t dev = w->dev_id;
+	const uint8_t port = w->port_id;
+	uint8_t *const sched_type_list = &t->sched_type_list[0];
+	const uint8_t last_queue = t->opt->nb_stages - 1;
+	const uint8_t nb_stages = t->opt->nb_stages + 1;
+	uint8_t cq_id;
+	struct rte_event ev[BURST_SIZE + 1];
+	const uint16_t nb_ports = rte_eth_dev_count();
+
+	while (t->done == false) {
+		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
+				BURST_SIZE, 0);
+
+		if (!nb_rx) {
+			pipeline_tx_flush(t, nb_ports);
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			rte_prefetch0(ev[i + 1].mbuf);
+			cq_id = ev[i].queue_id % nb_stages;
+
+			if (cq_id >= last_queue) {
+				if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+
+					pipeline_tx_unsafe_burst(ev[i].mbuf, t);
+					ev[i].op = RTE_EVENT_OP_RELEASE;
+					w->processed_pkts++;
+					continue;
+				}
+
+				ev[i].queue_id += (cq_id == last_queue) ? 1 : 0;
+				pipeline_fwd_event(&ev[i],
+						RTE_SCHED_TYPE_ATOMIC);
+			} else {
+				ev[i].queue_id++;
+				pipeline_fwd_event(&ev[i],
+						sched_type_list[cq_id]);
+			}
+		}
+
+		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+	}
+	return 0;
+}
+
 static int
 worker_wrapper(void *arg)
 {
-	RTE_SET_USED(arg);
+	struct worker_data *w  = arg;
+	struct evt_options *opt = w->t->opt;
+	const bool burst = evt_has_burst_mode(w->dev_id);
+	const bool mt_safe = !w->t->mt_unsafe;
+	const uint8_t nb_stages = opt->nb_stages;
+	RTE_SET_USED(opt);
+
+	/* allow compiler to optimize */
+	if (nb_stages == 1) {
+		if (!burst && mt_safe)
+			return pipeline_queue_worker_single_stage_safe(arg);
+		else if (!burst && !mt_safe)
+			return pipeline_queue_worker_single_stage_unsafe(
+					arg);
+		else if (burst && mt_safe)
+			return pipeline_queue_worker_single_stage_burst_safe(
+					arg);
+		else if (burst && !mt_safe)
+			return pipeline_queue_worker_single_stage_burst_unsafe(
+					arg);
+	} else {
+		if (!burst && mt_safe)
+			return pipeline_queue_worker_multi_stage_safe(arg);
+		else if (!burst && !mt_safe)
+			return pipeline_queue_worker_multi_stage_unsafe(arg);
+		if (burst && mt_safe)
+			return pipeline_queue_worker_multi_stage_burst_safe(
+					arg);
+		else if (burst && !mt_safe)
+			return pipeline_queue_worker_multi_stage_burst_unsafe(
+					arg);
+
+	}
 	rte_panic("invalid worker\n");
 }
 
-- 
2.15.1