DPDK patches and discussions
 help / color / mirror / Atom feed
* [dpdk-dev] [PATCH] examples/eventdev_pipeline: add Tx adapter support
@ 2018-09-05 13:45 Pavan Nikhilesh
  2018-09-17 14:17 ` Jerin Jacob
                   ` (3 more replies)
  0 siblings, 4 replies; 11+ messages in thread
From: Pavan Nikhilesh @ 2018-09-05 13:45 UTC (permalink / raw)
  To: jerin.jacob, nikhil.rao, harry.van.haaren, anoob.joseph
  Cc: dev, Pavan Nikhilesh

Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
---
 This patch depends on the following series:
 http://patches.dpdk.org/project/dpdk/list/?series=1121

 examples/eventdev_pipeline/main.c             |  62 ++--
 examples/eventdev_pipeline/pipeline_common.h  |  31 +-
 .../pipeline_worker_generic.c                 | 273 +++++-------------
 .../eventdev_pipeline/pipeline_worker_tx.c    | 130 +++++----
 4 files changed, 186 insertions(+), 310 deletions(-)

diff --git a/examples/eventdev_pipeline/main.c b/examples/eventdev_pipeline/main.c
index 700bc696f..95531150b 100644
--- a/examples/eventdev_pipeline/main.c
+++ b/examples/eventdev_pipeline/main.c
@@ -26,20 +26,6 @@ core_in_use(unsigned int lcore_id) {
 		fdata->tx_core[lcore_id] || fdata->worker_core[lcore_id]);
 }

-static void
-eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
-			void *userdata)
-{
-	int port_id = (uintptr_t) userdata;
-	unsigned int _sent = 0;
-
-	do {
-		/* Note: hard-coded TX queue */
-		_sent += rte_eth_tx_burst(port_id, 0, &pkts[_sent],
-					  unsent - _sent);
-	} while (_sent != unsent);
-}
-
 /*
  * Parse the coremask given as argument (hexadecimal string) and fill
  * the global configuration (core role and core count) with the parsed
@@ -263,6 +249,7 @@ parse_app_args(int argc, char **argv)
 static inline int
 port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 {
+	struct rte_eth_rxconf rx_conf;
 	static const struct rte_eth_conf port_conf_default = {
 		.rxmode = {
 			.mq_mode = ETH_MQ_RX_RSS,
@@ -291,6 +278,8 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 	if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
 		port_conf.txmode.offloads |=
 			DEV_TX_OFFLOAD_MBUF_FAST_FREE;
+	rx_conf = dev_info.default_rxconf;
+	rx_conf.offloads = port_conf.rxmode.offloads;

 	port_conf.rx_adv_conf.rss_conf.rss_hf &=
 		dev_info.flow_type_rss_offloads;
@@ -311,7 +300,8 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 	/* Allocate and set up 1 RX queue per Ethernet port. */
 	for (q = 0; q < rx_rings; q++) {
 		retval = rte_eth_rx_queue_setup(port, q, rx_ring_size,
-				rte_eth_dev_socket_id(port), NULL, mbuf_pool);
+				rte_eth_dev_socket_id(port), &rx_conf,
+				mbuf_pool);
 		if (retval < 0)
 			return retval;
 	}
@@ -350,7 +340,7 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 static int
 init_ports(uint16_t num_ports)
 {
-	uint16_t portid, i;
+	uint16_t portid;

 	if (!cdata.num_mbuf)
 		cdata.num_mbuf = 16384 * num_ports;
@@ -367,36 +357,26 @@ init_ports(uint16_t num_ports)
 			rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n",
 					portid);

-	RTE_ETH_FOREACH_DEV(i) {
-		void *userdata = (void *)(uintptr_t) i;
-		fdata->tx_buf[i] =
-			rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(32), 0);
-		if (fdata->tx_buf[i] == NULL)
-			rte_panic("Out of memory\n");
-		rte_eth_tx_buffer_init(fdata->tx_buf[i], 32);
-		rte_eth_tx_buffer_set_err_callback(fdata->tx_buf[i],
-						   eth_tx_buffer_retry,
-						   userdata);
-	}
-
 	return 0;
 }

 static void
 do_capability_setup(uint8_t eventdev_id)
 {
+	int ret;
 	uint16_t i;
-	uint8_t mt_unsafe = 0;
+	uint8_t generic_pipeline = 0;
 	uint8_t burst = 0;

 	RTE_ETH_FOREACH_DEV(i) {
-		struct rte_eth_dev_info dev_info;
-		memset(&dev_info, 0, sizeof(struct rte_eth_dev_info));
-
-		rte_eth_dev_info_get(i, &dev_info);
-		/* Check if it is safe ask worker to tx. */
-		mt_unsafe |= !(dev_info.tx_offload_capa &
-				DEV_TX_OFFLOAD_MT_LOCKFREE);
+		uint32_t caps = 0;
+
+		ret = rte_event_eth_tx_adapter_caps_get(eventdev_id, i, &caps);
+		if (ret)
+			rte_exit(EXIT_FAILURE,
+				"Invalid capability for Tx adptr port %d\n", i);
+		generic_pipeline |= !(caps &
+				RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT);
 	}

 	struct rte_event_dev_info eventdev_info;
@@ -406,10 +386,10 @@ do_capability_setup(uint8_t eventdev_id)
 	burst = eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_BURST_MODE ? 1 :
 		0;

-	if (mt_unsafe)
+	if (generic_pipeline)
 		set_worker_generic_setup_data(&fdata->cap, burst);
 	else
-		set_worker_tx_setup_data(&fdata->cap, burst);
+		set_worker_tx_enq_setup_data(&fdata->cap, burst);
 }

 static void
@@ -499,7 +479,7 @@ main(int argc, char **argv)
 	if (worker_data == NULL)
 		rte_panic("rte_calloc failed\n");

-	int dev_id = fdata->cap.evdev_setup(&cons_data, worker_data);
+	int dev_id = fdata->cap.evdev_setup(worker_data);
 	if (dev_id < 0)
 		rte_exit(EXIT_FAILURE, "Error setting up eventdev\n");

@@ -524,8 +504,8 @@ main(int argc, char **argv)

 		if (fdata->tx_core[lcore_id])
 			printf(
-				"[%s()] lcore %d executing NIC Tx, and using eventdev port %u\n",
-				__func__, lcore_id, cons_data.port_id);
+				"[%s()] lcore %d executing NIC Tx\n",
+				__func__, lcore_id);

 		if (fdata->sched_core[lcore_id])
 			printf("[%s()] lcore %d executing scheduler\n",
diff --git a/examples/eventdev_pipeline/pipeline_common.h b/examples/eventdev_pipeline/pipeline_common.h
index 9703396f8..a6cc912fb 100644
--- a/examples/eventdev_pipeline/pipeline_common.h
+++ b/examples/eventdev_pipeline/pipeline_common.h
@@ -16,6 +16,7 @@
 #include <rte_ethdev.h>
 #include <rte_eventdev.h>
 #include <rte_event_eth_rx_adapter.h>
+#include <rte_event_eth_tx_adapter.h>
 #include <rte_service.h>
 #include <rte_service_component.h>

@@ -23,38 +24,30 @@
 #define BATCH_SIZE 16
 #define MAX_NUM_CORE 64

-struct cons_data {
-	uint8_t dev_id;
-	uint8_t port_id;
-	uint8_t release;
-} __rte_cache_aligned;
-
 struct worker_data {
 	uint8_t dev_id;
 	uint8_t port_id;
 } __rte_cache_aligned;

 typedef int (*worker_loop)(void *);
-typedef int (*consumer_loop)(void);
 typedef void (*schedule_loop)(unsigned int);
-typedef int (*eventdev_setup)(struct cons_data *, struct worker_data *);
-typedef void (*rx_adapter_setup)(uint16_t nb_ports);
+typedef int (*eventdev_setup)(struct worker_data *);
+typedef void (*adapter_setup)(uint16_t nb_ports);
 typedef void (*opt_check)(void);

 struct setup_data {
 	worker_loop worker;
-	consumer_loop consumer;
 	schedule_loop scheduler;
 	eventdev_setup evdev_setup;
-	rx_adapter_setup adptr_setup;
+	adapter_setup adptr_setup;
 	opt_check check_opt;
 };

 struct fastpath_data {
 	volatile int done;
-	uint32_t tx_lock;
 	uint32_t evdev_service_id;
 	uint32_t rxadptr_service_id;
+	uint32_t txadptr_service_id;
 	bool rx_single;
 	bool tx_single;
 	bool sched_single;
@@ -62,7 +55,6 @@ struct fastpath_data {
 	unsigned int tx_core[MAX_NUM_CORE];
 	unsigned int sched_core[MAX_NUM_CORE];
 	unsigned int worker_core[MAX_NUM_CORE];
-	struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
 	struct setup_data cap;
 } __rte_cache_aligned;

@@ -88,6 +80,8 @@ struct config_data {
 	int16_t next_qid[MAX_NUM_STAGES+2];
 	int16_t qid[MAX_NUM_STAGES];
 	uint8_t rx_adapter_id;
+	uint8_t tx_adapter_id;
+	uint8_t tx_queue_id;
 	uint64_t worker_lcore_mask;
 	uint64_t rx_lcore_mask;
 	uint64_t tx_lcore_mask;
@@ -99,8 +93,6 @@ struct port_link {
 	uint8_t priority;
 };

-struct cons_data cons_data;
-
 struct fastpath_data *fdata;
 struct config_data cdata;

@@ -142,12 +134,11 @@ schedule_devices(unsigned int lcore_id)
 		}
 	}

-	if (fdata->tx_core[lcore_id] && (fdata->tx_single ||
-			 rte_atomic32_cmpset(&(fdata->tx_lock), 0, 1))) {
-		fdata->cap.consumer();
-		rte_atomic32_clear((rte_atomic32_t *)&(fdata->tx_lock));
+	if (fdata->tx_core[lcore_id]) {
+		rte_service_run_iter_on_app_lcore(fdata->txadptr_service_id,
+				!fdata->tx_single);
 	}
 }

 void set_worker_generic_setup_data(struct setup_data *caps, bool burst);
-void set_worker_tx_setup_data(struct setup_data *caps, bool burst);
+void set_worker_tx_enq_setup_data(struct setup_data *caps, bool burst);
diff --git a/examples/eventdev_pipeline/pipeline_worker_generic.c b/examples/eventdev_pipeline/pipeline_worker_generic.c
index 2215e9ebe..a355c23a1 100644
--- a/examples/eventdev_pipeline/pipeline_worker_generic.c
+++ b/examples/eventdev_pipeline/pipeline_worker_generic.c
@@ -119,153 +119,13 @@ worker_generic_burst(void *arg)
 	return 0;
 }

