From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga02.intel.com (mga02.intel.com [134.134.136.20]) by dpdk.org (Postfix) with ESMTP id A07501B209 for ; Mon, 15 Apr 2019 16:48:04 +0200 (CEST) X-Amp-Result: UNSCANNABLE X-Amp-File-Uploaded: False Received: from fmsmga002.fm.intel.com ([10.253.24.26]) by orsmga101.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 15 Apr 2019 07:48:03 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.60,354,1549958400"; d="scan'208";a="161719044" Received: from yexl-server.sh.intel.com (HELO localhost) ([10.67.110.206]) by fmsmga002.fm.intel.com with ESMTP; 15 Apr 2019 07:48:02 -0700 Date: Mon, 15 Apr 2019 22:42:24 +0800 From: Ye Xiaolong To: David Marchand Cc: dev , Ferruh Yigit , Qi Zhang , Karlsson Magnus , Topel Bjorn Message-ID: <20190415144224.GA69356@intel.com> References: <20190412144844.50712-1-xiaolong.ye@intel.com> <20190412144844.50712-2-xiaolong.ye@intel.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: User-Agent: Mutt/1.9.4 (2018-02-28) Subject: Re: [dpdk-dev] [PATCH 2/2] net/af_xdp: make reserve/submit peek/release consistent X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 15 Apr 2019 14:48:05 -0000 Hi, David Thanks for you detailed review comment. On 04/15, David Marchand wrote: >On Fri, Apr 12, 2019 at 4:54 PM Xiaolong Ye wrote: > >> As David pointed out, if we reserve N slots, but only submit n slots, >> we would end up with an incorrect opinion of the number of available slots >> later, we also would get wrong idx when we call xsk_ring_prod__reserve next >> time. It also applies to xsk_ring_cons__peek()/xsk_ring_cons__release(). >> >> This patch ensures that both reserve/submit and peek/release are >> consistent. >> >> Fixes: f1debd77efaf ("net/af_xdp: introduce AF_XDP PMD") >> >> Reported-by: David Marchand >> Signed-off-by: Xiaolong Ye >> --- >> drivers/net/af_xdp/rte_eth_af_xdp.c | 80 +++++++++++++++-------------- >> 1 file changed, 41 insertions(+), 39 deletions(-) >> >> diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c >> b/drivers/net/af_xdp/rte_eth_af_xdp.c >> index 5cc643ce2..76a6a8331 100644 >> --- a/drivers/net/af_xdp/rte_eth_af_xdp.c >> +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c >> @@ -138,22 +138,19 @@ reserve_fill_queue(struct xsk_umem_info *umem, int >> reserve_size) >> { >> struct xsk_ring_prod *fq = &umem->fq; >> uint32_t idx; >> - int i, ret; >> - >> - ret = xsk_ring_prod__reserve(fq, reserve_size, &idx); >> - if (unlikely(!ret)) { >> - AF_XDP_LOG(ERR, "Failed to reserve enough fq descs.\n"); >> - return ret; >> - } >> + int i; >> >> for (i = 0; i < reserve_size; i++) { >> __u64 *fq_addr; >> void *addr = NULL; >> if (rte_ring_dequeue(umem->buf_ring, &addr)) { >> - i--; >> break; >> } >> - fq_addr = xsk_ring_prod__fill_addr(fq, idx++); >> + if (unlikely(!xsk_ring_prod__reserve(fq, 1, &idx))) { >> + AF_XDP_LOG(WARNING, "Failed to reserve 1 fq >> desc.\n"); >> + break; >> + } >> + fq_addr = xsk_ring_prod__fill_addr(fq, idx); >> *fq_addr = (uint64_t)addr; >> } >> >> >I just spotted that reserve_fill_queue always returns 0. >I understand that xsk_configure expects an errors when not succeeding in >populating this ring. >And for this, it expects a non zero value for this. You are right, reserve_fill_queue does need retrun a non zero value when it fails to populate the ring. > >How about something like (neither tested nor compiled): > >static inline int >reserve_fill_queue(struct xsk_umem_info *umem, int reserve_size) >{ > struct xsk_ring_prod *fq = &umem->fq; > void *addrs[reserve_size]; > uint32_t idx; > int i, ret; > > if (rte_ring_dequeue_bulk(umem->buf_ring, &addrs, reserve_size, NULL) > != reserve_size) { > AF_XDP_LOG(DEBUG, "Failed to get enough buffers for fq.\n"); > return -1; > } > > ret = xsk_ring_prod__reserve(fq, reserve_size, &idx); > if (unlikely(!ret)) { > AF_XDP_LOG(DEBUG, "Failed to reserve enough fq descs.\n"); > rte_ring_enqueue_bulk(umem->buf_ring, &addrs, reserve_size, > NULL); > return -1; > } > > for (i = 0; i < reserve_size; i++) { > __u64 *fq_addr; > > fq_addr = xsk_ring_prod__fill_addr(fq, idx++); > *fq_addr = (uint64_t)addrs[i]; > } > > xsk_ring_prod__submit(fq, reserve_size); > > return 0; >} Sounds better, I'll adopt it in my new version. > > > >@@ -179,6 +176,9 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, >> uint16_t nb_pkts) >> >> nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE); >> >> + if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts) >> != 0)) >> + return 0; >> + >> rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx); >> if (rcvd == 0) >> return 0; >> > >When xsk_ring_cons__peek() returns 0, we will leak nb_pkts freshly >allocated mbufs. >See below for a suggestion. > > >@@ -186,9 +186,6 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, >> uint16_t nb_pkts) >> if (xsk_prod_nb_free(fq, free_thresh) >= free_thresh) >> (void)reserve_fill_queue(umem, ETH_AF_XDP_RX_BATCH_SIZE); >> >> - if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, rcvd) != >> 0)) >> - return 0; >> - >> for (i = 0; i < rcvd; i++) { >> const struct xdp_desc *desc; >> uint64_t addr; >> @@ -211,6 +208,10 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, >> uint16_t nb_pkts) >> >> xsk_ring_cons__release(rx, rcvd); >> >> + /* free the extra mbufs */ >> + for (; rcvd < nb_pkts; rcvd++) >> + rte_pktmbuf_free(mbufs[rcvd]); >> + >> > >You can move this block after the statistic update... > > > /* statistics */ >> rxq->stats.rx_pkts += (rcvd - dropped); >> rxq->stats.rx_bytes += rx_bytes; >> > >... then define a out: label. >And those mbufs are still clean and coming from a single mempool, we can >put them back as a single bulk. >Something like (again, untested): > >out: > if (count != nb_pkts) { > rte_mempool_put_bulk(rxq->mb_pool, &mbufs[count], > nb_pkts - count); > } > > return count; >} > >And you would jump to this label when xsk_ring_cons__peek() == 0. >What do you think ? I think these are all sensible suggestions, will do. > > > >@@ -261,55 +262,56 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, >> uint16_t nb_pkts) >> struct xsk_umem_info *umem = txq->pair->umem; >> struct rte_mbuf *mbuf; >> void *addrs[ETH_AF_XDP_TX_BATCH_SIZE]; >> + struct rte_mbuf *valid_bufs[ETH_AF_XDP_TX_BATCH_SIZE]; >> unsigned long tx_bytes = 0; >> - int i, valid = 0; >> + int i; >> + uint16_t nb_valid = 0; >> uint32_t idx_tx; >> + uint32_t buf_len = ETH_AF_XDP_FRAME_SIZE - >> ETH_AF_XDP_DATA_HEADROOM; >> >> nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE); >> >> pull_umem_cq(umem, nb_pkts); >> >> - nb_pkts = rte_ring_dequeue_bulk(umem->buf_ring, addrs, >> - nb_pkts, NULL); >> - if (nb_pkts == 0) >> + for (i = 0; i < nb_pkts; i++) { >> + if (bufs[i]->pkt_len <= buf_len) >> + valid_bufs[nb_valid++] = bufs[i]; >> + else >> + rte_pktmbuf_free(bufs[i]); >> + } >> + >> + nb_valid = rte_ring_dequeue_bulk(umem->buf_ring, addrs, >> + nb_valid, NULL); >> + if (nb_valid == 0) >> return 0; >> >> >You can't return 0. >You have stolen buffers from the caller with the previous check on pktlen. >When the application resends this bulk or frees the whole bulk, we will >have mbuf reuse bugs. > >Thinking about this, why would this happen ? >This limitation should be handled by properly reporting the mtu. >The application would then know it can't send those too big mbufs. > I think we can rely on mtu to handle the limitation, and assume that in this tx function, all pktlen are valid. Will change in next version. Thanks, Xiaolong > >If I missed something else and/or if you still don't trust the application, >I think the tx burst function should go like as described below. > >The first thing to do is check the mbufs length. >At the first invalid length, you break from the loop at index i (i is the >invalid buffer index). >Then dequeue i - 1 buffers from buf_ring. >Reserve i - 1 slots in tx. > >And return i buffers have been sent (plus a tx error stat += 1). > >You need to carefully take into account each step and free the buffer to >buf_ring when relevant and free the mbufs properly. > > >- if (xsk_ring_prod__reserve(&txq->tx, nb_pkts, &idx_tx) != nb_pkts) { >> + if (xsk_ring_prod__reserve(&txq->tx, nb_valid, &idx_tx) != >> nb_valid) { >> kick_tx(txq); >> - rte_ring_enqueue_bulk(umem->buf_ring, addrs, nb_pkts, >> NULL); >> + rte_ring_enqueue_bulk(umem->buf_ring, addrs, nb_valid, >> NULL); >> return 0; >> } >> >> - for (i = 0; i < nb_pkts; i++) { >> + for (i = 0; i < nb_valid; i++) { >> struct xdp_desc *desc; >> void *pkt; >> - uint32_t buf_len = ETH_AF_XDP_FRAME_SIZE >> - - ETH_AF_XDP_DATA_HEADROOM; >> desc = xsk_ring_prod__tx_desc(&txq->tx, idx_tx + i); >> - mbuf = bufs[i]; >> - if (mbuf->pkt_len <= buf_len) { >> - desc->addr = (uint64_t)addrs[valid]; >> - desc->len = mbuf->pkt_len; >> - pkt = xsk_umem__get_data(umem->mz->addr, >> - desc->addr); >> - rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *), >> - desc->len); >> - valid++; >> - tx_bytes += mbuf->pkt_len; >> - } >> + mbuf = valid_bufs[i]; >> + desc->addr = (uint64_t)addrs[i]; >> + desc->len = mbuf->pkt_len; >> + pkt = xsk_umem__get_data(umem->mz->addr, >> + desc->addr); >> + rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *), >> + desc->len); >> + tx_bytes += mbuf->pkt_len; >> rte_pktmbuf_free(mbuf); >> } >> >> - xsk_ring_prod__submit(&txq->tx, nb_pkts); >> + xsk_ring_prod__submit(&txq->tx, nb_valid); >> >> kick_tx(txq); >> >> - if (valid < nb_pkts) >> - rte_ring_enqueue_bulk(umem->buf_ring, &addrs[valid], >> - nb_pkts - valid, NULL); >> - >> - txq->stats.err_pkts += nb_pkts - valid; >> - txq->stats.tx_pkts += valid; >> + txq->stats.err_pkts += nb_pkts - nb_valid; >> + txq->stats.tx_pkts += nb_valid; >> txq->stats.tx_bytes += tx_bytes; >> >> return nb_pkts; >> -- >> 2.17.1 >> >> > >-- >David Marchand From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by dpdk.space (Postfix) with ESMTP id 159B1A00E6 for ; Mon, 15 Apr 2019 16:48:09 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 352841B20B; Mon, 15 Apr 2019 16:48:07 +0200 (CEST) Received: from mga02.intel.com (mga02.intel.com [134.134.136.20]) by dpdk.org (Postfix) with ESMTP id A07501B209 for ; Mon, 15 Apr 2019 16:48:04 +0200 (CEST) X-Amp-Result: UNSCANNABLE X-Amp-File-Uploaded: False Received: from fmsmga002.fm.intel.com ([10.253.24.26]) by orsmga101.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 15 Apr 2019 07:48:03 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.60,354,1549958400"; d="scan'208";a="161719044" Received: from yexl-server.sh.intel.com (HELO localhost) ([10.67.110.206]) by fmsmga002.fm.intel.com with ESMTP; 15 Apr 2019 07:48:02 -0700 Date: Mon, 15 Apr 2019 22:42:24 +0800 From: Ye Xiaolong To: David Marchand Cc: dev , Ferruh Yigit , Qi Zhang , Karlsson Magnus , Topel Bjorn Message-ID: <20190415144224.GA69356@intel.com> References: <20190412144844.50712-1-xiaolong.ye@intel.com> <20190412144844.50712-2-xiaolong.ye@intel.com> MIME-Version: 1.0 Content-Type: text/plain; charset="UTF-8" Content-Disposition: inline In-Reply-To: User-Agent: Mutt/1.9.4 (2018-02-28) Subject: Re: [dpdk-dev] [PATCH 2/2] net/af_xdp: make reserve/submit peek/release consistent X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Message-ID: <20190415144224.9xhKKTzbC4lLeJKB2Rjf2iGhHmGe00l4jEI2ZZJWcE8@z> Hi, David Thanks for you detailed review comment. On 04/15, David Marchand wrote: >On Fri, Apr 12, 2019 at 4:54 PM Xiaolong Ye wrote: > >> As David pointed out, if we reserve N slots, but only submit n slots, >> we would end up with an incorrect opinion of the number of available slots >> later, we also would get wrong idx when we call xsk_ring_prod__reserve next >> time. It also applies to xsk_ring_cons__peek()/xsk_ring_cons__release(). >> >> This patch ensures that both reserve/submit and peek/release are >> consistent. >> >> Fixes: f1debd77efaf ("net/af_xdp: introduce AF_XDP PMD") >> >> Reported-by: David Marchand >> Signed-off-by: Xiaolong Ye >> --- >> drivers/net/af_xdp/rte_eth_af_xdp.c | 80 +++++++++++++++-------------- >> 1 file changed, 41 insertions(+), 39 deletions(-) >> >> diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c >> b/drivers/net/af_xdp/rte_eth_af_xdp.c >> index 5cc643ce2..76a6a8331 100644 >> --- a/drivers/net/af_xdp/rte_eth_af_xdp.c >> +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c >> @@ -138,22 +138,19 @@ reserve_fill_queue(struct xsk_umem_info *umem, int >> reserve_size) >> { >> struct xsk_ring_prod *fq = &umem->fq; >> uint32_t idx; >> - int i, ret; >> - >> - ret = xsk_ring_prod__reserve(fq, reserve_size, &idx); >> - if (unlikely(!ret)) { >> - AF_XDP_LOG(ERR, "Failed to reserve enough fq descs.\n"); >> - return ret; >> - } >> + int i; >> >> for (i = 0; i < reserve_size; i++) { >> __u64 *fq_addr; >> void *addr = NULL; >> if (rte_ring_dequeue(umem->buf_ring, &addr)) { >> - i--; >> break; >> } >> - fq_addr = xsk_ring_prod__fill_addr(fq, idx++); >> + if (unlikely(!xsk_ring_prod__reserve(fq, 1, &idx))) { >> + AF_XDP_LOG(WARNING, "Failed to reserve 1 fq >> desc.\n"); >> + break; >> + } >> + fq_addr = xsk_ring_prod__fill_addr(fq, idx); >> *fq_addr = (uint64_t)addr; >> } >> >> >I just spotted that reserve_fill_queue always returns 0. >I understand that xsk_configure expects an errors when not succeeding in >populating this ring. >And for this, it expects a non zero value for this. You are right, reserve_fill_queue does need retrun a non zero value when it fails to populate the ring. > >How about something like (neither tested nor compiled): > >static inline int >reserve_fill_queue(struct xsk_umem_info *umem, int reserve_size) >{ > struct xsk_ring_prod *fq = &umem->fq; > void *addrs[reserve_size]; > uint32_t idx; > int i, ret; > > if (rte_ring_dequeue_bulk(umem->buf_ring, &addrs, reserve_size, NULL) > != reserve_size) { > AF_XDP_LOG(DEBUG, "Failed to get enough buffers for fq.\n"); > return -1; > } > > ret = xsk_ring_prod__reserve(fq, reserve_size, &idx); > if (unlikely(!ret)) { > AF_XDP_LOG(DEBUG, "Failed to reserve enough fq descs.\n"); > rte_ring_enqueue_bulk(umem->buf_ring, &addrs, reserve_size, > NULL); > return -1; > } > > for (i = 0; i < reserve_size; i++) { > __u64 *fq_addr; > > fq_addr = xsk_ring_prod__fill_addr(fq, idx++); > *fq_addr = (uint64_t)addrs[i]; > } > > xsk_ring_prod__submit(fq, reserve_size); > > return 0; >} Sounds better, I'll adopt it in my new version. > > > >@@ -179,6 +176,9 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, >> uint16_t nb_pkts) >> >> nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE); >> >> + if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts) >> != 0)) >> + return 0; >> + >> rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx); >> if (rcvd == 0) >> return 0; >> > >When xsk_ring_cons__peek() returns 0, we will leak nb_pkts freshly >allocated mbufs. >See below for a suggestion. > > >@@ -186,9 +186,6 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, >> uint16_t nb_pkts) >> if (xsk_prod_nb_free(fq, free_thresh) >= free_thresh) >> (void)reserve_fill_queue(umem, ETH_AF_XDP_RX_BATCH_SIZE); >> >> - if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, rcvd) != >> 0)) >> - return 0; >> - >> for (i = 0; i < rcvd; i++) { >> const struct xdp_desc *desc; >> uint64_t addr; >> @@ -211,6 +208,10 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, >> uint16_t nb_pkts) >> >> xsk_ring_cons__release(rx, rcvd); >> >> + /* free the extra mbufs */ >> + for (; rcvd < nb_pkts; rcvd++) >> + rte_pktmbuf_free(mbufs[rcvd]); >> + >> > >You can move this block after the statistic update... > > > /* statistics */ >> rxq->stats.rx_pkts += (rcvd - dropped); >> rxq->stats.rx_bytes += rx_bytes; >> > >... then define a out: label. >And those mbufs are still clean and coming from a single mempool, we can >put them back as a single bulk. >Something like (again, untested): > >out: > if (count != nb_pkts) { > rte_mempool_put_bulk(rxq->mb_pool, &mbufs[count], > nb_pkts - count); > } > > return count; >} > >And you would jump to this label when xsk_ring_cons__peek() == 0. >What do you think ? I think these are all sensible suggestions, will do. > > > >@@ -261,55 +262,56 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, >> uint16_t nb_pkts) >> struct xsk_umem_info *umem = txq->pair->umem; >> struct rte_mbuf *mbuf; >> void *addrs[ETH_AF_XDP_TX_BATCH_SIZE]; >> + struct rte_mbuf *valid_bufs[ETH_AF_XDP_TX_BATCH_SIZE]; >> unsigned long tx_bytes = 0; >> - int i, valid = 0; >> + int i; >> + uint16_t nb_valid = 0; >> uint32_t idx_tx; >> + uint32_t buf_len = ETH_AF_XDP_FRAME_SIZE - >> ETH_AF_XDP_DATA_HEADROOM; >> >> nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE); >> >> pull_umem_cq(umem, nb_pkts); >> >> - nb_pkts = rte_ring_dequeue_bulk(umem->buf_ring, addrs, >> - nb_pkts, NULL); >> - if (nb_pkts == 0) >> + for (i = 0; i < nb_pkts; i++) { >> + if (bufs[i]->pkt_len <= buf_len) >> + valid_bufs[nb_valid++] = bufs[i]; >> + else >> + rte_pktmbuf_free(bufs[i]); >> + } >> + >> + nb_valid = rte_ring_dequeue_bulk(umem->buf_ring, addrs, >> + nb_valid, NULL); >> + if (nb_valid == 0) >> return 0; >> >> >You can't return 0. >You have stolen buffers from the caller with the previous check on pktlen. >When the application resends this bulk or frees the whole bulk, we will >have mbuf reuse bugs. > >Thinking about this, why would this happen ? >This limitation should be handled by properly reporting the mtu. >The application would then know it can't send those too big mbufs. > I think we can rely on mtu to handle the limitation, and assume that in this tx function, all pktlen are valid. Will change in next version. Thanks, Xiaolong > >If I missed something else and/or if you still don't trust the application, >I think the tx burst function should go like as described below. > >The first thing to do is check the mbufs length. >At the first invalid length, you break from the loop at index i (i is the >invalid buffer index). >Then dequeue i - 1 buffers from buf_ring. >Reserve i - 1 slots in tx. > >And return i buffers have been sent (plus a tx error stat += 1). > >You need to carefully take into account each step and free the buffer to >buf_ring when relevant and free the mbufs properly. > > >- if (xsk_ring_prod__reserve(&txq->tx, nb_pkts, &idx_tx) != nb_pkts) { >> + if (xsk_ring_prod__reserve(&txq->tx, nb_valid, &idx_tx) != >> nb_valid) { >> kick_tx(txq); >> - rte_ring_enqueue_bulk(umem->buf_ring, addrs, nb_pkts, >> NULL); >> + rte_ring_enqueue_bulk(umem->buf_ring, addrs, nb_valid, >> NULL); >> return 0; >> } >> >> - for (i = 0; i < nb_pkts; i++) { >> + for (i = 0; i < nb_valid; i++) { >> struct xdp_desc *desc; >> void *pkt; >> - uint32_t buf_len = ETH_AF_XDP_FRAME_SIZE >> - - ETH_AF_XDP_DATA_HEADROOM; >> desc = xsk_ring_prod__tx_desc(&txq->tx, idx_tx + i); >> - mbuf = bufs[i]; >> - if (mbuf->pkt_len <= buf_len) { >> - desc->addr = (uint64_t)addrs[valid]; >> - desc->len = mbuf->pkt_len; >> - pkt = xsk_umem__get_data(umem->mz->addr, >> - desc->addr); >> - rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *), >> - desc->len); >> - valid++; >> - tx_bytes += mbuf->pkt_len; >> - } >> + mbuf = valid_bufs[i]; >> + desc->addr = (uint64_t)addrs[i]; >> + desc->len = mbuf->pkt_len; >> + pkt = xsk_umem__get_data(umem->mz->addr, >> + desc->addr); >> + rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *), >> + desc->len); >> + tx_bytes += mbuf->pkt_len; >> rte_pktmbuf_free(mbuf); >> } >> >> - xsk_ring_prod__submit(&txq->tx, nb_pkts); >> + xsk_ring_prod__submit(&txq->tx, nb_valid); >> >> kick_tx(txq); >> >> - if (valid < nb_pkts) >> - rte_ring_enqueue_bulk(umem->buf_ring, &addrs[valid], >> - nb_pkts - valid, NULL); >> - >> - txq->stats.err_pkts += nb_pkts - valid; >> - txq->stats.tx_pkts += valid; >> + txq->stats.err_pkts += nb_pkts - nb_valid; >> + txq->stats.tx_pkts += nb_valid; >> txq->stats.tx_bytes += tx_bytes; >> >> return nb_pkts; >> -- >> 2.17.1 >> >> > >-- >David Marchand