DPDK patches and discussions
 help / color / mirror / Atom feed
From: Sivaprasad Tummala <sivaprasad.tummala@amd.com>
To: <david.hunt@intel.com>, <jerinj@marvell.com>,
	<harry.van.haaren@intel.com>
Cc: <dev@dpdk.org>
Subject: [RFC PATCH 3/5] eventdev: support optional dequeue callbacks
Date: Wed, 19 Apr 2023 02:54:25 -0700	[thread overview]
Message-ID: <20230419095427.563185-3-sivaprasad.tummala@amd.com> (raw)
In-Reply-To: <20230419095427.563185-1-sivaprasad.tummala@amd.com>

Add optional support for inline event processing within dequeue call.
For a dequeue callback, events dequeued from the event port were
passed them to a callback function if configured, to allow
additional processing. e.g. unpack batch of packets from each event
on dequeue, before passing back to the application.

Signed-off-by: Sivaprasad Tummala <sivaprasad.tummala@amd.com>
---
 lib/eventdev/eventdev_pmd.h      |  17 ++++
 lib/eventdev/eventdev_private.c  |  17 ++++
 lib/eventdev/rte_eventdev.c      |  78 +++++++++++++++++
 lib/eventdev/rte_eventdev.h      | 145 ++++++++++++++++++++++++++++++-
 lib/eventdev/rte_eventdev_core.h |  12 ++-
 lib/eventdev/version.map         |   6 ++
 6 files changed, 272 insertions(+), 3 deletions(-)

diff --git a/lib/eventdev/eventdev_pmd.h b/lib/eventdev/eventdev_pmd.h
index 7b12f80f57..c87e06993f 100644
--- a/lib/eventdev/eventdev_pmd.h
+++ b/lib/eventdev/eventdev_pmd.h
@@ -97,6 +97,19 @@ struct rte_eventdev_global {
 	uint8_t nb_devs;	/**< Number of devices found */
 };
 
+/**
+ * @internal
+ * Structure used to hold information about the callbacks to be called for a
+ * port on dequeue.
+ */
+struct rte_event_dequeue_callback {
+	struct rte_event_dequeue_callback *next;
+	union{
+		rte_dequeue_callback_fn dequeue;
+	} fn;
+	void *param;
+};
+
 /**
  * @internal
  * The data part, with no function pointers, associated with each device.
@@ -173,6 +186,10 @@ struct rte_eventdev {
 	/**< Pointer to PMD dequeue burst function. */
 	event_maintain_t maintain;
 	/**< Pointer to PMD port maintenance function. */
+	struct rte_event_dequeue_callback *post_dequeue_burst_cbs[RTE_EVENT_MAX_PORTS_PER_DEV];
+	/**<  User-supplied functions called from dequeue_burst to post-process
+	 * received packets before passing them to the user
+	 */
 	event_tx_adapter_enqueue_t txa_enqueue_same_dest;
 	/**< Pointer to PMD eth Tx adapter burst enqueue function with
 	 * events destined to same Eth port & Tx queue.
diff --git a/lib/eventdev/eventdev_private.c b/lib/eventdev/eventdev_private.c
index 1d3d9d357e..6d1cbdb17d 100644
--- a/lib/eventdev/eventdev_private.c
+++ b/lib/eventdev/eventdev_private.c
@@ -118,4 +118,21 @@ event_dev_fp_ops_set(struct rte_event_fp_ops *fp_op,
 	fp_op->txa_enqueue_same_dest = dev->txa_enqueue_same_dest;
 	fp_op->ca_enqueue = dev->ca_enqueue;
 	fp_op->data = dev->data->ports;
+	fp_op->ev_port.clbk = (void **)(uintptr_t)dev->post_dequeue_burst_cbs;
+	fp_op->ev_port.data = dev->data->ports;
+}
+
+uint16_t
+rte_event_dequeue_callbacks(uint8_t dev_id, uint8_t port_id,
+		struct rte_event *ev, uint16_t nb_events, void *opaque)
+{
+	static uint16_t nb_rx;
+	const struct rte_event_dequeue_callback *cb = opaque;
+
+	while (cb != NULL) {
+		nb_rx = cb->fn.dequeue(dev_id, port_id, ev,
+				nb_events, cb->param);
+		cb = cb->next;
+	}
+	return nb_rx;
 }
diff --git a/lib/eventdev/rte_eventdev.c b/lib/eventdev/rte_eventdev.c
index ff77194783..0d43cb2d0a 100644
--- a/lib/eventdev/rte_eventdev.c
+++ b/lib/eventdev/rte_eventdev.c
@@ -38,6 +38,9 @@ static struct rte_eventdev_global eventdev_globals = {
 /* Public fastpath APIs. */
 struct rte_event_fp_ops rte_event_fp_ops[RTE_EVENT_MAX_DEVS];
 
