DPDK patches and discussions
 help / color / mirror / Atom feed
* [dpdk-dev] [PATCH 0/7] Introduce event vectorization
@ 2021-02-20 22:09 pbhagavatula
  2021-02-20 22:09 ` [dpdk-dev] [PATCH 1/7] eventdev: introduce event vector capability pbhagavatula
                   ` (6 more replies)
  0 siblings, 7 replies; 8+ messages in thread
From: pbhagavatula @ 2021-02-20 22:09 UTC (permalink / raw)
  To: jerinj, jay.jayatheerthan, erik.g.carrillo, abhinandan.gujjar,
	timothy.mcdaniel, hemant.agrawal, harry.van.haaren,
	mattias.ronnblom, liang.j.ma
  Cc: dev, Pavan Nikhilesh

From: Pavan Nikhilesh <pbhagavatula@marvell.com>

In traditional event programming model, events are identified by a
flow-id and a uintptr_t. The flow-id uniquely identifies a given event
and determines the order of scheduling based on schedule type, the
uintptr_t holds a single object.

Event devices also support burst mode with configurable dequeue depth,
i.e. each dequeue call would return multiple events and each event
might be at a different stage of the pipeline.
Having a burst of events belonging to different stages in a dequeue
burst is not only difficult to vectorize but also increases the scheduler
overhead and application overhead of pipelining events further.
Using event vectors we see a performance gain of ~150% as shown in [1].

By introducing event vectorization, each event will be capable of holding
multiple uintptr_t of the same flow thereby allowing applications
to vectorize their pipeline and reduce the complexity of pipelining
events across multiple stages. This also reduces the complexity of handling
enqueue and dequeue on an event device.

Since event devices are transparent to the events they are scheduling
so the event producers such as eth_rx_adapter, crypto_adapter , etc..
are responsible for vectorizing the buffers of the same flow into a single
event.

The series also breaks ABI in [2/7] patch which we fix in [7/7]. The patch
[7/7] can be changed in the next major release i.e. v21.11.

The dpdk-test-eventdev application has been updated with options to test
multiple vector sizes and timeouts.

[1]
As for performance improvement, with a ARM Cortex-A72 equivalent processer,
software event device (--vdev=event_sw0), single worker core, single stage
and using one service core for Rx adapter, Tx adapter, Scheduling.

Without event vectorization:
    ./build/app/dpdk-test-eventdev -l 7-23 -s 0x700 --vdev="event_sw0" --
         --prod_type_ethdev --nb_pkts=0 --verbose 2 --test=pipeline_queue
         --stlist=a --wlcores=20
    Port[0] using Rx adapter[0] configured
    Port[0] using Tx adapter[0] Configured
    4.728 mpps avg 4.728 mpps

With event vectorization:
    ./build/app/dpdk-test-eventdev -l 7-23 -s 0x700 --vdev="event_sw0" --
        --prod_type_ethdev --nb_pkts=0 --verbose 2 --test=pipeline_queue
        --stlist=a --wlcores=20 --enable_vector --nb_eth_queues 1
        --vector_size 256
    Port[0] using Rx adapter[0] configured
    Port[0] using Tx adapter[0] Configured
    34.383 mpps avg 34.383 mpps

Having dedicated service cores for each Rx queues and tweaking the vector,
dequeue burst size would further improve performance.

API usage is shown below:

Configuration:

	struct rte_event_eth_rx_adapter_event_vector_config vec_conf;

	vector_pool = rte_event_vector_pool_create("vector_pool",
			nb_elem, 0, vector_size, socket_id);

	rte_event_eth_rx_adapter_create(id, event_id, &adptr_conf);
	rte_event_eth_rx_adapter_queue_add(id, eth_id, -1, &queue_conf);
	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) {
		vec_conf.vector_sz = vector_size;
		vec_conf.vector_timeout_ns = vector_tmo_nsec;
		vec_conf.vector_mp = vector_pool;
		rte_event_eth_rx_adapter_queue_event_vector_config(id,
				eth_id, -1, &vec_conf);
	}

Fastpath:

	num = rte_event_dequeue_burst(event_id, port_id, &ev, 1, 0);
	if (!num)
		continue;

	if (ev.event_type & RTE_EVENT_TYPE_VECTOR) {
		switch (ev.event_type) {
		case RTE_EVENT_TYPE_ETHDEV_VECTOR:
		case RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR:
			struct rte_mbuf **mbufs;

			mbufs = ev.vector_ev->mbufs;
			for (i = 0; i < ev.vector_ev->nb_elem; i++)
				//Process mbufs.
			break;
		case ...
		}
	}
	...

Pavan Nikhilesh (7):
  eventdev: introduce event vector capability
  eventdev: introduce event vector Rx capability
  eventdev: introduce event vector Tx capability
  eventdev: add Rx adapter event vector support
  eventdev: add Tx adapter event vector support
  app/eventdev: add event vector mode in pipeline test
  eventdev: fix ABI breakage due to event vector

 app/test-eventdev/evt_common.h                |   4 +
 app/test-eventdev/evt_options.c               |  52 +++
 app/test-eventdev/evt_options.h               |   4 +
 app/test-eventdev/test_pipeline_atq.c         | 310 +++++++++++++--
 app/test-eventdev/test_pipeline_common.c      |  77 +++-
 app/test-eventdev/test_pipeline_common.h      |  18 +
 app/test-eventdev/test_pipeline_queue.c       | 320 +++++++++++++--
 .../prog_guide/event_ethernet_rx_adapter.rst  |  38 ++
 .../prog_guide/event_ethernet_tx_adapter.rst  |  12 +
 doc/guides/prog_guide/eventdev.rst            |  36 +-
 doc/guides/tools/testeventdev.rst             |  28 ++
 lib/librte_eventdev/eventdev_pmd.h            |  60 ++-
 .../rte_event_eth_rx_adapter.c                | 367 +++++++++++++++++-
 .../rte_event_eth_rx_adapter.h                |  93 +++++
 .../rte_event_eth_tx_adapter.c                |  66 +++-
 lib/librte_eventdev/rte_eventdev.c            |  11 +-
 lib/librte_eventdev/rte_eventdev.h            | 145 ++++++-
 lib/librte_eventdev/version.map               |   5 +
 18 files changed, 1560 insertions(+), 86 deletions(-)

--
2.17.1


^ permalink raw reply	[flat|nested] 8+ messages in thread

* [dpdk-dev] [PATCH 1/7] eventdev: introduce event vector capability
  2021-02-20 22:09 [dpdk-dev] [PATCH 0/7] Introduce event vectorization pbhagavatula
@ 2021-02-20 22:09 ` pbhagavatula
  2021-02-20 22:09 ` [dpdk-dev] [PATCH 2/7] eventdev: introduce event vector Rx capability pbhagavatula
                   ` (5 subsequent siblings)
  6 siblings, 0 replies; 8+ messages in thread
From: pbhagavatula @ 2021-02-20 22:09 UTC (permalink / raw)
  To: jerinj, jay.jayatheerthan, erik.g.carrillo, abhinandan.gujjar,
	timothy.mcdaniel, hemant.agrawal, harry.van.haaren,
	mattias.ronnblom, liang.j.ma, Ray Kinsella, Neil Horman
  Cc: dev, Pavan Nikhilesh

From: Pavan Nikhilesh <pbhagavatula@marvell.com>

Introduce rte_event_vector datastructure which is capable of holding
multiple uintptr_t of the same flow thereby allowing applications
to vectorize their pipeline and reducing the complexity of pipelining
the events across multiple stages.
This approach also reduces the scheduling overhead on a event device.

Add a event vector mempool create handler to create mempools based on
the best mempool ops available on a given platform.

Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
---
 doc/guides/prog_guide/eventdev.rst |  36 ++++++++-
 lib/librte_eventdev/rte_eventdev.h | 113 ++++++++++++++++++++++++++++-
 lib/librte_eventdev/version.map    |   3 +
 3 files changed, 149 insertions(+), 3 deletions(-)

diff --git a/doc/guides/prog_guide/eventdev.rst b/doc/guides/prog_guide/eventdev.rst
index ccde086f6..d19c91ab0 100644
--- a/doc/guides/prog_guide/eventdev.rst
+++ b/doc/guides/prog_guide/eventdev.rst
@@ -63,13 +63,45 @@ the actual event being scheduled is. The payload is a union of the following:
 * ``uint64_t u64``
 * ``void *event_ptr``
 * ``struct rte_mbuf *mbuf``
+* ``struct rte_event_vector *vec``
 
-These three items in a union occupy the same 64 bits at the end of the rte_event
+These four items in a union occupy the same 64 bits at the end of the rte_event
 structure. The application can utilize the 64 bits directly by accessing the
-u64 variable, while the event_ptr and mbuf are provided as convenience
+u64 variable, while the event_ptr, mbuf, vec are provided as convenience
 variables.  For example the mbuf pointer in the union can used to schedule a
 DPDK packet.
 
+Event Vector
+~~~~~~~~~~~~
+
+The rte_event_vector struct contains a vector of elements defined by the event
+type specified in the ``rte_event``. The event_vector structure contains the
+following data:
+
+* ``nb_elem`` - The number of elements held within the vector.
+
+Similar to ``rte_event`` the payload of event vector is also a union, allowing
+flexibility in what the actual vector is.
+
+* ``struct rte_mbuf *mbufs[0]`` - An array of mbufs.
+* ``void *ptrs[0]`` - An array of pointers.
+* ``uint64_t *u64s[0]`` - An array of uint64_t elements.
+
+The size of the event vector is related to the total number of elements it is
+configured to hold, this is achieved by making `rte_event_vector` a variable
+length structure.
+A helper function is provided to create a mempool that holds event vector, which
+takes name of the pool, total number of required ``rte_event_vector``,
+cache size, number of elements in each ``rte_event_vector`` and socket id.
+
+.. code-block:: c
+
+        rte_event_vector_pool_create("vector_pool", nb_event_vectors, cache_sz,
+                                     nb_elements_per_vector, socket_id);
+
+The function ``rte_event_vector_pool_create`` creates mempool with the best
+platform mempool ops.
+
 Queues
 ~~~~~~
 
diff --git a/lib/librte_eventdev/rte_eventdev.h b/lib/librte_eventdev/rte_eventdev.h
index ce1fc2ce0..ff6cb3e6a 100644
--- a/lib/librte_eventdev/rte_eventdev.h
+++ b/lib/librte_eventdev/rte_eventdev.h
@@ -212,8 +212,10 @@ extern "C" {
 
 #include <rte_common.h>
 #include <rte_config.h>
-#include <rte_memory.h>
 #include <rte_errno.h>
+#include <rte_mbuf_pool_ops.h>
+#include <rte_memory.h>
+#include <rte_mempool.h>
 
 #include "rte_eventdev_trace_fp.h"
 
@@ -913,6 +915,25 @@ rte_event_dev_stop_flush_callback_register(uint8_t dev_id,
 int
 rte_event_dev_close(uint8_t dev_id);
 
+/**
+ * Event vector structure.
+ */
+struct rte_event_vector {
+	uint64_t nb_elem : 16;
+	/**< Number of elements in this event vector. */
+	uint64_t rsvd : 48;
+	uint64_t impl_opaque;
+	union {
+		struct rte_mbuf *mbufs[0];
+		void *ptrs[0];
+		uint64_t *u64s[0];
+	} __rte_aligned(16);
+	/**< Start of the vector array union. Depending upon the event type the
+	 * vector array can be an array of mbufs or pointers or opaque u64
+	 * values.
+	 */
+};
+
 /* Scheduler type definitions */
 #define RTE_SCHED_TYPE_ORDERED          0
 /**< Ordered scheduling
@@ -986,6 +1007,21 @@ rte_event_dev_close(uint8_t dev_id);
  */
 #define RTE_EVENT_TYPE_ETH_RX_ADAPTER   0x4
 /**< The event generated from event eth Rx adapter */
+#define RTE_EVENT_TYPE_VECTOR           0x8
+/**< Indicates that event is a vector.
+ * All vector event types should be an logical OR of EVENT_TYPE_VECTOR.
+ * This simplifies the pipeline design as we can split processing the events
+ * between vector events and normal event across event types.
+ * Example:
+ *	if (ev.event_type & RTE_EVENT_TYPE_VECTOR) {
+ *		// Classify and handle vector event.
+ *	} else {
+ *		// Classify and handle event.
+ *	}
+ */
+#define RTE_EVENT_TYPE_CPU_VECTOR (RTE_EVENT_TYPE_VECTOR | RTE_EVENT_TYPE_CPU)
+/**< The event vector generated from cpu for pipelining. */
+
 #define RTE_EVENT_TYPE_MAX              0x10
 /**< Maximum number of event types */
 
@@ -1108,6 +1144,8 @@ struct rte_event {
 		/**< Opaque event pointer */
 		struct rte_mbuf *mbuf;
 		/**< mbuf pointer if dequeued event is associated with mbuf */
+		struct rte_event_vector *vec;
+		/**< Event vector pointer. */
 	};
 };
 
@@ -2023,6 +2061,79 @@ rte_event_dev_xstats_reset(uint8_t dev_id,
  */
 int rte_event_dev_selftest(uint8_t dev_id);
 
+/**
+ * Get the memory required per event vector based on number of elements per
+ * vector.
+ * This should be used to create the mempool that holds the event vectors.
+ *
+ * @param name
+ *   The name of the vector pool.
+ * @param n
+ *   The number of elements in the mbuf pool.
+ * @param cache_size
+ *   Size of the per-core object cache. See rte_mempool_create() for
+ *   details.
+ * @param nb_elem
+ *   The number of elements then a single event vector should be able to hold.
+ * @param socket_id
+ *   The socket identifier where the memory should be allocated. The
+ *   value can be *SOCKET_ID_ANY* if there is no NUMA constraint for the
+ *   reserved zone
+ *
+ * @return
+ *   The pointer to the new allocated mempool, on success. NULL on error
+ *   with rte_errno set appropriately. Possible rte_errno values include:
+ *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
+ *    - E_RTE_SECONDARY - function was called from a secondary process instance
+ *    - EINVAL - cache size provided is too large, or priv_size is not aligned.
+ *    - ENOSPC - the maximum number of memzones has already been allocated
+ *    - EEXIST - a memzone with the same name already exists
+ *    - ENOMEM - no appropriate memory area found in which to create memzone
+ */
+__rte_experimental
+static inline struct rte_mempool *
+rte_event_vector_pool_create(const char *name, unsigned int n,
+			     unsigned int cache_size, uint16_t nb_elem,
+			     int socket_id)
+{
+	const char *mp_ops_name;
+	struct rte_mempool *mp;
+	unsigned int elt_sz;
+	int ret;
+
+	if (!nb_elem) {
+		RTE_LOG(ERR, EVENTDEV,
+			"Invalid number of elements=%d requested\n", nb_elem);
+		rte_errno = -EINVAL;
+		return NULL;
+	}
+
+	elt_sz =
+		sizeof(struct rte_event_vector) + (nb_elem * sizeof(uintptr_t));
+	mp = rte_mempool_create_empty(name, n, elt_sz, cache_size, 0, socket_id,
+				      0);
+	if (mp == NULL)
+		return NULL;
+
+	mp_ops_name = rte_mbuf_best_mempool_ops();
+	ret = rte_mempool_set_ops_byname(mp, mp_ops_name, NULL);
+	if (ret != 0) {
+		RTE_LOG(ERR, EVENTDEV, "error setting mempool handler\n");
+		rte_mempool_free(mp);
+		rte_errno = -ret;
+		return NULL;
+	}
+
+	ret = rte_mempool_populate_default(mp);
+	if (ret < 0) {
+		rte_mempool_free(mp);
+		rte_errno = -ret;
+		return NULL;
+	}
+
+	return mp;
+}
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/lib/librte_eventdev/version.map b/lib/librte_eventdev/version.map
index 3e5c09cfd..a070ef56e 100644
--- a/lib/librte_eventdev/version.map
+++ b/lib/librte_eventdev/version.map
@@ -138,6 +138,9 @@ EXPERIMENTAL {
 	__rte_eventdev_trace_port_setup;
 	# added in 20.11
 	rte_event_pmd_pci_probe_named;
+
+	#added in 21.05
+	rte_event_vector_pool_create;
 };
 
 INTERNAL {
-- 
2.17.1


^ permalink raw reply	[flat|nested] 8+ messages in thread

* [dpdk-dev] [PATCH 2/7] eventdev: introduce event vector Rx capability
  2021-02-20 22:09 [dpdk-dev] [PATCH 0/7] Introduce event vectorization pbhagavatula
  2021-02-20 22:09 ` [dpdk-dev] [PATCH 1/7] eventdev: introduce event vector capability pbhagavatula
