From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <Pavan.Bhagavatula@cavium.com>
Received: from NAM03-DM3-obe.outbound.protection.outlook.com
 (mail-dm3nam03on0060.outbound.protection.outlook.com [104.47.41.60])
 by dpdk.org (Postfix) with ESMTP id F30A31B2D9
 for <dev@dpdk.org>; Tue, 16 Jan 2018 16:18:45 +0100 (CET)
DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed;
 d=CAVIUMNETWORKS.onmicrosoft.com; s=selector1-cavium-com;
 h=From:Date:Subject:Message-ID:Content-Type:MIME-Version;
 bh=aGyjT+XMPlu6eTK4fXBHvbZhqUu6OKnPWsaeiKO2Rec=;
 b=AJR2ihI5LzIsI7hM7SgkviJi8x3j025q7MG3b7GA44aco/6A5OKBWGsMa+8HfzZHF4t434sNwWbQP3w13RwhvX/wDCIy+dkE+GH2OAeV0Ld8NSiYMML93dvvN+7FLJuwVnu+lOJQZyD4uPXl4GSr585j4x/nj007IuwTLGgw7qk=
Authentication-Results: spf=none (sender IP is )
 smtp.mailfrom=Pavan.Bhagavatula@cavium.com; 
Received: from Pavan-LT.caveonetworks.com (111.93.218.67) by
 MWHPR07MB3469.namprd07.prod.outlook.com (10.164.192.20) with Microsoft SMTP
 Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384_P256) id
 15.20.407.7; Tue, 16 Jan 2018 15:18:41 +0000
From: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
To: jerin.jacob@caviumnetworks.com, santosh.shukla@caviumnetworks.com,
 harry.van.haaren@intel.com, gage.eads@intel.com, hemant.agrawal@nxp.com,
 nipun.gupta@nxp.com, liang.j.ma@intel.com
Cc: dev@dpdk.org,
	Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
Date: Tue, 16 Jan 2018 20:47:25 +0530
Message-Id: <20180116151728.566-10-pbhagavatula@caviumnetworks.com>
X-Mailer: git-send-email 2.14.1
In-Reply-To: <20180116151728.566-1-pbhagavatula@caviumnetworks.com>
References: <20171130072406.15605-1-pbhagavatula@caviumnetworks.com>
 <20180116151728.566-1-pbhagavatula@caviumnetworks.com>
MIME-Version: 1.0
Content-Type: text/plain
X-Originating-IP: [111.93.218.67]
X-ClientProxiedBy: MWHPR21CA0059.namprd21.prod.outlook.com (10.172.93.149) To
 MWHPR07MB3469.namprd07.prod.outlook.com (10.164.192.20)
X-MS-PublicTrafficType: Email
X-MS-Office365-Filtering-Correlation-Id: c6568c59-6a57-4ecd-2f8a-08d55cf46e91
X-Microsoft-Antispam: UriScan:; BCL:0; PCL:0;
 RULEID:(7020095)(4652020)(5600026)(4604075)(2017052603307)(7153060)(7193020);
 SRVR:MWHPR07MB3469; 
X-Microsoft-Exchange-Diagnostics: 1; MWHPR07MB3469;
 3:nVsbAi0ZPKzCh+xTyoEzQAGbZRPbzQmAcC0pF2LWDc+sakz1gphv+2ZAlkSJu4y+N1Sycalmuyr4EDpIAkiMf6yass4yuA9W9jeSiUPVEvlk9z2/Ng3Rbf4DzIuP/ELGp5vQnZsxdHL+dgBi3jrasWsYVHByx+mo9bVwzKVqRI/CWfwPKv6UaTYTL8v0owfNFtmdxR9wCIUIasQY2VNgPZqiYNupnDoZrQS76cJ3Oo/VTJb0CAIu/0cd8/bBw1Sg;
 25:7ec6+38skIsbk+KRBFiNavB11HdEGD2bRXqZw6+/MAvxKRAmOOgK2vHJx9HEgSNzPZ4uX7PL9UqTIh2us3WPWyG27vkiukD35sNp9k3/1nT1vZGv21jxOaONreWSUJIN3e3ereLVdKFCuw7nL6gkkZ7q6e6PcqAYTsxbjmhh4WI4n4eOdcB8bh9JJ5WALuUBU8EqSzeE3PcoMAtW1ERgp/+NQHscYl920tFZSEQGNKlnEApp/LSyhDh27h2qYyZpVxELKFj3kdt1KkYNUETT8+Lrn982ED3WGYygm549fI1JKOvgmQPl2eNIWP9NbdujlT63ZgONa27Ie6Mu+ZP6nQ==;
 31:MhQF6koxMoj9O9eChv2wMis6yyjcf7amvlh3yuBuhMoVdkv5tqzSIfqx0r6tHud5WzwHlwqiuQrP35mUqoNCRYJ/5bMVLGVIH5c50L34Oa1IFIQbIzxa0fICIssRlNGKco9RRKF0m/+egIEXxRkfULlSygZ9wzhdpsvm5QS8fmmYroeP9yM+TGtBJQEOgXG1jBfF1pAqZdjQQXfuEXl56BiIkfSFW5X9L1CE2ExC28o=