+/* spinlock for add/remove dequeue callbacks */
+static rte_spinlock_t event_dev_dequeue_cb_lock = RTE_SPINLOCK_INITIALIZER;
+
 /* Event dev north bound API implementation */
 
 uint8_t
@@ -860,6 +863,81 @@ rte_event_port_attr_get(uint8_t dev_id, uint8_t port_id, uint32_t attr_id,
 	return 0;
 }
 
+const struct rte_event_dequeue_callback *
+rte_event_add_dequeue_callback(uint8_t dev_id, uint8_t port_id,
+		rte_dequeue_callback_fn fn, void *user_param)
+{
+	struct rte_eventdev *dev;
+	struct rte_event_dequeue_callback *cb;
+	struct rte_event_dequeue_callback *tail;
+
+	/* check input parameters */
+	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, NULL);
+	dev = &rte_eventdevs[dev_id];
+	if (!is_valid_port(dev, port_id)) {
+		RTE_EDEV_LOG_ERR("Invalid port_id=%" PRIu8, port_id);
+		return NULL;
+	}
+
+	cb = rte_zmalloc(NULL, sizeof(*cb), 0);
+	if (cb == NULL) {
+		rte_errno = ENOMEM;
+		return NULL;
+	}
+	cb->fn.dequeue = fn;
+	cb->param = user_param;
+
+	rte_spinlock_lock(&event_dev_dequeue_cb_lock);
+	/* Add the callbacks in fifo order. */
+	tail = rte_eventdevs[dev_id].post_dequeue_burst_cbs[port_id];
+	if (!tail) {
+		/* Stores to cb->fn and cb->param should complete before
+		 * cb is visible to data plane.
+		 */
+		__atomic_store_n(
+			&rte_eventdevs[dev_id].post_dequeue_burst_cbs[port_id],
+			cb, __ATOMIC_RELEASE);
+	} else {
+		while (tail->next)
+			tail = tail->next;
+		/* Stores to cb->fn and cb->param should complete before
+		 * cb is visible to data plane.
+		 */
+		__atomic_store_n(&tail->next, cb, __ATOMIC_RELEASE);
+	}
+	rte_spinlock_unlock(&event_dev_dequeue_cb_lock);
+
+	return cb;
+}
+
+int
+rte_event_remove_dequeue_callback(uint8_t dev_id, uint8_t port_id,
+		const struct rte_event_dequeue_callback *user_cb)
+{
+	struct rte_eventdev *dev;
+
+	/* Check input parameters. */
+	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
+	dev = &rte_eventdevs[dev_id];
+	if (user_cb == NULL || !is_valid_port(dev, port_id))
+		return -EINVAL;
+
+	rte_spinlock_lock(&event_dev_dequeue_cb_lock);
+	prev_cb = &dev->post_dequeue_burst_cbs[port_id];
+	for (; *prev_cb != NULL; prev_cb = &cb->next) {
+		cb = *prev_cb;
+		if (cb == user_cb) {
+			/* Remove the user cb from the callback list. */
+			__atomic_store_n(prev_cb, cb->next, __ATOMIC_RELAXED);
+			ret = 0;
+			break;
+		}
+	}
+	rte_spinlock_unlock(&event_dev_dequeue_cb_lock);
+
+	return ret;
+}
+
 int
 rte_event_port_get_monitor_addr(uint8_t dev_id, uint8_t port_id,
 		struct rte_power_monitor_cond *pmc)
