From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga03.intel.com (mga03.intel.com [134.134.136.65]) by dpdk.org (Postfix) with ESMTP id 66D911B675 for ; Wed, 17 Apr 2019 15:55:41 +0200 (CEST) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga002.jf.intel.com ([10.7.209.21]) by orsmga103.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 17 Apr 2019 06:55:41 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.60,362,1549958400"; d="scan'208";a="151639772" Received: from yexl-server.sh.intel.com ([10.67.110.206]) by orsmga002.jf.intel.com with ESMTP; 17 Apr 2019 06:55:39 -0700 From: Xiaolong Ye To: dev@dpdk.org, Ferruh Yigit , David Marchand Cc: Qi Zhang , Karlsson Magnus , Topel Bjorn , Xiaolong Ye Date: Wed, 17 Apr 2019 21:49:45 +0800 Message-Id: <20190417134946.1250-4-xiaolong.ye@intel.com> X-Mailer: git-send-email 2.17.1 In-Reply-To: <20190417134946.1250-1-xiaolong.ye@intel.com> References: <20190417134946.1250-1-xiaolong.ye@intel.com> Subject: [dpdk-dev] [PATCH v4 3/4] net/af_xdp: make reserve/submit peek/release consistent X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 17 Apr 2019 13:55:41 -0000 As David pointed out, if we reserve N slots for Tx, but only submit n slots, we would end up with an incorrect opinion of the number of available slots later, we also would get wrong idx when we call xsk_ring_prod__reserve next time. It also applies to xsk_ring_cons__peek()/xsk_ring_cons__release(). This patch ensures that both reserve/submit and peek/release are consistent. Fixes: f1debd77efaf ("net/af_xdp: introduce AF_XDP PMD") Suggested-by: David Marchand Reviewed-by: David Marchand Signed-off-by: Xiaolong Ye --- drivers/net/af_xdp/rte_eth_af_xdp.c | 79 +++++++++++++++-------------- 1 file changed, 40 insertions(+), 39 deletions(-) diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c b/drivers/net/af_xdp/rte_eth_af_xdp.c index c46916bbe..6a0096523 100644 --- a/drivers/net/af_xdp/rte_eth_af_xdp.c +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c @@ -134,30 +134,34 @@ static const struct rte_eth_link pmd_link = { }; static inline int -reserve_fill_queue(struct xsk_umem_info *umem, int reserve_size) +reserve_fill_queue(struct xsk_umem_info *umem, uint16_t reserve_size) { struct xsk_ring_prod *fq = &umem->fq; + void *addrs[reserve_size]; uint32_t idx; - int i, ret; + uint16_t i; + + if (rte_ring_dequeue_bulk(umem->buf_ring, addrs, reserve_size, NULL) + != reserve_size) { + AF_XDP_LOG(DEBUG, "Failed to get enough buffers for fq.\n"); + return -1; + } - ret = xsk_ring_prod__reserve(fq, reserve_size, &idx); - if (unlikely(!ret)) { - AF_XDP_LOG(ERR, "Failed to reserve enough fq descs.\n"); - return ret; + if (unlikely(!xsk_ring_prod__reserve(fq, reserve_size, &idx))) { + AF_XDP_LOG(DEBUG, "Failed to reserve enough fq descs.\n"); + rte_ring_enqueue_bulk(umem->buf_ring, addrs, + reserve_size, NULL); + return -1; } for (i = 0; i < reserve_size; i++) { __u64 *fq_addr; - void *addr = NULL; - if (rte_ring_dequeue(umem->buf_ring, &addr)) { - i--; - break; - } + fq_addr = xsk_ring_prod__fill_addr(fq, idx++); - *fq_addr = (uint64_t)addr; + *fq_addr = (uint64_t)addrs[i]; } - xsk_ring_prod__submit(fq, i); + xsk_ring_prod__submit(fq, reserve_size); return 0; } @@ -174,21 +178,20 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) struct rte_mbuf *mbufs[ETH_AF_XDP_TX_BATCH_SIZE]; unsigned long dropped = 0; unsigned long rx_bytes = 0; - uint16_t count = 0; int rcvd, i; nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE); + if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts) != 0)) + return 0; + rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx); if (rcvd == 0) - return 0; + goto out; if (xsk_prod_nb_free(fq, free_thresh) >= free_thresh) (void)reserve_fill_queue(umem, ETH_AF_XDP_RX_BATCH_SIZE); - if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, rcvd) != 0)) - return 0; - for (i = 0; i < rcvd; i++) { const struct xdp_desc *desc; uint64_t addr; @@ -204,7 +207,7 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) rte_pktmbuf_pkt_len(mbufs[i]) = len; rte_pktmbuf_data_len(mbufs[i]) = len; rx_bytes += len; - bufs[count++] = mbufs[i]; + bufs[i] = mbufs[i]; rte_ring_enqueue(umem->buf_ring, (void *)addr); } @@ -215,7 +218,12 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) rxq->stats.rx_pkts += (rcvd - dropped); rxq->stats.rx_bytes += rx_bytes; - return count; +out: + if (rcvd != nb_pkts) + rte_mempool_put_bulk(rxq->mb_pool, (void **)&mbufs[rcvd], + nb_pkts - rcvd); + + return rcvd; } static void @@ -262,7 +270,7 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) struct rte_mbuf *mbuf; void *addrs[ETH_AF_XDP_TX_BATCH_SIZE]; unsigned long tx_bytes = 0; - int i, valid = 0; + int i; uint32_t idx_tx; nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE); @@ -283,20 +291,18 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) for (i = 0; i < nb_pkts; i++) { struct xdp_desc *desc; void *pkt; - uint32_t buf_len = ETH_AF_XDP_FRAME_SIZE - - ETH_AF_XDP_DATA_HEADROOM; + desc = xsk_ring_prod__tx_desc(&txq->tx, idx_tx + i); mbuf = bufs[i]; - if (mbuf->pkt_len <= buf_len) { - desc->addr = (uint64_t)addrs[valid]; - desc->len = mbuf->pkt_len; - pkt = xsk_umem__get_data(umem->mz->addr, - desc->addr); - rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *), - desc->len); - valid++; - tx_bytes += mbuf->pkt_len; - } + + desc->addr = (uint64_t)addrs[i]; + desc->len = mbuf->pkt_len; + pkt = xsk_umem__get_data(umem->mz->addr, + desc->addr); + rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *), + desc->len); + tx_bytes += mbuf->pkt_len; + rte_pktmbuf_free(mbuf); } @@ -304,12 +310,7 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) kick_tx(txq); - if (valid < nb_pkts) - rte_ring_enqueue_bulk(umem->buf_ring, &addrs[valid], - nb_pkts - valid, NULL); - - txq->stats.err_pkts += nb_pkts - valid; - txq->stats.tx_pkts += valid; + txq->stats.tx_pkts += nb_pkts; txq->stats.tx_bytes += tx_bytes; return nb_pkts; -- 2.17.1 From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by dpdk.space (Postfix) with ESMTP id 68799A00E6 for ; Wed, 17 Apr 2019 15:56:01 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id A1D881B6C3; Wed, 17 Apr 2019 15:55:45 +0200 (CEST) Received: from mga03.intel.com (mga03.intel.com [134.134.136.65]) by dpdk.org (Postfix) with ESMTP id 66D911B675 for ; Wed, 17 Apr 2019 15:55:41 +0200 (CEST) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga002.jf.intel.com ([10.7.209.21]) by orsmga103.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 17 Apr 2019 06:55:41 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.60,362,1549958400"; d="scan'208";a="151639772" Received: from yexl-server.sh.intel.com ([10.67.110.206]) by orsmga002.jf.intel.com with ESMTP; 17 Apr 2019 06:55:39 -0700 From: Xiaolong Ye To: dev@dpdk.org, Ferruh Yigit , David Marchand Cc: Qi Zhang , Karlsson Magnus , Topel Bjorn , Xiaolong Ye Date: Wed, 17 Apr 2019 21:49:45 +0800 Message-Id: <20190417134946.1250-4-xiaolong.ye@intel.com> X-Mailer: git-send-email 2.17.1 In-Reply-To: <20190417134946.1250-1-xiaolong.ye@intel.com> References: <20190417134946.1250-1-xiaolong.ye@intel.com> Subject: [dpdk-dev] [PATCH v4 3/4] net/af_xdp: make reserve/submit peek/release consistent X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Content-Type: text/plain; charset="UTF-8" Message-ID: <20190417134945.uQjndkZnG4zjTcjZxhKu69ByzP8HFLDVCoQNWn51GFU@z> As David pointed out, if we reserve N slots for Tx, but only submit n slots, we would end up with an incorrect opinion of the number of available slots later, we also would get wrong idx when we call xsk_ring_prod__reserve next time. It also applies to xsk_ring_cons__peek()/xsk_ring_cons__release(). This patch ensures that both reserve/submit and peek/release are consistent. Fixes: f1debd77efaf ("net/af_xdp: introduce AF_XDP PMD") Suggested-by: David Marchand Reviewed-by: David Marchand Signed-off-by: Xiaolong Ye --- drivers/net/af_xdp/rte_eth_af_xdp.c | 79 +++++++++++++++-------------- 1 file changed, 40 insertions(+), 39 deletions(-) diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c b/drivers/net/af_xdp/rte_eth_af_xdp.c index c46916bbe..6a0096523 100644 --- a/drivers/net/af_xdp/rte_eth_af_xdp.c +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c @@ -134,30 +134,34 @@ static const struct rte_eth_link pmd_link = { }; static inline int -reserve_fill_queue(struct xsk_umem_info *umem, int reserve_size) +reserve_fill_queue(struct xsk_umem_info *umem, uint16_t reserve_size) { struct xsk_ring_prod *fq = &umem->fq; + void *addrs[reserve_size]; uint32_t idx; - int i, ret; + uint16_t i; + + if (rte_ring_dequeue_bulk(umem->buf_ring, addrs, reserve_size, NULL) + != reserve_size) { + AF_XDP_LOG(DEBUG, "Failed to get enough buffers for fq.\n"); + return -1; + } - ret = xsk_ring_prod__reserve(fq, reserve_size, &idx); - if (unlikely(!ret)) { - AF_XDP_LOG(ERR, "Failed to reserve enough fq descs.\n"); - return ret; + if (unlikely(!xsk_ring_prod__reserve(fq, reserve_size, &idx))) { + AF_XDP_LOG(DEBUG, "Failed to reserve enough fq descs.\n"); + rte_ring_enqueue_bulk(umem->buf_ring, addrs, + reserve_size, NULL); + return -1; } for (i = 0; i < reserve_size; i++) { __u64 *fq_addr; - void *addr = NULL; - if (rte_ring_dequeue(umem->buf_ring, &addr)) { - i--; - break; - } + fq_addr = xsk_ring_prod__fill_addr(fq, idx++); - *fq_addr = (uint64_t)addr; + *fq_addr = (uint64_t)addrs[i]; } - xsk_ring_prod__submit(fq, i); + xsk_ring_prod__submit(fq, reserve_size); return 0; } @@ -174,21 +178,20 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) struct rte_mbuf *mbufs[ETH_AF_XDP_TX_BATCH_SIZE]; unsigned long dropped = 0; unsigned long rx_bytes = 0; - uint16_t count = 0; int rcvd, i; nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE); + if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts) != 0)) + return 0; + rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx); if (rcvd == 0) - return 0; + goto out; if (xsk_prod_nb_free(fq, free_thresh) >= free_thresh) (void)reserve_fill_queue(umem, ETH_AF_XDP_RX_BATCH_SIZE); - if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, rcvd) != 0)) - return 0; - for (i = 0; i < rcvd; i++) { const struct xdp_desc *desc; uint64_t addr; @@ -204,7 +207,7 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) rte_pktmbuf_pkt_len(mbufs[i]) = len; rte_pktmbuf_data_len(mbufs[i]) = len; rx_bytes += len; - bufs[count++] = mbufs[i]; + bufs[i] = mbufs[i]; rte_ring_enqueue(umem->buf_ring, (void *)addr); } @@ -215,7 +218,12 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) rxq->stats.rx_pkts += (rcvd - dropped); rxq->stats.rx_bytes += rx_bytes; - return count; +out: + if (rcvd != nb_pkts) + rte_mempool_put_bulk(rxq->mb_pool, (void **)&mbufs[rcvd], + nb_pkts - rcvd); + + return rcvd; } static void @@ -262,7 +270,7 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) struct rte_mbuf *mbuf; void *addrs[ETH_AF_XDP_TX_BATCH_SIZE]; unsigned long tx_bytes = 0; - int i, valid = 0; + int i; uint32_t idx_tx; nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE); @@ -283,20 +291,18 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) for (i = 0; i < nb_pkts; i++) { struct xdp_desc *desc; void *pkt; - uint32_t buf_len = ETH_AF_XDP_FRAME_SIZE - - ETH_AF_XDP_DATA_HEADROOM; + desc = xsk_ring_prod__tx_desc(&txq->tx, idx_tx + i); mbuf = bufs[i]; - if (mbuf->pkt_len <= buf_len) { - desc->addr = (uint64_t)addrs[valid]; - desc->len = mbuf->pkt_len; - pkt = xsk_umem__get_data(umem->mz->addr, - desc->addr); - rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *), - desc->len); - valid++; - tx_bytes += mbuf->pkt_len; - } + + desc->addr = (uint64_t)addrs[i]; + desc->len = mbuf->pkt_len; + pkt = xsk_umem__get_data(umem->mz->addr, + desc->addr); + rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *), + desc->len); + tx_bytes += mbuf->pkt_len; + rte_pktmbuf_free(mbuf); } @@ -304,12 +310,7 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) kick_tx(txq); - if (valid < nb_pkts) - rte_ring_enqueue_bulk(umem->buf_ring, &addrs[valid], - nb_pkts - valid, NULL); - - txq->stats.err_pkts += nb_pkts - valid; - txq->stats.tx_pkts += valid; + txq->stats.tx_pkts += nb_pkts; txq->stats.tx_bytes += tx_bytes; return nb_pkts; -- 2.17.1