X-MS-TrafficTypeDiagnostic: MWHPR07MB3469:
X-Microsoft-Exchange-Diagnostics: 1; MWHPR07MB3469;
 20:cziyzyzbYjs+Q6tbzMzTZjXXX92gXmumx32lPXi8CpjFNAORLdz0N2RFasgNe64EJdHSb34NsgYlxW55SzrlLoYANS+0xnlLvMMr4CSmMvme4jSzn+2pSLgvOxZac4Xu/4DS/XAqrgZ8fE8q0bFStR67i0WHFgJ9473vGRK8p78O2Vz7gK2THEVBsH8g7qFT49BhUNbXbYoCEf2JKB+5SyLb/2kMAqkZ9n6V9K4D/jp3i5ooCWxsu3Xd0sr4dCVRrsTHUyywBAK06DTRksJonu0vYC84EJfzLG+pwXq9+DwS9ROCuf52w/wjibxXVW0Q/soPSi1/+gd2/3lpaq2ZmkbYJdJFHT8L9j8V+cSDLV/p0yHPxpyF1W6Vtuix9x49bVL2d9baPybD+f0GgG2tYxF8CaPpEjCmjaol6oWlVf5uAbvpSDTeYOeOYh0znswgb14elWvvOlED0Mk+Qr/olZKmidB+eXMYEba7b2sktjWuQdP6N1e9DuN1qKydv4FVybOhAO7PnZAs9qhUhVERLxty8tk0QIE0SIeutkZeRrQhQcjpc57eR8V+6hAvHUDTZZTUmlTv2v70Z+UPxNA0eXJwbIJPIh4YNxvAZL67PR8=;
 4:JZHpXcM1i7KjE+gwmTlFifbxaqn9kYio9SBu1X0cf+KM0hd1iJwHAzTx6MQp3PqMKDZpJK7KK9M+Q/9j+xqhAXA7OGhbWdVToi0bsJkHbuJOKsr00Xpige9//90TaAy75iXu9T42UeRo6Sa7Xs7dbxkTfDFBbw+qguMEgLcQ8sonJEVkatcG5GL9Mgy/rkPeFZjJXROWLvlHifIfK82kpaZ6Zt89eKOAUscA3tM6kGBDNJSufSEf04AU1XqtiToIEVARL+oZhkNOwPYEohzrTw==
X-Microsoft-Antispam-PRVS: <MWHPR07MB346917B4C1B68DA03F3CE80D80EA0@MWHPR07MB3469.namprd07.prod.outlook.com>
X-Exchange-Antispam-Report-Test: UriScan:;
X-Exchange-Antispam-Report-CFA-Test: BCL:0; PCL:0;
 RULEID:(6040470)(2401047)(8121501046)(5005006)(3002001)(3231023)(944501161)(93006095)(10201501046)(6041268)(201703131423095)(201702281528075)(20161123555045)(201703061421075)(201703061406153)(20161123562045)(20161123564045)(20161123560045)(20161123558120)(6072148)(201708071742011);
 SRVR:MWHPR07MB3469; BCL:0; PCL:0; RULEID:(100000803101)(100110400095);
 SRVR:MWHPR07MB3469; 