@ 2021-02-20 22:09 ` pbhagavatula
  2021-02-20 22:09 ` [dpdk-dev] [PATCH 3/7] eventdev: introduce event vector Tx capability pbhagavatula
                   ` (4 subsequent siblings)
  6 siblings, 0 replies; 8+ messages in thread
From: pbhagavatula @ 2021-02-20 22:09 UTC (permalink / raw)
  To: jerinj, jay.jayatheerthan, erik.g.carrillo, abhinandan.gujjar,
	timothy.mcdaniel, hemant.agrawal, harry.van.haaren,
	mattias.ronnblom, liang.j.ma, Ray Kinsella, Neil Horman
  Cc: dev, Pavan Nikhilesh

From: Pavan Nikhilesh <pbhagavatula@marvell.com>

Introduce event ethernet Rx adapter event vector capability.

If an event eth Rx adapter has the capability of
RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR then a given Rx queue
can be configured to enable event vectorization by passing the
flag RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR to
rte_event_eth_rx_adapter_queue_conf::rx_queue_flags while configuring
Rx adapter through rte_event_eth_rx_adapter_queue_add.

The max vector size, vector timeout define the vector size and
mempool used for allocating vector event are configured through
rte_event_eth_rx_adapter_queue_add. The element size of the element
in the vector pool should be equal to
    sizeof(struct rte_event_vector) + (vector_sz * sizeof(uintptr_t))

Application can use `rte_event_get_event_vector_memory_footprint`
to get the element size before creating the vector mempool
rte_event_eth_rx_adapter_queue_conf::vector_mp.

The Rx adapter would be responsible for vectorizing the mbufs
based on the flow, the vector limits configured by the application
and add the vector event of mbufs to the event queue set via
rte_event_eth_rx_adapter_queue_conf::ev::queue_id.
It should also mark rte_event_vector::union_valid and fill
rte_event_vector::port, rte_event_vector::queue.

Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
---
 .../prog_guide/event_ethernet_rx_adapter.rst  | 38 +++++++++++
 .../rte_event_eth_rx_adapter.h                | 66 +++++++++++++++++++
 lib/librte_eventdev/rte_eventdev.h            | 30 ++++++++-
 lib/librte_eventdev/version.map               |  1 +
 4 files changed, 133 insertions(+), 2 deletions(-)

diff --git a/doc/guides/prog_guide/event_ethernet_rx_adapter.rst b/doc/guides/prog_guide/event_ethernet_rx_adapter.rst
index cb44ce0e4..735ea2439 100644
--- a/doc/guides/prog_guide/event_ethernet_rx_adapter.rst
+++ b/doc/guides/prog_guide/event_ethernet_rx_adapter.rst
@@ -186,3 +186,41 @@ the event buffer fill level is low. The
 ``rte_event_eth_rx_adapter_cb_register()`` function allow the application
 to register a callback that selects which packets to enqueue to the event
 device.
+
+Rx event vectorization
+~~~~~~~~~~~~~~~~~~~~~~
+
+The event devices, ethernet device pairs which support the capability
+``RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR`` can aggregate packets based on
+flow characteristics and generate a ``rte_event`` containing ``rte_event_vector``
+whose event type is either ``RTE_EVENT_TYPE_ETHDEV_VECTOR`` or
+``RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR``.
+The aggregation size and timeout are configurable at a queue level and the
+maximum, minimum vector sizes and timeouts vary based on the device capability
+and can be queried using ``rte_event_eth_rx_adapter_vector_limits_get``.
+The Rx adapter additionally might include useful data such as ethernet device
+port and queue identifier in the ``rte_event_vector::port`` and
+``rte_event_vector::queue`` and mark ``rte_event_vector::attr_valid`` as true.
+
+A loop processing ``rte_event_vector`` containing mbufs is shown below.
+
+.. code-block:: c
+
+        event = rte_event_dequeue_burst(event_dev, event_port, &event,
+                                        1, 0);
+        if (!event)
+                continue;
+
+        switch (ev.event_type) {
+        case RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR:
+        case RTE_EVENT_TYPE_ETHDEV_VECTOR:
+                struct rte_mbufs **mbufs;
+
+                mbufs = (struct rte_mbufs **)ev[i].vec->mbufs;
+                for (i = 0; i < ev.vec->nb_elem; i++) {
+                        // Process each mbuf.
+                }
+        break;
+        case ...
+        ...
+        }
diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.h b/lib/librte_eventdev/rte_event_eth_rx_adapter.h
index 21bb1e54c..4bdb38f08 100644
--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.h
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.h
@@ -92,6 +92,10 @@ extern "C" {
 /**< This flag indicates the flow identifier is valid
  * @see rte_event_eth_rx_adapter_queue_conf::rx_queue_flags
  */
+#define RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR	0x2
+/**< This flag indicates that mbufs arriving on the queue need to be vectorized
+ * @see rte_event_eth_rx_adapter_queue_conf::rx_queue_flags
+ */
 
 /**
  * Adapter configuration structure that the adapter configuration callback
@@ -167,6 +171,33 @@ struct rte_event_eth_rx_adapter_queue_conf {
 	 * The event adapter sets ev.event_type to RTE_EVENT_TYPE_ETHDEV in the
 	 * enqueued event.
 	 */
+	uint16_t vector_sz;
+	/**<
+	 * Indicates the maximum number for mbufs to combine and form a vector.
+	 * Should be within
+	 * @see rte_event_eth_rx_adapter_vector_limits::min_vector_sz
+	 * @see rte_event_eth_rx_adapter_vector_limits::max_vector_sz
+	 * Valid when RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR flag is set in
+	 * @see rte_event_eth_rx_adapter_queue_conf::rx_queue_flags
+	 */
+	uint64_t vector_timeout_ns;
+	/**<
+	 * Indicates the maximum number of nanoseconds to wait for receiving
+	 * mbufs. Should be within vectorization limits of the
+	 * adapter
+	 * @see rte_event_eth_rx_adapter_vector_limits::min_vector_ns
+	 * @see rte_event_eth_rx_adapter_vector_limits::max_vector_ns
+	 * Valid when RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR flag is set in
+	 * @see rte_event_eth_rx_adapter_queue_conf::rx_queue_flags
+	 */
+	struct rte_mempool *vector_mp;
+	/**<
+	 * Indicates the mempool that should be used for allocating
+	 * rte_event_vector container.
+	 * Should be created by using `rte_event_vector_pool_create`.
+	 * Valid when RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR flag is set in
+	 * @see rte_event_eth_rx_adapter_queue_conf::rx_queue_flags.
+	 */
 };
 
 /**
@@ -199,6 +230,20 @@ struct rte_event_eth_rx_adapter_stats {
 	/**< Received packet count for interrupt mode Rx queues */
 };
 
+/**
+ * A structure used to retrieve eth rx adapter vector limits.
+ */
+struct rte_event_eth_rx_adapter_vector_limits {
+	uint16_t min_sz;
+	/**< Minimum vector limit configurable. */
+	uint16_t max_sz;
+	/**< Maximum vector limit configurable. */
+	uint64_t min_timeout_ns;
+	/**< Minimum vector timeout configurable. */
+	uint64_t max_timeout_ns;
+	/**< Maximum vector timeout configurable. */
+};
+
 /**
  *
  * Callback function invoked by the SW adapter before it continues
@@ -467,6 +512,27 @@ int rte_event_eth_rx_adapter_cb_register(uint8_t id, uint16_t eth_dev_id,
 					 rte_event_eth_rx_adapter_cb_fn cb_fn,
 					 void *cb_arg);
 
+/**
+ * Retrieve vector limits for a given event dev and eth dev pair.
+ * @see rte_event_eth_rx_adapter_vector_limits
+ *
+ * @param dev_id
+ *  Event device identifier.
+ * @param eth_port_id
+ *  Port identifier of the ethernet device.
+ * @param [out] limits
+ *  A pointer to rte_event_eth_rx_adapter_vector_limits structure that has to
+ * be filled.
+ *
+ * @return
+ *  - 0: Success.
+ *  - <0: Error code on failure.
+ */
+__rte_experimental
+int rte_event_eth_rx_adapter_vector_limits_get(
+	uint8_t dev_id, uint16_t eth_port_id,
+	struct rte_event_eth_rx_adapter_vector_limits *limits);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/lib/librte_eventdev/rte_eventdev.h b/lib/librte_eventdev/rte_eventdev.h
index ff6cb3e6a..1cf3efa2d 100644
--- a/lib/librte_eventdev/rte_eventdev.h
+++ b/lib/librte_eventdev/rte_eventdev.h
@@ -919,9 +919,27 @@ rte_event_dev_close(uint8_t dev_id);
  * Event vector structure.
  */
 struct rte_event_vector {
-	uint64_t nb_elem : 16;
+	uint16_t nb_elem;
 	/**< Number of elements in this event vector. */
-	uint64_t rsvd : 48;
+	uint16_t rsvd : 15;
+	uint16_t attr_valid : 1;
+	/**< Indicates that the below union attributes have valid information.
+	 */
+	union {
+		/* Used by Rx adapter.
+		 * Indicats that all the elements in this vector belong to same
+		 * port and queue pair when originating from Rx adapter, valid
+		 * only when event type is ETHDEV_VECTOR or
+		 * ETH_RX_ADAPTER_VECTOR.
+		 */
+		struct {
+			uint16_t port;
+			/* Ethernet device port id. */
+			uint16_t queue;
+			/* Ethernet device queue id. */
+		};
+	};
+	/**< Union to hold common attributes of the vector array. */
 	uint64_t impl_opaque;
 	union {
 		struct rte_mbuf *mbufs[0];
@@ -1019,8 +1037,14 @@ struct rte_event_vector {
  *		// Classify and handle event.
  *	}
  */
+#define RTE_EVENT_TYPE_ETHDEV_VECTOR                                           \
+	(RTE_EVENT_TYPE_VECTOR | RTE_EVENT_TYPE_ETHDEV)
+/**< The event vector generated from ethdev subsystem */
 #define RTE_EVENT_TYPE_CPU_VECTOR (RTE_EVENT_TYPE_VECTOR | RTE_EVENT_TYPE_CPU)
 /**< The event vector generated from cpu for pipelining. */
+#define RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR                                   \
+	(RTE_EVENT_TYPE_VECTOR | RTE_EVENT_TYPE_ETH_RX_ADAPTER)
+/**< The event vector generated from eth Rx adapter. */
 
 #define RTE_EVENT_TYPE_MAX              0x10
 /**< Maximum number of event types */
@@ -1165,6 +1189,8 @@ struct rte_event {
  * @see struct rte_event_eth_rx_adapter_queue_conf::ev
  * @see struct rte_event_eth_rx_adapter_queue_conf::rx_queue_flags
  */
+#define RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR	0x8
+/**< Adapter supports event vectorization per ethdev. */
 
 /**
  * Retrieve the event device's ethdev Rx adapter capabilities for the
diff --git a/lib/librte_eventdev/version.map b/lib/librte_eventdev/version.map
index a070ef56e..34c1c830e 100644
--- a/lib/librte_eventdev/version.map
+++ b/lib/librte_eventdev/version.map
@@ -141,6 +141,7 @@ EXPERIMENTAL {
 
 	#added in 21.05
 	rte_event_vector_pool_create;
+	rte_event_eth_rx_adapter_vector_limits_get;
 };
 
 INTERNAL {
-- 
2.17.1


^ permalink raw reply	[flat|nested] 8+ messages in thread

* [dpdk-dev] [PATCH 3/7] eventdev: introduce event vector Tx capability
  2021-02-20 22:09 [dpdk-dev] [PATCH 0/7] Introduce event vectorization pbhagavatula
  2021-02-20 22:09 ` [dpdk-dev] [PATCH 1/7] eventdev: introduce event vector capability pbhagavatula
  2021-02-20 22:09 ` [dpdk-dev] [PATCH 2/7] eventdev: introduce event vector Rx capability pbhagavatula
@ 2021-02-20 22:09 ` pbhagavatula
  2021-02-20 22:09 ` [dpdk-dev] [PATCH 4/7] eventdev: add Rx adapter event vector support pbhagavatula
                   ` (3 subsequent siblings)
  6 siblings, 0 replies; 8+ messages in thread
