* [dpdk-dev] [PATCH v2] eventdev: rx-adapter: improve enqueue buffer to circular buffer
@ 2021-08-30 13:06 Ganapati Kundapura
2021-09-06 8:11 ` Jerin Jacob
0 siblings, 1 reply; 2+ messages in thread
From: Ganapati Kundapura @ 2021-08-30 13:06 UTC (permalink / raw)
To: jay.jayatheerthan, jerinjacobk; +Cc: dev, ganapati.kundapura
v2:
* Fixed typo in commit message
* changed subject line
v1:
RX adapter uses memove() to move unprocessed events to the beginning of
the packet enqueue buffer. The use memmove() was found to consume good
amount of CPU cycles (about 20%).
This patch removes the use of memove() while implementing a circular
buffer to avoid copying of data. With this change RX adapter is able
to fill the buffer of 16384 events.
Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>
---
lib/eventdev/rte_event_eth_rx_adapter.c | 84 ++++++++++++++++++++++++++-------
1 file changed, 68 insertions(+), 16 deletions(-)
diff --git a/lib/eventdev/rte_event_eth_rx_adapter.c b/lib/eventdev/rte_event_eth_rx_adapter.c
index 13dfb28..7c94c73 100644
--- a/lib/eventdev/rte_event_eth_rx_adapter.c
+++ b/lib/eventdev/rte_event_eth_rx_adapter.c
@@ -25,7 +25,7 @@
#define BATCH_SIZE 32
#define BLOCK_CNT_THRESHOLD 10
-#define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE)
+#define ETH_EVENT_BUFFER_SIZE (6*BATCH_SIZE)
#define MAX_VECTOR_SIZE 1024
#define MIN_VECTOR_SIZE 4
#define MAX_VECTOR_NS 1E9
@@ -83,6 +83,13 @@ struct rte_eth_event_enqueue_buffer {
uint16_t count;
/* Array of events in this buffer */
struct rte_event events[ETH_EVENT_BUFFER_SIZE];
+ /* Event enqueue happens from head */
+ uint16_t head;
+ /* New packets from rte_eth_rx_burst is enqued from tail */
+ uint16_t tail;
+ /* last element in the buffer before rollover */
+ uint16_t last;
+ uint16_t last_mask;
};
struct rte_event_eth_rx_adapter {
@@ -749,19 +756,35 @@ rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
struct rte_eth_event_enqueue_buffer *buf =
&rx_adapter->event_enqueue_buffer;
struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
+ uint16_t count = buf->last ? buf->last - buf->head : buf->count;
- if (!buf->count)
+ if (!count)
return 0;
uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
rx_adapter->event_port_id,
- buf->events,
- buf->count);
- if (n != buf->count) {
- memmove(buf->events,
- &buf->events[n],
- (buf->count - n) * sizeof(struct rte_event));
+ &buf->events[buf->head],
+ count);
+ if (n != count)
stats->rx_enq_retry++;
+
+ buf->head += n;
+
+ if (buf->last && n == count) {
+ uint16_t n1;
+
+ n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
+ rx_adapter->event_port_id,
+ &buf->events[0],
+ buf->tail);
+
+ if (n1 != buf->tail)
+ stats->rx_enq_retry++;
+
+ buf->last = 0;
+ buf->head = n1;
+ buf->last_mask = 0;
+ n += n1;
}
n ? rxa_enq_block_end_ts(rx_adapter, stats) :
@@ -858,7 +881,7 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
&dev_info->rx_queue[rx_queue_id];
struct rte_eth_event_enqueue_buffer *buf =
&rx_adapter->event_enqueue_buffer;
- struct rte_event *ev = &buf->events[buf->count];
+ uint16_t new_tail = buf->tail;
uint64_t event = eth_rx_queue_info->event;
uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
struct rte_mbuf *m = mbufs[0];
@@ -873,7 +896,10 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
for (i = 0; i < num; i++) {
+ struct rte_event *ev;
+
m = mbufs[i];
+ ev = &buf->events[new_tail];
rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
: m->hash.rss;
@@ -881,7 +907,7 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
ev->flow_id = (rss & ~flow_id_mask) |
(ev->flow_id & flow_id_mask);
ev->mbuf = m;
- ev++;
+ new_tail++;
}
} else {
num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
@@ -892,9 +918,14 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
dropped = 0;
nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
- ETH_EVENT_BUFFER_SIZE, buf->count,
- &buf->events[buf->count], num,
- dev_info->cb_arg, &dropped);
+ buf->last |
+ (RTE_DIM(buf->events) & ~buf->last_mask),
+ buf->count >= BATCH_SIZE ?
+ buf->count - BATCH_SIZE : 0,
+ &buf->events[buf->tail],
+ num,
+ dev_info->cb_arg,
+ &dropped);
if (unlikely(nb_cb > num))
RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
nb_cb, num);
@@ -905,6 +936,27 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
}
buf->count += num;
+ buf->tail += num;
+}
+
+static inline bool
+rxa_pkt_buf_available(struct rte_eth_event_enqueue_buffer *buf)
+{
+ uint32_t nb_req = buf->tail + BATCH_SIZE;
+
+ if (!buf->last) {
+ if (nb_req <= RTE_DIM(buf->events))
+ return true;
+
+ if (buf->head >= BATCH_SIZE) {
+ buf->last_mask = ~0;
+ buf->last = buf->tail;
+ buf->tail = 0;
+ return true;
+ }
+ }
+
+ return nb_req <= buf->head;
}
/* Enqueue packets from <port, q> to event buffer */
@@ -929,7 +981,7 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
/* Don't do a batch dequeue from the rx queue if there isn't
* enough space in the enqueue buffer.
*/
- while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
+ while (rxa_pkt_buf_available(buf)) {
if (buf->count >= BATCH_SIZE)
rxa_flush_event_buffer(rx_adapter);
@@ -1090,7 +1142,7 @@ rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
if (buf->count >= BATCH_SIZE)
rxa_flush_event_buffer(rx_adapter);
- while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
+ while (rxa_pkt_buf_available(buf)) {
struct eth_device_info *dev_info;
uint16_t port;
uint16_t queue;
@@ -1211,7 +1263,7 @@ rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
*/
if (buf->count >= BATCH_SIZE)
rxa_flush_event_buffer(rx_adapter);
- if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
+ if (!rxa_pkt_buf_available(buf)) {
rx_adapter->wrr_pos = wrr_pos;
return nb_rx;
}
--
2.6.4
^ permalink raw reply [flat|nested] 2+ messages in thread
* Re: [dpdk-dev] [PATCH v2] eventdev: rx-adapter: improve enqueue buffer to circular buffer
2021-08-30 13:06 [dpdk-dev] [PATCH v2] eventdev: rx-adapter: improve enqueue buffer to circular buffer Ganapati Kundapura
@ 2021-09-06 8:11 ` Jerin Jacob
0 siblings, 0 replies; 2+ messages in thread
From: Jerin Jacob @ 2021-09-06 8:11 UTC (permalink / raw)
To: Ganapati Kundapura; +Cc: Jayatheerthan, Jay, dpdk-dev
On Mon, Aug 30, 2021 at 6:41 PM Ganapati Kundapura
<ganapati.kundapura@intel.com> wrote:
>
> v2:
> * Fixed typo in commit message
> * changed subject line
>
> v1:
Changelog should be moved under "---".. See below.
> RX adapter uses memove() to move unprocessed events to the beginning of
> the packet enqueue buffer. The use memmove() was found to consume good
> amount of CPU cycles (about 20%).
>
> This patch removes the use of memove() while implementing a circular
> buffer to avoid copying of data. With this change RX adapter is able
> to fill the buffer of 16384 events.
>
Acked-by: Jerin Jacob <jerinj@marvell.com>
Fixed the changelog and change the subject
as "eventdev: make Rx-adapter enqueue buffer as circular buffer"
Applied to dpdk-next-net-eventdev/for-main. Thanks
> Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>
> ---
See above
> lib/eventdev/rte_event_eth_rx_adapter.c | 84 ++++++++++++++++++++++++++-------
> 1 file changed, 68 insertions(+), 16 deletions(-)
>
> diff --git a/lib/eventdev/rte_event_eth_rx_adapter.c b/lib/eventdev/rte_event_eth_rx_adapter.c
> index 13dfb28..7c94c73 100644
> --- a/lib/eventdev/rte_event_eth_rx_adapter.c
> +++ b/lib/eventdev/rte_event_eth_rx_adapter.c
> @@ -25,7 +25,7 @@
>
> #define BATCH_SIZE 32
> #define BLOCK_CNT_THRESHOLD 10
> -#define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE)
> +#define ETH_EVENT_BUFFER_SIZE (6*BATCH_SIZE)
> #define MAX_VECTOR_SIZE 1024
> #define MIN_VECTOR_SIZE 4
> #define MAX_VECTOR_NS 1E9
> @@ -83,6 +83,13 @@ struct rte_eth_event_enqueue_buffer {
> uint16_t count;
> /* Array of events in this buffer */
> struct rte_event events[ETH_EVENT_BUFFER_SIZE];
> + /* Event enqueue happens from head */
> + uint16_t head;
> + /* New packets from rte_eth_rx_burst is enqued from tail */
> + uint16_t tail;
> + /* last element in the buffer before rollover */
> + uint16_t last;
> + uint16_t last_mask;
> };
>
> struct rte_event_eth_rx_adapter {
> @@ -749,19 +756,35 @@ rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
> struct rte_eth_event_enqueue_buffer *buf =
> &rx_adapter->event_enqueue_buffer;
> struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
> + uint16_t count = buf->last ? buf->last - buf->head : buf->count;
>
> - if (!buf->count)
> + if (!count)
> return 0;
>
> uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
> rx_adapter->event_port_id,
> - buf->events,
> - buf->count);
> - if (n != buf->count) {
> - memmove(buf->events,
> - &buf->events[n],
> - (buf->count - n) * sizeof(struct rte_event));
> + &buf->events[buf->head],
> + count);
> + if (n != count)
> stats->rx_enq_retry++;
> +
> + buf->head += n;
> +
> + if (buf->last && n == count) {
> + uint16_t n1;
> +
> + n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
> + rx_adapter->event_port_id,
> + &buf->events[0],
> + buf->tail);
> +
> + if (n1 != buf->tail)
> + stats->rx_enq_retry++;
> +
> + buf->last = 0;
> + buf->head = n1;
> + buf->last_mask = 0;
> + n += n1;
> }
>
> n ? rxa_enq_block_end_ts(rx_adapter, stats) :
> @@ -858,7 +881,7 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
> &dev_info->rx_queue[rx_queue_id];
> struct rte_eth_event_enqueue_buffer *buf =
> &rx_adapter->event_enqueue_buffer;
> - struct rte_event *ev = &buf->events[buf->count];
> + uint16_t new_tail = buf->tail;
> uint64_t event = eth_rx_queue_info->event;
> uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
> struct rte_mbuf *m = mbufs[0];
> @@ -873,7 +896,10 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
> rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
> do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
> for (i = 0; i < num; i++) {
> + struct rte_event *ev;
> +
> m = mbufs[i];
> + ev = &buf->events[new_tail];
>
> rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
> : m->hash.rss;
> @@ -881,7 +907,7 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
> ev->flow_id = (rss & ~flow_id_mask) |
> (ev->flow_id & flow_id_mask);
> ev->mbuf = m;
> - ev++;
> + new_tail++;
> }
> } else {
> num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
> @@ -892,9 +918,14 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
>
> dropped = 0;
> nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
> - ETH_EVENT_BUFFER_SIZE, buf->count,
> - &buf->events[buf->count], num,
> - dev_info->cb_arg, &dropped);
> + buf->last |
> + (RTE_DIM(buf->events) & ~buf->last_mask),
> + buf->count >= BATCH_SIZE ?
> + buf->count - BATCH_SIZE : 0,
> + &buf->events[buf->tail],
> + num,
> + dev_info->cb_arg,
> + &dropped);
> if (unlikely(nb_cb > num))
> RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
> nb_cb, num);
> @@ -905,6 +936,27 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
> }
>
> buf->count += num;
> + buf->tail += num;
> +}
> +
> +static inline bool
> +rxa_pkt_buf_available(struct rte_eth_event_enqueue_buffer *buf)
> +{
> + uint32_t nb_req = buf->tail + BATCH_SIZE;
> +
> + if (!buf->last) {
> + if (nb_req <= RTE_DIM(buf->events))
> + return true;
> +
> + if (buf->head >= BATCH_SIZE) {
> + buf->last_mask = ~0;
> + buf->last = buf->tail;
> + buf->tail = 0;
> + return true;
> + }
> + }
> +
> + return nb_req <= buf->head;
> }
>
> /* Enqueue packets from <port, q> to event buffer */
> @@ -929,7 +981,7 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
> /* Don't do a batch dequeue from the rx queue if there isn't
> * enough space in the enqueue buffer.
> */
> - while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
> + while (rxa_pkt_buf_available(buf)) {
> if (buf->count >= BATCH_SIZE)
> rxa_flush_event_buffer(rx_adapter);
>
> @@ -1090,7 +1142,7 @@ rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
> if (buf->count >= BATCH_SIZE)
> rxa_flush_event_buffer(rx_adapter);
>
> - while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
> + while (rxa_pkt_buf_available(buf)) {
> struct eth_device_info *dev_info;
> uint16_t port;
> uint16_t queue;
> @@ -1211,7 +1263,7 @@ rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
> */
> if (buf->count >= BATCH_SIZE)
> rxa_flush_event_buffer(rx_adapter);
> - if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
> + if (!rxa_pkt_buf_available(buf)) {
> rx_adapter->wrr_pos = wrr_pos;
> return nb_rx;
> }
> --
> 2.6.4
>
^ permalink raw reply [flat|nested] 2+ messages in thread
end of thread, other threads:[~2021-09-06 8:12 UTC | newest]
Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-08-30 13:06 [dpdk-dev] [PATCH v2] eventdev: rx-adapter: improve enqueue buffer to circular buffer Ganapati Kundapura
2021-09-06 8:11 ` Jerin Jacob
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).