diff --git a/lib/eventdev/rte_eventdev.h b/lib/eventdev/rte_eventdev.h
index 841b1fb9b5..9ccd259058 100644
--- a/lib/eventdev/rte_eventdev.h
+++ b/lib/eventdev/rte_eventdev.h
@@ -948,6 +948,100 @@ void
 rte_event_port_quiesce(uint8_t dev_id, uint8_t port_id,
 		       rte_eventdev_port_flush_t release_cb, void *args);
 
+struct rte_event_dequeue_callback;
+
+/**
+ * Function type used for dequeue event processing callbacks.
+ *
+ * The callback function is called on dequeue with a burst of events that have
+ * been received on the given event port.
+ *
+ * @param dev_id
+ *   The identifier of the device.
+ * @param port_id
+ *   The identifier of the event port.
+ * @param[out] ev
+ *   Points to an array of *nb_events* objects of type *rte_event* structure
+ *   for output to be populated with the dequeued event objects.
+ * @param nb_events
+ *   The maximum number of event objects to dequeue, typically number of
+ *   rte_event_port_dequeue_depth() available for this port.
+ * @param opaque
+ *   Opaque pointer of event port callback related data.
+ *
+ * @return
+ *  The number of event objects returned to the user.
+ */
+typedef uint16_t (*rte_dequeue_callback_fn)(uint8_t dev_id, uint8_t port_id,
+		struct rte_event *ev, uint16_t nb_events, void *user_param);
+
+/**
+ * Add a callback to be called on event dequeue on a given event device port.
+ *
+ * This API configures a function to be called for each burst of
+ * events dequeued on a given event device port. The return value is a pointer
+ * that can be used to later remove the callback using
+ * rte_event_remove_dequeue_callback().
+ *
+ * Multiple functions are called in the order that they are added.
+ *
+ * @param dev_id
+ *   The identifier of the device.
+ * @param port_id
+ *   The identifier of the event port.
+ * @param fn
+ *   The callback function
+ * @param user_param
+ *   A generic pointer parameter which will be passed to each invocation of the
+ *   callback function on this event device port. Inter-thread synchronization
+ *   of any user data changes is the responsibility of the user.
+ *
+ * @return
+ *   NULL on error.
+ *   On success, a pointer value which can later be used to remove the callback.
+ */
+__rte_experimental
+const struct rte_event_dequeue_callback *
+rte_event_add_dequeue_callback(uint8_t dev_id, uint8_t port_id,
+		rte_dequeue_callback_fn fn, void *user_param);
+
+/**
+ * Remove a dequeue event callback from a given event device port.
+ *
+ * This API is used to removed callbacks that were added to a event device port
+ * using rte_event_add_dequeue_callback().
+ *
+ * Note: the callback is removed from the callback list but it isn't freed
+ * since the it may still be in use. The memory for the callback can be
+ * subsequently freed back by the application by calling rte_free():
+ *
+ * - Immediately - if the device is stopped, or the user knows that no
+ *   callbacks are in flight e.g. if called from the thread doing dequeue
+ *   on that port.
+ *
+ * - After a short delay - where the delay is sufficient to allow any
+ *   in-flight callbacks to complete. Alternately, the RCU mechanism can be
+ *   used to detect when data plane threads have ceased referencing the
+ *   callback memory.
+ *
+ * @param dev_id
+ *   The identifier of the device.
+ * @param port_id
+ *   The identifier of the event port.
+ * @param user_cb
+ *   The callback function
+ *
+ * @return
+ *   - 0: Success. Callback was removed.
+ *   - -ENODEV:  If *dev_id* is invalid.
+ *   - -EINVAL:  The port_id is out of range, or the callback
+ *               is NULL.
+ */
+__rte_experimental
+int
+rte_event_remove_dequeue_callback(uint8_t dev_id, uint8_t port_id,
+		const struct rte_event_dequeue_callback *user_cb);
+
 /**
  * The queue depth of the port on the enqueue side
  */
@@ -2133,6 +2227,34 @@ rte_event_enqueue_forward_burst(uint8_t dev_id, uint8_t port_id,
 					 fp_ops->enqueue_forward_burst);
 }
 