From: pbhagavatula @ 2021-02-20 22:09 UTC (permalink / raw)
  To: jerinj, jay.jayatheerthan, erik.g.carrillo, abhinandan.gujjar,
	timothy.mcdaniel, hemant.agrawal, harry.van.haaren,
	mattias.ronnblom, liang.j.ma
  Cc: dev, Pavan Nikhilesh

From: Pavan Nikhilesh <pbhagavatula@marvell.com>

Introduce event vector transmit capability for event eth
tx adapter.

The capability indicates that the Tx adapter is capable of
transmitting event vectors.
When rte_event_vector::union_valid is set, the Tx adapter should
transmit all the packets to the rte_event_vector::port using the
rte_event_vector::queue.
If rte_event_vector::union_valid is not set then the Tx adapter
should peek into each mbuf to get the destination port and queue
pair.

Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
---
 doc/guides/prog_guide/event_ethernet_tx_adapter.rst | 12 ++++++++++++
 lib/librte_eventdev/rte_eventdev.h                  |  8 +++++++-
 2 files changed, 19 insertions(+), 1 deletion(-)

diff --git a/doc/guides/prog_guide/event_ethernet_tx_adapter.rst b/doc/guides/prog_guide/event_ethernet_tx_adapter.rst
index a8c13e136..87277dcaf 100644
--- a/doc/guides/prog_guide/event_ethernet_tx_adapter.rst
+++ b/doc/guides/prog_guide/event_ethernet_tx_adapter.rst
@@ -164,3 +164,15 @@ The  ``rte_event_eth_tx_adapter_stats_get()`` function reports counters defined
 in struct ``rte_event_eth_tx_adapter_stats``. The counter values are the sum of
 the counts from the eventdev PMD callback if the callback is supported, and
 the counts maintained by the service function, if one exists.
+
+Tx event vectorization
+~~~~~~~~~~~~~~~~~~~~~~
+
+The event device, ethernet device paris which support the capability
+``RTE_EVENT_ETH_TX_ADAPTER_CAP_EVENT_VECTOR`` can process event vector of mbufs.
+Additionally, application can provide a hint to that Tx adapter that all the
+mbufs are destined to the same ethernet port and queue by setting the bit
+``rte_event_vector::attr_valid`` and filling `rte_event_vector::port`` and
+``rte_event_vector::queue``.
+If ``rte_event_vector::attr_valid`` is not set then the Tx adapter should peek
+into each mbuf and transmit them to the requested ethernet port and queue pair.
diff --git a/lib/librte_eventdev/rte_eventdev.h b/lib/librte_eventdev/rte_eventdev.h
index 1cf3efa2d..c817c29e6 100644
--- a/lib/librte_eventdev/rte_eventdev.h
+++ b/lib/librte_eventdev/rte_eventdev.h
@@ -926,11 +926,13 @@ struct rte_event_vector {
 	/**< Indicates that the below union attributes have valid information.
 	 */
 	union {
-		/* Used by Rx adapter.
+		/* Used by Rx/Tx adapter.
 		 * Indicats that all the elements in this vector belong to same
 		 * port and queue pair when originating from Rx adapter, valid
 		 * only when event type is ETHDEV_VECTOR or
 		 * ETH_RX_ADAPTER_VECTOR.
+		 * Can also be used to indicate the Tx adapter the destination
+		 * port and queue of the mbufs in the vector
 		 */
 		struct {
 			uint16_t port;
@@ -1287,6 +1289,10 @@ rte_event_crypto_adapter_caps_get(uint8_t dev_id, uint8_t cdev_id,
 #define RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT	0x1
 /**< This flag is sent when the PMD supports a packet transmit callback
  */
+#define RTE_EVENT_ETH_TX_ADAPTER_CAP_EVENT_VECTOR	0x2
+/**< Indicates that the Tx adapter is capable of handling event vector of
+ * mbufs.
+ */
 
 /**
  * Retrieve the event device's eth Tx adapter capabilities
-- 
2.17.1


^ permalink raw reply	[flat|nested] 8+ messages in thread

* [dpdk-dev] [PATCH 4/7] eventdev: add Rx adapter event vector support
  2021-02-20 22:09 [dpdk-dev] [PATCH 0/7] Introduce event vectorization pbhagavatula
                   ` (2 preceding siblings ...)
  2021-02-20 22:09 ` [dpdk-dev] [PATCH 3/7] eventdev: introduce event vector Tx capability pbhagavatula
@ 2021-02-20 22:09 ` pbhagavatula
  2021-02-20 22:09 ` [dpdk-dev] [PATCH 5/7] eventdev: add Tx " pbhagavatula
                   ` (2 subsequent siblings)
  6 siblings, 0 replies; 8+ messages in thread
From: pbhagavatula @ 2021-02-20 22:09 UTC (permalink / raw)
  To: jerinj, jay.jayatheerthan, erik.g.carrillo, abhinandan.gujjar,
	timothy.mcdaniel, hemant.agrawal, harry.van.haaren,
	mattias.ronnblom, liang.j.ma
  Cc: dev, Pavan Nikhilesh

From: Pavan Nikhilesh <pbhagavatula@marvell.com>

Add event vector support for event eth Rx adapter, the implementation
creates vector flows based on port and queue identifier of the received
mbufs.

Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
---
 lib/librte_eventdev/eventdev_pmd.h            |  31 +-
 .../rte_event_eth_rx_adapter.c                | 305 +++++++++++++++++-
 lib/librte_eventdev/rte_eventdev.c            |   6 +-
 3 files changed, 324 insertions(+), 18 deletions(-)

diff --git a/lib/librte_eventdev/eventdev_pmd.h b/lib/librte_eventdev/eventdev_pmd.h
index 7eb9a7739..60bfaebc0 100644
--- a/lib/librte_eventdev/eventdev_pmd.h
+++ b/lib/librte_eventdev/eventdev_pmd.h
@@ -69,9 +69,10 @@ extern "C" {
 	} \
 } while (0)
 
-#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP \
-		((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) | \
-			(RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ))
+#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP                                        \
+	((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) |                     \
+	 (RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) |                         \
+	 (RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR))
 
 #define RTE_EVENT_CRYPTO_ADAPTER_SW_CAP \
 		RTE_EVENT_CRYPTO_ADAPTER_CAP_SESSION_PRIVATE_DATA
@@ -645,6 +646,27 @@ typedef int (*eventdev_eth_rx_adapter_stats_reset)
  */
 typedef int (*eventdev_selftest)(void);
 
+struct rte_event_eth_rx_adapter_vector_limits;
+/**
+ * Get event vector limits for a given event, etherner device pair.
+ *
+ * @param dev
+ *   Event device pointer
+ *
+ * @param eth_dev
+ *   Ethernet device pointer
+ *
+ * @param[out] limits
+ *   Pointer to the limits structure to be filled.
+ *
+ * @return
+ *   - 0: Success.
+ *   - <0: Error code returned by the driver function.
+ */
+typedef int (*eventdev_eth_rx_adapter_vector_limits_get_t)(
+	const struct rte_eventdev *dev, const struct rte_eth_dev *eth_dev,
+	struct rte_event_eth_rx_adapter_vector_limits *limits);
+
 typedef uint32_t rte_event_pmd_selftest_seqn_t;
 extern int rte_event_pmd_selftest_seqn_dynfield_offset;
 
@@ -1067,6 +1089,9 @@ struct rte_eventdev_ops {
 	/**< Get ethernet Rx stats */
 	eventdev_eth_rx_adapter_stats_reset eth_rx_adapter_stats_reset;
 	/**< Reset ethernet Rx stats */
+	eventdev_eth_rx_adapter_vector_limits_get_t
+		eth_rx_adapter_vector_limits_get;
+	/**< Get event vector limits for the Rx adapter */
 
 	eventdev_timer_adapter_caps_get_t timer_adapter_caps_get;
 	/**< Get timer adapter capabilities */
diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
index d8c635e99..a1990637f 100644
--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
@@ -26,6 +26,10 @@
 #define BATCH_SIZE		32
 #define BLOCK_CNT_THRESHOLD	10
 #define ETH_EVENT_BUFFER_SIZE	(4*BATCH_SIZE)
+#define MAX_VECTOR_SIZE		1024
+#define MIN_VECTOR_SIZE		4
+#define MAX_VECTOR_NS		1E9
+#define MIN_VECTOR_NS		1E5
 
 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN	32
 #define ETH_RX_ADAPTER_MEM_NAME_LEN	32
@@ -59,6 +63,20 @@ struct eth_rx_poll_entry {
 	uint16_t eth_rx_qid;
 };
 
+struct eth_rx_vector_data {
+	TAILQ_ENTRY(eth_rx_vector_data) next;
+	uint16_t port;
+	uint16_t queue;
+	uint16_t max_vector_count;
+	uint64_t event;
+	uint64_t ts;
+	uint64_t vector_timeout_ticks;
+	struct rte_mempool *vector_pool;
+	struct rte_event_vector *vector_ev;
+} __rte_cache_aligned;
+
+TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
+
 /* Instance per adapter */
 struct rte_eth_event_enqueue_buffer {
 	/* Count of events in this buffer */
@@ -92,6 +110,14 @@ struct rte_event_eth_rx_adapter {
 	uint32_t wrr_pos;
 	/* Event burst buffer */
 	struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
+	/* Vector enable flag */
+	uint8_t ena_vector;
+	/* Timestamp of previous vector expiry list traversal */
+	uint64_t prev_expiry_ts;
+	/* Minimum ticks to wait before traversing expiry list */
+	uint64_t vector_tmo_ticks;
+	/* vector list */
+	struct eth_rx_vector_data_list vector_list;
 	/* Per adapter stats */
 	struct rte_event_eth_rx_adapter_stats stats;
 	/* Block count, counts up to BLOCK_CNT_THRESHOLD */
@@ -198,9 +224,11 @@ struct eth_device_info {
 struct eth_rx_queue_info {
 	int queue_enabled;	/* True if added */
 	int intr_enabled;
+	uint8_t ena_vector;
 	uint16_t wt;		/* Polling weight */
 	uint32_t flow_id_mask;	/* Set to ~0 if app provides flow id else 0 */
 	uint64_t event;
+	struct eth_rx_vector_data vector_data;
 };
 
 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
@@ -722,6 +750,9 @@ rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
 	    &rx_adapter->event_enqueue_buffer;
 	struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
 
+	if (!buf->count)
+		return 0;
+
 	uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
 					rx_adapter->event_port_id,
 					buf->events,
@@ -742,6 +773,72 @@ rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
 	return n;
 }
 
+static inline uint16_t
+rxa_create_event_vector(struct rte_event_eth_rx_adapter *rx_adapter,
+			struct eth_rx_queue_info *queue_info,
+			struct rte_eth_event_enqueue_buffer *buf,
+			struct rte_mbuf **mbufs, uint16_t num)
+{
+	struct rte_event *ev = &buf->events[buf->count];
+	struct eth_rx_vector_data *vec;
+	uint16_t filled, space, sz;
+
+	filled = 0;
+	vec = &queue_info->vector_data;
+	while (num) {
+		if (vec->vector_ev == NULL) {
+			if (rte_mempool_get(vec->vector_pool,
+					    (void **)&vec->vector_ev) < 0) {
+				rte_pktmbuf_free_bulk(mbufs, num);
+				return 0;
+			}
+			vec->vector_ev->nb_elem = 0;
+			vec->vector_ev->port = vec->port;
+			vec->vector_ev->queue = vec->queue;
+			vec->vector_ev->attr_valid = true;
+			TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
+		} else if (vec->vector_ev->nb_elem == vec->max_vector_count) {
+			/* Event ready. */
+			ev->event = vec->event;
+			ev->vec = vec->vector_ev;
+			ev++;
+			filled++;
+			vec->vector_ev = NULL;
+			TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
+			if (rte_mempool_get(vec->vector_pool,
+					    (void **)&vec->vector_ev) < 0) {
+				rte_pktmbuf_free_bulk(mbufs, num);
+				return 0;
+			}
+			vec->vector_ev->nb_elem = 0;
+			vec->vector_ev->port = vec->port;
+			vec->vector_ev->queue = vec->queue;
+			vec->vector_ev->attr_valid = true;
+			TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
+		}
+
+		space = vec->max_vector_count - vec->vector_ev->nb_elem;
+		sz = num > space ? space : num;
+		memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
+		       sizeof(void *) * sz);
+		vec->vector_ev->nb_elem += sz;
+		num -= sz;
+		mbufs += sz;
+		vec->ts = rte_rdtsc();
+	}
+
+	if (vec->vector_ev->nb_elem == vec->max_vector_count) {
+		ev->event = vec->event;
+		ev->vec = vec->vector_ev;
+		ev++;
+		filled++;
+		vec->vector_ev = NULL;
+		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
+	}
+
+	return filled;
+}
+
 static inline void
 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
 		uint16_t eth_dev_id,
@@ -770,25 +867,30 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
 	rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
 	do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
 
-	for (i = 0; i < num; i++) {
-		m = mbufs[i];
-
-		rss = do_rss ?
-			rxa_do_softrss(m, rx_adapter->rss_key_be) :
-			m->hash.rss;
-		ev->event = event;
-		ev->flow_id = (rss & ~flow_id_mask) |
-				(ev->flow_id & flow_id_mask);
-		ev->mbuf = m;
-		ev++;
+	if (!eth_rx_queue_info->ena_vector) {
+		for (i = 0; i < num; i++) {
+			m = mbufs[i];
+
+			rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
+				     : m->hash.rss;
+			ev->event = event;
+			ev->flow_id = (rss & ~flow_id_mask) |
+				      (ev->flow_id & flow_id_mask);
+			ev->mbuf = m;
+			ev++;
+		}
+	} else {
+		num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
+					      buf, mbufs, num);
 	}
 
-	if (dev_info->cb_fn) {
+	if (num && dev_info->cb_fn) {
 
 		dropped = 0;
 		nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
-					ETH_EVENT_BUFFER_SIZE, buf->count, ev,
-					num, dev_info->cb_arg, &dropped);
+					ETH_EVENT_BUFFER_SIZE, buf->count,
+					&buf->events[buf->count], num,
+					dev_info->cb_arg, &dropped);
 		if (unlikely(nb_cb > num))
 			RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
 				nb_cb, num);
@@ -1124,6 +1226,30 @@ rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
 	return nb_rx;
 }
 