-static __rte_always_inline int
-consumer(void)
-{
-	const uint64_t freq_khz = rte_get_timer_hz() / 1000;
-	struct rte_event packet;
-
-	static uint64_t received;
-	static uint64_t last_pkts;
-	static uint64_t last_time;
-	static uint64_t start_time;
-	int i;
-	uint8_t dev_id = cons_data.dev_id;
-	uint8_t port_id = cons_data.port_id;
-
-	do {
-		uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
-				&packet, 1, 0);
-
-		if (n == 0) {
-			RTE_ETH_FOREACH_DEV(i)
-				rte_eth_tx_buffer_flush(i, 0, fdata->tx_buf[i]);
-			return 0;
-		}
-		if (start_time == 0)
-			last_time = start_time = rte_get_timer_cycles();
-
-		received++;
-		uint8_t outport = packet.mbuf->port;
-
-		exchange_mac(packet.mbuf);
-		rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
-				packet.mbuf);
-
-		if (cons_data.release)
-			rte_event_enqueue_burst(dev_id, port_id,
-								&packet, n);
-
-		/* Print out mpps every 1<22 packets */
-		if (!cdata.quiet && received >= last_pkts + (1<<22)) {
-			const uint64_t now = rte_get_timer_cycles();
-			const uint64_t total_ms = (now - start_time) / freq_khz;
-			const uint64_t delta_ms = (now - last_time) / freq_khz;
-			uint64_t delta_pkts = received - last_pkts;
-
-			printf("# %s RX=%"PRIu64", time %"PRIu64 "ms, "
-					"avg %.3f mpps [current %.3f mpps]\n",
-					__func__,
-					received,
-					total_ms,
-					received / (total_ms * 1000.0),
-					delta_pkts / (delta_ms * 1000.0));
-			last_pkts = received;
-			last_time = now;
-		}
-
-		cdata.num_packets--;
-		if (cdata.num_packets <= 0)
-			fdata->done = 1;
-	/* Be stuck in this loop if single. */
-	} while (!fdata->done && fdata->tx_single);
-
-	return 0;
-}
-
-static __rte_always_inline int
-consumer_burst(void)
-{
-	const uint64_t freq_khz = rte_get_timer_hz() / 1000;
-	struct rte_event packets[BATCH_SIZE];
-
-	static uint64_t received;
-	static uint64_t last_pkts;
-	static uint64_t last_time;
-	static uint64_t start_time;
-	unsigned int i, j;
-	uint8_t dev_id = cons_data.dev_id;
-	uint8_t port_id = cons_data.port_id;
-
-	do {
-		uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
-				packets, RTE_DIM(packets), 0);
-
-		if (n == 0) {
-			RTE_ETH_FOREACH_DEV(j)
-				rte_eth_tx_buffer_flush(j, 0, fdata->tx_buf[j]);
-			return 0;
-		}
-		if (start_time == 0)
-			last_time = start_time = rte_get_timer_cycles();
-
-		received += n;
-		for (i = 0; i < n; i++) {
-			uint8_t outport = packets[i].mbuf->port;
-
-			exchange_mac(packets[i].mbuf);
-			rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
-					packets[i].mbuf);
-
-			packets[i].op = RTE_EVENT_OP_RELEASE;
-		}
-
-		if (cons_data.release) {
-			uint16_t nb_tx;
-
-			nb_tx = rte_event_enqueue_burst(dev_id, port_id,
-								packets, n);
-			while (nb_tx < n)
-				nb_tx += rte_event_enqueue_burst(dev_id,
-						port_id, packets + nb_tx,
-						n - nb_tx);
-		}
-
-		/* Print out mpps every 1<22 packets */
-		if (!cdata.quiet && received >= last_pkts + (1<<22)) {
-			const uint64_t now = rte_get_timer_cycles();
-			const uint64_t total_ms = (now - start_time) / freq_khz;
-			const uint64_t delta_ms = (now - last_time) / freq_khz;
-			uint64_t delta_pkts = received - last_pkts;
-
-			printf("# consumer RX=%"PRIu64", time %"PRIu64 "ms, "
-					"avg %.3f mpps [current %.3f mpps]\n",
-					received,
-					total_ms,
-					received / (total_ms * 1000.0),
-					delta_pkts / (delta_ms * 1000.0));
-			last_pkts = received;
-			last_time = now;
-		}
-
-		cdata.num_packets -= n;
-		if (cdata.num_packets <= 0)
-			fdata->done = 1;
-	/* Be stuck in this loop if single. */
-	} while (!fdata->done && fdata->tx_single);
-
-	return 0;
-}
-
 static int
-setup_eventdev_generic(struct cons_data *cons_data,
-		struct worker_data *worker_data)
+setup_eventdev_generic(struct worker_data *worker_data)
 {
 	const uint8_t dev_id = 0;
 	/* +1 stages is for a SINGLE_LINK TX stage */
 	const uint8_t nb_queues = cdata.num_stages + 1;
-	/* + 1 is one port for consumer */
-	const uint8_t nb_ports = cdata.num_workers + 1;
+	const uint8_t nb_ports = cdata.num_workers;
 	struct rte_event_dev_config config = {
 			.nb_event_queues = nb_queues,
 			.nb_event_ports = nb_ports,
@@ -285,11 +145,6 @@ setup_eventdev_generic(struct cons_data *cons_data,
 			.nb_atomic_flows = 1024,
 		.nb_atomic_order_sequences = 1024,
 	};
-	struct rte_event_port_conf tx_p_conf = {
-			.dequeue_depth = 128,
-			.enqueue_depth = 128,
-			.new_event_threshold = 4096,
-	};
 	struct rte_event_queue_conf tx_q_conf = {
 			.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST,
 			.event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_LINK,
@@ -297,7 +152,6 @@ setup_eventdev_generic(struct cons_data *cons_data,

 	struct port_link worker_queues[MAX_NUM_STAGES];
 	uint8_t disable_implicit_release;
-	struct port_link tx_queue;
 	unsigned int i;

 	int ret, ndev = rte_event_dev_count();
@@ -314,7 +168,6 @@ setup_eventdev_generic(struct cons_data *cons_data,
 			RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE);

 	wkr_p_conf.disable_implicit_release = disable_implicit_release;
-	tx_p_conf.disable_implicit_release = disable_implicit_release;

 	if (dev_info.max_event_port_dequeue_depth <
 			config.nb_event_port_dequeue_depth)
@@ -372,8 +225,7 @@ setup_eventdev_generic(struct cons_data *cons_data,
 		printf("%d: error creating qid %d\n", __LINE__, i);
 		return -1;
 	}
-	tx_queue.queue_id = i;
-	tx_queue.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST;
+	cdata.tx_queue_id = i;

 	if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
 		wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
@@ -403,26 +255,6 @@ setup_eventdev_generic(struct cons_data *cons_data,
 		w->port_id = i;
 	}

-	if (tx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
-		tx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
-	if (tx_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
-		tx_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
-
-	/* port for consumer, linked to TX queue */
-	if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {
-		printf("Error setting up port %d\n", i);
-		return -1;
-	}
-	if (rte_event_port_link(dev_id, i, &tx_queue.queue_id,
-				&tx_queue.priority, 1) != 1) {
-		printf("%d: error creating link for port %d\n",
-				__LINE__, i);
-		return -1;
-	}
-	*cons_data = (struct cons_data){.dev_id = dev_id,
-					.port_id = i,
-					.release = disable_implicit_release };
-
 	ret = rte_event_dev_service_id_get(dev_id,
 				&fdata->evdev_service_id);
 	if (ret != -ESRCH && ret != 0) {
@@ -431,76 +263,107 @@ setup_eventdev_generic(struct cons_data *cons_data,
 	}
 	rte_service_runstate_set(fdata->evdev_service_id, 1);
 	rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
-	if (rte_event_dev_start(dev_id) < 0) {
-		printf("Error starting eventdev\n");
-		return -1;
-	}

 	return dev_id;
 }

 static void
-init_rx_adapter(uint16_t nb_ports)
+init_adapters(uint16_t nb_ports)
 {
 	int i;
 	int ret;
+	uint8_t tx_port_id = 0;
 	uint8_t evdev_id = 0;
 	struct rte_event_dev_info dev_info;

 	ret = rte_event_dev_info_get(evdev_id, &dev_info);

-	struct rte_event_port_conf rx_p_conf = {
+	struct rte_event_port_conf adptr_p_conf = {
 		.dequeue_depth = 8,
 		.enqueue_depth = 8,
 		.new_event_threshold = 1200,
 	};

-	if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
-		rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
-	if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
-		rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
+	if (adptr_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
+		adptr_p_conf.dequeue_depth =
+			dev_info.max_event_port_dequeue_depth;
+	if (adptr_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
+		adptr_p_conf.enqueue_depth =
+			dev_info.max_event_port_enqueue_depth;

 	/* Create one adapter for all the ethernet ports. */
 	ret = rte_event_eth_rx_adapter_create(cdata.rx_adapter_id, evdev_id,
-			&rx_p_conf);
+			&adptr_p_conf);
 	if (ret)
 		rte_exit(EXIT_FAILURE, "failed to create rx adapter[%d]",
 				cdata.rx_adapter_id);

+	ret = rte_event_eth_tx_adapter_create(cdata.tx_adapter_id, evdev_id,
+			&adptr_p_conf);
+	if (ret)
+		rte_exit(EXIT_FAILURE, "failed to create tx adapter[%d]",
+				cdata.tx_adapter_id);
+
 	struct rte_event_eth_rx_adapter_queue_conf queue_conf;
 	memset(&queue_conf, 0, sizeof(queue_conf));
 	queue_conf.ev.sched_type = cdata.queue_type;
 	queue_conf.ev.queue_id = cdata.qid[0];

 	for (i = 0; i < nb_ports; i++) {
-		uint32_t cap;
-
-		ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
-		if (ret)
-			rte_exit(EXIT_FAILURE,
-					"failed to get event rx adapter "
-					"capabilities");
-
 		ret = rte_event_eth_rx_adapter_queue_add(cdata.rx_adapter_id, i,
 				-1, &queue_conf);
 		if (ret)
 			rte_exit(EXIT_FAILURE,
 					"Failed to add queues to Rx adapter");
+
+		ret = rte_event_eth_tx_adapter_queue_add(cdata.tx_adapter_id, i,
+				-1);
+		if (ret)
+			rte_exit(EXIT_FAILURE,
+					"Failed to add queues to Tx adapter");
 	}

+	ret = rte_event_eth_tx_adapter_event_port_get(cdata.tx_adapter_id,
+			&tx_port_id);
+	if (ret)
+		rte_exit(EXIT_FAILURE,
+				"Failed to get Tx adapter port id");
+	ret = rte_event_port_link(evdev_id, tx_port_id, &cdata.tx_queue_id,
+			NULL, 1);
+	if (ret != 1)
+		rte_exit(EXIT_FAILURE,
+				"Unable to link Tx adapter port to Tx queue");
+
 	ret = rte_event_eth_rx_adapter_service_id_get(cdata.rx_adapter_id,
 				&fdata->rxadptr_service_id);
 	if (ret != -ESRCH && ret != 0) {
 		rte_exit(EXIT_FAILURE,
-			"Error getting the service ID for sw eventdev\n");
+			"Error getting the service ID for Rx adapter\n");
 	}
 	rte_service_runstate_set(fdata->rxadptr_service_id, 1);
 	rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, 0);

+	ret = rte_event_eth_tx_adapter_service_id_get(cdata.tx_adapter_id,
+				&fdata->txadptr_service_id);
+	if (ret != -ESRCH && ret != 0) {
+		rte_exit(EXIT_FAILURE,
+			"Error getting the service ID for Tx adapter\n");
+	}
+	rte_service_runstate_set(fdata->txadptr_service_id, 1);
+	rte_service_set_runstate_mapped_check(fdata->txadptr_service_id, 0);
+
 	ret = rte_event_eth_rx_adapter_start(cdata.rx_adapter_id);
 	if (ret)
 		rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
 				cdata.rx_adapter_id);
+
+	ret = rte_event_eth_tx_adapter_start(cdata.tx_adapter_id);
+	if (ret)
+		rte_exit(EXIT_FAILURE, "Tx adapter[%d] start failed",
+				cdata.tx_adapter_id);
+
+	if (rte_event_dev_start(evdev_id) < 0)
+		rte_exit(EXIT_FAILURE, "Error starting eventdev");
 }

 static void
@@ -510,6 +373,8 @@ generic_opt_check(void)
 	int ret;
 	uint32_t cap = 0;
 	uint8_t rx_needed = 0;
+	uint8_t tx_needed = 0;
+	uint8_t sched_needed = 0;
 	struct rte_event_dev_info eventdev_info;

 	memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
@@ -519,6 +384,8 @@ generic_opt_check(void)
 				RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES))
 		rte_exit(EXIT_FAILURE,
 				"Event dev doesn't support all type queues\n");
+	sched_needed = !(eventdev_info.event_dev_cap &
+		RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED);

 	RTE_ETH_FOREACH_DEV(i) {
 		ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap);
@@ -527,13 +394,19 @@ generic_opt_check(void)
 				"failed to get event rx adapter capabilities");
 		rx_needed |=
 			!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT);
+
+		ret = rte_event_eth_tx_adapter_caps_get(0, i, &cap);
+		if (ret)
+			rte_exit(EXIT_FAILURE,
+				"failed to get event tx adapter capabilities");
+		tx_needed |=
+			!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT);
 	}

 	if (cdata.worker_lcore_mask == 0 ||
 			(rx_needed && cdata.rx_lcore_mask == 0) ||
-			cdata.tx_lcore_mask == 0 || (cdata.sched_lcore_mask == 0
-				&& !(eventdev_info.event_dev_cap &
-					RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) {
+			(tx_needed && cdata.tx_lcore_mask == 0) ||
+			(sched_needed && cdata.sched_lcore_mask == 0)) {
 		printf("Core part of pipeline was not assigned any cores. "
 			"This will stall the pipeline, please check core masks "
 			"(use -h for details on setting core masks):\n"
@@ -545,23 +418,27 @@ generic_opt_check(void)
 		rte_exit(-1, "Fix core masks\n");
 	}

-	if (eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)
+	if (!sched_needed)
 		memset(fdata->sched_core, 0,
 				sizeof(unsigned int) * MAX_NUM_CORE);
+	if (!rx_needed)
+		memset(fdata->rx_core, 0,
+				sizeof(unsigned int) * MAX_NUM_CORE);
+	if (!tx_needed)
+		memset(fdata->tx_core, 0,
+				sizeof(unsigned int) * MAX_NUM_CORE);
 }

 void
 set_worker_generic_setup_data(struct setup_data *caps, bool burst)
 {
 	if (burst) {
-		caps->consumer = consumer_burst;
 		caps->worker = worker_generic_burst;
 	} else {
-		caps->consumer = consumer;
 		caps->worker = worker_generic;
 	}

-	caps->adptr_setup = init_rx_adapter;
+	caps->adptr_setup = init_adapters;
 	caps->scheduler = schedule_devices;
 	caps->evdev_setup = setup_eventdev_generic;
 	caps->check_opt = generic_opt_check;
diff --git a/examples/eventdev_pipeline/pipeline_worker_tx.c b/examples/eventdev_pipeline/pipeline_worker_tx.c
index 3dbde92df..7cd516cd7 100644
--- a/examples/eventdev_pipeline/pipeline_worker_tx.c
+++ b/examples/eventdev_pipeline/pipeline_worker_tx.c
@@ -36,10 +36,11 @@ worker_event_enqueue_burst(const uint8_t dev, const uint8_t port,
 }

 static __rte_always_inline void
-worker_tx_pkt(struct rte_mbuf *mbuf)
+worker_tx_pkt(const uint8_t dev, const uint8_t port, struct rte_event *ev)
 {
-	exchange_mac(mbuf);
-	while (rte_eth_tx_burst(mbuf->port, 0, &mbuf, 1) != 1)
+	exchange_mac(ev->mbuf);
+	rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0);
+	while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1))
 		rte_pause();
 }