+/**
+ * @internal
+ * Helper routine for rte_event_dequeue_burst().
+ * Should be called at exit from PMD's rte_event_dequeue() implementation.
+ * Does necessary post-processing - invokes dequeue callbacks if any, etc.
+ *
+ * @param dev_id
+ *   The identifier of the device.
+ * @param port_id
+ *   The identifier of the event port.
+ * @param[out] ev
+ *   Points to an array of *nb_events* objects of type *rte_event* structure
+ *   for output to be populated with the dequeued event objects.
+ * @param nb_events
+ *   The maximum number of event objects to dequeue, typically number of
+ *   rte_event_port_dequeue_depth() available for this port.
+ * @param opaque
+ *   Opaque pointer of event port callback related data.
+ *
+ * @return
+ *  The number of event objects actually dequeued from the port. The return
+ *  value can be less than the value of the *nb_events* parameter when the
+ *  event port's queue is not full.
+ */
+__rte_experimental
+uint16_t rte_event_dequeue_callbacks(uint8_t dev_id, uint8_t port_id,
+		struct rte_event *ev, uint16_t nb_events, void *opaque);
+
 /**
  * Dequeue a burst of events objects or an event object from the event port
  * designated by its *event_port_id*, on an event device designated
@@ -2205,6 +2327,7 @@ rte_event_dequeue_burst(uint8_t dev_id, uint8_t port_id, struct rte_event ev[],
 {
 	const struct rte_event_fp_ops *fp_ops;
 	void *port;
+	uint16_t nb_rx;
 
 	fp_ops = &rte_event_fp_ops[dev_id];
 	port = fp_ops->data[port_id];
@@ -2226,10 +2349,28 @@ rte_event_dequeue_burst(uint8_t dev_id, uint8_t port_id, struct rte_event ev[],
 	 * requests nb_events as const one
 	 */
 	if (nb_events == 1)
-		return (fp_ops->dequeue)(port, ev, timeout_ticks);
+		nb_rx = fp_ops->dequeue(port, ev, timeout_ticks);
 	else
-		return (fp_ops->dequeue_burst)(port, ev, nb_events,
+		nb_rx = fp_ops->dequeue_burst(port, ev, nb_events,
 					       timeout_ticks);
+
+	{
+		void *cb;
+
+		/* __ATOMIC_RELEASE memory order was used when the
+		 * call back was inserted into the list.
+		 * Since there is a clear dependency between loading
+		 * cb and cb->fn/cb->next, __ATOMIC_ACQUIRE memory order is
+		 * not required.
+		 */
+		cb = __atomic_load_n((void **)&fp_ops->ev_port.clbk[port_id],
+				__ATOMIC_RELAXED);
+		if (unlikely(cb != NULL))
+			nb_rx = rte_event_dequeue_callbacks(dev_id, port_id,
+							ev, nb_rx, cb);
+	}
+
+	return nb_rx;
 }
 
 #define RTE_EVENT_DEV_MAINT_OP_FLUSH          (1 << 0)
diff --git a/lib/eventdev/rte_eventdev_core.h b/lib/eventdev/rte_eventdev_core.h
index c328bdbc82..b364ecc2a5 100644
--- a/lib/eventdev/rte_eventdev_core.h
+++ b/lib/eventdev/rte_eventdev_core.h
@@ -42,6 +42,14 @@ typedef uint16_t (*event_crypto_adapter_enqueue_t)(void *port,
 						   uint16_t nb_events);
 /**< @internal Enqueue burst of events on crypto adapter */
 
