DPDK patches and discussions
 help / color / mirror / Atom feed
* [dpdk-dev] [PATCH v1] eventdev: change packet enqueue buffer in RX adapter to circular buffer
@ 2021-08-27  9:42 Ganapati Kundapura
  2021-08-30  9:41 ` Jerin Jacob
  0 siblings, 1 reply; 3+ messages in thread
From: Ganapati Kundapura @ 2021-08-27  9:42 UTC (permalink / raw)
  To: jay.jayatheerthan, jerinjacobk; +Cc: dev

RX adapter user memove() to move unprocessed events to the beginning of
the packet enqueue buffer. The use memmove() was found to consume good
amount of CPU cycles (about 20%).

This patch removes the use of memove() while implementina a circular
buffer to avoid copying of data. With this change RX adapter is able
to fill the buffer of 16384 events.

Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>
---
 lib/eventdev/rte_event_eth_rx_adapter.c | 84 ++++++++++++++++++++++++++-------
 1 file changed, 68 insertions(+), 16 deletions(-)

diff --git a/lib/eventdev/rte_event_eth_rx_adapter.c b/lib/eventdev/rte_event_eth_rx_adapter.c
index 13dfb28..7c94c73 100644
--- a/lib/eventdev/rte_event_eth_rx_adapter.c
+++ b/lib/eventdev/rte_event_eth_rx_adapter.c
@@ -25,7 +25,7 @@
 
 #define BATCH_SIZE		32
 #define BLOCK_CNT_THRESHOLD	10
-#define ETH_EVENT_BUFFER_SIZE	(4*BATCH_SIZE)
+#define ETH_EVENT_BUFFER_SIZE	(6*BATCH_SIZE)
 #define MAX_VECTOR_SIZE		1024
 #define MIN_VECTOR_SIZE		4
 #define MAX_VECTOR_NS		1E9
@@ -83,6 +83,13 @@ struct rte_eth_event_enqueue_buffer {
 	uint16_t count;
 	/* Array of events in this buffer */
 	struct rte_event events[ETH_EVENT_BUFFER_SIZE];
+	/* Event enqueue happens from head */
+	uint16_t head;
+	/* New packets from rte_eth_rx_burst is enqued from tail */
+	uint16_t tail;
+	/* last element in the buffer before rollover */
+	uint16_t last;
+	uint16_t last_mask;
 };
 
 struct rte_event_eth_rx_adapter {
@@ -749,19 +756,35 @@ rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
 	struct rte_eth_event_enqueue_buffer *buf =
 	    &rx_adapter->event_enqueue_buffer;
 	struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
+	uint16_t count = buf->last ? buf->last - buf->head : buf->count;
 
-	if (!buf->count)
+	if (!count)
 		return 0;
 
 	uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
 					rx_adapter->event_port_id,
-					buf->events,
-					buf->count);
-	if (n != buf->count) {
-		memmove(buf->events,
-			&buf->events[n],
-			(buf->count - n) * sizeof(struct rte_event));
+					&buf->events[buf->head],
+					count);
+	if (n != count)
 		stats->rx_enq_retry++;
+
+	buf->head += n;
+
+	if (buf->last && n == count) {
+		uint16_t n1;
+
+		n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
+					rx_adapter->event_port_id,
+					&buf->events[0],
+					buf->tail);
+
+		if (n1 != buf->tail)
+			stats->rx_enq_retry++;
+
+		buf->last = 0;
+		buf->head = n1;
+		buf->last_mask = 0;
+		n += n1;
 	}
 
 	n ? rxa_enq_block_end_ts(rx_adapter, stats) :
@@ -858,7 +881,7 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
 					&dev_info->rx_queue[rx_queue_id];
 	struct rte_eth_event_enqueue_buffer *buf =
 					&rx_adapter->event_enqueue_buffer;
-	struct rte_event *ev = &buf->events[buf->count];
+	uint16_t new_tail = buf->tail;
 	uint64_t event = eth_rx_queue_info->event;
 	uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
 	struct rte_mbuf *m = mbufs[0];
@@ -873,7 +896,10 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
 		rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
 		do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
 		for (i = 0; i < num; i++) {
+			struct rte_event *ev;
+
 			m = mbufs[i];
+			ev = &buf->events[new_tail];
 
 			rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
 				     : m->hash.rss;
@@ -881,7 +907,7 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
 			ev->flow_id = (rss & ~flow_id_mask) |
 				      (ev->flow_id & flow_id_mask);
 			ev->mbuf = m;
-			ev++;
+			new_tail++;
 		}
 	} else {
 		num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
@@ -892,9 +918,14 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
 
 		dropped = 0;
 		nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
-					ETH_EVENT_BUFFER_SIZE, buf->count,
-					&buf->events[buf->count], num,
-					dev_info->cb_arg, &dropped);
+				       buf->last |
+				       (RTE_DIM(buf->events) & ~buf->last_mask),
+				       buf->count >= BATCH_SIZE ?
+						buf->count - BATCH_SIZE : 0,
+				       &buf->events[buf->tail],
+				       num,
+				       dev_info->cb_arg,
+				       &dropped);
 		if (unlikely(nb_cb > num))
 			RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
 				nb_cb, num);
@@ -905,6 +936,27 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
 	}
 
 	buf->count += num;
+	buf->tail += num;
+}
+
+static inline bool
+rxa_pkt_buf_available(struct rte_eth_event_enqueue_buffer *buf)
+{
+	uint32_t nb_req = buf->tail + BATCH_SIZE;
+
+	if (!buf->last) {
+		if (nb_req <= RTE_DIM(buf->events))
+			return true;
+
+		if (buf->head >= BATCH_SIZE) {
+			buf->last_mask = ~0;
+			buf->last = buf->tail;
+			buf->tail = 0;
+			return true;
+		}
+	}
+
+	return nb_req <= buf->head;
 }
 
 /* Enqueue packets from  <port, q>  to event buffer */
@@ -929,7 +981,7 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
 	/* Don't do a batch dequeue from the rx queue if there isn't
 	 * enough space in the enqueue buffer.
 	 */
-	while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
+	while (rxa_pkt_buf_available(buf)) {
 		if (buf->count >= BATCH_SIZE)
 			rxa_flush_event_buffer(rx_adapter);
 
@@ -1090,7 +1142,7 @@ rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
 	if (buf->count >= BATCH_SIZE)
 		rxa_flush_event_buffer(rx_adapter);
 
-	while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
+	while (rxa_pkt_buf_available(buf)) {
 		struct eth_device_info *dev_info;
 		uint16_t port;
 		uint16_t queue;
@@ -1211,7 +1263,7 @@ rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
 		 */
 		if (buf->count >= BATCH_SIZE)
 			rxa_flush_event_buffer(rx_adapter);
-		if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
+		if (!rxa_pkt_buf_available(buf)) {
 			rx_adapter->wrr_pos = wrr_pos;
 			return nb_rx;
 		}
-- 
2.6.4


^ permalink raw reply	[flat|nested] 3+ messages in thread

* Re: [dpdk-dev] [PATCH v1] eventdev: change packet enqueue buffer in RX adapter to circular buffer
  2021-08-27  9:42 [dpdk-dev] [PATCH v1] eventdev: change packet enqueue buffer in RX adapter to circular buffer Ganapati Kundapura
@ 2021-08-30  9:41 ` Jerin Jacob
  2021-08-30 10:25   ` Kundapura, Ganapati
  0 siblings, 1 reply; 3+ messages in thread
From: Jerin Jacob @ 2021-08-30  9:41 UTC (permalink / raw)
  To: Ganapati Kundapura; +Cc: Jayatheerthan, Jay, dpdk-dev

On Fri, Aug 27, 2021 at 3:12 PM Ganapati Kundapura
<ganapati.kundapura@intel.com> wrote:
>
> RX adapter user memove() to move unprocessed events to the beginning of
> the packet enqueue buffer. The use memmove() was found to consume good
> amount of CPU cycles (about 20%).
>
> This patch removes the use of memove() while implementina a circular

Typo

> buffer to avoid copying of data. With this change RX adapter is able
> to fill the buffer of 16384 events.


Please change the subject to :
eventdev: rx-adapter: improve ...

>
> Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>
> ---
>  lib/eventdev/rte_event_eth_rx_adapter.c | 84 ++++++++++++++++++++++++++-------
>  1 file changed, 68 insertions(+), 16 deletions(-)
>

>         } else {
>                 num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
> @@ -892,9 +918,14 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
>
>                 dropped = 0;
>                 nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
> -                                       ETH_EVENT_BUFFER_SIZE, buf->count,
> -                                       &buf->events[buf->count], num,
> -                                       dev_info->cb_arg, &dropped);
> +                                      buf->last |
> +                                      (RTE_DIM(buf->events) & ~buf->last_mask),
> +                                      buf->count >= BATCH_SIZE ?
> +                                               buf->count - BATCH_SIZE : 0,
> +                                      &buf->events[buf->tail],
> +                                      num,
> +                                      dev_info->cb_arg,
> +                                      &dropped);


Moving this code section to a separate inline function can allow more
linewidth to consume aka
less number of lines.

Rest looks good to me.

^ permalink raw reply	[flat|nested] 3+ messages in thread

* Re: [dpdk-dev] [PATCH v1] eventdev: change packet enqueue buffer in RX adapter to circular buffer
  2021-08-30  9:41 ` Jerin Jacob
