From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 0DC25A0C4D; Mon, 6 Sep 2021 10:12:13 +0200 (CEST) Received: from [217.70.189.124] (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id C0DB140E32; Mon, 6 Sep 2021 10:12:12 +0200 (CEST) Received: from mail-io1-f52.google.com (mail-io1-f52.google.com [209.85.166.52]) by mails.dpdk.org (Postfix) with ESMTP id 1516440C35 for ; Mon, 6 Sep 2021 10:12:12 +0200 (CEST) Received: by mail-io1-f52.google.com with SMTP id b200so7619231iof.13 for ; Mon, 06 Sep 2021 01:12:11 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20210112; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=gVeTUqZ1431EPU/eQdWQg19Nt5W7YagYfcxmjrP78kc=; b=dG+x9QvNLL09uXjOBBcYBLxzc6pRAfh57iDKdFgnlGKC82TtCV/hPivE6/uaN4hTrd VtI0rLbG4TDGmBRWd3bMw9bEE+KC0rz6de4UYG/3r/TbY176fLJXXwNgMpfXOHR+Ben2 3zdgN7jPtSQAc5vBFldTfMTdK9pWm0DzO04YaiKQZ13MsGmfLucTvbp3/y4YcRJ+AcFa f2Zd4n2YxV/2KpKsbY8Kb1i4RW9bPl3pcDm43DF6BRXFfQ1kN9yNZV9VgyshsZQNMAFS 66D0f7Suoy42h8N0q7Rqwon0VDG8YY9ke1KuMHSFO2Og32kk1NTImR8PKT6VjNePIYXJ aGsw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=gVeTUqZ1431EPU/eQdWQg19Nt5W7YagYfcxmjrP78kc=; b=UrGN6ZtNq32buntTUeGk4Mgybu028/gB23nsuxyp0f6S7O4imJATlpDL9uxPywaV1M 9aulTSD+6fy5zOTCJ8sR2ObMfGHUW97g/Vf41alIZ1+rMXLo2AIQ3BmOYbIaYJP/ZFIO sNqOLAcau+WIVP7GHkHdvZGBRlRv2BMWsHStr5ujKTD7/oQ2VC1UcGSGxj2J9oFAhgz2 v/V/g1oiHK2qZZ02eqqgCeqFHRV6CAieocRFSyGKjKk74yzthceqfjA7ziR3UhiAfbvH LEmpItcJanKY6dO5swJQ5jANp/rZFh3AcQqGeocct/MmDIQlXYvnGWAxdKIe20p+JSie xEyQ== X-Gm-Message-State: AOAM531vbPB3zD6b8v93E6zWYQk73yYlKgVbonkBfHlpn9n8/bmMVFdc sLY9LJxxVZaAlli3O/T0wyZ5Iu/BJaZVBbNyqmE= X-Google-Smtp-Source: ABdhPJwvlU7yfYk2QHmALdSgoQ63/OxSy0wfRfJaF6lgnMFtIW1GvIei61pK5Y6rStyFIIiAGdfO4yAHO6c/XRGOOV8= X-Received: by 2002:a5e:a813:: with SMTP id c19mr9001204ioa.199.1630915931294; Mon, 06 Sep 2021 01:12:11 -0700 (PDT) MIME-Version: 1.0 References: <20210830130625.1892399-1-ganapati.kundapura@intel.com> In-Reply-To: <20210830130625.1892399-1-ganapati.kundapura@intel.com> From: Jerin Jacob Date: Mon, 6 Sep 2021 13:41:45 +0530 Message-ID: To: Ganapati Kundapura Cc: "Jayatheerthan, Jay" , dpdk-dev Content-Type: text/plain; charset="UTF-8" Subject: Re: [dpdk-dev] [PATCH v2] eventdev: rx-adapter: improve enqueue buffer to circular buffer X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" On Mon, Aug 30, 2021 at 6:41 PM Ganapati Kundapura wrote: > > v2: > * Fixed typo in commit message > * changed subject line > > v1: Changelog should be moved under "---".. See below. > RX adapter uses memove() to move unprocessed events to the beginning of > the packet enqueue buffer. The use memmove() was found to consume good > amount of CPU cycles (about 20%). > > This patch removes the use of memove() while implementing a circular > buffer to avoid copying of data. With this change RX adapter is able > to fill the buffer of 16384 events. > Acked-by: Jerin Jacob Fixed the changelog and change the subject as "eventdev: make Rx-adapter enqueue buffer as circular buffer" Applied to dpdk-next-net-eventdev/for-main. Thanks > Signed-off-by: Ganapati Kundapura > --- See above > lib/eventdev/rte_event_eth_rx_adapter.c | 84 ++++++++++++++++++++++++++------- > 1 file changed, 68 insertions(+), 16 deletions(-) > > diff --git a/lib/eventdev/rte_event_eth_rx_adapter.c b/lib/eventdev/rte_event_eth_rx_adapter.c > index 13dfb28..7c94c73 100644 > --- a/lib/eventdev/rte_event_eth_rx_adapter.c > +++ b/lib/eventdev/rte_event_eth_rx_adapter.c > @@ -25,7 +25,7 @@ > > #define BATCH_SIZE 32 > #define BLOCK_CNT_THRESHOLD 10 > -#define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE) > +#define ETH_EVENT_BUFFER_SIZE (6*BATCH_SIZE) > #define MAX_VECTOR_SIZE 1024 > #define MIN_VECTOR_SIZE 4 > #define MAX_VECTOR_NS 1E9 > @@ -83,6 +83,13 @@ struct rte_eth_event_enqueue_buffer { > uint16_t count; > /* Array of events in this buffer */ > struct rte_event events[ETH_EVENT_BUFFER_SIZE]; > + /* Event enqueue happens from head */ > + uint16_t head; > + /* New packets from rte_eth_rx_burst is enqued from tail */ > + uint16_t tail; > + /* last element in the buffer before rollover */ > + uint16_t last; > + uint16_t last_mask; > }; > > struct rte_event_eth_rx_adapter { > @@ -749,19 +756,35 @@ rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter) > struct rte_eth_event_enqueue_buffer *buf = > &rx_adapter->event_enqueue_buffer; > struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats; > + uint16_t count = buf->last ? buf->last - buf->head : buf->count; > > - if (!buf->count) > + if (!count) > return 0; > > uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id, > rx_adapter->event_port_id, > - buf->events, > - buf->count); > - if (n != buf->count) { > - memmove(buf->events, > - &buf->events[n], > - (buf->count - n) * sizeof(struct rte_event)); > + &buf->events[buf->head], > + count); > + if (n != count) > stats->rx_enq_retry++; > + > + buf->head += n; > + > + if (buf->last && n == count) { > + uint16_t n1; > + > + n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id, > + rx_adapter->event_port_id, > + &buf->events[0], > + buf->tail); > + > + if (n1 != buf->tail) > + stats->rx_enq_retry++; > + > + buf->last = 0; > + buf->head = n1; > + buf->last_mask = 0; > + n += n1; > } > > n ? rxa_enq_block_end_ts(rx_adapter, stats) : > @@ -858,7 +881,7 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter, > &dev_info->rx_queue[rx_queue_id]; > struct rte_eth_event_enqueue_buffer *buf = > &rx_adapter->event_enqueue_buffer; > - struct rte_event *ev = &buf->events[buf->count]; > + uint16_t new_tail = buf->tail; > uint64_t event = eth_rx_queue_info->event; > uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask; > struct rte_mbuf *m = mbufs[0]; > @@ -873,7 +896,10 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter, > rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1); > do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask; > for (i = 0; i < num; i++) { > + struct rte_event *ev; > + > m = mbufs[i]; > + ev = &buf->events[new_tail]; > > rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be) > : m->hash.rss; > @@ -881,7 +907,7 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter, > ev->flow_id = (rss & ~flow_id_mask) | > (ev->flow_id & flow_id_mask); > ev->mbuf = m; > - ev++; > + new_tail++; > } > } else { > num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info, > @@ -892,9 +918,14 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter, > > dropped = 0; > nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id, > - ETH_EVENT_BUFFER_SIZE, buf->count, > - &buf->events[buf->count], num, > - dev_info->cb_arg, &dropped); > + buf->last | > + (RTE_DIM(buf->events) & ~buf->last_mask), > + buf->count >= BATCH_SIZE ? > + buf->count - BATCH_SIZE : 0, > + &buf->events[buf->tail], > + num, > + dev_info->cb_arg, > + &dropped); > if (unlikely(nb_cb > num)) > RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events", > nb_cb, num); > @@ -905,6 +936,27 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter, > } > > buf->count += num; > + buf->tail += num; > +} > + > +static inline bool > +rxa_pkt_buf_available(struct rte_eth_event_enqueue_buffer *buf) > +{ > + uint32_t nb_req = buf->tail + BATCH_SIZE; > + > + if (!buf->last) { > + if (nb_req <= RTE_DIM(buf->events)) > + return true; > + > + if (buf->head >= BATCH_SIZE) { > + buf->last_mask = ~0; > + buf->last = buf->tail; > + buf->tail = 0; > + return true; > + } > + } > + > + return nb_req <= buf->head; > } > > /* Enqueue packets from to event buffer */ > @@ -929,7 +981,7 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter, > /* Don't do a batch dequeue from the rx queue if there isn't > * enough space in the enqueue buffer. > */ > - while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) { > + while (rxa_pkt_buf_available(buf)) { > if (buf->count >= BATCH_SIZE) > rxa_flush_event_buffer(rx_adapter); > > @@ -1090,7 +1142,7 @@ rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter) > if (buf->count >= BATCH_SIZE) > rxa_flush_event_buffer(rx_adapter); > > - while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) { > + while (rxa_pkt_buf_available(buf)) { > struct eth_device_info *dev_info; > uint16_t port; > uint16_t queue; > @@ -1211,7 +1263,7 @@ rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter) > */ > if (buf->count >= BATCH_SIZE) > rxa_flush_event_buffer(rx_adapter); > - if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) { > + if (!rxa_pkt_buf_available(buf)) { > rx_adapter->wrr_pos = wrr_pos; > return nb_rx; > } > -- > 2.6.4 >