@@ -64,7 +65,7 @@ worker_do_tx_single(void *arg)
 		received++;

 		if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-			worker_tx_pkt(ev.mbuf);
+			worker_tx_pkt(dev, port, &ev);
 			tx++;
 			continue;
 		}
@@ -100,7 +101,7 @@ worker_do_tx_single_atq(void *arg)
 		received++;

 		if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-			worker_tx_pkt(ev.mbuf);
+			worker_tx_pkt(dev, port, &ev);
 			tx++;
 			continue;
 		}
@@ -141,7 +142,7 @@ worker_do_tx_single_burst(void *arg)
 			rte_prefetch0(ev[i + 1].mbuf);
 			if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {

-				worker_tx_pkt(ev[i].mbuf);
+				worker_tx_pkt(dev, port, &ev[i]);
 				ev[i].op = RTE_EVENT_OP_RELEASE;
 				tx++;

@@ -188,7 +189,7 @@ worker_do_tx_single_burst_atq(void *arg)
 			rte_prefetch0(ev[i + 1].mbuf);
 			if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {

-				worker_tx_pkt(ev[i].mbuf);
+				worker_tx_pkt(dev, port, &ev[i]);
 				ev[i].op = RTE_EVENT_OP_RELEASE;
 				tx++;
 			} else
@@ -232,7 +233,7 @@ worker_do_tx(void *arg)

 		if (cq_id >= lst_qid) {
 			if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-				worker_tx_pkt(ev.mbuf);
+				worker_tx_pkt(dev, port, &ev);
 				tx++;
 				continue;
 			}
@@ -280,7 +281,7 @@ worker_do_tx_atq(void *arg)

 		if (cq_id == lst_qid) {
 			if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-				worker_tx_pkt(ev.mbuf);
+				worker_tx_pkt(dev, port, &ev);
 				tx++;
 				continue;
 			}
@@ -330,7 +331,7 @@ worker_do_tx_burst(void *arg)

 			if (cq_id >= lst_qid) {
 				if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
-					worker_tx_pkt(ev[i].mbuf);
+					worker_tx_pkt(dev, port, &ev[i]);
 					tx++;
 					ev[i].op = RTE_EVENT_OP_RELEASE;
 					continue;
@@ -387,7 +388,7 @@ worker_do_tx_burst_atq(void *arg)

 			if (cq_id == lst_qid) {
 				if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
-					worker_tx_pkt(ev[i].mbuf);
+					worker_tx_pkt(dev, port, &ev[i]);
 					tx++;
 					ev[i].op = RTE_EVENT_OP_RELEASE;
 					continue;
@@ -413,10 +414,8 @@ worker_do_tx_burst_atq(void *arg)
 }

 static int
-setup_eventdev_worker_tx(struct cons_data *cons_data,
-		struct worker_data *worker_data)
+setup_eventdev_worker_tx_enq(struct worker_data *worker_data)
 {
-	RTE_SET_USED(cons_data);
 	uint8_t i;
 	const uint8_t atq = cdata.all_type_queues ? 1 : 0;
 	const uint8_t dev_id = 0;
@@ -575,10 +574,9 @@ setup_eventdev_worker_tx(struct cons_data *cons_data,
 	}
 	rte_service_runstate_set(fdata->evdev_service_id, 1);
 	rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
-	if (rte_event_dev_start(dev_id) < 0) {
-		printf("Error starting eventdev\n");
-		return -1;
-	}
+
+	if (rte_event_dev_start(dev_id) < 0)
+		rte_exit(EXIT_FAILURE, "Error starting eventdev");

 	return dev_id;
 }
@@ -602,7 +600,7 @@ service_rx_adapter(void *arg)
 }

 static void
-init_rx_adapter(uint16_t nb_ports)
+init_adapters(uint16_t nb_ports)
 {
 	int i;
 	int ret;
@@ -613,17 +611,18 @@ init_rx_adapter(uint16_t nb_ports)
 	ret = rte_event_dev_info_get(evdev_id, &dev_info);
 	adptr_services = rte_zmalloc(NULL, sizeof(struct rx_adptr_services), 0);

-	struct rte_event_port_conf rx_p_conf = {
+	struct rte_event_port_conf adptr_p_conf = {
 		.dequeue_depth = 8,
 		.enqueue_depth = 8,
 		.new_event_threshold = 1200,
 	};

-	if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
-		rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
-	if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
-		rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
-
+	if (adptr_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
+		adptr_p_conf.dequeue_depth =
+			dev_info.max_event_port_dequeue_depth;
+	if (adptr_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
+		adptr_p_conf.enqueue_depth =
+			dev_info.max_event_port_enqueue_depth;

 	struct rte_event_eth_rx_adapter_queue_conf queue_conf;
 	memset(&queue_conf, 0, sizeof(queue_conf));
@@ -633,11 +632,11 @@ init_rx_adapter(uint16_t nb_ports)
 		uint32_t cap;
 		uint32_t service_id;

-		ret = rte_event_eth_rx_adapter_create(i, evdev_id, &rx_p_conf);
+		ret = rte_event_eth_rx_adapter_create(i, evdev_id,
+				&adptr_p_conf);
 		if (ret)
 			rte_exit(EXIT_FAILURE,
-					"failed to create rx adapter[%d]",
-					cdata.rx_adapter_id);
+					"failed to create rx adapter[%d]", i);

 		ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
 		if (ret)
@@ -654,7 +653,6 @@ init_rx_adapter(uint16_t nb_ports)
 			rte_exit(EXIT_FAILURE,
 					"Failed to add queues to Rx adapter");

-
 		/* Producer needs to be scheduled. */
 		if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) {
 			ret = rte_event_eth_rx_adapter_service_id_get(i,
@@ -680,9 +678,29 @@ init_rx_adapter(uint16_t nb_ports)
 		ret = rte_event_eth_rx_adapter_start(i);
 		if (ret)
 			rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
-					cdata.rx_adapter_id);
+					i);
+	}
+
+	/* We already know that Tx adapter has INTERNAL port cap*/
+	ret = rte_event_eth_tx_adapter_create(cdata.tx_adapter_id, evdev_id,
+			&adptr_p_conf);
+	if (ret)
+		rte_exit(EXIT_FAILURE, "failed to create tx adapter[%d]",
+				cdata.tx_adapter_id);
+
+	for (i = 0; i < nb_ports; i++) {
+		ret = rte_event_eth_tx_adapter_queue_add(cdata.tx_adapter_id, i,
+				-1);
+		if (ret)
+			rte_exit(EXIT_FAILURE,
+					"Failed to add queues to Tx adapter");
 	}

+	ret = rte_event_eth_tx_adapter_start(cdata.tx_adapter_id);
+	if (ret)
+		rte_exit(EXIT_FAILURE, "Tx adapter[%d] start failed",
+				cdata.tx_adapter_id);
+
 	if (adptr_services->nb_rx_adptrs) {
 		struct rte_service_spec service;

@@ -695,8 +713,7 @@ init_rx_adapter(uint16_t nb_ports)
 				&fdata->rxadptr_service_id);
 		if (ret)
 			rte_exit(EXIT_FAILURE,
-				"Rx adapter[%d] service register failed",
-				cdata.rx_adapter_id);
+				"Rx adapter service register failed");

 		rte_service_runstate_set(fdata->rxadptr_service_id, 1);
 		rte_service_component_runstate_set(fdata->rxadptr_service_id,
@@ -708,23 +725,20 @@ init_rx_adapter(uint16_t nb_ports)
 		rte_free(adptr_services);
 	}

-	if (!adptr_services->nb_rx_adptrs && fdata->cap.consumer == NULL &&
-			(dev_info.event_dev_cap &
+	if (!adptr_services->nb_rx_adptrs && (dev_info.event_dev_cap &
 			 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))
 		fdata->cap.scheduler = NULL;
-
-	if (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)
-		memset(fdata->sched_core, 0,
-				sizeof(unsigned int) * MAX_NUM_CORE);
 }

 static void
-worker_tx_opt_check(void)
+worker_tx_enq_opt_check(void)
 {
 	int i;
 	int ret;
 	uint32_t cap = 0;
 	uint8_t rx_needed = 0;
+	uint8_t tx_needed = 0;
+	uint8_t sched_needed = 0;
 	struct rte_event_dev_info eventdev_info;

 	memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
@@ -734,22 +748,29 @@ worker_tx_opt_check(void)
 				RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES))
 		rte_exit(EXIT_FAILURE,
 				"Event dev doesn't support all type queues\n");
+	sched_needed = !(eventdev_info.event_dev_cap &
+		RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED);

 	RTE_ETH_FOREACH_DEV(i) {
 		ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap);
 		if (ret)
 			rte_exit(EXIT_FAILURE,
-					"failed to get event rx adapter "
-					"capabilities");
+				"failed to get event rx adapter capabilities");
 		rx_needed |=
 			!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT);
+
+		ret = rte_event_eth_tx_adapter_caps_get(0, i, &cap);
+		if (ret)
+			rte_exit(EXIT_FAILURE,
+				"failed to get event tx adapter capabilities");
+		tx_needed |=
+			!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT);
 	}

 	if (cdata.worker_lcore_mask == 0 ||
 			(rx_needed && cdata.rx_lcore_mask == 0) ||
-			(cdata.sched_lcore_mask == 0 &&
-			 !(eventdev_info.event_dev_cap &
-				 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) {
+			(tx_needed && cdata.tx_lcore_mask == 0) ||
+			(sched_needed && cdata.sched_lcore_mask == 0)) {
 		printf("Core part of pipeline was not assigned any cores. "
 			"This will stall the pipeline, please check core masks "
 			"(use -h for details on setting core masks):\n"
@@ -760,6 +781,16 @@ worker_tx_opt_check(void)
 			cdata.worker_lcore_mask);
 		rte_exit(-1, "Fix core masks\n");
 	}
+
+	if (!sched_needed)
+		memset(fdata->sched_core, 0,
+				sizeof(unsigned int) * MAX_NUM_CORE);
+	if (!rx_needed)
+		memset(fdata->rx_core, 0,
+				sizeof(unsigned int) * MAX_NUM_CORE);
+	if (!tx_needed)
+		memset(fdata->tx_core, 0,
+				sizeof(unsigned int) * MAX_NUM_CORE);
 }

 static worker_loop
@@ -821,18 +852,15 @@ get_worker_multi_stage(bool burst)
 }

 void
-set_worker_tx_setup_data(struct setup_data *caps, bool burst)
+set_worker_tx_enq_setup_data(struct setup_data *caps, bool burst)
 {
 	if (cdata.num_stages == 1)
 		caps->worker = get_worker_single_stage(burst);
 	else
 		caps->worker = get_worker_multi_stage(burst);

-	memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE);
-
-	caps->check_opt = worker_tx_opt_check;
-	caps->consumer = NULL;
+	caps->check_opt = worker_tx_enq_opt_check;
 	caps->scheduler = schedule_devices;
-	caps->evdev_setup = setup_eventdev_worker_tx;
-	caps->adptr_setup = init_rx_adapter;
+	caps->evdev_setup = setup_eventdev_worker_tx_enq;
+	caps->adptr_setup = init_adapters;
 }
--
2.18.0

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [dpdk-dev] [PATCH] examples/eventdev_pipeline: add Tx adapter support
  2018-09-05 13:45 [dpdk-dev] [PATCH] examples/eventdev_pipeline: add Tx adapter support Pavan Nikhilesh
