From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 523D843183; Tue, 17 Oct 2023 01:47:25 +0200 (CEST) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 338754067C; Tue, 17 Oct 2023 01:47:25 +0200 (CEST) Received: from linux.microsoft.com (linux.microsoft.com [13.77.154.182]) by mails.dpdk.org (Postfix) with ESMTP id 2E192402BE for ; Tue, 17 Oct 2023 01:47:23 +0200 (CEST) Received: by linux.microsoft.com (Postfix, from userid 1086) id 56F6F20B74C0; Mon, 16 Oct 2023 16:47:22 -0700 (PDT) DKIM-Filter: OpenDKIM Filter v2.11.0 linux.microsoft.com 56F6F20B74C0 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=linux.microsoft.com; s=default; t=1697500042; bh=nIObAl1UBP4DU09SRKP7flovLp80vF3BR8H6evudd/w=; h=Date:From:To:Cc:Subject:References:In-Reply-To:From; b=YQnSI9YiqAmipaapWurDiK5xRV9+g3q746apkbXrF6zSDlsDCm/TPI/651i3/OP4w us2VW0w7A4+oezsvz1yboBc9Uwl4KOHQzjPM1dbJyCDLYZDkCaT25ibpqPSw7iIDGy /697MOSbiSymx/Zhzi6N2IioyLgz0zHfag60Skhk= Date: Mon, 16 Oct 2023 16:47:22 -0700 From: Tyler Retzlaff To: Sivaprasad Tummala Cc: jerinjacobk@gmail.com, harry.van.haaren@intel.com, anatoly.burakov@intel.com, dev@dpdk.org, ferruh.yigit@amd.com, david.hunt@intel.com Subject: Re: [PATCH v1 3/6] eventdev: support optional dequeue callbacks Message-ID: <20231016234722.GB13553@linuxonhyperv3.guj3yctzbm1etfxqx2vob5hsef.xx.internal.cloudapp.net> References: <20230419095427.563185-1-sivaprasad.tummala@amd.com> <20231016205715.970999-1-sivaprasad.tummala@amd.com> <20231016205715.970999-3-sivaprasad.tummala@amd.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <20231016205715.970999-3-sivaprasad.tummala@amd.com> User-Agent: Mutt/1.5.21 (2010-09-15) X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org On Mon, Oct 16, 2023 at 01:57:12PM -0700, Sivaprasad Tummala wrote: > Add optional support for inline event processing within pmd dequeue > call. For a dequeue callback, events dequeued from the event port > were passed them to a callback function if configured, to allow > additional processing. e.g. unpack batch of packets from each event > on dequeue, before passing back to the application. > > Signed-off-by: Sivaprasad Tummala > --- > lib/eventdev/eventdev_pmd.h | 38 +++++++++++ > lib/eventdev/eventdev_private.c | 2 + > lib/eventdev/rte_eventdev.c | 107 +++++++++++++++++++++++++++++++ > lib/eventdev/rte_eventdev.h | 95 +++++++++++++++++++++++++++ > lib/eventdev/rte_eventdev_core.h | 12 +++- > lib/eventdev/version.map | 3 + > 6 files changed, 256 insertions(+), 1 deletion(-) > > diff --git a/lib/eventdev/eventdev_pmd.h b/lib/eventdev/eventdev_pmd.h > index a0ee768ce7..ce067b1d5d 100644 > --- a/lib/eventdev/eventdev_pmd.h > +++ b/lib/eventdev/eventdev_pmd.h > @@ -97,6 +97,19 @@ struct rte_eventdev_global { > uint8_t nb_devs; /**< Number of devices found */ > }; > > +/** > + * @internal > + * Structure used to hold information about the callbacks to be called for a > + * port on dequeue. > + */ > +struct rte_event_dequeue_callback { > + struct rte_event_dequeue_callback *next; > + union{ > + rte_dequeue_callback_fn dequeue; > + } fn; > + void *param; > +}; > + > /** > * @internal > * The data part, with no function pointers, associated with each device. > @@ -171,6 +184,10 @@ struct rte_eventdev { > /**< Pointer to PMD dequeue burst function. */ > event_maintain_t maintain; > /**< Pointer to PMD port maintenance function. */ > + struct rte_event_dequeue_callback *post_dequeue_burst_cbs[RTE_EVENT_MAX_PORTS_PER_DEV]; > + /**< User-supplied functions called from dequeue_burst to post-process > + * received packets before passing them to the user > + */ > event_tx_adapter_enqueue_t txa_enqueue_same_dest; > /**< Pointer to PMD eth Tx adapter burst enqueue function with > * events destined to same Eth port & Tx queue. > @@ -245,6 +262,27 @@ rte_event_pmd_is_valid_dev(uint8_t dev_id) > return 1; > } > > +/** > + * Executes all the user application registered callbacks for the specific > + * event device. > + * > + * @param dev_id > + * Event device index. > + * @param port_id > + * Event port index > + * @param ev > + * Points to an array of *nb_events* objects of type *rte_event* structure > + * for output to be populated with the dequeued event objects. > + * @param nb_events > + * number of event objects > + * > + * @return > + * The number of event objects > + */ > +__rte_internal > +uint16_t rte_eventdev_pmd_dequeue_callback_process(uint8_t dev_id, > + uint8_t port_id, struct rte_event ev[], uint16_t nb_events); > + > /** > * Definitions of all functions exported by a driver through the > * generic structure of type *event_dev_ops* supplied in the > diff --git a/lib/eventdev/eventdev_private.c b/lib/eventdev/eventdev_private.c > index 017f97ccab..052c526ce0 100644 > --- a/lib/eventdev/eventdev_private.c > +++ b/lib/eventdev/eventdev_private.c > @@ -137,4 +137,6 @@ event_dev_fp_ops_set(struct rte_event_fp_ops *fp_op, > fp_op->dma_enqueue = dev->dma_enqueue; > fp_op->profile_switch = dev->profile_switch; > fp_op->data = dev->data->ports; > + fp_op->ev_port.clbk = (void **)(uintptr_t)dev->post_dequeue_burst_cbs; > + fp_op->ev_port.data = dev->data->ports; > } > diff --git a/lib/eventdev/rte_eventdev.c b/lib/eventdev/rte_eventdev.c > index 5feb4326a2..f2540a6aa8 100644 > --- a/lib/eventdev/rte_eventdev.c > +++ b/lib/eventdev/rte_eventdev.c > @@ -18,6 +18,7 @@ > #include > #include > #include > +#include > #include > #include > #include > @@ -39,6 +40,9 @@ static struct rte_eventdev_global eventdev_globals = { > /* Public fastpath APIs. */ > struct rte_event_fp_ops rte_event_fp_ops[RTE_EVENT_MAX_DEVS]; > > +/* spinlock for add/remove dequeue callbacks */ > +static rte_spinlock_t event_dev_dequeue_cb_lock = RTE_SPINLOCK_INITIALIZER; > + > /* Event dev north bound API implementation */ > > uint8_t > @@ -884,6 +888,109 @@ rte_event_port_attr_get(uint8_t dev_id, uint8_t port_id, uint32_t attr_id, > return 0; > } > > +const struct rte_event_dequeue_callback * > +rte_event_add_dequeue_callback(uint8_t dev_id, uint8_t port_id, > + rte_dequeue_callback_fn fn, void *user_param) > +{ > + struct rte_eventdev *dev; > + struct rte_event_dequeue_callback *cb; > + struct rte_event_dequeue_callback *tail; > + > + /* check input parameters */ > + RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, NULL); > + dev = &rte_eventdevs[dev_id]; > + if (!is_valid_port(dev, port_id)) { > + RTE_EDEV_LOG_ERR("Invalid port_id=%" PRIu8, port_id); > + return NULL; > + } > + > + cb = rte_zmalloc(NULL, sizeof(*cb), 0); > + if (cb == NULL) { > + rte_errno = ENOMEM; > + return NULL; > + } > + cb->fn.dequeue = fn; > + cb->param = user_param; > + > + rte_spinlock_lock(&event_dev_dequeue_cb_lock); > + /* Add the callbacks in fifo order. */ > + tail = rte_eventdevs[dev_id].post_dequeue_burst_cbs[port_id]; > + if (!tail) { > + /* Stores to cb->fn and cb->param should complete before > + * cb is visible to data plane. > + */ > + rte_atomic_store_explicit( > + &rte_eventdevs[dev_id].post_dequeue_burst_cbs[port_id], > + cb, __ATOMIC_RELEASE); ^^^^^^^^^^^^^^^^ rte_memory_order_release (rte_stdatomic.h) > + } else { > + while (tail->next) > + tail = tail->next; > + /* Stores to cb->fn and cb->param should complete before > + * cb is visible to data plane. > + */ > + rte_atomic_store_explicit(&tail->next, cb, __ATOMIC_RELEASE); ^^^^ same here > + } > + rte_spinlock_unlock(&event_dev_dequeue_cb_lock); > + > + return cb; > +} > + > +int > +rte_event_remove_dequeue_callback(uint8_t dev_id, uint8_t port_id, > + const struct rte_event_dequeue_callback *user_cb) > +{ > + struct rte_eventdev *dev; > + struct rte_event_dequeue_callback *cb; > + struct rte_event_dequeue_callback **prev_cb; > + > + /* Check input parameters. */ > + RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL); > + dev = &rte_eventdevs[dev_id]; > + if (user_cb == NULL || !is_valid_port(dev, port_id)) > + return -EINVAL; > + > + rte_spinlock_lock(&event_dev_dequeue_cb_lock); > + prev_cb = &dev->post_dequeue_burst_cbs[port_id]; > + for (; *prev_cb != NULL; prev_cb = &cb->next) { > + cb = *prev_cb; > + if (cb == user_cb) { > + /* Remove the user cb from the callback list. */ > + rte_atomic_store_explicit(prev_cb, cb->next, > + __ATOMIC_RELAXED); ^^^ and here > + break; > + } > + } > + rte_spinlock_unlock(&event_dev_dequeue_cb_lock); > + > + return 0; > +} > + > +uint16_t rte_eventdev_pmd_dequeue_callback_process(uint8_t dev_id, > + uint8_t port_id, struct rte_event ev[], uint16_t nb_events) > +{ > + struct rte_event_dequeue_callback *cb; > + const struct rte_event_fp_ops *fp_ops; > + > + fp_ops = &rte_event_fp_ops[dev_id]; > + > + /* __ATOMIC_RELEASE memory order was used when the > + * call back was inserted into the list. > + * Since there is a clear dependency between loading > + * cb and cb->fn/cb->next, __ATOMIC_ACQUIRE memory order is > + * not required. > + */ > + cb = rte_atomic_load_explicit((void **)&fp_ops->ev_port.clbk[port_id], ^^^^^^^ needs to be __rte_atomic qualified > + __ATOMIC_RELAXED); ^^^^ rte_memory_order