+static void
+rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
+{
+	struct rte_event_eth_rx_adapter *rx_adapter = arg;
+	struct rte_eth_event_enqueue_buffer *buf =
+		&rx_adapter->event_enqueue_buffer;
+	struct rte_event *ev;
+
+	if (buf->count)
+		rxa_flush_event_buffer(rx_adapter);
+
+	if (vec->vector_ev->nb_elem == 0)
+		return;
+	ev = &buf->events[buf->count];
+
+	/* Event ready. */
+	ev->event = vec->event;
+	ev->vec = vec->vector_ev;
+	buf->count++;
+
+	vec->vector_ev = NULL;
+	vec->ts = 0;
+}
+
 static int
 rxa_service_func(void *args)
 {
@@ -1137,6 +1263,24 @@ rxa_service_func(void *args)
 		return 0;
 	}
 
+	if (rx_adapter->ena_vector) {
+		if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
+		    rx_adapter->vector_tmo_ticks) {
+			struct eth_rx_vector_data *vec;
+
+			TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
+				uint64_t elapsed_time = rte_rdtsc() - vec->ts;
+
+				if (elapsed_time >= vec->vector_timeout_ticks) {
+					rxa_vector_expire(vec, rx_adapter);
+					TAILQ_REMOVE(&rx_adapter->vector_list,
+						     vec, next);
+				}
+			}
+			rx_adapter->prev_expiry_ts = rte_rdtsc();
+		}
+	}
+
 	stats = &rx_adapter->stats;
 	stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
 	stats->rx_packets += rxa_poll(rx_adapter);
@@ -1640,6 +1784,28 @@ rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 	}
 }
 
+static void
+rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
+		    uint64_t vector_ns, struct rte_mempool *mp, int32_t qid,
+		    uint16_t port_id)
+{
+#define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
+	struct eth_rx_vector_data *vector_data;
+	uint32_t flow_id;
+
+	vector_data = &queue_info->vector_data;
+	vector_data->max_vector_count = vector_count;
+	vector_data->port = port_id;
+	vector_data->queue = qid;
+	vector_data->vector_pool = mp;
+	vector_data->vector_timeout_ticks =
+		NSEC2TICK(vector_ns, rte_get_timer_hz());
+	vector_data->ts = 0;
+	flow_id = queue_info->event & 0xFFFFF;
+	flow_id = flow_id == 0 ? (qid & 0xFF) | (port_id & 0xFFFF) : flow_id;
+	vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
+}
+
 static void
 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
 	struct eth_device_info *dev_info,
@@ -1716,6 +1882,25 @@ rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 	} else
 		qi_ev->flow_id = 0;
 
+	if (conf->rx_queue_flags &
+	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
+		queue_info->ena_vector = 1;
+		qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
+		rxa_set_vector_data(queue_info, conf->vector_sz,
+				    conf->vector_timeout_ns, conf->vector_mp,
+				    rx_queue_id, dev_info->dev->data->port_id);
+		rx_adapter->ena_vector = 1;
+		rx_adapter->vector_tmo_ticks =
+			rx_adapter->vector_tmo_ticks
+				? RTE_MIN(queue_info->vector_data
+						  .vector_timeout_ticks,
+					  rx_adapter->vector_tmo_ticks)
+				: queue_info->vector_data.vector_timeout_ticks;
+		rx_adapter->vector_tmo_ticks <<= 1;
+		TAILQ_INIT(&rx_adapter->vector_list);
+		rx_adapter->prev_expiry_ts = 0;
+	}
+
 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
 	if (rxa_polled_queue(dev_info, rx_queue_id)) {
 		rx_adapter->num_rx_polled += !pollq;
@@ -2054,6 +2239,7 @@ rte_event_eth_rx_adapter_queue_add(uint8_t id,
 	struct rte_event_eth_rx_adapter *rx_adapter;
 	struct rte_eventdev *dev;
 	struct eth_device_info *dev_info;
+	struct rte_event_eth_rx_adapter_vector_limits limits;
 
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
@@ -2081,6 +2267,48 @@ rte_event_eth_rx_adapter_queue_add(uint8_t id,
 		return -EINVAL;
 	}
 
+	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0 &&
+	    (queue_conf->rx_queue_flags &
+	     RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR)) {
+		RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
+				 " eth port: %" PRIu16 " adapter id: %" PRIu8,
+				 eth_dev_id, id);
+		return -EINVAL;
+	}
+
+	if (queue_conf->rx_queue_flags &
+	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
+		ret = rte_event_eth_rx_adapter_vector_limits_get(
+			rx_adapter->eventdev_id, eth_dev_id, &limits);
+		if (ret < 0) {
+			RTE_EDEV_LOG_ERR("Failed to get event device vector limits,"
+					 " eth port: %" PRIu16
+					 " adapter id: %" PRIu8,
+					 eth_dev_id, id);
+			return -EINVAL;
+		}
+		if (queue_conf->vector_sz < limits.min_sz ||
+		    queue_conf->vector_sz > limits.max_sz ||
+		    queue_conf->vector_timeout_ns < limits.min_timeout_ns ||
+		    queue_conf->vector_timeout_ns > limits.max_timeout_ns ||
+		    queue_conf->vector_mp == NULL) {
+			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
+					 " eth port: %" PRIu16
+					 " adapter id: %" PRIu8,
+					 eth_dev_id, id);
+			return -EINVAL;
+		}
+		if (queue_conf->vector_mp->elt_size <
+		    (sizeof(struct rte_event_vector) +
+		     (sizeof(uintptr_t) * queue_conf->vector_sz))) {
+			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
+					 " eth port: %" PRIu16
+					 " adapter id: %" PRIu8,
+					 eth_dev_id, id);
+			return -EINVAL;
+		}
+	}
+
 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
 		(rx_queue_id != -1)) {
 		RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
@@ -2143,6 +2371,17 @@ rte_event_eth_rx_adapter_queue_add(uint8_t id,
 	return 0;
 }
 
+static int
+rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
+{
+	limits->max_sz = MAX_VECTOR_SIZE;
+	limits->min_sz = MIN_VECTOR_SIZE;
+	limits->max_timeout_ns = MAX_VECTOR_NS;
+	limits->min_timeout_ns = MIN_VECTOR_NS;
+
+	return 0;
+}
+
 int
 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
 				int32_t rx_queue_id)
@@ -2263,6 +2502,44 @@ rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
 	return ret;
 }
 