@ 2018-09-17 14:17 ` Jerin Jacob
  2018-09-19  2:54 ` Rao, Nikhil
                   ` (2 subsequent siblings)
  3 siblings, 0 replies; 11+ messages in thread
From: Jerin Jacob @ 2018-09-17 14:17 UTC (permalink / raw)
  To: Pavan Nikhilesh; +Cc: nikhil.rao, harry.van.haaren, anoob.joseph, dev

-----Original Message-----
> Date: Wed,  5 Sep 2018 19:15:54 +0530
> From: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
> To: jerin.jacob@caviumnetworks.com, nikhil.rao@intel.com,
>  harry.van.haaren@intel.com, anoob.joseph@caviumnetworks.com
> Cc: dev@dpdk.org, Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
> Subject: [dpdk-dev] [PATCH] examples/eventdev_pipeline: add Tx adapter
>  support
> X-Mailer: git-send-email 2.18.0
> 
> Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
> ---
>  This patch depends on the following series:
>  http://patches.dpdk.org/project/dpdk/list/?series=1121
> 
>  examples/eventdev_pipeline/main.c             |  62 ++--
>  examples/eventdev_pipeline/pipeline_common.h  |  31 +-
>  .../pipeline_worker_generic.c                 | 273 +++++-------------
>  .../eventdev_pipeline/pipeline_worker_tx.c    | 130 +++++----
>  4 files changed, 186 insertions(+), 310 deletions(-)


Harry and Nikhil,

As examples/eventdev_pipeline and Tx adapter maintainers, Please
review this examples/eventdev_pipeline changes.

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [dpdk-dev] [PATCH] examples/eventdev_pipeline: add Tx adapter support
  2018-09-05 13:45 [dpdk-dev] [PATCH] examples/eventdev_pipeline: add Tx adapter support Pavan Nikhilesh
  2018-09-17 14:17 ` Jerin Jacob
@ 2018-09-19  2:54 ` Rao, Nikhil
  2018-09-19 10:42   ` Pavan Nikhilesh
  2018-09-21  7:44 ` Rao, Nikhil
  2018-09-24 10:12 ` [dpdk-dev] [PATCH v2] " Pavan Nikhilesh
  3 siblings, 1 reply; 11+ messages in thread
From: Rao, Nikhil @ 2018-09-19  2:54 UTC (permalink / raw)
  To: Pavan Nikhilesh, jerin.jacob, harry.van.haaren, anoob.joseph
  Cc: dev, Rao, Nikhil

On 9/5/2018 7:15 PM, Pavan Nikhilesh wrote:
> Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
> ---
>   This patch depends on the following series:
>   http://patches.dpdk.org/project/dpdk/list/?series=1121
> 
>   examples/eventdev_pipeline/main.c             |  62 ++--
>   examples/eventdev_pipeline/pipeline_common.h  |  31 +-
>   .../pipeline_worker_generic.c                 | 273 +++++-------------
>   .../eventdev_pipeline/pipeline_worker_tx.c    | 130 +++++----
>   4 files changed, 186 insertions(+), 310 deletions(-)
> 
> diff --git a/examples/eventdev_pipeline/main.c b/examples/eventdev_pipeline/main.c
> index 700bc696f..95531150b 100644
> --- a/examples/eventdev_pipeline/main.c
> +++ b/examples/eventdev_pipeline/main.c
> 
>
</snip>

>   static void
>   do_capability_setup(uint8_t eventdev_id)
>   {
> +	int ret;
>   	uint16_t i;
> -	uint8_t mt_unsafe = 0;
> +	uint8_t generic_pipeline = 0;
>   	uint8_t burst = 0;
> 
>   	RTE_ETH_FOREACH_DEV(i) {
> -		struct rte_eth_dev_info dev_info;
> -		memset(&dev_info, 0, sizeof(struct rte_eth_dev_info));
> -
> -		rte_eth_dev_info_get(i, &dev_info);
> -		/* Check if it is safe ask worker to tx. */
> -		mt_unsafe |= !(dev_info.tx_offload_capa &
> -				DEV_TX_OFFLOAD_MT_LOCKFREE);
> +		uint32_t caps = 0;
> +
> +		ret = rte_event_eth_tx_adapter_caps_get(eventdev_id, i, &caps);
> +		if (ret)
> +			rte_exit(EXIT_FAILURE,
> +				"Invalid capability for Tx adptr port %d\n", i);
> +		generic_pipeline |= !(caps &
> +				RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT);
>   	}
> 
>   	struct rte_event_dev_info eventdev_info;
> @@ -406,10 +386,10 @@ do_capability_setup(uint8_t eventdev_id)
>   	burst = eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_BURST_MODE ? 1 :
>   		0;
> 
> -	if (mt_unsafe)
> +	if (generic_pipeline)
>   		set_worker_generic_setup_data(&fdata->cap, burst);
>   	else
> -		set_worker_tx_setup_data(&fdata->cap, burst);
> +		set_worker_tx_enq_setup_data(&fdata->cap, burst);
>   }

The generic_pipeline flag is set here and therefore, aren't the 
subsequent checks in generic_opt_check() and worker_tx_enq_opt_check() 
redundant ?

Thanks,
Nikhil

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [dpdk-dev] [PATCH] examples/eventdev_pipeline: add Tx adapter support
  2018-09-19  2:54 ` Rao, Nikhil
@ 2018-09-19 10:42   ` Pavan Nikhilesh
  2018-09-20  2:18     ` Rao, Nikhil
  0 siblings, 1 reply; 11+ messages in thread
From: Pavan Nikhilesh @ 2018-09-19 10:42 UTC (permalink / raw)
  To: Rao, Nikhil, jerin.jacob, harry.van.haaren, anoob.joseph; +Cc: dev

On Wed, Sep 19, 2018 at 08:24:01AM +0530, Rao, Nikhil wrote:
> On 9/5/2018 7:15 PM, Pavan Nikhilesh wrote:
> > Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
> > ---
> >   This patch depends on the following series:
> >   http://patches.dpdk.org/project/dpdk/list/?series=1121
> >
> >   examples/eventdev_pipeline/main.c             |  62 ++--
> >   examples/eventdev_pipeline/pipeline_common.h  |  31 +-
> >   .../pipeline_worker_generic.c                 | 273 +++++-------------
> >   .../eventdev_pipeline/pipeline_worker_tx.c    | 130 +++++----
> >   4 files changed, 186 insertions(+), 310 deletions(-)
> >
> > diff --git a/examples/eventdev_pipeline/main.c b/examples/eventdev_pipeline/main.c
> > index 700bc696f..95531150b 100644
> > --- a/examples/eventdev_pipeline/main.c
> > +++ b/examples/eventdev_pipeline/main.c
> >
> >
> </snip>
>
> >   static void
> >   do_capability_setup(uint8_t eventdev_id)
> >   {
> > +     int ret;
> >       uint16_t i;
> > -     uint8_t mt_unsafe = 0;
> > +     uint8_t generic_pipeline = 0;
> >       uint8_t burst = 0;
> >
> >       RTE_ETH_FOREACH_DEV(i) {
> > -             struct rte_eth_dev_info dev_info;
> > -             memset(&dev_info, 0, sizeof(struct rte_eth_dev_info));
> > -
> > -             rte_eth_dev_info_get(i, &dev_info);
> > -             /* Check if it is safe ask worker to tx. */
> > -             mt_unsafe |= !(dev_info.tx_offload_capa &
> > -                             DEV_TX_OFFLOAD_MT_LOCKFREE);
> > +             uint32_t caps = 0;
> > +
> > +             ret = rte_event_eth_tx_adapter_caps_get(eventdev_id, i, &caps);
> > +             if (ret)
> > +                     rte_exit(EXIT_FAILURE,
> > +                             "Invalid capability for Tx adptr port %d\n", i);
> > +             generic_pipeline |= !(caps &
> > +                             RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT);
> >       }
> >
> >       struct rte_event_dev_info eventdev_info;
> > @@ -406,10 +386,10 @@ do_capability_setup(uint8_t eventdev_id)
> >       burst = eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_BURST_MODE ? 1 :
> >               0;
> >
> > -     if (mt_unsafe)
> > +     if (generic_pipeline)
> >               set_worker_generic_setup_data(&fdata->cap, burst);
> >       else
> > -             set_worker_tx_setup_data(&fdata->cap, burst);
> > +             set_worker_tx_enq_setup_data(&fdata->cap, burst);
> >   }
>
> The generic_pipeline flag is set here and therefore, aren't the
> subsequent checks in generic_opt_check() and worker_tx_enq_opt_check()
> redundant ?

The checks inside generic_opt_check, worker_tx_enq_opt_check are still required
as different eventdevs opdl, DSW, dpaa, sw, octeontx might have different
capabilities.

The generic_pipeline flag is just used to decide the pipeline to use based on
the ethdev connected to eventdev.

The original idea was to have support to add different pipelines based on
combination of capability between eventdev and ethdev.

>
> Thanks,
> Nikhil

Regards,
Pavan.

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [dpdk-dev] [PATCH] examples/eventdev_pipeline: add Tx adapter support
  2018-09-19 10:42   ` Pavan Nikhilesh
@ 2018-09-20  2:18     ` Rao, Nikhil
  0 siblings, 0 replies; 11+ messages in thread
From: Rao, Nikhil @ 2018-09-20  2:18 UTC (permalink / raw)
  To: Pavan Nikhilesh, jerin.jacob, harry.van.haaren, anoob.joseph
  Cc: dev, Rao, Nikhil

On 9/19/2018 4:12 PM, Pavan Nikhilesh wrote:
> On Wed, Sep 19, 2018 at 08:24:01AM +0530, Rao, Nikhil wrote:
>> On 9/5/2018 7:15 PM, Pavan Nikhilesh wrote:
>>> Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
>>> ---
>>>    This patch depends on the following series:
>>>    http://patches.dpdk.org/project/dpdk/list/?series=1121
>>>
>>>    examples/eventdev_pipeline/main.c             |  62 ++--
>>>    examples/eventdev_pipeline/pipeline_common.h  |  31 +-
>>>    .../pipeline_worker_generic.c                 | 273 +++++-------------
>>>    .../eventdev_pipeline/pipeline_worker_tx.c    | 130 +++++----
>>>    4 files changed, 186 insertions(+), 310 deletions(-)
>>>
>>> diff --git a/examples/eventdev_pipeline/main.c b/examples/eventdev_pipeline/main.c
>>> index 700bc696f..95531150b 100644
>>> --- a/examples/eventdev_pipeline/main.c
>>> +++ b/examples/eventdev_pipeline/main.c
>>>
>>>
>> </snip>
>>
>>>    static void
>>>    do_capability_setup(uint8_t eventdev_id)
>>>    {
>>> +     int ret;
>>>        uint16_t i;
>>> -     uint8_t mt_unsafe = 0;
>>> +     uint8_t generic_pipeline = 0;
>>>        uint8_t burst = 0;
>>>
>>>        RTE_ETH_FOREACH_DEV(i) {
>>> -             struct rte_eth_dev_info dev_info;
>>> -             memset(&dev_info, 0, sizeof(struct rte_eth_dev_info));
>>> -
>>> -             rte_eth_dev_info_get(i, &dev_info);
>>> -             /* Check if it is safe ask worker to tx. */
>>> -             mt_unsafe |= !(dev_info.tx_offload_capa &
>>> -                             DEV_TX_OFFLOAD_MT_LOCKFREE);
>>> +             uint32_t caps = 0;
>>> +
>>> +             ret = rte_event_eth_tx_adapter_caps_get(eventdev_id, i, &caps);
>>> +             if (ret)
>>> +                     rte_exit(EXIT_FAILURE,
>>> +                             "Invalid capability for Tx adptr port %d\n", i);
>>> +             generic_pipeline |= !(caps &
>>> +                             RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT);
>>>        }
>>>
>>>        struct rte_event_dev_info eventdev_info;
>>> @@ -406,10 +386,10 @@ do_capability_setup(uint8_t eventdev_id)
>>>        burst = eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_BURST_MODE ? 1 :
>>>                0;
>>>
>>> -     if (mt_unsafe)
>>> +     if (generic_pipeline)
>>>                set_worker_generic_setup_data(&fdata->cap, burst);
>>>        else
>>> -             set_worker_tx_setup_data(&fdata->cap, burst);
>>> +             set_worker_tx_enq_setup_data(&fdata->cap, burst);
>>>    }
>>
>> The generic_pipeline flag is set here and therefore, aren't the
>> subsequent checks in generic_opt_check() and worker_tx_enq_opt_check()
>> redundant ?
> 
> The checks inside generic_opt_check, worker_tx_enq_opt_check are still required
> as different eventdevs opdl, DSW, dpaa, sw, octeontx might have different
> capabilities.

I should have mentioned in the previous reply, the checks was I 
referring to were the checks for the 
RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT flag.

Also, on second reading, I noticed these checks had 
RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT not 
RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT.

Thanks,
Nikhil

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [dpdk-dev] [PATCH] examples/eventdev_pipeline: add Tx adapter support
  2018-09-05 13:45 [dpdk-dev] [PATCH] examples/eventdev_pipeline: add Tx adapter support Pavan Nikhilesh
  2018-09-17 14:17 ` Jerin Jacob
  2018-09-19  2:54 ` Rao, Nikhil
@ 2018-09-21  7:44 ` Rao, Nikhil
  2018-09-24  9:51   ` Pavan Nikhilesh
  2018-09-24 10:12 ` [dpdk-dev] [PATCH v2] " Pavan Nikhilesh
  3 siblings, 1 reply; 11+ messages in thread
