DPDK patches and discussions
 help / color / mirror / Atom feed
* [dpdk-dev] [PATCH v2] eventdev: rx-adapter: improve enqueue buffer to circular buffer
@ 2021-08-30 13:06 Ganapati Kundapura
  2021-09-06  8:11 ` Jerin Jacob
  0 siblings, 1 reply; 2+ messages in thread
From: Ganapati Kundapura @ 2021-08-30 13:06 UTC (permalink / raw)
  To: jay.jayatheerthan, jerinjacobk; +Cc: dev, ganapati.kundapura

v2:
* Fixed typo in commit message
* changed subject line

v1:
RX adapter uses memove() to move unprocessed events to the beginning of
the packet enqueue buffer. The use memmove() was found to consume good
amount of CPU cycles (about 20%).

This patch removes the use of memove() while implementing a circular
buffer to avoid copying of data. With this change RX adapter is able
to fill the buffer of 16384 events.

Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>
---
 lib/eventdev/rte_event_eth_rx_adapter.c | 84 ++++++++++++++++++++++++++-------
 1 file changed, 68 insertions(+), 16 deletions(-)

diff --git a/lib/eventdev/rte_event_eth_rx_adapter.c b/lib/eventdev/rte_event_eth_rx_adapter.c
index 13dfb28..7c94c73 100644
--- a/lib/eventdev/rte_event_eth_rx_adapter.c
+++ b/lib/eventdev/rte_event_eth_rx_adapter.c
@@ -25,7 +25,7 @@
 
 #define BATCH_SIZE		32
 #define BLOCK_CNT_THRESHOLD	10
-#define ETH_EVENT_BUFFER_SIZE	(4*BATCH_SIZE)
+#define ETH_EVENT_BUFFER_SIZE	(6*BATCH_SIZE)
 #define MAX_VECTOR_SIZE		1024
 #define MIN_VECTOR_SIZE		4
 #define MAX_VECTOR_NS		1E9
@@ -83,6 +83,13 @@ struct rte_eth_event_enqueue_buffer {
 	uint16_t count;
 	/* Array of events in this buffer */
 	struct rte_event events[ETH_EVENT_BUFFER_SIZE];
+	/* Event enqueue happens from head */
+	uint16_t head;
+	/* New packets from rte_eth_rx_burst is enqued from tail */
+	uint16_t tail;
+	/* last element in the buffer before rollover */
+	uint16_t last;
+	uint16_t last_mask;
 };
 
 struct rte_event_eth_rx_adapter {
@@ -749,19 +756,35 @@ rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
 	struct rte_eth_event_enqueue_buffer *buf =
 	    &rx_adapter->event_enqueue_buffer;
 	struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
+	uint16_t count = buf->last ? buf->last - buf->head : buf->count;
 
-	if (!buf->count)
+	if (!count)
 		return 0;
 
 	uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
 					rx_adapter->event_port_id,
-					buf->events,
-					buf->count);
-	if (n != buf->count) {
-		memmove(buf->events,
-			&buf->events[n],
-			(buf->count - n) * sizeof(struct rte_event));
+					&buf->events[buf->head],
+					count);
+	if (n != count)
 		stats->rx_enq_retry++;
+
+	buf->head += n;
+
+	if (buf->last && n == count) {
+		uint16_t n1;
+
+		n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
+					rx_adapter->event_port_id,
+					&buf->events[0],
+					buf->tail);
+
+		if (n1 != buf->tail)
+			stats->rx_enq_retry++;
+
+		buf->last = 0;
+		buf->head = n1;
+		buf->last_mask = 0;
+		n += n1;
 	}
 
 	n ? rxa_enq_block_end_ts(rx_adapter, stats) :
@@ -858,7 +881,7 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
 					&dev_info->rx_queue[rx_queue_id];
 	struct rte_eth_event_enqueue_buffer *buf =
 					&rx_adapter->event_enqueue_buffer;
-	struct rte_event *ev = &buf->events[buf->count];
+	uint16_t new_tail = buf->tail;
 	uint64_t event = eth_rx_queue_info->event;
 	uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
 	struct rte_mbuf *m = mbufs[0];
@@ -873,7 +896,10 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
 		rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
 		do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
 		for (i = 0; i < num; i++) {
+			struct rte_event *ev;
+
 			m = mbufs[i];
+			ev = &buf->events[new_tail];
 
 			rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
 				     : m->hash.rss;
@@ -881,7 +907,7 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
 			ev->flow_id = (rss & ~flow_id_mask) |
 				      (ev->flow_id & flow_id_mask);
 			ev->mbuf = m;
-			ev++;
+			new_tail++;
 		}
 	} else {
 		num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
@@ -892,9 +918,14 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
 
 		dropped = 0;
 		nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
-					ETH_EVENT_BUFFER_SIZE, buf->count,
-					&buf->events[buf->count], num,
-					dev_info->cb_arg, &dropped);
+				       buf->last |
+				       (RTE_DIM(buf->events) & ~buf->last_mask),
+				       buf->count >= BATCH_SIZE ?
+						buf->count - BATCH_SIZE : 0,
+				       &buf->events[buf->tail],
+				       num,
+				       dev_info->cb_arg,
+				       &dropped);
 		if (unlikely(nb_cb > num))
 			RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
 				nb_cb, num);
@@ -905,6 +936,27 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
 	}
 
 	buf->count += num;
+	buf->tail += num;
+}
+
+static inline bool
+rxa_pkt_buf_available(struct rte_eth_event_enqueue_buffer *buf)
+{
+	uint32_t nb_req = buf->tail + BATCH_SIZE;
+
+	if (!buf->last) {
+		if (nb_req <= RTE_DIM(buf->events))
+			return true;
+
+		if (buf->head >= BATCH_SIZE) {
+			buf->last_mask = ~0;
+			buf->last = buf->tail;
+			buf->tail = 0;
+			return true;
+		}
+	}
+
+	return nb_req <= buf->head;
 }
 
 /* Enqueue packets from  <port, q>  to event buffer */
@@ -929,7 +981,7 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
 	/* Don't do a batch dequeue from the rx queue if there isn't
 	 * enough space in the enqueue buffer.
 	 */
-	while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
+	while (rxa_pkt_buf_available(buf)) {
 		if (buf->count >= BATCH_SIZE)
 			rxa_flush_event_buffer(rx_adapter);
 
@@ -1090,7 +1142,7 @@ rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
 	if (buf->count >= BATCH_SIZE)
 		rxa_flush_event_buffer(rx_adapter);
 
-	while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
+	while (rxa_pkt_buf_available(buf)) {
 		struct eth_device_info *dev_info;
 		uint16_t port;
 		uint16_t queue;
@@ -1211,7 +1263,7 @@ rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
 		 */
 		if (buf->count >= BATCH_SIZE)
 			rxa_flush_event_buffer(rx_adapter);
-		if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
+		if (!rxa_pkt_buf_available(buf)) {
 			rx_adapter->wrr_pos = wrr_pos;
 			return nb_rx;
 		}
-- 
2.6.4


^ permalink raw reply	[flat|nested] 2+ messages in thread

* Re: [dpdk-dev] [PATCH v2] eventdev: rx-adapter: improve enqueue buffer to circular buffer
  2021-08-30 13:06 [dpdk-dev] [PATCH v2] eventdev: rx-adapter: improve enqueue buffer to circular buffer Ganapati Kundapura
@ 2021-09-06  8:11 ` Jerin Jacob
  0 siblings, 0 replies; 2+ messages in thread