+int
+rte_event_eth_rx_adapter_vector_limits_get(
+	uint8_t dev_id, uint16_t eth_port_id,
+	struct rte_event_eth_rx_adapter_vector_limits *limits)
+{
+	struct rte_eventdev *dev;
+	uint32_t cap;
+	int ret;
+
+	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
+	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_port_id, -EINVAL);
+
+	if (limits == NULL)
+		return -EINVAL;
+
+	dev = &rte_eventdevs[dev_id];
+
+	ret = rte_event_eth_rx_adapter_caps_get(dev_id, eth_port_id, &cap);
+	if (ret) {
+		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
+				 "eth port %" PRIu16,
+				 dev_id, eth_port_id);
+		return ret;
+	}
+
+	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
+		RTE_FUNC_PTR_OR_ERR_RET(
+			*dev->dev_ops->eth_rx_adapter_vector_limits_get,
+			-ENOTSUP);
+		ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
+			dev, &rte_eth_devices[eth_port_id], limits);
+	} else {
+		ret = rxa_sw_vector_limits(limits);
+	}
+
+	return ret;
+}
+
 int
 rte_event_eth_rx_adapter_start(uint8_t id)
 {
diff --git a/lib/librte_eventdev/rte_eventdev.c b/lib/librte_eventdev/rte_eventdev.c
index b57363f80..2e6e367e0 100644
--- a/lib/librte_eventdev/rte_eventdev.c
+++ b/lib/librte_eventdev/rte_eventdev.c
@@ -122,7 +122,11 @@ rte_event_eth_rx_adapter_caps_get(uint8_t dev_id, uint16_t eth_port_id,
 
 	if (caps == NULL)
 		return -EINVAL;
-	*caps = 0;
+
+	if (dev->dev_ops->eth_rx_adapter_caps_get == NULL)
+		*caps = RTE_EVENT_ETH_RX_ADAPTER_SW_CAP;
+	else
+		*caps = 0;
 
 	return dev->dev_ops->eth_rx_adapter_caps_get ?
 				(*dev->dev_ops->eth_rx_adapter_caps_get)(dev,
-- 
2.17.1


^ permalink raw reply	[flat|nested] 8+ messages in thread

* [dpdk-dev] [PATCH 5/7] eventdev: add Tx adapter event vector support
  2021-02-20 22:09 [dpdk-dev] [PATCH 0/7] Introduce event vectorization pbhagavatula
                   ` (3 preceding siblings ...)
  2021-02-20 22:09 ` [dpdk-dev] [PATCH 4/7] eventdev: add Rx adapter event vector support pbhagavatula
@ 2021-02-20 22:09 ` pbhagavatula
  2021-02-20 22:09 ` [dpdk-dev] [PATCH 6/7] app/eventdev: add event vector mode in pipeline test pbhagavatula
  2021-02-20 22:09 ` [dpdk-dev] [PATCH 7/7] eventdev: fix ABI breakage due to event vector pbhagavatula
  6 siblings, 0 replies; 8+ messages in thread
From: pbhagavatula @ 2021-02-20 22:09 UTC (permalink / raw)
  To: jerinj, jay.jayatheerthan, erik.g.carrillo, abhinandan.gujjar,
	timothy.mcdaniel, hemant.agrawal, harry.van.haaren,
	mattias.ronnblom, liang.j.ma
  Cc: dev, Pavan Nikhilesh

From: Pavan Nikhilesh <pbhagavatula@marvell.com>

Add event vector support for event eth Tx adapter, the implementation
receives events from the single linked queue and based on
rte_event_vector::union_valid transmits the vector of mbufs to a given
port, queue pair.

Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
---
 .../rte_event_eth_tx_adapter.c                | 66 ++++++++++++++++---
 lib/librte_eventdev/rte_eventdev.c            |  5 +-
 2 files changed, 60 insertions(+), 11 deletions(-)

diff --git a/lib/librte_eventdev/rte_event_eth_tx_adapter.c b/lib/librte_eventdev/rte_event_eth_tx_adapter.c
index 5b4c42dcf..b50be74a8 100644
--- a/lib/librte_eventdev/rte_event_eth_tx_adapter.c
+++ b/lib/librte_eventdev/rte_event_eth_tx_adapter.c
@@ -510,6 +510,47 @@ txa_service_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
 	stats->tx_dropped += unsent - sent;
 }
 
+static uint16_t
+txa_process_event_vector(struct txa_service_data *txa,
+			 struct rte_event_vector *vec)
+{
+	struct txa_service_queue_info *tqi;
+	uint16_t port, queue, nb_tx;
+	struct rte_mbuf **mbufs;
+	int i;
+
+	mbufs = (struct rte_mbuf **)vec->mbufs;
+	if (vec->attr_valid) {
+		port = vec->port;
+		queue = vec->queue;
+		tqi = txa_service_queue(txa, port, queue);
+		if (unlikely(tqi == NULL || !tqi->added)) {
+			rte_pktmbuf_free_bulk(mbufs, vec->nb_elem);
+			rte_mempool_put(rte_mempool_from_obj(vec), vec);
+			return 0;
+		}
+		for (i = 0; i < vec->nb_elem; i++) {
+			nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf,
+						   mbufs[i]);
+		}
+	} else {
+		for (i = 0; i < vec->nb_elem; i++) {
+			port = mbufs[i]->port;
+			queue = rte_event_eth_tx_adapter_txq_get(mbufs[i]);
+			tqi = txa_service_queue(txa, port, queue);
+			if (unlikely(tqi == NULL || !tqi->added)) {
+				rte_pktmbuf_free(mbufs[i]);
+				continue;
+			}
+			nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf,
+						   mbufs[i]);
+		}
+	}
+	rte_mempool_put(rte_mempool_from_obj(vec), vec);
+
+	return nb_tx;
+}
+
 static void
 txa_service_tx(struct txa_service_data *txa, struct rte_event *ev,
 	uint32_t n)
@@ -522,22 +563,27 @@ txa_service_tx(struct txa_service_data *txa, struct rte_event *ev,
 
 	nb_tx = 0;
 	for (i = 0; i < n; i++) {
-		struct rte_mbuf *m;
 		uint16_t port;
 		uint16_t queue;
 		struct txa_service_queue_info *tqi;
 
-		m = ev[i].mbuf;
-		port = m->port;
-		queue = rte_event_eth_tx_adapter_txq_get(m);
+		if (!(ev[i].event_type & RTE_EVENT_TYPE_VECTOR)) {
+			struct rte_mbuf *m;
 
-		tqi = txa_service_queue(txa, port, queue);
-		if (unlikely(tqi == NULL || !tqi->added)) {
-			rte_pktmbuf_free(m);
-			continue;
-		}
+			m = ev[i].mbuf;
+			port = m->port;
+			queue = rte_event_eth_tx_adapter_txq_get(m);
 
-		nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf, m);
+			tqi = txa_service_queue(txa, port, queue);
+			if (unlikely(tqi == NULL || !tqi->added)) {
+				rte_pktmbuf_free(m);
+				continue;
+			}
+
+			nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf, m);
+		} else {
+			nb_tx += txa_process_event_vector(txa, ev[i].vec);
+		}
 	}
 
 	stats->tx_packets += nb_tx;
diff --git a/lib/librte_eventdev/rte_eventdev.c b/lib/librte_eventdev/rte_eventdev.c
index 2e6e367e0..b2e85ce67 100644
--- a/lib/librte_eventdev/rte_eventdev.c
+++ b/lib/librte_eventdev/rte_eventdev.c
@@ -196,7 +196,10 @@ rte_event_eth_tx_adapter_caps_get(uint8_t dev_id, uint16_t eth_port_id,
 	if (caps == NULL)
 		return -EINVAL;
 
-	*caps = 0;
+	if (dev->dev_ops->eth_tx_adapter_caps_get == NULL)
+		*caps = RTE_EVENT_ETH_TX_ADAPTER_CAP_EVENT_VECTOR;
+	else
+		*caps = 0;
 
 	return dev->dev_ops->eth_tx_adapter_caps_get ?
 			(*dev->dev_ops->eth_tx_adapter_caps_get)(dev,
-- 
2.17.1


^ permalink raw reply	[flat|nested] 8+ messages in thread

* [dpdk-dev] [PATCH 6/7] app/eventdev: add event vector mode in pipeline test
  2021-02-20 22:09 [dpdk-dev] [PATCH 0/7] Introduce event vectorization pbhagavatula
                   ` (4 preceding siblings ...)
  2021-02-20 22:09 ` [dpdk-dev] [PATCH 5/7] eventdev: add Tx " pbhagavatula
@ 2021-02-20 22:09 ` pbhagavatula
  2021-02-20 22:09 ` [dpdk-dev] [PATCH 7/7] eventdev: fix ABI breakage due to event vector pbhagavatula
  6 siblings, 0 replies; 8+ messages in thread
From: pbhagavatula @ 2021-02-20 22:09 UTC (permalink / raw)
  To: jerinj, jay.jayatheerthan, erik.g.carrillo, abhinandan.gujjar,
	timothy.mcdaniel, hemant.agrawal, harry.van.haaren,
	mattias.ronnblom, liang.j.ma
  Cc: dev, Pavan Nikhilesh

From: Pavan Nikhilesh <pbhagavatula@marvell.com>

Add event vector support in pipeline tests. By default this mode
is disabled, it can be enabled by using the option --enable_vector.
example:
	dpdk-test-eventdev -l 7-23 -s 0xff00 -- --prod_type_ethdev
	--nb_pkts=0 --verbose 2 --test=pipeline_atq --stlist=a
	--wlcores=20-23  --enable_vector

Addtional options to configure vector size and vector timeout are
also implemented and can be used by specifying --vector_size and
--vector_tmo_ns

This patch also adds a new option to set the number of Rx queues
configured per event eth rx adapter.
example:
	dpdk-test-eventdev -l 7-23 -s 0xff00 -- --prod_type_ethdev
	--nb_pkts=0 --verbose 2 --test=pipeline_atq --stlist=a
	--wlcores=20-23  --nb_eth_queues 4

Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
---
 app/test-eventdev/evt_common.h           |   4 +
 app/test-eventdev/evt_options.c          |  52 ++++
 app/test-eventdev/evt_options.h          |   4 +
 app/test-eventdev/test_pipeline_atq.c    | 310 ++++++++++++++++++++--
 app/test-eventdev/test_pipeline_common.c |  69 ++++-
 app/test-eventdev/test_pipeline_common.h |  18 ++
 app/test-eventdev/test_pipeline_queue.c  | 320 +++++++++++++++++++++--
 doc/guides/tools/testeventdev.rst        |  28 ++
 8 files changed, 751 insertions(+), 54 deletions(-)

diff --git a/app/test-eventdev/evt_common.h b/app/test-eventdev/evt_common.h
index a1da1cf11..0e228258e 100644
--- a/app/test-eventdev/evt_common.h
+++ b/app/test-eventdev/evt_common.h
@@ -58,16 +58,20 @@ struct evt_options {
 	uint8_t sched_type_list[EVT_MAX_STAGES];
 	uint16_t mbuf_sz;
 	uint16_t wkr_deq_dep;
+	uint16_t vector_size;
+	uint16_t eth_queues;
 	uint32_t nb_flows;
 	uint32_t tx_first;
 	uint32_t max_pkt_sz;
 	uint32_t deq_tmo_nsec;
 	uint32_t q_priority:1;
 	uint32_t fwd_latency:1;
+	uint32_t ena_vector : 1;
 	uint64_t nb_pkts;
 	uint64_t nb_timers;
 	uint64_t expiry_nsec;
 	uint64_t max_tmo_nsec;
+	uint64_t vector_tmo_nsec;
 	uint64_t timer_tick_nsec;
 	uint64_t optm_timer_tick_nsec;
 	enum evt_prod_type prod_type;
diff --git a/app/test-eventdev/evt_options.c b/app/test-eventdev/evt_options.c
index 0d04ea9f8..0d5540574 100644
--- a/app/test-eventdev/evt_options.c
+++ b/app/test-eventdev/evt_options.c
@@ -34,6 +34,9 @@ evt_options_default(struct evt_options *opt)
 	opt->max_tmo_nsec = 1E5;  /* 100000ns ~100us */
 	opt->expiry_nsec = 1E4;   /* 10000ns ~10us */
 	opt->prod_type = EVT_PROD_TYPE_SYNT;
+	opt->eth_queues = 1;
+	opt->vector_size = 64;
+	opt->vector_tmo_nsec = 100E3;
 }
 
 typedef int (*option_parser_t)(struct evt_options *opt,
@@ -257,6 +260,43 @@ evt_parse_max_pkt_sz(struct evt_options *opt, const char *arg)
 	return ret;
 }
 
+static int
+evt_parse_ena_vector(struct evt_options *opt, const char *arg __rte_unused)
+{
+	opt->ena_vector = 1;
+	return 0;
+}
+
+static int
+evt_parse_vector_size(struct evt_options *opt, const char *arg)
+{
+	int ret;
+
+	ret = parser_read_uint16(&(opt->vector_size), arg);
+
+	return ret;
+}
+
+static int
+evt_parse_vector_tmo_ns(struct evt_options *opt, const char *arg)
+{
+	int ret;
+
+	ret = parser_read_uint64(&(opt->vector_tmo_nsec), arg);
+
+	return ret;
+}
+
+static int
+evt_parse_eth_queues(struct evt_options *opt, const char *arg)
+{
+	int ret;
+
+	ret = parser_read_uint16(&(opt->eth_queues), arg);
+
+	return ret;
+}
+
 static void
 usage(char *program)
 {
@@ -289,6 +329,10 @@ usage(char *program)
 		"\t--expiry_nsec      : event timer expiry ns.\n"
 		"\t--mbuf_sz          : packet mbuf size.\n"
 		"\t--max_pkt_sz       : max packet size.\n"
+		"\t--nb_eth_queues    : number of ethernet Rx queues.\n"
+		"\t--enable_vector    : enable event vectorization.\n"
+		"\t--vector_size      : Max vector size.\n"
+		"\t--vector_tmo_ns    : Max vector timeout in nanoseconds\n"
 		);
 	printf("available tests:\n");
 	evt_test_dump_names();
@@ -360,6 +404,10 @@ static struct option lgopts[] = {
 	{ EVT_EXPIRY_NSEC,         1, 0, 0 },
 	{ EVT_MBUF_SZ,             1, 0, 0 },
 	{ EVT_MAX_PKT_SZ,          1, 0, 0 },
+	{ EVT_NB_ETH_QUEUES,       1, 0, 0 },
+	{ EVT_ENA_VECTOR,          0, 0, 0 },
+	{ EVT_VECTOR_SZ,           1, 0, 0 },
+	{ EVT_VECTOR_TMO,          1, 0, 0 },
 	{ EVT_HELP,                0, 0, 0 },
 	{ NULL,                    0, 0, 0 }
 };
@@ -394,6 +442,10 @@ evt_opts_parse_long(int opt_idx, struct evt_options *opt)
 		{ EVT_EXPIRY_NSEC, evt_parse_expiry_nsec},
 		{ EVT_MBUF_SZ, evt_parse_mbuf_sz},
 		{ EVT_MAX_PKT_SZ, evt_parse_max_pkt_sz},
+		{ EVT_NB_ETH_QUEUES, evt_parse_eth_queues},
+		{ EVT_ENA_VECTOR, evt_parse_ena_vector},
+		{ EVT_VECTOR_SZ, evt_parse_vector_size},
+		{ EVT_VECTOR_TMO, evt_parse_vector_tmo_ns},
 	};
 
 	for (i = 0; i < RTE_DIM(parsermap); i++) {
diff --git a/app/test-eventdev/evt_options.h b/app/test-eventdev/evt_options.h
index 748e54fae..1cea2a3e1 100644
--- a/app/test-eventdev/evt_options.h
+++ b/app/test-eventdev/evt_options.h
@@ -42,6 +42,10 @@
 #define EVT_EXPIRY_NSEC          ("expiry_nsec")
 #define EVT_MBUF_SZ              ("mbuf_sz")
 #define EVT_MAX_PKT_SZ           ("max_pkt_sz")
+#define EVT_NB_ETH_QUEUES        ("nb_eth_queues")
+#define EVT_ENA_VECTOR           ("enable_vector")
+#define EVT_VECTOR_SZ            ("vector_size")
+#define EVT_VECTOR_TMO           ("vector_tmo_ns")
 #define EVT_HELP                 ("help")
 
 void evt_options_default(struct evt_options *opt);
diff --git a/app/test-eventdev/test_pipeline_atq.c b/app/test-eventdev/test_pipeline_atq.c
index 0872b25b5..84dd4f44e 100644
--- a/app/test-eventdev/test_pipeline_atq.c
+++ b/app/test-eventdev/test_pipeline_atq.c
@@ -15,6 +15,8 @@ pipeline_atq_nb_event_queues(struct evt_options *opt)
 	return rte_eth_dev_count_avail();
 }
 
+typedef int (*pipeline_atq_worker_t)(void *arg);
+
 static __rte_noinline int
 pipeline_atq_worker_single_stage_tx(void *arg)
 {
@@ -113,6 +115,112 @@ pipeline_atq_worker_single_stage_burst_fwd(void *arg)
 	return 0;
 }
 
+static __rte_noinline int
+pipeline_atq_worker_single_stage_tx_vector(void *arg)
+{
+	PIPELINE_WORKER_SINGLE_STAGE_INIT;
+	uint16_t vector_sz;
+
+	while (!t->done) {
+		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+		if (!event) {
+			rte_pause();
+			continue;
+		}
+		vector_sz = ev.vec->nb_elem;
+		pipeline_event_tx_vector(dev, port, &ev);
+		w->processed_pkts += vector_sz;
+	}
+
+	return 0;
+}
+
+static __rte_noinline int
+pipeline_atq_worker_single_stage_fwd_vector(void *arg)
+{
+	PIPELINE_WORKER_SINGLE_STAGE_INIT;
+	const uint8_t *tx_queue = t->tx_evqueue_id;
+	uint16_t vector_sz;
+
+	while (!t->done) {
+		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+		if (!event) {
+			rte_pause();
+			continue;
+		}
+
+		vector_sz = ev.vec->nb_elem;
+		ev.queue_id = tx_queue[ev.vec->port];
+		ev.vec->queue = 0;
+		pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
+		pipeline_event_enqueue(dev, port, &ev);
+		w->processed_pkts += vector_sz;
+	}
+
+	return 0;
+}
+
+static __rte_noinline int
+pipeline_atq_worker_single_stage_burst_tx_vector(void *arg)
+{
+	PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
+	uint16_t vector_sz;
+
+	while (!t->done) {
+		uint16_t nb_rx =
+			rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+
+		if (!nb_rx) {
+			rte_pause();
+			continue;
+		}
+		vector_sz = 0;
+		for (i = 0; i < nb_rx; i++) {
+			vector_sz += ev[i].vec->nb_elem;
+			ev[i].vec->queue = 0;
+		}
+
+		pipeline_event_tx_burst(dev, port, ev, nb_rx);
+		w->processed_pkts += vector_sz;
+	}
+
+	return 0;
+}
+
+static __rte_noinline int
+pipeline_atq_worker_single_stage_burst_fwd_vector(void *arg)
+{
+	PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
+	const uint8_t *tx_queue = t->tx_evqueue_id;
+	uint16_t vector_sz;
+
+	while (!t->done) {
+		uint16_t nb_rx =
+			rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+
+		if (!nb_rx) {
+			rte_pause();
+			continue;
+		}
+
+		vector_sz = 0;
+		for (i = 0; i < nb_rx; i++) {
+			ev[i].queue_id = tx_queue[ev[i].vec->port];
+			ev[i].vec->queue = 0;
+			vector_sz += ev[i].vec->nb_elem;
+			pipeline_fwd_event_vector(&ev[i],
+						  RTE_SCHED_TYPE_ATOMIC);
+		}
+
+		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+		w->processed_pkts += vector_sz;
+	}
+
+	return 0;
+}
+
 static __rte_noinline int
 pipeline_atq_worker_multi_stage_tx(void *arg)
 {
@@ -245,6 +353,147 @@ pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
 	return 0;
 }
 
+static __rte_noinline int
+pipeline_atq_worker_multi_stage_tx_vector(void *arg)
+{
+	PIPELINE_WORKER_MULTI_STAGE_INIT;
+	uint16_t vector_sz;
+
+	while (!t->done) {
+		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+		if (!event) {
+			rte_pause();
+			continue;
+		}
+
+		cq_id = ev.sub_event_type % nb_stages;
+
+		if (cq_id == last_queue) {
+			vector_sz = ev.vec->nb_elem;
+			pipeline_event_tx_vector(dev, port, &ev);
+			w->processed_pkts += vector_sz;
+			continue;
+		}
+
+		ev.sub_event_type++;
+		pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
+		pipeline_event_enqueue(dev, port, &ev);
+	}
+
+	return 0;
+}
+
+static __rte_noinline int
+pipeline_atq_worker_multi_stage_fwd_vector(void *arg)
+{
+	PIPELINE_WORKER_MULTI_STAGE_INIT;
+	const uint8_t *tx_queue = t->tx_evqueue_id;
+	uint16_t vector_sz;
+
+	while (!t->done) {
+		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+		if (!event) {
+			rte_pause();
+			continue;
+		}
+
+		cq_id = ev.sub_event_type % nb_stages;
+
+		if (cq_id == last_queue) {
+			ev.queue_id = tx_queue[ev.vec->port];
+			ev.vec->queue = 0;
+			vector_sz = ev.vec->nb_elem;
+			pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
+			pipeline_event_enqueue(dev, port, &ev);
+			w->processed_pkts += vector_sz;
+		} else {
+			ev.sub_event_type++;
+			pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
+			pipeline_event_enqueue(dev, port, &ev);
+		}
+	}
+
+	return 0;
+}
+
+static __rte_noinline int
+pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg)
+{
+	PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
+	uint16_t vector_sz;
+
+	while (!t->done) {
+		uint16_t nb_rx =
+			rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+
+		if (!nb_rx) {
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			cq_id = ev[i].sub_event_type % nb_stages;
+
+			if (cq_id == last_queue) {
+				vector_sz = ev[i].vec->nb_elem;
+				pipeline_event_tx_vector(dev, port, &ev[i]);
+				ev[i].op = RTE_EVENT_OP_RELEASE;
+				w->processed_pkts += vector_sz;
+				continue;
+			}
+
+			ev[i].sub_event_type++;
+			pipeline_fwd_event_vector(&ev[i],
+						  sched_type_list[cq_id]);
+		}
+
+		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+	}
+
+	return 0;
+}
+
+static __rte_noinline int
+pipeline_atq_worker_multi_stage_burst_fwd_vector(void *arg)
+{
+	PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
+	const uint8_t *tx_queue = t->tx_evqueue_id;
+	uint16_t vector_sz;
+
+	while (!t->done) {
+		uint16_t nb_rx =
+			rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+
+		if (!nb_rx) {
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			cq_id = ev[i].sub_event_type % nb_stages;
+
+			if (cq_id == last_queue) {
+				vector_sz = ev[i].vec->nb_elem;
+				ev[i].queue_id = tx_queue[ev[i].vec->port];
+				ev[i].vec->queue = 0;
+				pipeline_fwd_event_vector(
+					&ev[i], RTE_SCHED_TYPE_ATOMIC);
+				w->processed_pkts += vector_sz;
+			} else {
+				ev[i].sub_event_type++;
+				pipeline_fwd_event_vector(
+					&ev[i], sched_type_list[cq_id]);
+			}
+		}
+
+		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+	}
+
+	return 0;
+}
+
 static int
 worker_wrapper(void *arg)
 {
@@ -253,27 +502,36 @@ worker_wrapper(void *arg)
 	const bool burst = evt_has_burst_mode(w->dev_id);
 	const bool internal_port = w->t->internal_port;
 	const uint8_t nb_stages = opt->nb_stages;
-	RTE_SET_USED(opt);
+	/*vector/burst/internal_port*/
+	const pipeline_atq_worker_t
+	pipeline_atq_worker_single_stage[2][2][2] = {
+		[0][0][0] = pipeline_atq_worker_single_stage_fwd,
+		[0][0][1] = pipeline_atq_worker_single_stage_tx,
+		[0][1][0] = pipeline_atq_worker_single_stage_burst_fwd,
+		[0][1][1] = pipeline_atq_worker_single_stage_burst_tx,
+		[1][0][0] = pipeline_atq_worker_single_stage_fwd_vector,
+		[1][0][1] = pipeline_atq_worker_single_stage_tx_vector,
+		[1][1][0] = pipeline_atq_worker_single_stage_burst_fwd_vector,
+		[1][1][1] = pipeline_atq_worker_single_stage_burst_tx_vector,
+	};
+	const pipeline_atq_worker_t
+	pipeline_atq_worker_multi_stage[2][2][2] = {
+		[0][0][0] = pipeline_atq_worker_multi_stage_fwd,
+		[0][0][1] = pipeline_atq_worker_multi_stage_tx,
+		[0][1][0] = pipeline_atq_worker_multi_stage_burst_fwd,
+		[0][1][1] = pipeline_atq_worker_multi_stage_burst_tx,
+		[1][0][0] = pipeline_atq_worker_multi_stage_fwd_vector,
+		[1][0][1] = pipeline_atq_worker_multi_stage_tx_vector,
+		[1][1][0] = pipeline_atq_worker_multi_stage_burst_fwd_vector,
+		[1][1][1] = pipeline_atq_worker_multi_stage_burst_tx_vector,
+	};
 
-	if (nb_stages == 1) {
-		if (!burst && internal_port)
-			return pipeline_atq_worker_single_stage_tx(arg);
-		else if (!burst && !internal_port)
-			return pipeline_atq_worker_single_stage_fwd(arg);
-		else if (burst && internal_port)
-			return pipeline_atq_worker_single_stage_burst_tx(arg);
-		else if (burst && !internal_port)
-			return pipeline_atq_worker_single_stage_burst_fwd(arg);
-	} else {
-		if (!burst && internal_port)
-			return pipeline_atq_worker_multi_stage_tx(arg);
-		else if (!burst && !internal_port)
-			return pipeline_atq_worker_multi_stage_fwd(arg);
-		if (burst && internal_port)
-			return pipeline_atq_worker_multi_stage_burst_tx(arg);
-		else if (burst && !internal_port)
-			return pipeline_atq_worker_multi_stage_burst_fwd(arg);
-	}
+	if (nb_stages == 1)
+		return (pipeline_atq_worker_single_stage[opt->ena_vector][burst]
+							[internal_port])(arg);
+	else
+		return (pipeline_atq_worker_multi_stage[opt->ena_vector][burst]
+						       [internal_port])(arg);
 
 	rte_panic("invalid worker\n");
 }
@@ -290,7 +548,7 @@ pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt)
 	int ret;
 	int nb_ports;
 	int nb_queues;
-	uint8_t queue;
+	uint8_t queue, is_prod;
 	uint8_t tx_evqueue_id[RTE_MAX_ETHPORTS];
 	uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV];
 	uint8_t nb_worker_queues = 0;
@@ -330,15 +588,19 @@ pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt)
 		q_conf.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ALL_TYPES;
 
 		if (!t->internal_port) {
+			is_prod = false;
 			RTE_ETH_FOREACH_DEV(prod) {
 				if (queue == tx_evqueue_id[prod]) {
 					q_conf.event_queue_cfg =
 						RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
-				} else {
-					queue_arr[nb_worker_queues] = queue;
-					nb_worker_queues++;
+					is_prod = true;
+					break;
 				}
 			}
+			if (!is_prod) {
+				queue_arr[nb_worker_queues] = queue;
+				nb_worker_queues++;
+			}
 		}
 
 		ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf);
diff --git a/app/test-eventdev/test_pipeline_common.c b/app/test-eventdev/test_pipeline_common.c
index b47d76743..89f73be86 100644
--- a/app/test-eventdev/test_pipeline_common.c
+++ b/app/test-eventdev/test_pipeline_common.c
@@ -36,6 +36,12 @@ pipeline_opt_dump(struct evt_options *opt, uint8_t nb_queues)
 	evt_dump_queue_priority(opt);
 	evt_dump_sched_type_list(opt);
 	evt_dump_producer_type(opt);
+	evt_dump("nb_eth_rx_queues", "%d", opt->eth_queues);
+	evt_dump("event_vector", "%d", opt->ena_vector);
+	if (opt->ena_vector) {
+		evt_dump("vector_size", "%d", opt->vector_size);
+		evt_dump("vector_tmo_ns", "%ld", opt->vector_tmo_nsec);
+	}
 }
 
 static inline uint64_t
@@ -163,7 +169,7 @@ pipeline_opt_check(struct evt_options *opt, uint64_t nb_queues)
 int
 pipeline_ethdev_setup(struct evt_test *test, struct evt_options *opt)
 {
-	uint16_t i;
+	uint16_t i, j;
 	int ret;
 	uint8_t nb_queues = 1;
 	struct test_pipeline *t = evt_test_priv(test);
@@ -210,6 +216,16 @@ pipeline_ethdev_setup(struct evt_test *test, struct evt_options *opt)
 		if (!(caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT))
 			t->internal_port = 0;
 
+		ret = rte_event_eth_rx_adapter_caps_get(opt->dev_id, i, &caps);
+		if (ret != 0) {
+			evt_err("failed to get event tx adapter[%d] caps", i);
+			return ret;
+		}
+
+		if (!(caps & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT))
+			local_port_conf.rxmode.offloads |=
+				DEV_RX_OFFLOAD_RSS_HASH;
+
 		ret = rte_eth_dev_info_get(i, &dev_info);
 		if (ret != 0) {
 			evt_err("Error during getting device (port %u) info: %s\n",
@@ -236,19 +252,22 @@ pipeline_ethdev_setup(struct evt_test *test, struct evt_options *opt)
 				local_port_conf.rx_adv_conf.rss_conf.rss_hf);
 		}
 
-		if (rte_eth_dev_configure(i, nb_queues, nb_queues,
-					&local_port_conf)
-				< 0) {
+		if (rte_eth_dev_configure(i, opt->eth_queues, nb_queues,
+					  &local_port_conf) < 0) {
 			evt_err("Failed to configure eth port [%d]", i);
 			return -EINVAL;
 		}
 
-		if (rte_eth_rx_queue_setup(i, 0, NB_RX_DESC,
-				rte_socket_id(), &rx_conf, t->pool) < 0) {
-			evt_err("Failed to setup eth port [%d] rx_queue: %d.",
+		for (j = 0; j < opt->eth_queues; j++) {
+			if (rte_eth_rx_queue_setup(i, j, NB_RX_DESC,
+						   rte_socket_id(), &rx_conf,
+						   t->pool) < 0) {
+				evt_err("Failed to setup eth port [%d] rx_queue: %d.",
 					i, 0);
-			return -EINVAL;
+				return -EINVAL;
+			}
 		}
+
 		if (rte_eth_tx_queue_setup(i, 0, NB_TX_DESC,
 					rte_socket_id(), NULL) < 0) {
 			evt_err("Failed to setup eth port [%d] tx_queue: %d.",
@@ -310,11 +329,24 @@ pipeline_event_rx_adapter_setup(struct evt_options *opt, uint8_t stride,
 {
 	int ret = 0;
 	uint16_t prod;
+	struct rte_mempool *vector_pool = NULL;
 	struct rte_event_eth_rx_adapter_queue_conf queue_conf;
 
 	memset(&queue_conf, 0,
 			sizeof(struct rte_event_eth_rx_adapter_queue_conf));
 	queue_conf.ev.sched_type = opt->sched_type_list[0];
+	if (opt->ena_vector) {
+		unsigned int nb_elem = (opt->pool_sz / opt->vector_size) << 1;
+
+		nb_elem = nb_elem ? nb_elem : 1;
+		vector_pool = rte_event_vector_pool_create(
+			"vector_pool", nb_elem, 0, opt->vector_size,
+			opt->socket_id);
+		if (vector_pool == NULL) {
+			evt_err("failed to create event vector pool");
+			return -ENOMEM;
+		}
+	}
 	RTE_ETH_FOREACH_DEV(prod) {
 		uint32_t cap;
 
@@ -326,6 +358,19 @@ pipeline_event_rx_adapter_setup(struct evt_options *opt, uint8_t stride,
 					opt->dev_id);
 			return ret;
 		}
+		if (opt->ena_vector) {
+			if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) {
+				queue_conf.vector_sz = opt->vector_size;
+				queue_conf.vector_timeout_ns =
+					opt->vector_tmo_nsec;
+				queue_conf.rx_queue_flags |=
+				RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR;
+				queue_conf.vector_mp = vector_pool;
+			} else {
+				evt_err("Rx adapter doesn't support event vector");
+				return -EINVAL;
+			}
+		}
 		queue_conf.ev.queue_id = prod * stride;
 		ret = rte_event_eth_rx_adapter_create(prod, opt->dev_id,
 				&prod_conf);
@@ -378,6 +423,14 @@ pipeline_event_tx_adapter_setup(struct evt_options *opt,
 			return ret;
 		}
 
+		if (opt->ena_vector) {
+			if (!(cap &
+			      RTE_EVENT_ETH_TX_ADAPTER_CAP_EVENT_VECTOR)) {
+				evt_err("Tx adapter doesn't support event vector");
+				return -EINVAL;
+			}
+		}
+
 		ret = rte_event_eth_tx_adapter_create(consm, opt->dev_id,
 				&port_conf);
 		if (ret) {
diff --git a/app/test-eventdev/test_pipeline_common.h b/app/test-eventdev/test_pipeline_common.h
index 6e73c6ab2..800a90616 100644
--- a/app/test-eventdev/test_pipeline_common.h
+++ b/app/test-eventdev/test_pipeline_common.h
@@ -101,6 +101,14 @@ pipeline_fwd_event(struct rte_event *ev, uint8_t sched)
 	ev->sched_type = sched;
 }
 
+static __rte_always_inline void
+pipeline_fwd_event_vector(struct rte_event *ev, uint8_t sched)
+{
+	ev->event_type = RTE_EVENT_TYPE_CPU_VECTOR;
+	ev->op = RTE_EVENT_OP_FORWARD;
+	ev->sched_type = sched;
+}
+
 static __rte_always_inline void
 pipeline_event_tx(const uint8_t dev, const uint8_t port,
 		struct rte_event * const ev)
@@ -110,6 +118,16 @@ pipeline_event_tx(const uint8_t dev, const uint8_t port,
 		rte_pause();
 }
 
+static __rte_always_inline void
+pipeline_event_tx_vector(const uint8_t dev, const uint8_t port,
+			 struct rte_event *const ev)
+{
+	ev->vec->queue = 0;
+
+	while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0))
+		rte_pause();
+}
+
 static __rte_always_inline void
 pipeline_event_tx_burst(const uint8_t dev, const uint8_t port,
 		struct rte_event *ev, const uint16_t nb_rx)
diff --git a/app/test-eventdev/test_pipeline_queue.c b/app/test-eventdev/test_pipeline_queue.c
index 9a9febb19..f6cc3e358 100644
--- a/app/test-eventdev/test_pipeline_queue.c
+++ b/app/test-eventdev/test_pipeline_queue.c
@@ -15,6 +15,8 @@ pipeline_queue_nb_event_queues(struct evt_options *opt)
 	return (eth_count * opt->nb_stages) + eth_count;
 }
 
+typedef int (*pipeline_queue_worker_t)(void *arg);
+
 static __rte_noinline int
 pipeline_queue_worker_single_stage_tx(void *arg)
 {
@@ -126,6 +128,125 @@ pipeline_queue_worker_single_stage_burst_fwd(void *arg)
 	return 0;
 }
 
+static __rte_noinline int
+pipeline_queue_worker_single_stage_tx_vector(void *arg)
+{
+	PIPELINE_WORKER_SINGLE_STAGE_INIT;
+	uint16_t vector_sz;
+
+	while (!t->done) {
+		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+		if (!event) {
+			rte_pause();
+			continue;
+		}
+
+		if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
+			vector_sz = ev.vec->nb_elem;
+			pipeline_event_tx_vector(dev, port, &ev);
+			w->processed_pkts += vector_sz;
+		} else {
+			ev.queue_id++;
+			pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
+			pipeline_event_enqueue(dev, port, &ev);
+		}
+	}
+
+	return 0;
+}
+
+static __rte_noinline int
+pipeline_queue_worker_single_stage_fwd_vector(void *arg)
+{
+	PIPELINE_WORKER_SINGLE_STAGE_INIT;
+	const uint8_t *tx_queue = t->tx_evqueue_id;
+	uint16_t vector_sz;
+
+	while (!t->done) {
+		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+		if (!event) {
+			rte_pause();
+			continue;
+		}
+
+		ev.queue_id = tx_queue[ev.vec->port];
+		ev.vec->queue = 0;
+		vector_sz = ev.vec->nb_elem;
+		pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
+		pipeline_event_enqueue(dev, port, &ev);
+		w->processed_pkts += vector_sz;
+	}
+
+	return 0;
+}
+
+static __rte_noinline int
+pipeline_queue_worker_single_stage_burst_tx_vector(void *arg)
+{
+	PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
+	uint16_t vector_sz;
+
+	while (!t->done) {
+		uint16_t nb_rx =
+			rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+
+		if (!nb_rx) {
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+				vector_sz = ev[i].vec->nb_elem;
+				pipeline_event_tx_vector(dev, port, &ev[i]);
+				ev[i].op = RTE_EVENT_OP_RELEASE;
+				w->processed_pkts += vector_sz;
+			} else {
+				ev[i].queue_id++;
+				pipeline_fwd_event_vector(
+					&ev[i], RTE_SCHED_TYPE_ATOMIC);
+			}
+		}
+
+		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+	}
+
+	return 0;
+}
+
+static __rte_noinline int
+pipeline_queue_worker_single_stage_burst_fwd_vector(void *arg)
+{
+	PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
+	const uint8_t *tx_queue = t->tx_evqueue_id;
+	uint16_t vector_sz;
+
+	while (!t->done) {
+		uint16_t nb_rx =
+			rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+
+		if (!nb_rx) {
+			rte_pause();
+			continue;
+		}
+
+		vector_sz = 0;
+		for (i = 0; i < nb_rx; i++) {
+			ev[i].queue_id = tx_queue[ev[i].vec->port];
+			ev[i].vec->queue = 0;
+			vector_sz += ev[i].vec->nb_elem;
+			pipeline_fwd_event_vector(&ev[i],
+						  RTE_SCHED_TYPE_ATOMIC);
+		}
+
+		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+		w->processed_pkts += vector_sz;
+	}
+
+	return 0;
+}
 
 static __rte_noinline int
 pipeline_queue_worker_multi_stage_tx(void *arg)
@@ -267,6 +388,151 @@ pipeline_queue_worker_multi_stage_burst_fwd(void *arg)
 	return 0;
 }
 
+static __rte_noinline int
+pipeline_queue_worker_multi_stage_tx_vector(void *arg)
+{
+	PIPELINE_WORKER_MULTI_STAGE_INIT;
+	const uint8_t *tx_queue = t->tx_evqueue_id;
+	uint16_t vector_sz;
+
+	while (!t->done) {
+		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+		if (!event) {
+			rte_pause();
+			continue;
+		}
+
+		cq_id = ev.queue_id % nb_stages;
+
+		if (ev.queue_id == tx_queue[ev.vec->port]) {
+			vector_sz = ev.vec->nb_elem;
+			pipeline_event_tx_vector(dev, port, &ev);
+			w->processed_pkts += vector_sz;
+			continue;
+		}
+
+		ev.queue_id++;
+		pipeline_fwd_event_vector(&ev, cq_id != last_queue
+						       ? sched_type_list[cq_id]
+						       : RTE_SCHED_TYPE_ATOMIC);
+		pipeline_event_enqueue(dev, port, &ev);
+	}
+
+	return 0;
+}
+
+static __rte_noinline int
+pipeline_queue_worker_multi_stage_fwd_vector(void *arg)
+{
+	PIPELINE_WORKER_MULTI_STAGE_INIT;
+	const uint8_t *tx_queue = t->tx_evqueue_id;
+	uint16_t vector_sz;
+
+	while (!t->done) {
+		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+		if (!event) {
+			rte_pause();
+			continue;
+		}
+
+		cq_id = ev.queue_id % nb_stages;
+
+		if (cq_id == last_queue) {
+			vector_sz = ev.vec->nb_elem;
+			ev.queue_id = tx_queue[ev.vec->port];
+			pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
+			w->processed_pkts += vector_sz;
+		} else {
+			ev.queue_id++;
+			pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
+		}
+
+		pipeline_event_enqueue(dev, port, &ev);
+	}
+
+	return 0;
+}
+
+static __rte_noinline int
+pipeline_queue_worker_multi_stage_burst_tx_vector(void *arg)
+{
+	PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
+	const uint8_t *tx_queue = t->tx_evqueue_id;
+	uint16_t vector_sz;
+
+	while (!t->done) {
+		uint16_t nb_rx =
+			rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+
+		if (!nb_rx) {
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			cq_id = ev[i].queue_id % nb_stages;
+
+			if (ev[i].queue_id == tx_queue[ev[i].vec->port]) {
+				vector_sz = ev[i].vec->nb_elem;
+				pipeline_event_tx_vector(dev, port, &ev[i]);
+				ev[i].op = RTE_EVENT_OP_RELEASE;
+				w->processed_pkts += vector_sz;
+				continue;
+			}
+
+			ev[i].queue_id++;
+			pipeline_fwd_event_vector(
+				&ev[i], cq_id != last_queue
+						? sched_type_list[cq_id]
+						: RTE_SCHED_TYPE_ATOMIC);
+		}
+
+		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+	}
+
+	return 0;
+}
+
+static __rte_noinline int
+pipeline_queue_worker_multi_stage_burst_fwd_vector(void *arg)
+{
+	PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
+	const uint8_t *tx_queue = t->tx_evqueue_id;
+	uint16_t vector_sz;
+
+	while (!t->done) {
+		uint16_t nb_rx =
+			rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+
+		if (!nb_rx) {
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			cq_id = ev[i].queue_id % nb_stages;
+
+			if (cq_id == last_queue) {
+				ev[i].queue_id = tx_queue[ev[i].vec->port];
+				vector_sz = ev[i].vec->nb_elem;
+				pipeline_fwd_event_vector(
+					&ev[i], RTE_SCHED_TYPE_ATOMIC);
+				w->processed_pkts += vector_sz;
+			} else {
+				ev[i].queue_id++;
+				pipeline_fwd_event_vector(
+					&ev[i], sched_type_list[cq_id]);
+			}
+		}
+
+		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+	}
+
+	return 0;
+}
+
 static int
 worker_wrapper(void *arg)
 {
@@ -275,29 +541,39 @@ worker_wrapper(void *arg)
 	const bool burst = evt_has_burst_mode(w->dev_id);
 	const bool internal_port = w->t->internal_port;
 	const uint8_t nb_stages = opt->nb_stages;
-	RTE_SET_USED(opt);
-
-	if (nb_stages == 1) {
-		if (!burst && internal_port)
-			return pipeline_queue_worker_single_stage_tx(arg);
-		else if (!burst && !internal_port)
-			return pipeline_queue_worker_single_stage_fwd(arg);
-		else if (burst && internal_port)
-			return pipeline_queue_worker_single_stage_burst_tx(arg);
-		else if (burst && !internal_port)
-			return pipeline_queue_worker_single_stage_burst_fwd(
-					arg);
-	} else {
-		if (!burst && internal_port)
-			return pipeline_queue_worker_multi_stage_tx(arg);
-		else if (!burst && !internal_port)
-			return pipeline_queue_worker_multi_stage_fwd(arg);
-		else if (burst && internal_port)
-			return pipeline_queue_worker_multi_stage_burst_tx(arg);
-		else if (burst && !internal_port)
-			return pipeline_queue_worker_multi_stage_burst_fwd(arg);
+	/*vector/burst/internal_port*/
+	const pipeline_queue_worker_t
+	pipeline_queue_worker_single_stage[2][2][2] = {
+		[0][0][0] = pipeline_queue_worker_single_stage_fwd,
+		[0][0][1] = pipeline_queue_worker_single_stage_tx,
+		[0][1][0] = pipeline_queue_worker_single_stage_burst_fwd,
+		[0][1][1] = pipeline_queue_worker_single_stage_burst_tx,
+		[1][0][0] = pipeline_queue_worker_single_stage_fwd_vector,
+		[1][0][1] = pipeline_queue_worker_single_stage_tx_vector,
+		[1][1][0] = pipeline_queue_worker_single_stage_burst_fwd_vector,
+		[1][1][1] = pipeline_queue_worker_single_stage_burst_tx_vector,
+	};
+	const pipeline_queue_worker_t
+	pipeline_queue_worker_multi_stage[2][2][2] = {
+		[0][0][0] = pipeline_queue_worker_multi_stage_fwd,
+		[0][0][1] = pipeline_queue_worker_multi_stage_tx,
+		[0][1][0] = pipeline_queue_worker_multi_stage_burst_fwd,
+		[0][1][1] = pipeline_queue_worker_multi_stage_burst_tx,
+		[1][0][0] = pipeline_queue_worker_multi_stage_fwd_vector,
+		[1][0][1] = pipeline_queue_worker_multi_stage_tx_vector,
+		[1][1][0] = pipeline_queue_worker_multi_stage_burst_fwd_vector,
+		[1][1][1] = pipeline_queue_worker_multi_stage_burst_tx_vector,
+	};
+
+	if (nb_stages == 1)
+		return (pipeline_queue_worker_single_stage[opt->ena_vector]
+							  [burst]
+							  [internal_port])(arg);
+	else
+		return (pipeline_queue_worker_multi_stage[opt->ena_vector]
+							 [burst]
+							 [internal_port])(arg);
 
-	}
 	rte_panic("invalid worker\n");
 }
 
diff --git a/doc/guides/tools/testeventdev.rst b/doc/guides/tools/testeventdev.rst
index ad1788a3d..691cf706e 100644
--- a/doc/guides/tools/testeventdev.rst
+++ b/doc/guides/tools/testeventdev.rst
@@ -158,6 +158,26 @@ The following are the application command-line options:
        Set max packet mbuf size. Can be used configure Rx/Tx scatter gather.
        Only applicable for `pipeline_atq` and `pipeline_queue` tests.
 
+* ``--nb_eth_queues``
+
+       Configure multiple Rx queues per each ethernet port.
+       Only applicable for `pipeline_atq` and `pipeline_queue` tests.
+
+* ``--enable_vector``
+
+       Enable event vector for Rx/Tx adapters.
+       Only applicable for `pipeline_atq` and `pipeline_queue` tests.
+
+* ``--vector_size``
+
+       Vector size to configure for the Rx adapter.
+       Only applicable for `pipeline_atq` and `pipeline_queue` tests.
+
+* ``--vector_tmo_ns``
+
+       Vector timeout nanoseconds to be configured for the Rx adapter.
+       Only applicable for `pipeline_atq` and `pipeline_queue` tests.
+
 
 Eventdev Tests
 --------------
@@ -607,6 +627,10 @@ Supported application command line options are following::
         --worker_deq_depth
         --prod_type_ethdev
         --deq_tmo_nsec
+        --nb_eth_queues
+        --enable_vector
+        --vector_size
+        --vector_tmo_ns
 
 
 .. Note::
@@ -699,6 +723,10 @@ Supported application command line options are following::
         --worker_deq_depth
         --prod_type_ethdev
         --deq_tmo_nsec
+        --nb_eth_queues
+        --enable_vector
+        --vector_size
+        --vector_tmo_ns
 
 
 .. Note::
-- 
2.17.1


^ permalink raw reply	[flat|nested] 8+ messages in thread

* [dpdk-dev] [PATCH 7/7] eventdev: fix ABI breakage due to event vector
  2021-02-20 22:09 [dpdk-dev] [PATCH 0/7] Introduce event vectorization pbhagavatula
                   ` (5 preceding siblings ...)
  2021-02-20 22:09 ` [dpdk-dev] [PATCH 6/7] app/eventdev: add event vector mode in pipeline test pbhagavatula
@ 2021-02-20 22:09 ` pbhagavatula
  6 siblings, 0 replies; 8+ messages in thread
From: pbhagavatula @ 2021-02-20 22:09 UTC (permalink / raw)
  To: jerinj, jay.jayatheerthan, erik.g.carrillo, abhinandan.gujjar,
	timothy.mcdaniel, hemant.agrawal, harry.van.haaren,
	mattias.ronnblom, liang.j.ma, Ray Kinsella, Neil Horman
  Cc: dev, Pavan Nikhilesh

From: Pavan Nikhilesh <pbhagavatula@marvell.com>

Fix ABI breakage due to event vector configuration by moving
the vector configuration into a new structure and having a separate
function for enabling the vector config on a given ethernet device and
queue pair.
This vector config and function can be merged to queue config in
v21.11.

Fixes: 44c81670cf0a ("eventdev: introduce event vector Rx capability")

Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
---
 app/test-eventdev/test_pipeline_common.c      |  16 +-
 lib/librte_eventdev/eventdev_pmd.h            |  29 +++
 .../rte_event_eth_rx_adapter.c                | 168 ++++++++++++------
 .../rte_event_eth_rx_adapter.h                |  27 +++
 lib/librte_eventdev/version.map               |   1 +
 5 files changed, 184 insertions(+), 57 deletions(-)

diff --git a/app/test-eventdev/test_pipeline_common.c b/app/test-eventdev/test_pipeline_common.c
index 89f73be86..9aeefdd5f 100644
--- a/app/test-eventdev/test_pipeline_common.c
+++ b/app/test-eventdev/test_pipeline_common.c
@@ -331,6 +331,7 @@ pipeline_event_rx_adapter_setup(struct evt_options *opt, uint8_t stride,
 	uint16_t prod;
 	struct rte_mempool *vector_pool = NULL;
 	struct rte_event_eth_rx_adapter_queue_conf queue_conf;
+	struct rte_event_eth_rx_adapter_event_vector_config vec_conf;

 	memset(&queue_conf, 0,
 			sizeof(struct rte_event_eth_rx_adapter_queue_conf));
@@ -360,12 +361,8 @@ pipeline_event_rx_adapter_setup(struct evt_options *opt, uint8_t stride,
 		}
 		if (opt->ena_vector) {
 			if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) {
-				queue_conf.vector_sz = opt->vector_size;
-				queue_conf.vector_timeout_ns =
-					opt->vector_tmo_nsec;
 				queue_conf.rx_queue_flags |=
 				RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR;
-				queue_conf.vector_mp = vector_pool;
 			} else {
 				evt_err("Rx adapter doesn't support event vector");
 				return -EINVAL;
@@ -385,6 +382,17 @@ pipeline_event_rx_adapter_setup(struct evt_options *opt, uint8_t stride,
 			return ret;
 		}

+		if (opt->ena_vector) {
+			vec_conf.vector_sz = opt->vector_size;
+			vec_conf.vector_timeout_ns = opt->vector_tmo_nsec;
+			vec_conf.vector_mp = vector_pool;
+			if (rte_event_eth_rx_adapter_queue_event_vector_config(
+				    prod, prod, -1, &vec_conf) < 0) {
+				evt_err("Failed to configure event vectorization for Rx adapter");
+				return -EINVAL;
+			}
+		}
+
 		if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) {
 			uint32_t service_id = -1U;

diff --git a/lib/librte_eventdev/eventdev_pmd.h b/lib/librte_eventdev/eventdev_pmd.h
index 60bfaebc0..d79dfd612 100644
--- a/lib/librte_eventdev/eventdev_pmd.h
+++ b/lib/librte_eventdev/eventdev_pmd.h
@@ -667,6 +667,32 @@ typedef int (*eventdev_eth_rx_adapter_vector_limits_get_t)(
 	const struct rte_eventdev *dev, const struct rte_eth_dev *eth_dev,
 	struct rte_event_eth_rx_adapter_vector_limits *limits);

+struct rte_event_eth_rx_adapter_event_vector_config;
+/**
+ * Enable event vector on an given Rx queue of a ethernet devices belonging to
+ * the Rx adapter.
+ *
+ * @param dev
+ *   Event device pointer
+ *
+ * @param eth_dev
+ *   Ethernet device pointer
+ *
+ * @param rx_queue_id
+ *   The Rx queue identifier
+ *
+ * @param config
+ *   Pointer to the event vector configuration structure.
+ *
+ * @return
+ *   - 0: Success.
+ *   - <0: Error code returned by the driver function.
+ */
+typedef int (*eventdev_eth_rx_adapter_event_vector_config_t)(
+	const struct rte_eventdev *dev, const struct rte_eth_dev *eth_dev,
+	int32_t rx_queue_id,
+	const struct rte_event_eth_rx_adapter_event_vector_config *config);
+
 typedef uint32_t rte_event_pmd_selftest_seqn_t;
 extern int rte_event_pmd_selftest_seqn_dynfield_offset;

@@ -1092,6 +1118,9 @@ struct rte_eventdev_ops {
 	eventdev_eth_rx_adapter_vector_limits_get_t
 		eth_rx_adapter_vector_limits_get;
 	/**< Get event vector limits for the Rx adapter */
+	eventdev_eth_rx_adapter_event_vector_config_t
+		eth_rx_adapter_event_vector_config;
+	/**< Configure Rx adapter with event vector */

 	eventdev_timer_adapter_caps_get_t timer_adapter_caps_get;
 	/**< Get timer adapter capabilities */
diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
index a1990637f..c71990078 100644
--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
@@ -1882,25 +1882,6 @@ rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 	} else
 		qi_ev->flow_id = 0;

-	if (conf->rx_queue_flags &
-	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
-		queue_info->ena_vector = 1;
-		qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
-		rxa_set_vector_data(queue_info, conf->vector_sz,
-				    conf->vector_timeout_ns, conf->vector_mp,
-				    rx_queue_id, dev_info->dev->data->port_id);
-		rx_adapter->ena_vector = 1;
-		rx_adapter->vector_tmo_ticks =
-			rx_adapter->vector_tmo_ticks
-				? RTE_MIN(queue_info->vector_data
-						  .vector_timeout_ticks,
-					  rx_adapter->vector_tmo_ticks)
-				: queue_info->vector_data.vector_timeout_ticks;
-		rx_adapter->vector_tmo_ticks <<= 1;
-		TAILQ_INIT(&rx_adapter->vector_list);
-		rx_adapter->prev_expiry_ts = 0;
-	}
-
 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
 	if (rxa_polled_queue(dev_info, rx_queue_id)) {
 		rx_adapter->num_rx_polled += !pollq;
@@ -1926,6 +1907,44 @@ rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 	}
 }

+static void
+rxa_sw_event_vector_configure(
+	struct rte_event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
+	int rx_queue_id,
+	const struct rte_event_eth_rx_adapter_event_vector_config *config)
+{
+	struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
+	struct eth_rx_queue_info *queue_info;
+	struct rte_event *qi_ev;
+
+	if (rx_queue_id == -1) {
+		uint16_t nb_rx_queues;
+		uint16_t i;
+
+		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
+		for (i = 0; i < nb_rx_queues; i++)
+			rxa_sw_event_vector_configure(rx_adapter, eth_dev_id, i,
+						      config);
+		return;
+	}
+
+	queue_info = &dev_info->rx_queue[rx_queue_id];
+	qi_ev = (struct rte_event *)&queue_info->event;
+	queue_info->ena_vector = 1;
+	qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
+	rxa_set_vector_data(queue_info, config->vector_sz,
+			    config->vector_timeout_ns, config->vector_mp,
+			    rx_queue_id, dev_info->dev->data->port_id);
+	rx_adapter->ena_vector = 1;
+	rx_adapter->vector_tmo_ticks =
+		rx_adapter->vector_tmo_ticks ?
+			      RTE_MIN(config->vector_timeout_ns << 1,
+				      rx_adapter->vector_tmo_ticks) :
+			      config->vector_timeout_ns << 1;
+	rx_adapter->prev_expiry_ts = 0;
+	TAILQ_INIT(&rx_adapter->vector_list);
+}
+
 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 		uint16_t eth_dev_id,
 		int rx_queue_id,
@@ -2239,7 +2258,6 @@ rte_event_eth_rx_adapter_queue_add(uint8_t id,
 	struct rte_event_eth_rx_adapter *rx_adapter;
 	struct rte_eventdev *dev;
 	struct eth_device_info *dev_info;
-	struct rte_event_eth_rx_adapter_vector_limits limits;

 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
@@ -2276,39 +2294,6 @@ rte_event_eth_rx_adapter_queue_add(uint8_t id,
 		return -EINVAL;
 	}

-	if (queue_conf->rx_queue_flags &
-	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
-		ret = rte_event_eth_rx_adapter_vector_limits_get(
-			rx_adapter->eventdev_id, eth_dev_id, &limits);
-		if (ret < 0) {
-			RTE_EDEV_LOG_ERR("Failed to get event device vector limits,"
-					 " eth port: %" PRIu16
-					 " adapter id: %" PRIu8,
-					 eth_dev_id, id);
-			return -EINVAL;
-		}
-		if (queue_conf->vector_sz < limits.min_sz ||
-		    queue_conf->vector_sz > limits.max_sz ||
-		    queue_conf->vector_timeout_ns < limits.min_timeout_ns ||
-		    queue_conf->vector_timeout_ns > limits.max_timeout_ns ||
-		    queue_conf->vector_mp == NULL) {
-			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
-					 " eth port: %" PRIu16
-					 " adapter id: %" PRIu8,
-					 eth_dev_id, id);
-			return -EINVAL;
-		}
-		if (queue_conf->vector_mp->elt_size <
-		    (sizeof(struct rte_event_vector) +
-		     (sizeof(uintptr_t) * queue_conf->vector_sz))) {
-			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
-					 " eth port: %" PRIu16
-					 " adapter id: %" PRIu8,
-					 eth_dev_id, id);
-			return -EINVAL;
-		}
-	}
-
 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
 		(rx_queue_id != -1)) {
 		RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
@@ -2502,6 +2487,83 @@ rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
 	return ret;
 }

+int
+rte_event_eth_rx_adapter_queue_event_vector_config(
+	uint8_t id, uint16_t eth_dev_id, int32_t rx_queue_id,
+	struct rte_event_eth_rx_adapter_event_vector_config *config)
+{
+	struct rte_event_eth_rx_adapter_vector_limits limits;
+	struct rte_event_eth_rx_adapter *rx_adapter;
+	struct rte_eventdev *dev;
+	uint32_t cap;
+	int ret;
+
+	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
+	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
+
+	rx_adapter = rxa_id_to_adapter(id);
+	if ((rx_adapter == NULL) || (config == NULL))
+		return -EINVAL;
+
+	dev = &rte_eventdevs[rx_adapter->eventdev_id];
+	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
+						eth_dev_id, &cap);
+	if (ret) {
+		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
+				 "eth port %" PRIu16,
+				 id, eth_dev_id);
+		return ret;
+	}
+
+	if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR)) {
+		RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
+				 " eth port: %" PRIu16 " adapter id: %" PRIu8,
+				 eth_dev_id, id);
+		return -EINVAL;
+	}
+
+	ret = rte_event_eth_rx_adapter_vector_limits_get(
+		rx_adapter->eventdev_id, eth_dev_id, &limits);
+	if (ret) {
+		RTE_EDEV_LOG_ERR("Failed to get vector limits edev %" PRIu8
+				 "eth port %" PRIu16,
+				 rx_adapter->eventdev_id, eth_dev_id);
+		return ret;
+	}
+
+	if (config->vector_sz < limits.min_sz ||
+	    config->vector_sz > limits.max_sz ||
+	    config->vector_timeout_ns < limits.min_timeout_ns ||
+	    config->vector_timeout_ns > limits.max_timeout_ns ||
+	    config->vector_mp == NULL) {
+		RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
+				 " eth port: %" PRIu16 " adapter id: %" PRIu8,
+				 eth_dev_id, id);
+		return -EINVAL;
+	}
+	if (config->vector_mp->elt_size <
+	    (sizeof(struct rte_event_vector) +
+	     (sizeof(uintptr_t) * config->vector_sz))) {
+		RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
+				 " eth port: %" PRIu16 " adapter id: %" PRIu8,
+				 eth_dev_id, id);
+		return -EINVAL;
+	}
+
+	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
+		RTE_FUNC_PTR_OR_ERR_RET(
+			*dev->dev_ops->eth_rx_adapter_event_vector_config,
+			-ENOTSUP);
+		ret = dev->dev_ops->eth_rx_adapter_event_vector_config(
+			dev, &rte_eth_devices[eth_dev_id], rx_queue_id, config);
+	} else {
+		rxa_sw_event_vector_configure(rx_adapter, eth_dev_id,
+					      rx_queue_id, config);
+	}
+
+	return ret;
+}
+
 int
 rte_event_eth_rx_adapter_vector_limits_get(
 	uint8_t dev_id, uint16_t eth_port_id,
diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.h b/lib/librte_eventdev/rte_event_eth_rx_adapter.h
index 4bdb38f08..ceef6d565 100644
--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.h
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.h
@@ -171,6 +171,9 @@ struct rte_event_eth_rx_adapter_queue_conf {
 	 * The event adapter sets ev.event_type to RTE_EVENT_TYPE_ETHDEV in the
 	 * enqueued event.
 	 */
+};
+
+struct rte_event_eth_rx_adapter_event_vector_config {
 	uint16_t vector_sz;
 	/**<
 	 * Indicates the maximum number for mbufs to combine and form a vector.
@@ -418,6 +421,30 @@ int rte_event_eth_rx_adapter_queue_add(uint8_t id,
 int rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
 				       int32_t rx_queue_id);

+/**
+ * Configure event vectorization for a given ethernet device queue, that has
+ * been added to a event eth Rx adapter.
+ *
+ * @param id
+ *  The identifier of the ethernet Rx event adapter.
+ *
+ * @param eth_dev_id
+ *  The identifier of the ethernet device.
+ *
+ * @param rx_queue_id
+ *  Ethernet device receive queue index.
+ *  If rx_queue_id is -1, then all Rx queues configured for the ethernet device
+ *  are configured with event vectorization.
+ *
+ * @return
+ *  - 0: Success, Receive queue configured correctly.
+ *  - <0: Error code on failure.
+ */
+__rte_experimental
+int rte_event_eth_rx_adapter_queue_event_vector_config(
+	uint8_t id, uint16_t eth_dev_id, int32_t rx_queue_id,
+	struct rte_event_eth_rx_adapter_event_vector_config *config);
+
 /**
  * Start ethernet Rx event adapter
  *
diff --git a/lib/librte_eventdev/version.map b/lib/librte_eventdev/version.map
index 34c1c830e..902df0ae3 100644
--- a/lib/librte_eventdev/version.map
+++ b/lib/librte_eventdev/version.map
@@ -142,6 +142,7 @@ EXPERIMENTAL {
 	#added in 21.05
 	rte_event_vector_pool_create;
 	rte_event_eth_rx_adapter_vector_limits_get;
+	rte_event_eth_rx_adapter_queue_event_vector_config;
 };

 INTERNAL {
--
2.17.1


^ permalink raw reply	[flat|nested] 8+ messages in thread

end of thread, other threads:[~2021-02-20 22:11 UTC | newest]

Thread overview: 8+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-02-20 22:09 [dpdk-dev] [PATCH 0/7] Introduce event vectorization pbhagavatula
2021-02-20 22:09 ` [dpdk-dev] [PATCH 1/7] eventdev: introduce event vector capability pbhagavatula
2021-02-20 22:09 ` [dpdk-dev] [PATCH 2/7] eventdev: introduce event vector Rx capability pbhagavatula
2021-02-20 22:09 ` [dpdk-dev] [PATCH 3/7] eventdev: introduce event vector Tx capability pbhagavatula
2021-02-20 22:09 ` [dpdk-dev] [PATCH 4/7] eventdev: add Rx adapter event vector support pbhagavatula
2021-02-20 22:09 ` [dpdk-dev] [PATCH 5/7] eventdev: add Tx " pbhagavatula
2021-02-20 22:09 ` [dpdk-dev] [PATCH 6/7] app/eventdev: add event vector mode in pipeline test pbhagavatula
2021-02-20 22:09 ` [dpdk-dev] [PATCH 7/7] eventdev: fix ABI breakage due to event vector pbhagavatula

DPDK patches and discussions

This inbox may be cloned and mirrored by anyone:

	git clone --mirror http://inbox.dpdk.org/dev/0 dev/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 dev dev/ http://inbox.dpdk.org/dev \
		dev@dpdk.org
	public-inbox-index dev

Example config snippet for mirrors.
Newsgroup available over NNTP:
	nntp://inbox.dpdk.org/inbox.dpdk.dev


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git