From: Rao, Nikhil @ 2018-09-21  7:44 UTC (permalink / raw)
  To: Pavan Nikhilesh, jerin.jacob, harry.van.haaren, anoob.joseph
  Cc: dev, Rao, Nikhil

On 9/5/2018 7:15 PM, Pavan Nikhilesh wrote:
> Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
> ---
>   This patch depends on the following series:
>   http://patches.dpdk.org/project/dpdk/list/?series=1121
> 
>   examples/eventdev_pipeline/main.c             |  62 ++--
>   examples/eventdev_pipeline/pipeline_common.h  |  31 +-
>   .../pipeline_worker_generic.c                 | 273 +++++-------------
>   .../eventdev_pipeline/pipeline_worker_tx.c    | 130 +++++----
>   4 files changed, 186 insertions(+), 310 deletions(-)
> 
> --- a/examples/eventdev_pipeline/pipeline_worker_generic.c
> +++ b/examples/eventdev_pipeline/pipeline_worker_generic.c
> @@ -119,153 +119,13 @@ worker_generic_burst(void *arg)
>   	return 0;
>   }
> 

>   static void
> -init_rx_adapter(uint16_t nb_ports)
> +init_adapters(uint16_t nb_ports)
>   {
>   	int i;
>   	int ret;
> +	uint8_t tx_port_id = 0;
>   	uint8_t evdev_id = 0;
>   	struct rte_event_dev_info dev_info;
> 
>   	ret = rte_event_dev_info_get(evdev_id, &dev_info);
> 
> -	struct rte_event_port_conf rx_p_conf = {
> +	struct rte_event_port_conf adptr_p_conf = {
>   		.dequeue_depth = 8,
>   		.enqueue_depth = 8,
>   		.new_event_threshold = 1200,
>   	};
> 

We should restore the dequeue_depth to 128 for the port config passed to 
the Tx adapter. Doing so takes the performance from 5.8 mpps to 11.7 
mpps for on my test setup (test setup uses the SW PMD). Restoring 
enqueue_depth and new_event_threshold (64 and 4096 respectively) had
no noticeable effect.

Thanks,
Nikhil

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [dpdk-dev] [PATCH] examples/eventdev_pipeline: add Tx adapter support
  2018-09-21  7:44 ` Rao, Nikhil
@ 2018-09-24  9:51   ` Pavan Nikhilesh
  0 siblings, 0 replies; 11+ messages in thread
From: Pavan Nikhilesh @ 2018-09-24  9:51 UTC (permalink / raw)
  To: Rao, Nikhil, jerin.jacob, harry.van.haaren, anoob.joseph; +Cc: dev

On Fri, Sep 21, 2018 at 01:14:53PM +0530, Rao, Nikhil wrote:
> On 9/5/2018 7:15 PM, Pavan Nikhilesh wrote:
> > Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
> > ---
> >   This patch depends on the following series:
> >   http://patches.dpdk.org/project/dpdk/list/?series=1121
> >
> >   examples/eventdev_pipeline/main.c             |  62 ++--
> >   examples/eventdev_pipeline/pipeline_common.h  |  31 +-
> >   .../pipeline_worker_generic.c                 | 273 +++++-------------
> >   .../eventdev_pipeline/pipeline_worker_tx.c    | 130 +++++----
> >   4 files changed, 186 insertions(+), 310 deletions(-)
> >
> > --- a/examples/eventdev_pipeline/pipeline_worker_generic.c
> > +++ b/examples/eventdev_pipeline/pipeline_worker_generic.c
> > @@ -119,153 +119,13 @@ worker_generic_burst(void *arg)
> >       return 0;
> >   }
> >
>
> >   static void
> > -init_rx_adapter(uint16_t nb_ports)
> > +init_adapters(uint16_t nb_ports)
> >   {
> >       int i;
> >       int ret;
> > +     uint8_t tx_port_id = 0;
> >       uint8_t evdev_id = 0;
> >       struct rte_event_dev_info dev_info;
> >
> >       ret = rte_event_dev_info_get(evdev_id, &dev_info);
> >
> > -     struct rte_event_port_conf rx_p_conf = {
> > +     struct rte_event_port_conf adptr_p_conf = {
> >               .dequeue_depth = 8,
> >               .enqueue_depth = 8,
> >               .new_event_threshold = 1200,
> >       };
> >
>
> We should restore the dequeue_depth to 128 for the port config passed to
> the Tx adapter. Doing so takes the performance from 5.8 mpps to 11.7
> mpps for on my test setup (test setup uses the SW PMD). Restoring
> enqueue_depth and new_event_threshold (64 and 4096 respectively) had
> no noticeable effect.

We replace the above values with the defaults passed by the driver

        if (adptr_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
                adptr_p_conf.dequeue_depth =
                        dev_info.max_event_port_dequeue_depth;
        if (adptr_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
                adptr_p_conf.enqueue_depth =
                        dev_info.max_event_port_enqueue_depth;

Still I missed setting it to configurable defaults used for worker ports as
below

        struct rte_event_port_conf adptr_p_conf = {
                .dequeue_depth = cdata.worker_cq_depth,
                .enqueue_depth = 64,
                .new_event_threshold = 1200,
        };

I will send the v2 soon
>
> Thanks,
> Nikhil

Thanks,
Pavan.

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [dpdk-dev] [PATCH v2] examples/eventdev_pipeline: add Tx adapter support
  2018-09-05 13:45 [dpdk-dev] [PATCH] examples/eventdev_pipeline: add Tx adapter support Pavan Nikhilesh
                   ` (2 preceding siblings ...)
  2018-09-21  7:44 ` Rao, Nikhil
@ 2018-09-24 10:12 ` Pavan Nikhilesh
  2018-09-26 12:56   ` Rao, Nikhil
  3 siblings, 1 reply; 11+ messages in thread
From: Pavan Nikhilesh @ 2018-09-24 10:12 UTC (permalink / raw)
  To: jerin.jacob, nikhil.rao, harry.van.haaren, anoob.joseph
  Cc: dev, Pavan Nikhilesh

Redo the worker pipelines and offload transmission to service cores
seamlessly through Tx adapter.

Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
---
 v2 Changes:
 - Updated enqueue,dequeue depth thresholds.
 - remove redundant capability checks.

 examples/eventdev_pipeline/main.c             |  88 +++---
 examples/eventdev_pipeline/pipeline_common.h  |  31 +-
 .../pipeline_worker_generic.c                 | 268 +++++-------------
 .../eventdev_pipeline/pipeline_worker_tx.c    | 156 +++++-----
 4 files changed, 207 insertions(+), 336 deletions(-)

diff --git a/examples/eventdev_pipeline/main.c b/examples/eventdev_pipeline/main.c
index 700bc696f..92e08bc0c 100644
--- a/examples/eventdev_pipeline/main.c
+++ b/examples/eventdev_pipeline/main.c
@@ -26,20 +26,6 @@ core_in_use(unsigned int lcore_id) {
 		fdata->tx_core[lcore_id] || fdata->worker_core[lcore_id]);
 }

-static void
-eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
-			void *userdata)
-{
-	int port_id = (uintptr_t) userdata;
-	unsigned int _sent = 0;
-
-	do {
-		/* Note: hard-coded TX queue */
-		_sent += rte_eth_tx_burst(port_id, 0, &pkts[_sent],
-					  unsent - _sent);
-	} while (_sent != unsent);
-}
-
 /*
  * Parse the coremask given as argument (hexadecimal string) and fill
  * the global configuration (core role and core count) with the parsed
@@ -263,6 +249,7 @@ parse_app_args(int argc, char **argv)
 static inline int
 port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 {
+	struct rte_eth_rxconf rx_conf;
 	static const struct rte_eth_conf port_conf_default = {
 		.rxmode = {
 			.mq_mode = ETH_MQ_RX_RSS,
@@ -291,6 +278,8 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 	if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
 		port_conf.txmode.offloads |=
 			DEV_TX_OFFLOAD_MBUF_FAST_FREE;
+	rx_conf = dev_info.default_rxconf;
+	rx_conf.offloads = port_conf.rxmode.offloads;

 	port_conf.rx_adv_conf.rss_conf.rss_hf &=
 		dev_info.flow_type_rss_offloads;
@@ -311,7 +300,8 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 	/* Allocate and set up 1 RX queue per Ethernet port. */
 	for (q = 0; q < rx_rings; q++) {
 		retval = rte_eth_rx_queue_setup(port, q, rx_ring_size,
-				rte_eth_dev_socket_id(port), NULL, mbuf_pool);
+				rte_eth_dev_socket_id(port), &rx_conf,
+				mbuf_pool);
 		if (retval < 0)
 			return retval;
 	}
@@ -350,7 +340,7 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 static int
 init_ports(uint16_t num_ports)
 {
-	uint16_t portid, i;
+	uint16_t portid;

 	if (!cdata.num_mbuf)
 		cdata.num_mbuf = 16384 * num_ports;
@@ -367,36 +357,26 @@ init_ports(uint16_t num_ports)
 			rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n",
 					portid);

-	RTE_ETH_FOREACH_DEV(i) {
-		void *userdata = (void *)(uintptr_t) i;
-		fdata->tx_buf[i] =
-			rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(32), 0);
-		if (fdata->tx_buf[i] == NULL)
-			rte_panic("Out of memory\n");
-		rte_eth_tx_buffer_init(fdata->tx_buf[i], 32);
-		rte_eth_tx_buffer_set_err_callback(fdata->tx_buf[i],
-						   eth_tx_buffer_retry,
-						   userdata);
-	}
-
 	return 0;
 }

 static void
 do_capability_setup(uint8_t eventdev_id)
 {
+	int ret;
 	uint16_t i;
-	uint8_t mt_unsafe = 0;
+	uint8_t generic_pipeline = 0;
 	uint8_t burst = 0;

 	RTE_ETH_FOREACH_DEV(i) {
-		struct rte_eth_dev_info dev_info;
-		memset(&dev_info, 0, sizeof(struct rte_eth_dev_info));
-
-		rte_eth_dev_info_get(i, &dev_info);
-		/* Check if it is safe ask worker to tx. */
-		mt_unsafe |= !(dev_info.tx_offload_capa &
-				DEV_TX_OFFLOAD_MT_LOCKFREE);
+		uint32_t caps = 0;
+
+		ret = rte_event_eth_tx_adapter_caps_get(eventdev_id, i, &caps);
+		if (ret)
+			rte_exit(EXIT_FAILURE,
+				"Invalid capability for Tx adptr port %d\n", i);
+		generic_pipeline |= !(caps &
+				RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT);
 	}

 	struct rte_event_dev_info eventdev_info;
@@ -406,21 +386,42 @@ do_capability_setup(uint8_t eventdev_id)
 	burst = eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_BURST_MODE ? 1 :
 		0;

-	if (mt_unsafe)
+	if (generic_pipeline)
 		set_worker_generic_setup_data(&fdata->cap, burst);
 	else
-		set_worker_tx_setup_data(&fdata->cap, burst);
+		set_worker_tx_enq_setup_data(&fdata->cap, burst);
 }

 static void
 signal_handler(int signum)
 {
+	static uint8_t once;
+	uint16_t portid;
+
 	if (fdata->done)
 		rte_exit(1, "Exiting on signal %d\n", signum);
-	if (signum == SIGINT || signum == SIGTERM) {
+	if ((signum == SIGINT || signum == SIGTERM) && !once) {
 		printf("\n\nSignal %d received, preparing to exit...\n",
 				signum);
+		if (cdata.dump_dev)
+			rte_event_dev_dump(0, stdout);
+		once = 1;
 		fdata->done = 1;
+		rte_smp_wmb();
+
+		RTE_ETH_FOREACH_DEV(portid) {
+			rte_event_eth_rx_adapter_stop(portid);
+			rte_event_eth_tx_adapter_stop(portid);
+			rte_eth_dev_stop(portid);
+		}
+
+		rte_eal_mp_wait_lcore();
+
+		RTE_ETH_FOREACH_DEV(portid) {
+			rte_eth_dev_close(portid);
+		}
+
+		rte_event_dev_close(0);
 	}
 	if (signum == SIGTSTP)
 		rte_event_dev_dump(0, stdout);
@@ -499,7 +500,7 @@ main(int argc, char **argv)
 	if (worker_data == NULL)
 		rte_panic("rte_calloc failed\n");

-	int dev_id = fdata->cap.evdev_setup(&cons_data, worker_data);
+	int dev_id = fdata->cap.evdev_setup(worker_data);
 	if (dev_id < 0)
 		rte_exit(EXIT_FAILURE, "Error setting up eventdev\n");

@@ -524,8 +525,8 @@ main(int argc, char **argv)

 		if (fdata->tx_core[lcore_id])
 			printf(
-				"[%s()] lcore %d executing NIC Tx, and using eventdev port %u\n",
-				__func__, lcore_id, cons_data.port_id);
+				"[%s()] lcore %d executing NIC Tx\n",
+				__func__, lcore_id);

 		if (fdata->sched_core[lcore_id])
 			printf("[%s()] lcore %d executing scheduler\n",
@@ -555,9 +556,6 @@ main(int argc, char **argv)

 	rte_eal_mp_wait_lcore();

-	if (cdata.dump_dev)
-		rte_event_dev_dump(dev_id, stdout);
-
 	if (!cdata.quiet && (port_stat(dev_id, worker_data[0].port_id) !=
 			(uint64_t)-ENOTSUP)) {
 		printf("\nPort Workload distribution:\n");
diff --git a/examples/eventdev_pipeline/pipeline_common.h b/examples/eventdev_pipeline/pipeline_common.h
index 9703396f8..a6cc912fb 100644
--- a/examples/eventdev_pipeline/pipeline_common.h
+++ b/examples/eventdev_pipeline/pipeline_common.h
@@ -16,6 +16,7 @@
 #include <rte_ethdev.h>
 #include <rte_eventdev.h>
 #include <rte_event_eth_rx_adapter.h>
+#include <rte_event_eth_tx_adapter.h>
 #include <rte_service.h>
 #include <rte_service_component.h>

@@ -23,38 +24,30 @@
 #define BATCH_SIZE 16
 #define MAX_NUM_CORE 64

-struct cons_data {
-	uint8_t dev_id;
-	uint8_t port_id;
-	uint8_t release;
-} __rte_cache_aligned;
-
 struct worker_data {
 	uint8_t dev_id;
 	uint8_t port_id;
 } __rte_cache_aligned;

 typedef int (*worker_loop)(void *);
-typedef int (*consumer_loop)(void);
 typedef void (*schedule_loop)(unsigned int);
-typedef int (*eventdev_setup)(struct cons_data *, struct worker_data *);
-typedef void (*rx_adapter_setup)(uint16_t nb_ports);
+typedef int (*eventdev_setup)(struct worker_data *);
+typedef void (*adapter_setup)(uint16_t nb_ports);
 typedef void (*opt_check)(void);

 struct setup_data {
 	worker_loop worker;
-	consumer_loop consumer;
 	schedule_loop scheduler;
 	eventdev_setup evdev_setup;
-	rx_adapter_setup adptr_setup;
+	adapter_setup adptr_setup;
 	opt_check check_opt;
 };

 struct fastpath_data {
 	volatile int done;
-	uint32_t tx_lock;
 	uint32_t evdev_service_id;
 	uint32_t rxadptr_service_id;
+	uint32_t txadptr_service_id;
 	bool rx_single;
 	bool tx_single;
 	bool sched_single;
@@ -62,7 +55,6 @@ struct fastpath_data {
 	unsigned int tx_core[MAX_NUM_CORE];
 	unsigned int sched_core[MAX_NUM_CORE];
 	unsigned int worker_core[MAX_NUM_CORE];
-	struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
 	struct setup_data cap;
 } __rte_cache_aligned;

@@ -88,6 +80,8 @@ struct config_data {
 	int16_t next_qid[MAX_NUM_STAGES+2];
 	int16_t qid[MAX_NUM_STAGES];
 	uint8_t rx_adapter_id;
+	uint8_t tx_adapter_id;
+	uint8_t tx_queue_id;
 	uint64_t worker_lcore_mask;
 	uint64_t rx_lcore_mask;
 	uint64_t tx_lcore_mask;
@@ -99,8 +93,6 @@ struct port_link {
 	uint8_t priority;
 };

-struct cons_data cons_data;
-
 struct fastpath_data *fdata;
 struct config_data cdata;

@@ -142,12 +134,11 @@ schedule_devices(unsigned int lcore_id)
 		}
 	}

-	if (fdata->tx_core[lcore_id] && (fdata->tx_single ||
-			 rte_atomic32_cmpset(&(fdata->tx_lock), 0, 1))) {
-		fdata->cap.consumer();
-		rte_atomic32_clear((rte_atomic32_t *)&(fdata->tx_lock));
+	if (fdata->tx_core[lcore_id]) {
+		rte_service_run_iter_on_app_lcore(fdata->txadptr_service_id,
+				!fdata->tx_single);
 	}
 }

 void set_worker_generic_setup_data(struct setup_data *caps, bool burst);
-void set_worker_tx_setup_data(struct setup_data *caps, bool burst);
+void set_worker_tx_enq_setup_data(struct setup_data *caps, bool burst);
diff --git a/examples/eventdev_pipeline/pipeline_worker_generic.c b/examples/eventdev_pipeline/pipeline_worker_generic.c
index 2215e9ebe..169064949 100644
--- a/examples/eventdev_pipeline/pipeline_worker_generic.c
+++ b/examples/eventdev_pipeline/pipeline_worker_generic.c
@@ -119,153 +119,13 @@ worker_generic_burst(void *arg)
 	return 0;
 }

-static __rte_always_inline int
-consumer(void)
-{
-	const uint64_t freq_khz = rte_get_timer_hz() / 1000;
-	struct rte_event packet;
-
-	static uint64_t received;
-	static uint64_t last_pkts;
-	static uint64_t last_time;
-	static uint64_t start_time;
-	int i;
-	uint8_t dev_id = cons_data.dev_id;
-	uint8_t port_id = cons_data.port_id;
-
-	do {
-		uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
-				&packet, 1, 0);
-
-		if (n == 0) {
-			RTE_ETH_FOREACH_DEV(i)
-				rte_eth_tx_buffer_flush(i, 0, fdata->tx_buf[i]);
-			return 0;
-		}
-		if (start_time == 0)
-			last_time = start_time = rte_get_timer_cycles();
-
-		received++;
-		uint8_t outport = packet.mbuf->port;
-
-		exchange_mac(packet.mbuf);
-		rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
-				packet.mbuf);
-
-		if (cons_data.release)
-			rte_event_enqueue_burst(dev_id, port_id,
-								&packet, n);
-
-		/* Print out mpps every 1<22 packets */
-		if (!cdata.quiet && received >= last_pkts + (1<<22)) {
-			const uint64_t now = rte_get_timer_cycles();
-			const uint64_t total_ms = (now - start_time) / freq_khz;
-			const uint64_t delta_ms = (now - last_time) / freq_khz;
-			uint64_t delta_pkts = received - last_pkts;
-
-			printf("# %s RX=%"PRIu64", time %"PRIu64 "ms, "
-					"avg %.3f mpps [current %.3f mpps]\n",
-					__func__,
-					received,
-					total_ms,
-					received / (total_ms * 1000.0),
-					delta_pkts / (delta_ms * 1000.0));
-			last_pkts = received;
-			last_time = now;
-		}
-
-		cdata.num_packets--;
-		if (cdata.num_packets <= 0)
-			fdata->done = 1;
-	/* Be stuck in this loop if single. */
-	} while (!fdata->done && fdata->tx_single);
-
-	return 0;
-}
-
-static __rte_always_inline int
-consumer_burst(void)
-{
-	const uint64_t freq_khz = rte_get_timer_hz() / 1000;
-	struct rte_event packets[BATCH_SIZE];
-
-	static uint64_t received;
-	static uint64_t last_pkts;
-	static uint64_t last_time;
-	static uint64_t start_time;
-	unsigned int i, j;
-	uint8_t dev_id = cons_data.dev_id;
-	uint8_t port_id = cons_data.port_id;
-
-	do {
-		uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
-				packets, RTE_DIM(packets), 0);
-
-		if (n == 0) {
-			RTE_ETH_FOREACH_DEV(j)
-				rte_eth_tx_buffer_flush(j, 0, fdata->tx_buf[j]);
-			return 0;
-		}
-		if (start_time == 0)
-			last_time = start_time = rte_get_timer_cycles();
-
-		received += n;
-		for (i = 0; i < n; i++) {
-			uint8_t outport = packets[i].mbuf->port;
-
-			exchange_mac(packets[i].mbuf);
-			rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
-					packets[i].mbuf);
-
-			packets[i].op = RTE_EVENT_OP_RELEASE;
-		}
-
-		if (cons_data.release) {
-			uint16_t nb_tx;
-
-			nb_tx = rte_event_enqueue_burst(dev_id, port_id,
-								packets, n);
-			while (nb_tx < n)
-				nb_tx += rte_event_enqueue_burst(dev_id,
-						port_id, packets + nb_tx,
-						n - nb_tx);
-		}
-
-		/* Print out mpps every 1<22 packets */
-		if (!cdata.quiet && received >= last_pkts + (1<<22)) {
-			const uint64_t now = rte_get_timer_cycles();
-			const uint64_t total_ms = (now - start_time) / freq_khz;
-			const uint64_t delta_ms = (now - last_time) / freq_khz;
-			uint64_t delta_pkts = received - last_pkts;
-
-			printf("# consumer RX=%"PRIu64", time %"PRIu64 "ms, "
-					"avg %.3f mpps [current %.3f mpps]\n",
-					received,
-					total_ms,
-					received / (total_ms * 1000.0),
-					delta_pkts / (delta_ms * 1000.0));
-			last_pkts = received;
-			last_time = now;
-		}
-
-		cdata.num_packets -= n;
-		if (cdata.num_packets <= 0)
-			fdata->done = 1;
-	/* Be stuck in this loop if single. */
-	} while (!fdata->done && fdata->tx_single);
-
-	return 0;
-}
-
 static int