From: Jerin Jacob @ 2021-09-06  8:11 UTC (permalink / raw)
  To: Ganapati Kundapura; +Cc: Jayatheerthan, Jay, dpdk-dev

On Mon, Aug 30, 2021 at 6:41 PM Ganapati Kundapura
<ganapati.kundapura@intel.com> wrote:
>
> v2:
> * Fixed typo in commit message
> * changed subject line
>
> v1:

Changelog should be moved under "---".. See below.

> RX adapter uses memove() to move unprocessed events to the beginning of
> the packet enqueue buffer. The use memmove() was found to consume good
> amount of CPU cycles (about 20%).
>
> This patch removes the use of memove() while implementing a circular
> buffer to avoid copying of data. With this change RX adapter is able
> to fill the buffer of 16384 events.
>

Acked-by: Jerin Jacob <jerinj@marvell.com>

Fixed the changelog and change the subject
as "eventdev: make Rx-adapter enqueue buffer as circular buffer"
Applied to dpdk-next-net-eventdev/for-main. Thanks


> Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>
> ---

See above

>  lib/eventdev/rte_event_eth_rx_adapter.c | 84 ++++++++++++++++++++++++++-------
>  1 file changed, 68 insertions(+), 16 deletions(-)
>
> diff --git a/lib/eventdev/rte_event_eth_rx_adapter.c b/lib/eventdev/rte_event_eth_rx_adapter.c
> index 13dfb28..7c94c73 100644
> --- a/lib/eventdev/rte_event_eth_rx_adapter.c
> +++ b/lib/eventdev/rte_event_eth_rx_adapter.c
> @@ -25,7 +25,7 @@
>
>  #define BATCH_SIZE             32
>  #define BLOCK_CNT_THRESHOLD    10
> -#define ETH_EVENT_BUFFER_SIZE  (4*BATCH_SIZE)
> +#define ETH_EVENT_BUFFER_SIZE  (6*BATCH_SIZE)
>  #define MAX_VECTOR_SIZE                1024
>  #define MIN_VECTOR_SIZE                4
>  #define MAX_VECTOR_NS          1E9
> @@ -83,6 +83,13 @@ struct rte_eth_event_enqueue_buffer {
>         uint16_t count;
>         /* Array of events in this buffer */
>         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
> +       /* Event enqueue happens from head */
> +       uint16_t head;
> +       /* New packets from rte_eth_rx_burst is enqued from tail */
> +       uint16_t tail;
> +       /* last element in the buffer before rollover */
> +       uint16_t last;
> +       uint16_t last_mask;
>  };
>
>  struct rte_event_eth_rx_adapter {
> @@ -749,19 +756,35 @@ rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
>         struct rte_eth_event_enqueue_buffer *buf =
>             &rx_adapter->event_enqueue_buffer;
>         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
> +       uint16_t count = buf->last ? buf->last - buf->head : buf->count;
>
> -       if (!buf->count)
> +       if (!count)
>                 return 0;
>
>         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
>                                         rx_adapter->event_port_id,
> -                                       buf->events,
> -                                       buf->count);
> -       if (n != buf->count) {
> -               memmove(buf->events,
> -                       &buf->events[n],
> -                       (buf->count - n) * sizeof(struct rte_event));
> +                                       &buf->events[buf->head],
> +                                       count);
> +       if (n != count)
>                 stats->rx_enq_retry++;
> +
> +       buf->head += n;
> +
> +       if (buf->last && n == count) {
> +               uint16_t n1;
> +
> +               n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
> +                                       rx_adapter->event_port_id,
> +                                       &buf->events[0],
> +                                       buf->tail);
> +
> +               if (n1 != buf->tail)
> +                       stats->rx_enq_retry++;
> +
> +               buf->last = 0;
> +               buf->head = n1;
> +               buf->last_mask = 0;
> +               n += n1;
>         }
>
>         n ? rxa_enq_block_end_ts(rx_adapter, stats) :
> @@ -858,7 +881,7 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
>                                         &dev_info->rx_queue[rx_queue_id];
>         struct rte_eth_event_enqueue_buffer *buf =
>                                         &rx_adapter->event_enqueue_buffer;
> -       struct rte_event *ev = &buf->events[buf->count];
> +       uint16_t new_tail = buf->tail;
>         uint64_t event = eth_rx_queue_info->event;
>         uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
>         struct rte_mbuf *m = mbufs[0];
> @@ -873,7 +896,10 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
>                 rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
>                 do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
>                 for (i = 0; i < num; i++) {
> +                       struct rte_event *ev;
> +
>                         m = mbufs[i];
> +                       ev = &buf->events[new_tail];
>
>                         rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
>                                      : m->hash.rss;
> @@ -881,7 +907,7 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
>                         ev->flow_id = (rss & ~flow_id_mask) |
>                                       (ev->flow_id & flow_id_mask);
>                         ev->mbuf = m;
> -                       ev++;
> +                       new_tail++;
>                 }
>         } else {
>                 num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
> @@ -892,9 +918,14 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
>
>                 dropped = 0;
>                 nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
> -                                       ETH_EVENT_BUFFER_SIZE, buf->count,
> -                                       &buf->events[buf->count], num,
> -                                       dev_info->cb_arg, &dropped);
> +                                      buf->last |
> +                                      (RTE_DIM(buf->events) & ~buf->last_mask),
> +                                      buf->count >= BATCH_SIZE ?
> +                                               buf->count - BATCH_SIZE : 0,
> +                                      &buf->events[buf->tail],
> +                                      num,
> +                                      dev_info->cb_arg,
> +                                      &dropped);
>                 if (unlikely(nb_cb > num))
>                         RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
>                                 nb_cb, num);
> @@ -905,6 +936,27 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
>         }
>
>         buf->count += num;
> +       buf->tail += num;
> +}
> +
> +static inline bool
> +rxa_pkt_buf_available(struct rte_eth_event_enqueue_buffer *buf)
> +{
> +       uint32_t nb_req = buf->tail + BATCH_SIZE;
> +
> +       if (!buf->last) {
> +               if (nb_req <= RTE_DIM(buf->events))
> +                       return true;
> +
> +               if (buf->head >= BATCH_SIZE) {
> +                       buf->last_mask = ~0;
> +                       buf->last = buf->tail;
> +                       buf->tail = 0;
> +                       return true;
> +               }
> +       }
> +
> +       return nb_req <= buf->head;
>  }
>
>  /* Enqueue packets from  <port, q>  to event buffer */
> @@ -929,7 +981,7 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
>         /* Don't do a batch dequeue from the rx queue if there isn't
>          * enough space in the enqueue buffer.
>          */
> -       while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
> +       while (rxa_pkt_buf_available(buf)) {
>                 if (buf->count >= BATCH_SIZE)
>                         rxa_flush_event_buffer(rx_adapter);
>
> @@ -1090,7 +1142,7 @@ rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
>         if (buf->count >= BATCH_SIZE)
>                 rxa_flush_event_buffer(rx_adapter);
>
> -       while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
> +       while (rxa_pkt_buf_available(buf)) {
>                 struct eth_device_info *dev_info;
>                 uint16_t port;
>                 uint16_t queue;
> @@ -1211,7 +1263,7 @@ rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
>                  */
>                 if (buf->count >= BATCH_SIZE)
>                         rxa_flush_event_buffer(rx_adapter);
> -               if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
> +               if (!rxa_pkt_buf_available(buf)) {
>                         rx_adapter->wrr_pos = wrr_pos;
>                         return nb_rx;
>                 }
> --
> 2.6.4
>

^ permalink raw reply	[flat|nested] 2+ messages in thread

end of thread, other threads:[~2021-09-06  8:12 UTC | newest]

Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-08-30 13:06 [dpdk-dev] [PATCH v2] eventdev: rx-adapter: improve enqueue buffer to circular buffer Ganapati Kundapura
2021-09-06  8:11 ` Jerin Jacob

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).