@ 2021-08-30 10:25   ` Kundapura, Ganapati
  0 siblings, 0 replies; 3+ messages in thread
From: Kundapura, Ganapati @ 2021-08-30 10:25 UTC (permalink / raw)
  To: Jerin Jacob; +Cc: Jayatheerthan, Jay, dpdk-dev



> -----Original Message-----
> From: Jerin Jacob <jerinjacobk@gmail.com>
> Sent: 30 August 2021 15:11
> To: Kundapura, Ganapati <ganapati.kundapura@intel.com>
> Cc: Jayatheerthan, Jay <jay.jayatheerthan@intel.com>; dpdk-dev
> <dev@dpdk.org>
> Subject: Re: [PATCH v1] eventdev: change packet enqueue buffer in RX
> adapter to circular buffer
> 
> On Fri, Aug 27, 2021 at 3:12 PM Ganapati Kundapura
> <ganapati.kundapura@intel.com> wrote:
> >
> > RX adapter user memove() to move unprocessed events to the beginning
> > of the packet enqueue buffer. The use memmove() was found to consume
> > good amount of CPU cycles (about 20%).
> >
> > This patch removes the use of memove() while implementina a circular
> 
> Typo
Will address in next patch
> 
> > buffer to avoid copying of data. With this change RX adapter is able
> > to fill the buffer of 16384 events.
> 
> 
> Please change the subject to :
> eventdev: rx-adapter: improve ...
Will address in next patch
> 
> >
> > Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>
> > ---
> >  lib/eventdev/rte_event_eth_rx_adapter.c | 84
> > ++++++++++++++++++++++++++-------
> >  1 file changed, 68 insertions(+), 16 deletions(-)
> >
> 
> >         } else {
> >                 num = rxa_create_event_vector(rx_adapter,
> > eth_rx_queue_info, @@ -892,9 +918,14 @@ rxa_buffer_mbufs(struct
> > rte_event_eth_rx_adapter *rx_adapter,
> >
> >                 dropped = 0;
> >                 nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
> > -                                       ETH_EVENT_BUFFER_SIZE, buf->count,
> > -                                       &buf->events[buf->count], num,
> > -                                       dev_info->cb_arg, &dropped);
> > +                                      buf->last |
> > +                                      (RTE_DIM(buf->events) & ~buf->last_mask),
> > +                                      buf->count >= BATCH_SIZE ?
> > +                                               buf->count - BATCH_SIZE : 0,
> > +                                      &buf->events[buf->tail],
> > +                                      num,
> > +                                      dev_info->cb_arg,
> > +                                      &dropped);
These are arguments to callback function dev_info->cb_fn. Passing these to another inline function,
retrieving dev_info from rx_adapter(additional parameter to inline function) and to
call the dev_info->cb_fn from inline function which is getting called only once
seems overhead just to allow more line width.
> 
> 
> Moving this code section to a separate inline function can allow more
> linewidth to consume aka less number of lines.
> 
> Rest looks good to me.

^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2021-08-30 10:25 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-08-27  9:42 [dpdk-dev] [PATCH v1] eventdev: change packet enqueue buffer in RX adapter to circular buffer Ganapati Kundapura
2021-08-30  9:41 ` Jerin Jacob
2021-08-30 10:25   ` Kundapura, Ganapati

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).