-setup_eventdev_generic(struct cons_data *cons_data,
-		struct worker_data *worker_data)
+setup_eventdev_generic(struct worker_data *worker_data)
 {
 	const uint8_t dev_id = 0;
 	/* +1 stages is for a SINGLE_LINK TX stage */
 	const uint8_t nb_queues = cdata.num_stages + 1;
-	/* + 1 is one port for consumer */
-	const uint8_t nb_ports = cdata.num_workers + 1;
+	const uint8_t nb_ports = cdata.num_workers;
 	struct rte_event_dev_config config = {
 			.nb_event_queues = nb_queues,
 			.nb_event_ports = nb_ports,
@@ -285,11 +145,6 @@ setup_eventdev_generic(struct cons_data *cons_data,
 			.nb_atomic_flows = 1024,
 		.nb_atomic_order_sequences = 1024,
 	};
-	struct rte_event_port_conf tx_p_conf = {
-			.dequeue_depth = 128,
-			.enqueue_depth = 128,
-			.new_event_threshold = 4096,
-	};
 	struct rte_event_queue_conf tx_q_conf = {
 			.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST,
 			.event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_LINK,
@@ -297,7 +152,6 @@ setup_eventdev_generic(struct cons_data *cons_data,

 	struct port_link worker_queues[MAX_NUM_STAGES];
 	uint8_t disable_implicit_release;
-	struct port_link tx_queue;
 	unsigned int i;

 	int ret, ndev = rte_event_dev_count();
@@ -314,7 +168,6 @@ setup_eventdev_generic(struct cons_data *cons_data,
 			RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE);

 	wkr_p_conf.disable_implicit_release = disable_implicit_release;
-	tx_p_conf.disable_implicit_release = disable_implicit_release;

 	if (dev_info.max_event_port_dequeue_depth <
 			config.nb_event_port_dequeue_depth)
@@ -372,8 +225,7 @@ setup_eventdev_generic(struct cons_data *cons_data,
 		printf("%d: error creating qid %d\n", __LINE__, i);
 		return -1;
 	}
-	tx_queue.queue_id = i;
-	tx_queue.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST;
+	cdata.tx_queue_id = i;

 	if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
 		wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
@@ -403,26 +255,6 @@ setup_eventdev_generic(struct cons_data *cons_data,
 		w->port_id = i;
 	}

-	if (tx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
-		tx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
-	if (tx_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
-		tx_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
-
-	/* port for consumer, linked to TX queue */
-	if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {
-		printf("Error setting up port %d\n", i);
-		return -1;
-	}
-	if (rte_event_port_link(dev_id, i, &tx_queue.queue_id,
-				&tx_queue.priority, 1) != 1) {
-		printf("%d: error creating link for port %d\n",
-				__LINE__, i);
-		return -1;
-	}
-	*cons_data = (struct cons_data){.dev_id = dev_id,
-					.port_id = i,
-					.release = disable_implicit_release };
-
 	ret = rte_event_dev_service_id_get(dev_id,
 				&fdata->evdev_service_id);
 	if (ret != -ESRCH && ret != 0) {
@@ -431,76 +263,107 @@ setup_eventdev_generic(struct cons_data *cons_data,
 	}
 	rte_service_runstate_set(fdata->evdev_service_id, 1);
 	rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
-	if (rte_event_dev_start(dev_id) < 0) {
-		printf("Error starting eventdev\n");
-		return -1;
-	}

 	return dev_id;
 }

 static void
-init_rx_adapter(uint16_t nb_ports)
+init_adapters(uint16_t nb_ports)
 {
 	int i;
 	int ret;
+	uint8_t tx_port_id = 0;
 	uint8_t evdev_id = 0;
 	struct rte_event_dev_info dev_info;

 	ret = rte_event_dev_info_get(evdev_id, &dev_info);

-	struct rte_event_port_conf rx_p_conf = {
-		.dequeue_depth = 8,
-		.enqueue_depth = 8,
-		.new_event_threshold = 1200,
+	struct rte_event_port_conf adptr_p_conf = {
+		.dequeue_depth = cdata.worker_cq_depth,
+		.enqueue_depth = 64,
+		.new_event_threshold = 4096,
 	};

-	if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
-		rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
-	if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
-		rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
+	if (adptr_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
+		adptr_p_conf.dequeue_depth =
+			dev_info.max_event_port_dequeue_depth;
+	if (adptr_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
+		adptr_p_conf.enqueue_depth =
+			dev_info.max_event_port_enqueue_depth;

 	/* Create one adapter for all the ethernet ports. */
 	ret = rte_event_eth_rx_adapter_create(cdata.rx_adapter_id, evdev_id,
-			&rx_p_conf);
+			&adptr_p_conf);
 	if (ret)
 		rte_exit(EXIT_FAILURE, "failed to create rx adapter[%d]",
 				cdata.rx_adapter_id);

+	ret = rte_event_eth_tx_adapter_create(cdata.tx_adapter_id, evdev_id,
+			&adptr_p_conf);
+	if (ret)
+		rte_exit(EXIT_FAILURE, "failed to create tx adapter[%d]",
+				cdata.tx_adapter_id);
+
 	struct rte_event_eth_rx_adapter_queue_conf queue_conf;
 	memset(&queue_conf, 0, sizeof(queue_conf));
 	queue_conf.ev.sched_type = cdata.queue_type;
 	queue_conf.ev.queue_id = cdata.qid[0];

 	for (i = 0; i < nb_ports; i++) {
-		uint32_t cap;
-
-		ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
-		if (ret)
-			rte_exit(EXIT_FAILURE,
-					"failed to get event rx adapter "
-					"capabilities");
-
 		ret = rte_event_eth_rx_adapter_queue_add(cdata.rx_adapter_id, i,
 				-1, &queue_conf);
 		if (ret)
 			rte_exit(EXIT_FAILURE,
 					"Failed to add queues to Rx adapter");
+
+		ret = rte_event_eth_tx_adapter_queue_add(cdata.tx_adapter_id, i,
+				-1);
+		if (ret)
+			rte_exit(EXIT_FAILURE,
+					"Failed to add queues to Tx adapter");
 	}

+	ret = rte_event_eth_tx_adapter_event_port_get(cdata.tx_adapter_id,
+			&tx_port_id);
+	if (ret)
+		rte_exit(EXIT_FAILURE,
+				"Failed to get Tx adapter port id");
+	ret = rte_event_port_link(evdev_id, tx_port_id, &cdata.tx_queue_id,
+			NULL, 1);
+	if (ret != 1)
+		rte_exit(EXIT_FAILURE,
+				"Unable to link Tx adapter port to Tx queue");
+
 	ret = rte_event_eth_rx_adapter_service_id_get(cdata.rx_adapter_id,
 				&fdata->rxadptr_service_id);
 	if (ret != -ESRCH && ret != 0) {
 		rte_exit(EXIT_FAILURE,
-			"Error getting the service ID for sw eventdev\n");
+			"Error getting the service ID for Rx adapter\n");
 	}
 	rte_service_runstate_set(fdata->rxadptr_service_id, 1);
 	rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, 0);

+	ret = rte_event_eth_tx_adapter_service_id_get(cdata.tx_adapter_id,
+				&fdata->txadptr_service_id);
+	if (ret != -ESRCH && ret != 0) {
+		rte_exit(EXIT_FAILURE,
+			"Error getting the service ID for Tx adapter\n");
+	}
+	rte_service_runstate_set(fdata->txadptr_service_id, 1);
+	rte_service_set_runstate_mapped_check(fdata->txadptr_service_id, 0);
+
 	ret = rte_event_eth_rx_adapter_start(cdata.rx_adapter_id);
 	if (ret)
 		rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
 				cdata.rx_adapter_id);
+
+	ret = rte_event_eth_tx_adapter_start(cdata.tx_adapter_id);
+	if (ret)
+		rte_exit(EXIT_FAILURE, "Tx adapter[%d] start failed",
+				cdata.tx_adapter_id);
+
+	if (rte_event_dev_start(evdev_id) < 0)
+		rte_exit(EXIT_FAILURE, "Error starting eventdev");
 }

 static void
@@ -510,6 +373,7 @@ generic_opt_check(void)
 	int ret;
 	uint32_t cap = 0;
 	uint8_t rx_needed = 0;
+	uint8_t sched_needed = 0;
 	struct rte_event_dev_info eventdev_info;

 	memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
@@ -519,6 +383,8 @@ generic_opt_check(void)
 				RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES))
 		rte_exit(EXIT_FAILURE,
 				"Event dev doesn't support all type queues\n");
+	sched_needed = !(eventdev_info.event_dev_cap &
+		RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED);

 	RTE_ETH_FOREACH_DEV(i) {
 		ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap);
@@ -531,9 +397,8 @@ generic_opt_check(void)

 	if (cdata.worker_lcore_mask == 0 ||
 			(rx_needed && cdata.rx_lcore_mask == 0) ||
-			cdata.tx_lcore_mask == 0 || (cdata.sched_lcore_mask == 0
-				&& !(eventdev_info.event_dev_cap &
-					RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) {
+			(cdata.tx_lcore_mask == 0) ||
+			(sched_needed && cdata.sched_lcore_mask == 0)) {
 		printf("Core part of pipeline was not assigned any cores. "
 			"This will stall the pipeline, please check core masks "
 			"(use -h for details on setting core masks):\n"
@@ -545,23 +410,24 @@ generic_opt_check(void)
 		rte_exit(-1, "Fix core masks\n");
 	}

-	if (eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)
+	if (!sched_needed)
 		memset(fdata->sched_core, 0,
 				sizeof(unsigned int) * MAX_NUM_CORE);
+	if (!rx_needed)
+		memset(fdata->rx_core, 0,
+				sizeof(unsigned int) * MAX_NUM_CORE);
 }

 void
 set_worker_generic_setup_data(struct setup_data *caps, bool burst)
 {
 	if (burst) {
-		caps->consumer = consumer_burst;
 		caps->worker = worker_generic_burst;
 	} else {
-		caps->consumer = consumer;
 		caps->worker = worker_generic;
 	}

-	caps->adptr_setup = init_rx_adapter;
+	caps->adptr_setup = init_adapters;
 	caps->scheduler = schedule_devices;
 	caps->evdev_setup = setup_eventdev_generic;
 	caps->check_opt = generic_opt_check;
diff --git a/examples/eventdev_pipeline/pipeline_worker_tx.c b/examples/eventdev_pipeline/pipeline_worker_tx.c
index 3dbde92df..85eb075fc 100644
--- a/examples/eventdev_pipeline/pipeline_worker_tx.c
+++ b/examples/eventdev_pipeline/pipeline_worker_tx.c
@@ -36,10 +36,11 @@ worker_event_enqueue_burst(const uint8_t dev, const uint8_t port,
 }

 static __rte_always_inline void
-worker_tx_pkt(struct rte_mbuf *mbuf)
+worker_tx_pkt(const uint8_t dev, const uint8_t port, struct rte_event *ev)
 {
-	exchange_mac(mbuf);
-	while (rte_eth_tx_burst(mbuf->port, 0, &mbuf, 1) != 1)
+	exchange_mac(ev->mbuf);
+	rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0);
+	while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1))
 		rte_pause();
 }

@@ -64,15 +65,15 @@ worker_do_tx_single(void *arg)
 		received++;

 		if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-			worker_tx_pkt(ev.mbuf);
+			worker_tx_pkt(dev, port, &ev);
 			tx++;
-			continue;
+		} else {
+			work();
+			ev.queue_id++;
+			worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+			worker_event_enqueue(dev, port, &ev);
+			fwd++;
 		}
-		work();
-		ev.queue_id++;
-		worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
-		worker_event_enqueue(dev, port, &ev);
-		fwd++;
 	}

 	if (!cdata.quiet)
@@ -100,14 +101,14 @@ worker_do_tx_single_atq(void *arg)
 		received++;

 		if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-			worker_tx_pkt(ev.mbuf);
+			worker_tx_pkt(dev, port, &ev);
 			tx++;
-			continue;
+		} else {
+			work();
+			worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+			worker_event_enqueue(dev, port, &ev);
+			fwd++;
 		}