X-Forefront-PRVS: 0554B1F54F
X-Forefront-Antispam-Report: SFV:NSPM;
 SFS:(10009020)(366004)(396003)(346002)(376002)(39380400002)(39860400002)(199004)(189003)(2950100002)(4326008)(42882006)(5009440100003)(6666003)(107886003)(25786009)(68736007)(16526018)(72206003)(478600001)(316002)(5660300001)(16586007)(26005)(50226002)(53936002)(7736002)(305945005)(36756003)(53416004)(6116002)(81166006)(8656006)(69596002)(8676002)(6512007)(81156014)(8936002)(97736004)(386003)(6506007)(50466002)(66066001)(76176011)(59450400001)(52116002)(51416003)(2906002)(47776003)(48376002)(106356001)(3846002)(1076002)(105586002)(6486002)(42262002);
 DIR:OUT; SFP:1101; SCL:1; SRVR:MWHPR07MB3469; H:Pavan-LT.caveonetworks.com;
 FPR:; SPF:None; PTR:InfoNoRecords; A:1; MX:1; LANG:en; 
Received-SPF: None (protection.outlook.com: cavium.com does not designate
 permitted sender hosts)
X-Microsoft-Exchange-Diagnostics: =?us-ascii?Q?1; MWHPR07MB3469;
 23:qJ+RzgCHUwVU76h8oirxsgPVbxoo4sDOJTtwr3qGm?=
 =?us-ascii?Q?zv/+UOTwRgidOz4AZR0Eu5+dELDZacgHLbUZxn3p9SefYVV9vkNB+3P25EfA?=
 =?us-ascii?Q?M9eukAai9JK4EA9OfrpIZnPUctsnU+3looYBbx7xlh7otPIKXhzMtvh7O8/A?=
 =?us-ascii?Q?Errh5wh4p9XqCFqerHOX3aFgMvWxWGfRGmcQltssQgnOxpvkehFj3tFBPyVQ?=
 =?us-ascii?Q?ufs2hf4/YHADClVw5Mdt+Tw+bbmP/yvKnisl4+B6vsk0OZ0hbMiRiFdt+rLN?=
 =?us-ascii?Q?X3vfKIBaQZexB+YC1jj6Fsk73+NsojtbmHgrFyldviQXveG66P15OLruyJ2w?=
 =?us-ascii?Q?GdTthcB+tLKcusgh+pv0hgYBCHscLfRBhOUuIAPjC/vZsBL1joskSeU8gXSC?=
 =?us-ascii?Q?uPeH7i5lW72/ofxsnuj60seuqHottYvpC3fXCuv1zN1K/z2StEyCKOihcDxY?=
 =?us-ascii?Q?+ImKkg1F8o8qicbz763+QJGBT3Z8H0bKSqU65k9Qmp+kAJWWp6jhAoN5/Kdo?=
 =?us-ascii?Q?Qcb8SiilBQlCb52xXv8n5ZL+26Jt9B7ajzfspUzKOATZz6+vrAlmEhD1bB3m?=
 =?us-ascii?Q?3yBTE0JhqO+o+o78mtXgXZHyY+EZZXCfGitZYhXJliU8u9pmG9qxVFqCUOs4?=
 =?us-ascii?Q?70kCY2+vaPkiMahcegftO+sNrYb0XV/u/9Tr8zWHjZMaG1+bIdnqxeffwa7T?=
 =?us-ascii?Q?QJCHAcGxoW86W7eqOhE2J++T9Z8P8zr1iWWHP0K9DtHjhGjkRXJxDXvkxjfc?=
 =?us-ascii?Q?F3loqP5joFn0WsrBX8Jt8UAW+Cx91LuAZmhI9bSbZSqmJEzBlyfywXUiEgYh?=
 =?us-ascii?Q?mui1sssjcOU09rJL7ObCSyijZVSHXCEsMzETr6pMzDf/0ffYKrvOpPI5o9ll?=
 =?us-ascii?Q?Jjac8+QxC4/fnM6LWdoHLYQ25sTBnL2Q8ucPKbX1zYbTuVniQMDptvgv9241?=
 =?us-ascii?Q?m3xKllaocdoI0jsD0d4+0SV0P27bSr7QsoJpadliGPKEdFlAd1xMn9k/THey?=
 =?us-ascii?Q?Kc+W6dZSteWuFp1d8kLnv1cJK/NRD7sI3AyDsox8kK42g6B0S9jWGqJ77QSj?=
 =?us-ascii?Q?1r3LCwlnk1QgJmqzLuJ9ZZyQTenZJZnGuDFuUtgQrJkRvEtFmIBF/cUgj6s9?=
 =?us-ascii?Q?S4s7zt01flPJKdg/SUOjg6bZ/R/CC+ayS9FvbYmEEl00I3jzpIk1KfsVgxdn?=
 =?us-ascii?Q?LqQGmk3NeJLtNI/QpRw5RXyfj9RFxWgGacMJJf4uJTFAG+i3+m2olPe2L9nT?=
 =?us-ascii?Q?ueou69kAvF207RjulIG+ljMgJ8QE8UCWXzo3ryt?=