+struct rte_eventdev_port_data {
+	void **data;
+	/**< points to array of internal port data pointers */
+	void **clbk;
+	/**< points to array of port callback data pointers */
+};
+/**< @internal Structure used to hold opaque eventdev port data. */
+
 struct rte_event_fp_ops {
 	void **data;
 	/**< points to array of internal port data pointers */
@@ -65,7 +73,9 @@ struct rte_event_fp_ops {
 	/**< PMD Tx adapter enqueue same destination function. */
 	event_crypto_adapter_enqueue_t ca_enqueue;
 	/**< PMD Crypto adapter enqueue function. */
-	uintptr_t reserved[6];
+	struct rte_eventdev_port_data ev_port;
+	/**< Eventdev port data. */
+	uintptr_t reserved[3];
 } __rte_cache_aligned;
 
 extern struct rte_event_fp_ops rte_event_fp_ops[RTE_EVENT_MAX_DEVS];
diff --git a/lib/eventdev/version.map b/lib/eventdev/version.map
index 89068a5713..8ce54f5017 100644
--- a/lib/eventdev/version.map
+++ b/lib/eventdev/version.map
@@ -131,6 +131,12 @@ EXPERIMENTAL {
 	rte_event_eth_tx_adapter_runtime_params_init;
 	rte_event_eth_tx_adapter_runtime_params_set;
 	rte_event_timer_remaining_ticks_get;
+
+	# added in 23.07
+	rte_event_dequeue_callbacks
+	rte_event_add_dequeue_callback
+	rte_event_remove_dequeue_callback
+	rte_event_port_get_monitor_addr
 };
 
 INTERNAL {
-- 
2.34.1


  parent reply	other threads:[~2023-04-19  9:55 UTC|newest]

Thread overview: 33+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-04-19  9:54 [RFC PATCH 1/5] eventdev: add power monitoring API on event port Sivaprasad Tummala
2023-04-19  9:54 ` [RFC PATCH 2/5] event/sw: support power monitor Sivaprasad Tummala
2023-04-19  9:54 ` Sivaprasad Tummala [this message]
2023-04-24 16:06   ` [RFC PATCH 3/5] eventdev: support optional dequeue callbacks Ferruh Yigit
2023-05-17 14:22   ` Burakov, Anatoly
2023-04-19  9:54 ` [RFC PATCH 4/5] power: add eventdev support for power management Sivaprasad Tummala
2023-05-17 14:43   ` Burakov, Anatoly
2023-05-24 12:34     ` Tummala, Sivaprasad
2023-04-19  9:54 ` [RFC PATCH 5/5] examples/eventdev_p: add eventdev " Sivaprasad Tummala
2023-04-19 10:15 ` [RFC PATCH 1/5] eventdev: add power monitoring API on event port Jerin Jacob
2023-04-24 16:06   ` Ferruh Yigit
2023-04-25  4:09     ` Jerin Jacob
2023-05-02 11:19       ` Ferruh Yigit
2023-05-03  7:58         ` Jerin Jacob
2023-05-03  8:13           ` Ferruh Yigit
2023-05-03  8:26             ` Jerin Jacob
2023-05-03 15:11               ` Tummala, Sivaprasad
2023-04-25  6:19     ` Mattias Rönnblom
2023-05-02 10:43       ` Ferruh Yigit
2023-05-17 14:48 ` Burakov, Anatoly
2023-10-16 20:57 ` [PATCH v1 1/6] " Sivaprasad Tummala
2023-10-16 20:57   ` [PATCH v1 2/6] event/sw: support power monitor Sivaprasad Tummala
2023-10-16 23:41     ` Tyler Retzlaff
2023-10-16 20:57   ` [PATCH v1 3/6] eventdev: support optional dequeue callbacks Sivaprasad Tummala
2023-10-16 23:47     ` Tyler Retzlaff
2023-10-16 20:57   ` [PATCH v1 4/6] event/sw: " Sivaprasad Tummala
2023-10-16 20:57   ` [PATCH v1 5/6] power: add eventdev support for power management Sivaprasad Tummala
2023-10-16 23:51     ` Tyler Retzlaff
2023-10-17  3:03       ` Tummala, Sivaprasad
2023-10-17  3:22     ` Jerin Jacob
2023-10-18  7:08       ` Tummala, Sivaprasad
2023-10-18  7:13         ` Jerin Jacob
2023-10-16 20:57   ` [PATCH v1 6/6] examples/eventdev_p: add eventdev " Sivaprasad Tummala

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20230419095427.563185-3-sivaprasad.tummala@amd.com \
    --to=sivaprasad.tummala@amd.com \
    --cc=david.hunt@intel.com \
    --cc=dev@dpdk.org \
    --cc=harry.van.haaren@intel.com \
    --cc=jerinj@marvell.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).