From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga06.intel.com (mga06.intel.com [134.134.136.31]) by dpdk.org (Postfix) with ESMTP id 6B875201 for ; Tue, 6 Dec 2016 00:30:49 +0100 (CET) Received: from fmsmga005.fm.intel.com ([10.253.24.32]) by orsmga104.jf.intel.com with ESMTP; 05 Dec 2016 15:30:47 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.33,750,1477983600"; d="scan'208";a="39228905" Received: from fmsmsx106.amr.corp.intel.com ([10.18.124.204]) by fmsmga005.fm.intel.com with ESMTP; 05 Dec 2016 15:30:47 -0800 Received: from fmsmsx117.amr.corp.intel.com (10.18.116.17) by FMSMSX106.amr.corp.intel.com (10.18.124.204) with Microsoft SMTP Server (TLS) id 14.3.248.2; Mon, 5 Dec 2016 15:30:47 -0800 Received: from fmsmsx108.amr.corp.intel.com ([169.254.9.109]) by fmsmsx117.amr.corp.intel.com ([169.254.3.14]) with mapi id 14.03.0248.002; Mon, 5 Dec 2016 15:30:47 -0800 From: "Eads, Gage" To: Jerin Jacob CC: "dev@dpdk.org" , "Richardson, Bruce" , "Van Haaren, Harry" , "hemant.agrawal@nxp.com" Thread-Topic: [RFC PATCH] eventdev: add buffered enqueue and flush APIs Thread-Index: AQHSTOGyfjQDFE4CfUa/lzWOR3jpZaD6BXIt Date: Mon, 5 Dec 2016 23:30:46 +0000 Message-ID: <60DABA4C-E3E8-4768-B2E4-BB97C6421A50@intel.com> References: <1480707956-17187-1-git-send-email-gage.eads@intel.com> <1480707956-17187-2-git-send-email-gage.eads@intel.com>, <20161202211847.GA14577@localhost.localdomain> In-Reply-To: <20161202211847.GA14577@localhost.localdomain> Accept-Language: en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: quoted-printable MIME-Version: 1.0 Subject: Re: [dpdk-dev] [RFC PATCH] eventdev: add buffered enqueue and flush APIs X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 05 Dec 2016 23:30:50 -0000 > On Dec 3, 2016, at 5:18 AM, Jerin Jacob = wrote: >=20 >> On Fri, Dec 02, 2016 at 01:45:56PM -0600, Gage Eads wrote: >> This commit adds buffered enqueue functionality to the eventdev API. >> It is conceptually similar to the ethdev API's tx buffering, however >> with a smaller API surface and no dropping of events. >=20 > Hello Gage, > Different implementation may have different strategies to hold the buffer= s. A benefit of inlining the buffering logic in the header is that we avoid th= e overhead of entering the PMD for what is a fairly simple operation (commo= n case: add event to an array, increment counter). If we make this implemen= tation-defined (i.e. use PMD callbacks), we lose that benefit. > and some does not need to hold the buffers if it is DDR backed. Though DDR-backed hardware doesn't need to buffer in software, doing so wou= ld reduce the software overhead of enqueueing. Compared to N individual cal= ls to enqueue, buffering N events then calling enqueue burst once can benef= it from amortized (or parallelized) PMD-specific bookkeeping and error-chec= king across the set of events, and will definitely benefit from the amortiz= ed function call overhead and better I-cache behavior. (Essentially this is= VPP from the fd.io project.) This should result in higher overall event th= roughout (agnostic of the underlying device). I'm skeptical that other buffering strategies would emerge, but I can only = speculate on Cavium/NXP/etc. NPU software. > IHMO, This may not be the candidate for common code. I guess you can move= this > to driver side and abstract under SW driver's enqueue_burst. >=20 I don't think that will work without adding a flush API, otherwise we could= have indefinitely buffered events. I see three ways forward: - The proposed approach - Add the proposed functions but make them implementation-specific. - Require the application to write its own buffering logic (i.e. no API cha= nge) Thanks, Gage >=20 >>=20 >> Signed-off-by: Gage Eads >> --- >> lib/librte_eventdev/rte_eventdev.c | 29 ++++++++++ >> lib/librte_eventdev/rte_eventdev.h | 106 +++++++++++++++++++++++++++++++= ++++++ >> 2 files changed, 135 insertions(+) >>=20 >> diff --git a/lib/librte_eventdev/rte_eventdev.c b/lib/librte_eventdev/rt= e_eventdev.c >> index 17ce5c3..564573f 100644 >> --- a/lib/librte_eventdev/rte_eventdev.c >> +++ b/lib/librte_eventdev/rte_eventdev.c >> @@ -219,6 +219,7 @@ >> uint16_t *links_map; >> uint8_t *ports_dequeue_depth; >> uint8_t *ports_enqueue_depth; >> + struct rte_eventdev_enqueue_buffer *port_buffers; >> unsigned int i; >>=20 >> EDEV_LOG_DEBUG("Setup %d ports on device %u", nb_ports, >> @@ -272,6 +273,19 @@ >> "nb_ports %u", nb_ports); >> return -(ENOMEM); >> } >> + >> + /* Allocate memory to store port enqueue buffers */ >> + dev->data->port_buffers =3D >> + rte_zmalloc_socket("eventdev->port_buffers", >> + sizeof(dev->data->port_buffers[0]) * nb_ports, >> + RTE_CACHE_LINE_SIZE, dev->data->socket_id); >> + if (dev->data->port_buffers =3D=3D NULL) { >> + dev->data->nb_ports =3D 0; >> + EDEV_LOG_ERR("failed to get memory for port enq" >> + " buffers, nb_ports %u", nb_ports); >> + return -(ENOMEM); >> + } >> + >> } else if (dev->data->ports !=3D NULL && nb_ports !=3D 0) {/* re-conf= ig */ >> RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->port_release, -ENOTSUP); >>=20 >> @@ -279,6 +293,7 @@ >> ports_dequeue_depth =3D dev->data->ports_dequeue_depth; >> ports_enqueue_depth =3D dev->data->ports_enqueue_depth; >> links_map =3D dev->data->links_map; >> + port_buffers =3D dev->data->port_buffers; >>=20 >> for (i =3D nb_ports; i < old_nb_ports; i++) >> (*dev->dev_ops->port_release)(ports[i]); >> @@ -324,6 +339,17 @@ >> return -(ENOMEM); >> } >>=20 >> + /* Realloc memory to store port enqueue buffers */ >> + port_buffers =3D rte_realloc(dev->data->port_buffers, >> + sizeof(dev->data->port_buffers[0]) * nb_ports, >> + RTE_CACHE_LINE_SIZE); >> + if (port_buffers =3D=3D NULL) { >> + dev->data->nb_ports =3D 0; >> + EDEV_LOG_ERR("failed to realloc mem for port enq" >> + " buffers, nb_ports %u", nb_ports); >> + return -(ENOMEM); >> + } >> + >> if (nb_ports > old_nb_ports) { >> uint8_t new_ps =3D nb_ports - old_nb_ports; >>=20 >> @@ -336,12 +362,15 @@ >> memset(links_map + >> (old_nb_ports * RTE_EVENT_MAX_QUEUES_PER_DEV), >> 0, sizeof(ports_enqueue_depth[0]) * new_ps); >> + memset(port_buffers + old_nb_ports, 0, >> + sizeof(port_buffers[0]) * new_ps); >> } >>=20 >> dev->data->ports =3D ports; >> dev->data->ports_dequeue_depth =3D ports_dequeue_depth; >> dev->data->ports_enqueue_depth =3D ports_enqueue_depth; >> dev->data->links_map =3D links_map; >> + dev->data->port_buffers =3D port_buffers; >> } else if (dev->data->ports !=3D NULL && nb_ports =3D=3D 0) { >> RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->port_release, -ENOTSUP); >>=20 >> diff --git a/lib/librte_eventdev/rte_eventdev.h b/lib/librte_eventdev/rt= e_eventdev.h >> index 778d6dc..3f24342 100644 >> --- a/lib/librte_eventdev/rte_eventdev.h >> +++ b/lib/librte_eventdev/rte_eventdev.h >> @@ -246,6 +246,7 @@ >> #include >> #include >> #include >> +#include >>=20 >> #define EVENTDEV_NAME_SKELETON_PMD event_skeleton >> /**< Skeleton event device PMD name */ >> @@ -965,6 +966,26 @@ typedef uint16_t (*event_dequeue_burst_t)(void *por= t, struct rte_event ev[], >> #define RTE_EVENTDEV_NAME_MAX_LEN (64) >> /**< @internal Max length of name of event PMD */ >>=20 >> +#define RTE_EVENT_BUF_MAX 16 >> +/**< Maximum number of events in an enqueue buffer. */ >> + >> +/** >> + * @internal >> + * An enqueue buffer for each port. >> + * >> + * The reason this struct is in the header is for inlining the function= calls >> + * to enqueue, as doing a function call per packet would incur signific= ant >> + * performance overhead. >> + * >> + * \see rte_event_enqueue_buffer(), rte_event_enqueue_buffer_flush() >> + */ >> +struct rte_eventdev_enqueue_buffer { >> + /**> Count of events in this buffer */ >> + uint16_t count; >> + /**> Array of events in this buffer */ >> + struct rte_event events[RTE_EVENT_BUF_MAX]; >> +} __rte_cache_aligned; >> + >> /** >> * @internal >> * The data part, with no function pointers, associated with each device= . >> @@ -983,6 +1004,8 @@ struct rte_eventdev_data { >> /**< Number of event ports. */ >> void **ports; >> /**< Array of pointers to ports. */ >> + struct rte_eventdev_enqueue_buffer *port_buffers; >> + /**< Array of port enqueue buffers. */ >> uint8_t *ports_dequeue_depth; >> /**< Array of port dequeue depth. */ >> uint8_t *ports_enqueue_depth; >> @@ -1132,6 +1155,89 @@ struct rte_eventdev { >> } >>=20 >> /** >> + * Flush the enqueue buffer of the event port specified by *port_id*, i= n the >> + * event device specified by *dev_id*. >> + * >> + * This function attempts to flush as many of the buffered events as po= ssible, >> + * and returns the number of flushed events. Any unflushed events remai= n in >> + * the buffer. >> + * >> + * @param dev_id >> + * The identifier of the device. >> + * @param port_id >> + * The identifier of the event port. >> + * >> + * @return >> + * The number of event objects actually flushed to the event device. >> + * >> + * \see rte_event_enqueue_buffer(), rte_event_enqueue_burst() >> + * \see rte_event_port_enqueue_depth() >> + */ >> +static inline int >> +rte_event_enqueue_buffer_flush(uint8_t dev_id, uint8_t port_id) >> +{ >> + struct rte_eventdev *dev =3D &rte_eventdevs[dev_id]; >> + struct rte_eventdev_enqueue_buffer *buf =3D >> + &dev->data->port_buffers[port_id]; >> + int n; >> + >> + n =3D rte_event_enqueue_burst(dev_id, port_id, buf->events, buf->co= unt); >> + >> + if (n !=3D buf->count) >> + memmove(buf->events, &buf->events[n], buf->count - n); >> + >> + buf->count -=3D n; >> + >> + return n; >> +} >> + >> +/** >> + * Buffer an event object supplied in *rte_event* structure for future >> + * enqueueing on an event device designated by its *dev_id* through the= event >> + * port specified by *port_id*. >> + * >> + * This function takes a single event and buffers it for later enqueuin= g to the >> + * queue specified in the event structure. If the buffer is full, the >> + * function will attempt to flush the buffer before buffering the event= . >> + * If the flush operation fails, the previously buffered events remain = in the >> + * buffer and an error is returned to the user to indicate that *ev* wa= s not >> + * buffered. >> + * >> + * @param dev_id >> + * The identifier of the device. >> + * @param port_id >> + * The identifier of the event port. >> + * @param ev >> + * Pointer to struct rte_event >> + * >> + * @return >> + * - 0 on success >> + * - <0 on failure. Failure can occur if the event port's output queue= is >> + * backpressured, for instance. >> + * >> + * \see rte_event_enqueue_buffer_flush(), rte_event_enqueue_burst() >> + * \see rte_event_port_enqueue_depth() >> + */ >> +static inline int >> +rte_event_enqueue_buffer(uint8_t dev_id, uint8_t port_id, struct rte_ev= ent *ev) >> +{ >> + struct rte_eventdev *dev =3D &rte_eventdevs[dev_id]; >> + struct rte_eventdev_enqueue_buffer *buf =3D >> + &dev->data->port_buffers[port_id]; >> + int ret; >> + >> + /* If necessary, flush the enqueue buffer to make space for ev. */ >> + if (buf->count =3D=3D RTE_EVENT_BUF_MAX) { >> + ret =3D rte_event_enqueue_buffer_flush(dev_id, port_id); >> + if (ret =3D=3D 0) >> + return -ENOSPC; >> + } >> + >> + rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event)= ); >> + return 0; >> +} >> + >> +/** >> * Converts nanoseconds to *wait* value for rte_event_dequeue() >> * >> * If the device is configured with RTE_EVENT_DEV_CFG_PER_DEQUEUE_WAIT f= lag then >> --=20 >> 1.9.1 >>=20