X-Microsoft-Exchange-Diagnostics: 1; MWHPR07MB3469;
 6:j2pNi9A26F0PFnmTY6ECCiEToCctZNIF+Ow0Ep7TYN5Sj2wmCezyCVH/FACvrx09Ys1Ekz24v2297lhAu0jQ3dx/DjVGuna0dc93JXbh7QiJrwxFVoF4zHxGLl7vcB8X7WtClzJuDnJfiDD01J25lKM2RYHSG6tjQBw9Ff+m8iGkJkg5Y90v+eMxFGUvpcM+wU3DXEzre1cEQhtK9nqML7Gys8w2ezhgBXcfLB0pnRmdMGK6nEku4ENYugLc69dbNLFaQLkDN90fzptRqZg2RRCyAwaWVFOYzhJcvdA1x7sSq/MQfHrRNK1CAMSbWJIl5zVyUkLhPSIy8ngQuYgAHXV2ZgQ5gCNnjx+nljTBVFc=;
 5:rg4+DEEV3np9qaiuRy0jQpPrA40tF+Jc299GygGnVQHZOJFHaHWggd55cI3SbaEbeenxcOKphR1zFBTBkuAmoqE7V7xnHC4oX4hxZQPfRaK3S9ndLk5zEvFoGEbmrxuXXJOm0xU6+vYcnFGJ9RDGGfy3L6G9wOxzi4+GuyEZbv4=;
 24:dJuUWDmfejjYIF8v5ibEv6z6Z8hRKaHWTU5j8stiVpDv/77ISA5/ebWlW48syM4VnfVVvMFL7ukUGyb16joSIb1T6kg+omzROSoVtKe6zyc=;
 7:esgl+2TGhPwMYKDx6rANUo/ldmEoaQNXSEIcu9wK2n5d65AXEpbqHireNWHdpIudQjP7+5bxsiyvREfSHYg3Eksl+ZLcqEP6GTV5LjoNV0+6t3MWriROJpEwV4QWf+aSMHMSZ4IhAbOZ4eq9wSxNy3R0w4q6t9euSTVfU7mrYJUml453xfCV+Nvtarv7HWr6sEoGBjcC1ItJRY4SWHwG62EyZxcY5X/gSfZg88zY8diE9FznBRi5Kn6JSMPkjnpw
SpamDiagnosticOutput: 1:99
SpamDiagnosticMetadata: NSPM
X-OriginatorOrg: caviumnetworks.com
X-MS-Exchange-CrossTenant-OriginalArrivalTime: 16 Jan 2018 15:18:41.6138 (UTC)
X-MS-Exchange-CrossTenant-Network-Message-Id: c6568c59-6a57-4ecd-2f8a-08d55cf46e91
X-MS-Exchange-CrossTenant-FromEntityHeader: Hosted
X-MS-Exchange-CrossTenant-Id: 711e4ccf-2e9b-4bcf-a551-4094005b6194
X-MS-Exchange-Transport-CrossTenantHeadersStamped: MWHPR07MB3469
Subject: [dpdk-dev] [PATCH v5 10/14] app/eventdev: add pipeline queue worker
	functions
X-BeenThere: dev@dpdk.org
X-Mailman-Version: 2.1.15
Precedence: list
List-Id: DPDK patches and discussions <dev.dpdk.org>
List-Unsubscribe: <https://dpdk.org/ml/options/dev>,
 <mailto:dev-request@dpdk.org?subject=unsubscribe>