-		work();
-		worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
-		worker_event_enqueue(dev, port, &ev);
-		fwd++;
 	}

 	if (!cdata.quiet)
@@ -141,7 +142,7 @@ worker_do_tx_single_burst(void *arg)
 			rte_prefetch0(ev[i + 1].mbuf);
 			if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {

-				worker_tx_pkt(ev[i].mbuf);
+				worker_tx_pkt(dev, port, &ev[i]);
 				ev[i].op = RTE_EVENT_OP_RELEASE;
 				tx++;

@@ -188,7 +189,7 @@ worker_do_tx_single_burst_atq(void *arg)
 			rte_prefetch0(ev[i + 1].mbuf);
 			if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {

-				worker_tx_pkt(ev[i].mbuf);
+				worker_tx_pkt(dev, port, &ev[i]);
 				ev[i].op = RTE_EVENT_OP_RELEASE;
 				tx++;
 			} else
@@ -232,7 +233,7 @@ worker_do_tx(void *arg)

 		if (cq_id >= lst_qid) {
 			if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-				worker_tx_pkt(ev.mbuf);
+				worker_tx_pkt(dev, port, &ev);
 				tx++;
 				continue;
 			}
@@ -280,7 +281,7 @@ worker_do_tx_atq(void *arg)

 		if (cq_id == lst_qid) {
 			if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-				worker_tx_pkt(ev.mbuf);
+				worker_tx_pkt(dev, port, &ev);
 				tx++;
 				continue;
 			}
@@ -330,7 +331,7 @@ worker_do_tx_burst(void *arg)

 			if (cq_id >= lst_qid) {
 				if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
-					worker_tx_pkt(ev[i].mbuf);
+					worker_tx_pkt(dev, port, &ev[i]);
 					tx++;
 					ev[i].op = RTE_EVENT_OP_RELEASE;
 					continue;
@@ -387,7 +388,7 @@ worker_do_tx_burst_atq(void *arg)

 			if (cq_id == lst_qid) {
 				if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
-					worker_tx_pkt(ev[i].mbuf);
+					worker_tx_pkt(dev, port, &ev[i]);
 					tx++;
 					ev[i].op = RTE_EVENT_OP_RELEASE;
 					continue;
@@ -413,10 +414,8 @@ worker_do_tx_burst_atq(void *arg)
 }

 static int
-setup_eventdev_worker_tx(struct cons_data *cons_data,
-		struct worker_data *worker_data)
+setup_eventdev_worker_tx_enq(struct worker_data *worker_data)
 {
-	RTE_SET_USED(cons_data);
 	uint8_t i;
 	const uint8_t atq = cdata.all_type_queues ? 1 : 0;
 	const uint8_t dev_id = 0;
@@ -575,10 +574,9 @@ setup_eventdev_worker_tx(struct cons_data *cons_data,
 	}
 	rte_service_runstate_set(fdata->evdev_service_id, 1);
 	rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
-	if (rte_event_dev_start(dev_id) < 0) {
-		printf("Error starting eventdev\n");
-		return -1;
-	}
+
+	if (rte_event_dev_start(dev_id) < 0)
+		rte_exit(EXIT_FAILURE, "Error starting eventdev");

 	return dev_id;
 }
@@ -602,7 +600,7 @@ service_rx_adapter(void *arg)
 }

 static void
-init_rx_adapter(uint16_t nb_ports)
+init_adapters(uint16_t nb_ports)
 {
 	int i;
 	int ret;
@@ -613,17 +611,18 @@ init_rx_adapter(uint16_t nb_ports)
 	ret = rte_event_dev_info_get(evdev_id, &dev_info);
 	adptr_services = rte_zmalloc(NULL, sizeof(struct rx_adptr_services), 0);

-	struct rte_event_port_conf rx_p_conf = {
-		.dequeue_depth = 8,
-		.enqueue_depth = 8,
-		.new_event_threshold = 1200,
+	struct rte_event_port_conf adptr_p_conf = {
+		.dequeue_depth = cdata.worker_cq_depth,
+		.enqueue_depth = 64,
+		.new_event_threshold = 4096,
 	};

-	if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
-		rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
-	if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
-		rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
-
+	if (adptr_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
+		adptr_p_conf.dequeue_depth =
+			dev_info.max_event_port_dequeue_depth;
+	if (adptr_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
+		adptr_p_conf.enqueue_depth =
+			dev_info.max_event_port_enqueue_depth;

 	struct rte_event_eth_rx_adapter_queue_conf queue_conf;
 	memset(&queue_conf, 0, sizeof(queue_conf));
@@ -633,11 +632,11 @@ init_rx_adapter(uint16_t nb_ports)
 		uint32_t cap;
 		uint32_t service_id;

-		ret = rte_event_eth_rx_adapter_create(i, evdev_id, &rx_p_conf);
+		ret = rte_event_eth_rx_adapter_create(i, evdev_id,
+				&adptr_p_conf);
 		if (ret)
 			rte_exit(EXIT_FAILURE,
-					"failed to create rx adapter[%d]",
-					cdata.rx_adapter_id);
+					"failed to create rx adapter[%d]", i);

 		ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
 		if (ret)
@@ -654,7 +653,6 @@ init_rx_adapter(uint16_t nb_ports)
 			rte_exit(EXIT_FAILURE,
 					"Failed to add queues to Rx adapter");

-
 		/* Producer needs to be scheduled. */
 		if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) {
 			ret = rte_event_eth_rx_adapter_service_id_get(i,
@@ -680,9 +678,29 @@ init_rx_adapter(uint16_t nb_ports)
 		ret = rte_event_eth_rx_adapter_start(i);
 		if (ret)
 			rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
-					cdata.rx_adapter_id);
+					i);
 	}

+	/* We already know that Tx adapter has INTERNAL port cap*/
+	ret = rte_event_eth_tx_adapter_create(cdata.tx_adapter_id, evdev_id,
+			&adptr_p_conf);
+	if (ret)
+		rte_exit(EXIT_FAILURE, "failed to create tx adapter[%d]",
+				cdata.tx_adapter_id);
+
+	for (i = 0; i < nb_ports; i++) {
+		ret = rte_event_eth_tx_adapter_queue_add(cdata.tx_adapter_id, i,
+				-1);
+		if (ret)
+			rte_exit(EXIT_FAILURE,
+					"Failed to add queues to Tx adapter");
+	}
+
+	ret = rte_event_eth_tx_adapter_start(cdata.tx_adapter_id);
+	if (ret)
+		rte_exit(EXIT_FAILURE, "Tx adapter[%d] start failed",
+				cdata.tx_adapter_id);
+
 	if (adptr_services->nb_rx_adptrs) {
 		struct rte_service_spec service;

@@ -695,8 +713,7 @@ init_rx_adapter(uint16_t nb_ports)
 				&fdata->rxadptr_service_id);
 		if (ret)
 			rte_exit(EXIT_FAILURE,
-				"Rx adapter[%d] service register failed",
-				cdata.rx_adapter_id);
+				"Rx adapter service register failed");

 		rte_service_runstate_set(fdata->rxadptr_service_id, 1);
 		rte_service_component_runstate_set(fdata->rxadptr_service_id,
@@ -708,23 +725,19 @@ init_rx_adapter(uint16_t nb_ports)
 		rte_free(adptr_services);
 	}

-	if (!adptr_services->nb_rx_adptrs && fdata->cap.consumer == NULL &&
-			(dev_info.event_dev_cap &
+	if (!adptr_services->nb_rx_adptrs && (dev_info.event_dev_cap &
 			 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))
 		fdata->cap.scheduler = NULL;
-
-	if (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)
-		memset(fdata->sched_core, 0,
-				sizeof(unsigned int) * MAX_NUM_CORE);
 }

 static void
-worker_tx_opt_check(void)
+worker_tx_enq_opt_check(void)
 {
 	int i;
 	int ret;
 	uint32_t cap = 0;
 	uint8_t rx_needed = 0;
+	uint8_t sched_needed = 0;
 	struct rte_event_dev_info eventdev_info;

 	memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
@@ -734,32 +747,38 @@ worker_tx_opt_check(void)
 				RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES))
 		rte_exit(EXIT_FAILURE,
 				"Event dev doesn't support all type queues\n");
+	sched_needed = !(eventdev_info.event_dev_cap &
+		RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED);

 	RTE_ETH_FOREACH_DEV(i) {
 		ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap);
 		if (ret)
 			rte_exit(EXIT_FAILURE,
-					"failed to get event rx adapter "
-					"capabilities");
+				"failed to get event rx adapter capabilities");
 		rx_needed |=
 			!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT);
 	}

 	if (cdata.worker_lcore_mask == 0 ||
 			(rx_needed && cdata.rx_lcore_mask == 0) ||
-			(cdata.sched_lcore_mask == 0 &&
-			 !(eventdev_info.event_dev_cap &
-				 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) {
+			(sched_needed && cdata.sched_lcore_mask == 0)) {
 		printf("Core part of pipeline was not assigned any cores. "
 			"This will stall the pipeline, please check core masks "
 			"(use -h for details on setting core masks):\n"
-			"\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64
-			"\n\tworkers: %"PRIu64"\n",
-			cdata.rx_lcore_mask, cdata.tx_lcore_mask,
-			cdata.sched_lcore_mask,
-			cdata.worker_lcore_mask);
+			"\trx: %"PRIu64"\n\tsched: %"PRIu64
+			"\n\tworkers: %"PRIu64"\n", cdata.rx_lcore_mask,
+			cdata.sched_lcore_mask, cdata.worker_lcore_mask);
 		rte_exit(-1, "Fix core masks\n");
 	}
+
+	if (!sched_needed)
+		memset(fdata->sched_core, 0,
+				sizeof(unsigned int) * MAX_NUM_CORE);
+	if (!rx_needed)
+		memset(fdata->rx_core, 0,
+				sizeof(unsigned int) * MAX_NUM_CORE);
+
+	memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE);
 }

 static worker_loop
