From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id E3C4AA0548; Fri, 27 Aug 2021 11:42:12 +0200 (CEST) Received: from [217.70.189.124] (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 5E932406B4; Fri, 27 Aug 2021 11:42:12 +0200 (CEST) Received: from mga01.intel.com (mga01.intel.com [192.55.52.88]) by mails.dpdk.org (Postfix) with ESMTP id 47D924067C for ; Fri, 27 Aug 2021 11:42:10 +0200 (CEST) X-IronPort-AV: E=McAfee;i="6200,9189,10088"; a="240149324" X-IronPort-AV: E=Sophos;i="5.84,356,1620716400"; d="scan'208";a="240149324" Received: from fmsmga003.fm.intel.com ([10.253.24.29]) by fmsmga101.fm.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 27 Aug 2021 02:42:09 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.84,356,1620716400"; d="scan'208";a="528254790" Received: from txandevlnx322.an.intel.com ([10.123.117.44]) by FMSMGA003.fm.intel.com with ESMTP; 27 Aug 2021 02:42:08 -0700 From: Ganapati Kundapura To: jay.jayatheerthan@intel.com, jerinjacobk@gmail.com Cc: dev@dpdk.org Date: Fri, 27 Aug 2021 04:42:04 -0500 Message-Id: <20210827094204.474846-1-ganapati.kundapura@intel.com> X-Mailer: git-send-email 2.23.0 MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [dpdk-dev] [PATCH v1] eventdev: change packet enqueue buffer in RX adapter to circular buffer X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" RX adapter user memove() to move unprocessed events to the beginning of the packet enqueue buffer. The use memmove() was found to consume good amount of CPU cycles (about 20%). This patch removes the use of memove() while implementina a circular buffer to avoid copying of data. With this change RX adapter is able to fill the buffer of 16384 events. Signed-off-by: Ganapati Kundapura --- lib/eventdev/rte_event_eth_rx_adapter.c | 84 ++++++++++++++++++++++++++------- 1 file changed, 68 insertions(+), 16 deletions(-) diff --git a/lib/eventdev/rte_event_eth_rx_adapter.c b/lib/eventdev/rte_event_eth_rx_adapter.c index 13dfb28..7c94c73 100644 --- a/lib/eventdev/rte_event_eth_rx_adapter.c +++ b/lib/eventdev/rte_event_eth_rx_adapter.c @@ -25,7 +25,7 @@ #define BATCH_SIZE 32 #define BLOCK_CNT_THRESHOLD 10 -#define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE) +#define ETH_EVENT_BUFFER_SIZE (6*BATCH_SIZE) #define MAX_VECTOR_SIZE 1024 #define MIN_VECTOR_SIZE 4 #define MAX_VECTOR_NS 1E9 @@ -83,6 +83,13 @@ struct rte_eth_event_enqueue_buffer { uint16_t count; /* Array of events in this buffer */ struct rte_event events[ETH_EVENT_BUFFER_SIZE]; + /* Event enqueue happens from head */ + uint16_t head; + /* New packets from rte_eth_rx_burst is enqued from tail */ + uint16_t tail; + /* last element in the buffer before rollover */ + uint16_t last; + uint16_t last_mask; }; struct rte_event_eth_rx_adapter { @@ -749,19 +756,35 @@ rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter) struct rte_eth_event_enqueue_buffer *buf = &rx_adapter->event_enqueue_buffer; struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats; + uint16_t count = buf->last ? buf->last - buf->head : buf->count; - if (!buf->count) + if (!count) return 0; uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id, rx_adapter->event_port_id, - buf->events, - buf->count); - if (n != buf->count) { - memmove(buf->events, - &buf->events[n], - (buf->count - n) * sizeof(struct rte_event)); + &buf->events[buf->head], + count); + if (n != count) stats->rx_enq_retry++; + + buf->head += n; + + if (buf->last && n == count) { + uint16_t n1; + + n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id, + rx_adapter->event_port_id, + &buf->events[0], + buf->tail); + + if (n1 != buf->tail) + stats->rx_enq_retry++; + + buf->last = 0; + buf->head = n1; + buf->last_mask = 0; + n += n1; } n ? rxa_enq_block_end_ts(rx_adapter, stats) : @@ -858,7 +881,7 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter, &dev_info->rx_queue[rx_queue_id]; struct rte_eth_event_enqueue_buffer *buf = &rx_adapter->event_enqueue_buffer; - struct rte_event *ev = &buf->events[buf->count]; + uint16_t new_tail = buf->tail; uint64_t event = eth_rx_queue_info->event; uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask; struct rte_mbuf *m = mbufs[0]; @@ -873,7 +896,10 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter, rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1); do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask; for (i = 0; i < num; i++) { + struct rte_event *ev; + m = mbufs[i]; + ev = &buf->events[new_tail]; rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be) : m->hash.rss; @@ -881,7 +907,7 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter, ev->flow_id = (rss & ~flow_id_mask) | (ev->flow_id & flow_id_mask); ev->mbuf = m; - ev++; + new_tail++; } } else { num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info, @@ -892,9 +918,14 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter, dropped = 0; nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id, - ETH_EVENT_BUFFER_SIZE, buf->count, - &buf->events[buf->count], num, - dev_info->cb_arg, &dropped); + buf->last | + (RTE_DIM(buf->events) & ~buf->last_mask), + buf->count >= BATCH_SIZE ? + buf->count - BATCH_SIZE : 0, + &buf->events[buf->tail], + num, + dev_info->cb_arg, + &dropped); if (unlikely(nb_cb > num)) RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events", nb_cb, num); @@ -905,6 +936,27 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter, } buf->count += num; + buf->tail += num; +} + +static inline bool +rxa_pkt_buf_available(struct rte_eth_event_enqueue_buffer *buf) +{ + uint32_t nb_req = buf->tail + BATCH_SIZE; + + if (!buf->last) { + if (nb_req <= RTE_DIM(buf->events)) + return true; + + if (buf->head >= BATCH_SIZE) { + buf->last_mask = ~0; + buf->last = buf->tail; + buf->tail = 0; + return true; + } + } + + return nb_req <= buf->head; } /* Enqueue packets from to event buffer */ @@ -929,7 +981,7 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter, /* Don't do a batch dequeue from the rx queue if there isn't * enough space in the enqueue buffer. */ - while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) { + while (rxa_pkt_buf_available(buf)) { if (buf->count >= BATCH_SIZE) rxa_flush_event_buffer(rx_adapter); @@ -1090,7 +1142,7 @@ rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter) if (buf->count >= BATCH_SIZE) rxa_flush_event_buffer(rx_adapter); - while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) { + while (rxa_pkt_buf_available(buf)) { struct eth_device_info *dev_info; uint16_t port; uint16_t queue; @@ -1211,7 +1263,7 @@ rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter) */ if (buf->count >= BATCH_SIZE) rxa_flush_event_buffer(rx_adapter); - if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) { + if (!rxa_pkt_buf_available(buf)) { rx_adapter->wrr_pos = wrr_pos; return nb_rx; } -- 2.6.4