List-Archive: <http://dpdk.org/ml/archives/dev/>
List-Post: <mailto:dev@dpdk.org>
List-Help: <mailto:dev-request@dpdk.org?subject=help>
List-Subscribe: <https://dpdk.org/ml/listinfo/dev>,
 <mailto:dev-request@dpdk.org?subject=subscribe>
X-List-Received-Date: Tue, 16 Jan 2018 15:18:46 -0000

Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
---
 app/test-eventdev/test_pipeline_common.h |  80 +++++++++
 app/test-eventdev/test_pipeline_queue.c  | 288 ++++++++++++++++++++++++++++++-
 2 files changed, 367 insertions(+), 1 deletion(-)

diff --git a/app/test-eventdev/test_pipeline_common.h b/app/test-eventdev/test_pipeline_common.h
index db2517baf..5fb91607d 100644
--- a/app/test-eventdev/test_pipeline_common.h
+++ b/app/test-eventdev/test_pipeline_common.h
@@ -63,6 +63,86 @@ struct test_pipeline {
 	uint8_t sched_type_list[EVT_MAX_STAGES] __rte_cache_aligned;
 } __rte_cache_aligned;
 
+#define BURST_SIZE 16
+
+#define PIPELINE_WROKER_SINGLE_STAGE_INIT \
+	struct worker_data *w  = arg;     \
+	struct test_pipeline *t = w->t;   \
+	const uint8_t dev = w->dev_id;    \
+	const uint8_t port = w->port_id;  \
+	struct rte_event ev
+
+#define PIPELINE_WROKER_SINGLE_STAGE_BURST_INIT \
+	int i;                                  \
+	struct worker_data *w  = arg;           \
+	struct test_pipeline *t = w->t;         \
+	const uint8_t dev = w->dev_id;          \
+	const uint8_t port = w->port_id;        \
+	struct rte_event ev[BURST_SIZE + 1]
+
+#define PIPELINE_WROKER_MULTI_STAGE_INIT                         \
+	struct worker_data *w  = arg;                            \
+	struct test_pipeline *t = w->t;                          \
+	uint8_t cq_id;                                           \
+	const uint8_t dev = w->dev_id;                           \
+	const uint8_t port = w->port_id;                         \
+	const uint8_t last_queue = t->opt->nb_stages - 1;        \
+	uint8_t *const sched_type_list = &t->sched_type_list[0]; \
+	struct rte_event ev
+
+#define PIPELINE_WROKER_MULTI_STAGE_BURST_INIT                   \
+	int i;                                  \
+	struct worker_data *w  = arg;                            \
+	struct test_pipeline *t = w->t;                          \
+	uint8_t cq_id;                                           \
+	const uint8_t dev = w->dev_id;                           \
+	const uint8_t port = w->port_id;                         \
+	const uint8_t last_queue = t->opt->nb_stages - 1;        \
+	uint8_t *const sched_type_list = &t->sched_type_list[0]; \
+	struct rte_event ev[BURST_SIZE + 1]
+
+static __rte_always_inline void
+pipeline_fwd_event(struct rte_event *ev, uint8_t sched)
+{
+	ev->event_type = RTE_EVENT_TYPE_CPU;
+	ev->op = RTE_EVENT_OP_FORWARD;
+	ev->sched_type = sched;
+}
+
+static __rte_always_inline void
+pipeline_event_enqueue(const uint8_t dev, const uint8_t port,
+		struct rte_event *ev)
+{
+	while (rte_event_enqueue_burst(dev, port, ev, 1) != 1)
+		rte_pause();
+}
+
+static __rte_always_inline void
+pipeline_event_enqueue_burst(const uint8_t dev, const uint8_t port,
+		struct rte_event *ev, const uint16_t nb_rx)
+{
+	uint16_t enq;
+
+	enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
+	while (enq < nb_rx) {
+		enq += rte_event_enqueue_burst(dev, port,
+						ev + enq, nb_rx - enq);
+	}
+}
+
+static __rte_always_inline void
+pipeline_tx_pkt(struct rte_mbuf *mbuf)
+{
+	while (rte_eth_tx_burst(mbuf->port, 0, &mbuf, 1) != 1)
+		rte_pause();
+}
+
+static inline int
+pipeline_nb_event_ports(struct evt_options *opt)
+{
+	return evt_nr_active_lcores(opt->wlcores);
+}
+
 int pipeline_test_result(struct evt_test *test, struct evt_options *opt);
 int pipeline_opt_check(struct evt_options *opt, uint64_t nb_queues);
 int pipeline_test_setup(struct evt_test *test, struct evt_options *opt);