@@ -821,18 +840,15 @@ get_worker_multi_stage(bool burst)
 }

 void
-set_worker_tx_setup_data(struct setup_data *caps, bool burst)
+set_worker_tx_enq_setup_data(struct setup_data *caps, bool burst)
 {
 	if (cdata.num_stages == 1)
 		caps->worker = get_worker_single_stage(burst);
 	else
 		caps->worker = get_worker_multi_stage(burst);

-	memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE);
-
-	caps->check_opt = worker_tx_opt_check;
-	caps->consumer = NULL;
+	caps->check_opt = worker_tx_enq_opt_check;
 	caps->scheduler = schedule_devices;
-	caps->evdev_setup = setup_eventdev_worker_tx;
-	caps->adptr_setup = init_rx_adapter;
+	caps->evdev_setup = setup_eventdev_worker_tx_enq;
+	caps->adptr_setup = init_adapters;
 }
--
2.19.0

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [dpdk-dev] [PATCH v2] examples/eventdev_pipeline: add Tx adapter support
  2018-09-24 10:12 ` [dpdk-dev] [PATCH v2] " Pavan Nikhilesh
@ 2018-09-26 12:56   ` Rao, Nikhil
  2018-09-28 11:15     ` Jerin Jacob
  0 siblings, 1 reply; 11+ messages in thread
From: Rao, Nikhil @ 2018-09-26 12:56 UTC (permalink / raw)
  To: Pavan Nikhilesh, jerin.jacob, harry.van.haaren, anoob.joseph
  Cc: dev, Rao, Nikhil

On 9/24/2018 3:42 PM, Pavan Nikhilesh wrote:
> Redo the worker pipelines and offload transmission to service cores
> seamlessly through Tx adapter.
> 
> Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
> ---
>   v2 Changes:
>   - Updated enqueue,dequeue depth thresholds.
>   - remove redundant capability checks.

Reviewed-by: Nikhil Rao <nikhil.rao@intel.com>
Tested-by: Nikhil Rao <nikhil.rao@intel.com>

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [dpdk-dev] [PATCH v2] examples/eventdev_pipeline: add Tx adapter support
  2018-09-26 12:56   ` Rao, Nikhil
@ 2018-09-28 11:15     ` Jerin Jacob
  2018-10-03  8:47       ` Rao, Nikhil
  0 siblings, 1 reply; 11+ messages in thread
From: Jerin Jacob @ 2018-09-28 11:15 UTC (permalink / raw)
  To: Rao, Nikhil; +Cc: Pavan Nikhilesh, harry.van.haaren, anoob.joseph, dev

-----Original Message-----
> Date: Wed, 26 Sep 2018 18:26:37 +0530
> From: "Rao, Nikhil" <nikhil.rao@intel.com>
> To: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>,
>  jerin.jacob@caviumnetworks.com, harry.van.haaren@intel.com,
>  anoob.joseph@caviumnetworks.com
> CC: dev@dpdk.org, "Rao, Nikhil" <nikhil.rao@intel.com>
> Subject: Re: [PATCH v2] examples/eventdev_pipeline: add Tx adapter support
> User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64; rv:60.0) Gecko/20100101
>  Thunderbird/60.0
> 
> On 9/24/2018 3:42 PM, Pavan Nikhilesh wrote:
> > Redo the worker pipelines and offload transmission to service cores
> > seamlessly through Tx adapter.
> > 
> > Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
> > ---
> >   v2 Changes:
> >   - Updated enqueue,dequeue depth thresholds.
> >   - remove redundant capability checks.
> 
> Reviewed-by: Nikhil Rao <nikhil.rao@intel.com>
> Tested-by: Nikhil Rao <nikhil.rao@intel.com>


Applied to dpdk-next-eventdev/master. Thanks.

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [dpdk-dev] [PATCH v2] examples/eventdev_pipeline: add Tx adapter support
  2018-09-28 11:15     ` Jerin Jacob
@ 2018-10-03  8:47       ` Rao, Nikhil
  0 siblings, 0 replies; 11+ messages in thread
From: Rao, Nikhil @ 2018-10-03  8:47 UTC (permalink / raw)
  To: Pavan Nikhilesh; +Cc: Van Haaren, Harry, anoob.joseph, dev, Jerin Jacob

> -----Original Message-----
> From: Jerin Jacob [mailto:jerin.jacob@caviumnetworks.com]
> Sent: Friday, September 28, 2018 4:46 PM
> To: Rao, Nikhil <nikhil.rao@intel.com>
> Cc: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>; Van Haaren,
> Harry <harry.van.haaren@intel.com>; anoob.joseph@caviumnetworks.com;
> dev@dpdk.org
> Subject: Re: [PATCH v2] examples/eventdev_pipeline: add Tx adapter support
> 
> -----Original Message-----
> > Date: Wed, 26 Sep 2018 18:26:37 +0530
> > From: "Rao, Nikhil" <nikhil.rao@intel.com>
> > To: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>,
> >  jerin.jacob@caviumnetworks.com, harry.van.haaren@intel.com,
> > anoob.joseph@caviumnetworks.com
> > CC: dev@dpdk.org, "Rao, Nikhil" <nikhil.rao@intel.com>
> > Subject: Re: [PATCH v2] examples/eventdev_pipeline: add Tx adapter
> > support
> > User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64; rv:60.0)
> > Gecko/20100101
> >  Thunderbird/60.0
> >
> > On 9/24/2018 3:42 PM, Pavan Nikhilesh wrote:
> > > Redo the worker pipelines and offload transmission to service cores
> > > seamlessly through Tx adapter.
> > >
> > > Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
> > > ---
> > >   v2 Changes:
> > >   - Updated enqueue,dequeue depth thresholds.
> > >   - remove redundant capability checks.
> >
> > Reviewed-by: Nikhil Rao <nikhil.rao@intel.com>
> > Tested-by: Nikhil Rao <nikhil.rao@intel.com>
> 
> 
> Applied to dpdk-next-eventdev/master. Thanks.
> 
Hi Pavan,

A couple of questions:

1. For the generic pipeline case with the SW PMD,  the tx adapter port's default dequeue depth value causes  performance regression from ~12 mpps to 7mpps. While the -c command line argument is also applicable to the tx adapter's port,  since packet flow to the tx adapter port is of a different nature than the worker ports, how about adding the  dequeue depth for the tx adapter port as a separate command line argument with a default value of 128 ?

2.  -n parameter and stats for the generic pipeline that were available in the app previously do not work, do you plan to add this functionality  ? 

Thanks,
Nikhil

^ permalink raw reply	[flat|nested] 11+ messages in thread

end of thread, other threads:[~2018-10-03  8:47 UTC | newest]

Thread overview: 11+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-09-05 13:45 [dpdk-dev] [PATCH] examples/eventdev_pipeline: add Tx adapter support Pavan Nikhilesh
2018-09-17 14:17 ` Jerin Jacob
2018-09-19  2:54 ` Rao, Nikhil
2018-09-19 10:42   ` Pavan Nikhilesh
2018-09-20  2:18     ` Rao, Nikhil
2018-09-21  7:44 ` Rao, Nikhil
2018-09-24  9:51   ` Pavan Nikhilesh
2018-09-24 10:12 ` [dpdk-dev] [PATCH v2] " Pavan Nikhilesh
2018-09-26 12:56   ` Rao, Nikhil
2018-09-28 11:15     ` Jerin Jacob
2018-10-03  8:47       ` Rao, Nikhil

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).