diff --git a/app/test-eventdev/test_pipeline_queue.c b/app/test-eventdev/test_pipeline_queue.c
index 773c3ecaa..835fe0782 100644
--- a/app/test-eventdev/test_pipeline_queue.c
+++ b/app/test-eventdev/test_pipeline_queue.c
@@ -15,10 +15,296 @@ pipeline_queue_nb_event_queues(struct evt_options *opt)
 	return (eth_count * opt->nb_stages) + eth_count;
 }
 
+static int
+pipeline_queue_worker_single_stage_tx(void *arg)
+{
+	PIPELINE_WROKER_SINGLE_STAGE_INIT;
+
+	while (t->done == false) {
+		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+		if (!event) {
+			rte_pause();
+			continue;
+		}
+
+		if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
+			pipeline_tx_pkt(ev.mbuf);
+			w->processed_pkts++;
+		} else {
+			ev.queue_id++;
+			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+			pipeline_event_enqueue(dev, port, &ev);
+		}
+	}
+
+	return 0;
+}
+
+static int
+pipeline_queue_worker_single_stage_fwd(void *arg)
+{
+	PIPELINE_WROKER_SINGLE_STAGE_INIT;
+	const uint8_t tx_queue = t->tx_service.queue_id;
+
+	while (t->done == false) {
+		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+		if (!event) {
+			rte_pause();
+			continue;
+		}
+
+		ev.queue_id = tx_queue;
+		pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+		pipeline_event_enqueue(dev, port, &ev);
+		w->processed_pkts++;
+	}
+
+	return 0;
+}
+
+static int
+pipeline_queue_worker_single_stage_burst_tx(void *arg)
+{
+	PIPELINE_WROKER_SINGLE_STAGE_BURST_INIT;
+
+	while (t->done == false) {
+		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
+				BURST_SIZE, 0);
+
+		if (!nb_rx) {
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			rte_prefetch0(ev[i + 1].mbuf);
+			if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+
+				pipeline_tx_pkt(ev[i].mbuf);
+				ev[i].op = RTE_EVENT_OP_RELEASE;
+				w->processed_pkts++;
+			} else {
+				ev[i].queue_id++;
+				pipeline_fwd_event(&ev[i],
+						RTE_SCHED_TYPE_ATOMIC);
+			}
+		}
+
+		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+	}
+
+	return 0;
+}
+
+static int
+pipeline_queue_worker_single_stage_burst_fwd(void *arg)
+{
+	PIPELINE_WROKER_SINGLE_STAGE_BURST_INIT;
+	const uint8_t tx_queue = t->tx_service.queue_id;
+
+	while (t->done == false) {
+		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
+				BURST_SIZE, 0);
+
+		if (!nb_rx) {
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			rte_prefetch0(ev[i + 1].mbuf);
+			ev[i].queue_id = tx_queue;
+			pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
+			w->processed_pkts++;
+		}
+
+		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+	}
+
+	return 0;
+}
+
+
+static int
+pipeline_queue_worker_multi_stage_tx(void *arg)
+{
+	PIPELINE_WROKER_MULTI_STAGE_INIT;
+	const uint8_t nb_stages = t->opt->nb_stages + 1;
+
+	while (t->done == false) {
+		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+		if (!event) {
+			rte_pause();
+			continue;
+		}
+
+		cq_id = ev.queue_id % nb_stages;
+
+		if (cq_id >= last_queue) {
+			if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
+
+				pipeline_tx_pkt(ev.mbuf);
+				w->processed_pkts++;
+				continue;
+			}
+			ev.queue_id += (cq_id == last_queue) ? 1 : 0;
+			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+		} else {
+			ev.queue_id++;
+			pipeline_fwd_event(&ev, sched_type_list[cq_id]);
+		}
+
+		pipeline_event_enqueue(dev, port, &ev);
+	}
+	return 0;
+}
+
+static int
+pipeline_queue_worker_multi_stage_fwd(void *arg)
+{
+	PIPELINE_WROKER_MULTI_STAGE_INIT;
+	const uint8_t nb_stages = t->opt->nb_stages + 1;
+	const uint8_t tx_queue = t->tx_service.queue_id;
+
+	while (t->done == false) {
+		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+		if (!event) {
+			rte_pause();
+			continue;
+		}
+
+		cq_id = ev.queue_id % nb_stages;
+
+		if (cq_id == last_queue) {
+			ev.queue_id = tx_queue;
+			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+			w->processed_pkts++;
+		} else {
+			ev.queue_id++;
+			pipeline_fwd_event(&ev, sched_type_list[cq_id]);
+		}
+
+		pipeline_event_enqueue(dev, port, &ev);
+	}
+	return 0;
+}
+
+static int
+pipeline_queue_worker_multi_stage_burst_tx(void *arg)
+{
+	PIPELINE_WROKER_MULTI_STAGE_BURST_INIT;
+	const uint8_t nb_stages = t->opt->nb_stages + 1;
+
+	while (t->done == false) {
+		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
+				BURST_SIZE, 0);
+
+		if (!nb_rx) {
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			rte_prefetch0(ev[i + 1].mbuf);
+			cq_id = ev[i].queue_id % nb_stages;
+
+			if (cq_id >= last_queue) {
+				if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+
+					pipeline_tx_pkt(ev[i].mbuf);
+					ev[i].op = RTE_EVENT_OP_RELEASE;
+					w->processed_pkts++;
+					continue;
+				}
+
+				ev[i].queue_id += (cq_id == last_queue) ? 1 : 0;
+				pipeline_fwd_event(&ev[i],
+						RTE_SCHED_TYPE_ATOMIC);
+			} else {
+				ev[i].queue_id++;
+				pipeline_fwd_event(&ev[i],
+						sched_type_list[cq_id]);
+			}
+
+		}
+
+		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+	}
+	return 0;
+}
+
+static int
+pipeline_queue_worker_multi_stage_burst_fwd(void *arg)
+{
+	PIPELINE_WROKER_MULTI_STAGE_BURST_INIT;
+	const uint8_t nb_stages = t->opt->nb_stages + 1;
+	const uint8_t tx_queue = t->tx_service.queue_id;
+
+	while (t->done == false) {
+		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
+				BURST_SIZE, 0);
+
+		if (!nb_rx) {
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			rte_prefetch0(ev[i + 1].mbuf);
+			cq_id = ev[i].queue_id % nb_stages;
+
+			if (cq_id == last_queue) {
+				ev[i].queue_id = tx_queue;
+				pipeline_fwd_event(&ev[i],
+						RTE_SCHED_TYPE_ATOMIC);
+				w->processed_pkts++;
+			} else {
+				ev[i].queue_id++;
+				pipeline_fwd_event(&ev[i],
+						sched_type_list[cq_id]);
+			}
+		}
+
+		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+	}
+	return 0;
+}
+
 static int
 worker_wrapper(void *arg)
 {
-	RTE_SET_USED(arg);
+	struct worker_data *w  = arg;
+	struct evt_options *opt = w->t->opt;
+	const bool burst = evt_has_burst_mode(w->dev_id);
+	const bool mt_safe = !w->t->mt_unsafe;
+	const uint8_t nb_stages = opt->nb_stages;
+	RTE_SET_USED(opt);
+
+	if (nb_stages == 1) {
+		if (!burst && mt_safe)
+			return pipeline_queue_worker_single_stage_tx(arg);
+		else if (!burst && !mt_safe)
+			return pipeline_queue_worker_single_stage_fwd(arg);
+		else if (burst && mt_safe)
+			return pipeline_queue_worker_single_stage_burst_tx(arg);
+		else if (burst && !mt_safe)
+			return pipeline_queue_worker_single_stage_burst_fwd(
+					arg);
+	} else {
+		if (!burst && mt_safe)
+			return pipeline_queue_worker_multi_stage_tx(arg);
+		else if (!burst && !mt_safe)
+			return pipeline_queue_worker_multi_stage_fwd(arg);
+		else if (burst && mt_safe)
+			return pipeline_queue_worker_multi_stage_burst_tx(arg);
+		else if (burst && !mt_safe)
+			return pipeline_queue_worker_multi_stage_burst_fwd(arg);
+
+	}
 	rte_panic("invalid worker\n");
 }
 
-- 
2.14.1