From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by dpdk.space (Postfix) with ESMTP id 208C3A00E6 for ; Mon, 15 Apr 2019 10:19:48 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id E755C1B0FC; Mon, 15 Apr 2019 10:19:46 +0200 (CEST) Received: from mail-vk1-f194.google.com (mail-vk1-f194.google.com [209.85.221.194]) by dpdk.org (Postfix) with ESMTP id 4E2727EC7 for ; Mon, 15 Apr 2019 10:19:45 +0200 (CEST) Received: by mail-vk1-f194.google.com with SMTP id s63so3435025vkg.10 for ; Mon, 15 Apr 2019 01:19:45 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=k72cJWtl/5jQKKugyUA87d16tTvrETvqi393rsi66Gk=; b=rwFD/9g9/1m8qMmT2iDkKHd/4B/LFEPAX3b8ZUc3tEYF9fY3TynG/6W20h6VzMJVpO JcZXddH+6+X3zNbyjBH+N1bnZnwMkYFcpaBFjDiFPGx9QKP6XXKRzlWqbjupBDi/maq6 CV0f1fOT6tMiUHeOzBhgmkuGHFiUq0FCu/sxDPFtJi82n8otZmTTUuDlUWhtN8K4dz3q 88lx+4H2H3/LplVH+aCESL+T3oili0aGYpA3/0/MZ9odcELelnHeUjSW6hZD98lrUkFi 6Oksl6AbW0c+15J3e+CG+mgt00ZXqanvDrdHXvjQ6vlSohIZV34WrjtfjTOEjEMOP8FJ kI1w== X-Gm-Message-State: APjAAAXrWdmn1tfdcij7p7wJq0F262FPXf5WBhEQzY7yqdsnNuhTY4MI aDSrOan3KPb7k7CcCyj7dbA+G5cCdevYcDb+dKZBMg== X-Google-Smtp-Source: APXvYqw2rg4nvZkONbGFTKulLGItb1n/6OmDcp3g5eN01RZ4TOmF5a1igL+nHx/cYWAmX4aELimcHTmAxEZCvbttoeQ= X-Received: by 2002:a1f:b297:: with SMTP id b145mr38381135vkf.74.1555316384602; Mon, 15 Apr 2019 01:19:44 -0700 (PDT) MIME-Version: 1.0 References: <20190412144844.50712-1-xiaolong.ye@intel.com> <20190412144844.50712-2-xiaolong.ye@intel.com> In-Reply-To: <20190412144844.50712-2-xiaolong.ye@intel.com> From: David Marchand Date: Mon, 15 Apr 2019 10:19:33 +0200 Message-ID: To: Xiaolong Ye Cc: dev , Ferruh Yigit , Qi Zhang , Karlsson Magnus , Topel Bjorn Content-Type: text/plain; charset="UTF-8" X-Content-Filtered-By: Mailman/MimeDel 2.1.15 Subject: Re: [dpdk-dev] [PATCH 2/2] net/af_xdp: make reserve/submit peek/release consistent X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Message-ID: <20190415081933.UXW0i1V3ZUJMcCbbUn-Tr32kJQga-3fznA-gZRDNuFw@z> On Fri, Apr 12, 2019 at 4:54 PM Xiaolong Ye wrote: > As David pointed out, if we reserve N slots, but only submit n slots, > we would end up with an incorrect opinion of the number of available slots > later, we also would get wrong idx when we call xsk_ring_prod__reserve next > time. It also applies to xsk_ring_cons__peek()/xsk_ring_cons__release(). > > This patch ensures that both reserve/submit and peek/release are > consistent. > > Fixes: f1debd77efaf ("net/af_xdp: introduce AF_XDP PMD") > > Reported-by: David Marchand > Signed-off-by: Xiaolong Ye > --- > drivers/net/af_xdp/rte_eth_af_xdp.c | 80 +++++++++++++++-------------- > 1 file changed, 41 insertions(+), 39 deletions(-) > > diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c > b/drivers/net/af_xdp/rte_eth_af_xdp.c > index 5cc643ce2..76a6a8331 100644 > --- a/drivers/net/af_xdp/rte_eth_af_xdp.c > +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c > @@ -138,22 +138,19 @@ reserve_fill_queue(struct xsk_umem_info *umem, int > reserve_size) > { > struct xsk_ring_prod *fq = &umem->fq; > uint32_t idx; > - int i, ret; > - > - ret = xsk_ring_prod__reserve(fq, reserve_size, &idx); > - if (unlikely(!ret)) { > - AF_XDP_LOG(ERR, "Failed to reserve enough fq descs.\n"); > - return ret; > - } > + int i; > > for (i = 0; i < reserve_size; i++) { > __u64 *fq_addr; > void *addr = NULL; > if (rte_ring_dequeue(umem->buf_ring, &addr)) { > - i--; > break; > } > - fq_addr = xsk_ring_prod__fill_addr(fq, idx++); > + if (unlikely(!xsk_ring_prod__reserve(fq, 1, &idx))) { > + AF_XDP_LOG(WARNING, "Failed to reserve 1 fq > desc.\n"); > + break; > + } > + fq_addr = xsk_ring_prod__fill_addr(fq, idx); > *fq_addr = (uint64_t)addr; > } > > I just spotted that reserve_fill_queue always returns 0. I understand that xsk_configure expects an errors when not succeeding in populating this ring. And for this, it expects a non zero value for this. How about something like (neither tested nor compiled): static inline int reserve_fill_queue(struct xsk_umem_info *umem, int reserve_size) { struct xsk_ring_prod *fq = &umem->fq; void *addrs[reserve_size]; uint32_t idx; int i, ret; if (rte_ring_dequeue_bulk(umem->buf_ring, &addrs, reserve_size, NULL) != reserve_size) { AF_XDP_LOG(DEBUG, "Failed to get enough buffers for fq.\n"); return -1; } ret = xsk_ring_prod__reserve(fq, reserve_size, &idx); if (unlikely(!ret)) { AF_XDP_LOG(DEBUG, "Failed to reserve enough fq descs.\n"); rte_ring_enqueue_bulk(umem->buf_ring, &addrs, reserve_size, NULL); return -1; } for (i = 0; i < reserve_size; i++) { __u64 *fq_addr; fq_addr = xsk_ring_prod__fill_addr(fq, idx++); *fq_addr = (uint64_t)addrs[i]; } xsk_ring_prod__submit(fq, reserve_size); return 0; } @@ -179,6 +176,9 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, > uint16_t nb_pkts) > > nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE); > > + if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts) > != 0)) > + return 0; > + > rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx); > if (rcvd == 0) > return 0; > When xsk_ring_cons__peek() returns 0, we will leak nb_pkts freshly allocated mbufs. See below for a suggestion. @@ -186,9 +186,6 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, > uint16_t nb_pkts) > if (xsk_prod_nb_free(fq, free_thresh) >= free_thresh) > (void)reserve_fill_queue(umem, ETH_AF_XDP_RX_BATCH_SIZE); > > - if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, rcvd) != > 0)) > - return 0; > - > for (i = 0; i < rcvd; i++) { > const struct xdp_desc *desc; > uint64_t addr; > @@ -211,6 +208,10 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, > uint16_t nb_pkts) > > xsk_ring_cons__release(rx, rcvd); > > + /* free the extra mbufs */ > + for (; rcvd < nb_pkts; rcvd++) > + rte_pktmbuf_free(mbufs[rcvd]); > + > You can move this block after the statistic update... /* statistics */ > rxq->stats.rx_pkts += (rcvd - dropped); > rxq->stats.rx_bytes += rx_bytes; > ... then define a out: label. And those mbufs are still clean and coming from a single mempool, we can put them back as a single bulk. Something like (again, untested): out: if (count != nb_pkts) { rte_mempool_put_bulk(rxq->mb_pool, &mbufs[count], nb_pkts - count); } return count; } And you would jump to this label when xsk_ring_cons__peek() == 0. What do you think ? @@ -261,55 +262,56 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, > uint16_t nb_pkts) > struct xsk_umem_info *umem = txq->pair->umem; > struct rte_mbuf *mbuf; > void *addrs[ETH_AF_XDP_TX_BATCH_SIZE]; > + struct rte_mbuf *valid_bufs[ETH_AF_XDP_TX_BATCH_SIZE]; > unsigned long tx_bytes = 0; > - int i, valid = 0; > + int i; > + uint16_t nb_valid = 0; > uint32_t idx_tx; > + uint32_t buf_len = ETH_AF_XDP_FRAME_SIZE - > ETH_AF_XDP_DATA_HEADROOM; > > nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE); > > pull_umem_cq(umem, nb_pkts); > > - nb_pkts = rte_ring_dequeue_bulk(umem->buf_ring, addrs, > - nb_pkts, NULL); > - if (nb_pkts == 0) > + for (i = 0; i < nb_pkts; i++) { > + if (bufs[i]->pkt_len <= buf_len) > + valid_bufs[nb_valid++] = bufs[i]; > + else > + rte_pktmbuf_free(bufs[i]); > + } > + > + nb_valid = rte_ring_dequeue_bulk(umem->buf_ring, addrs, > + nb_valid, NULL); > + if (nb_valid == 0) > return 0; > > You can't return 0. You have stolen buffers from the caller with the previous check on pktlen. When the application resends this bulk or frees the whole bulk, we will have mbuf reuse bugs. Thinking about this, why would this happen ? This limitation should be handled by properly reporting the mtu. The application would then know it can't send those too big mbufs. If I missed something else and/or if you still don't trust the application, I think the tx burst function should go like as described below. The first thing to do is check the mbufs length. At the first invalid length, you break from the loop at index i (i is the invalid buffer index). Then dequeue i - 1 buffers from buf_ring. Reserve i - 1 slots in tx. And return i buffers have been sent (plus a tx error stat += 1). You need to carefully take into account each step and free the buffer to buf_ring when relevant and free the mbufs properly. - if (xsk_ring_prod__reserve(&txq->tx, nb_pkts, &idx_tx) != nb_pkts) { > + if (xsk_ring_prod__reserve(&txq->tx, nb_valid, &idx_tx) != > nb_valid) { > kick_tx(txq); > - rte_ring_enqueue_bulk(umem->buf_ring, addrs, nb_pkts, > NULL); > + rte_ring_enqueue_bulk(umem->buf_ring, addrs, nb_valid, > NULL); > return 0; > } > > - for (i = 0; i < nb_pkts; i++) { > + for (i = 0; i < nb_valid; i++) { > struct xdp_desc *desc; > void *pkt; > - uint32_t buf_len = ETH_AF_XDP_FRAME_SIZE > - - ETH_AF_XDP_DATA_HEADROOM; > desc = xsk_ring_prod__tx_desc(&txq->tx, idx_tx + i); > - mbuf = bufs[i]; > - if (mbuf->pkt_len <= buf_len) { > - desc->addr = (uint64_t)addrs[valid]; > - desc->len = mbuf->pkt_len; > - pkt = xsk_umem__get_data(umem->mz->addr, > - desc->addr); > - rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *), > - desc->len); > - valid++; > - tx_bytes += mbuf->pkt_len; > - } > + mbuf = valid_bufs[i]; > + desc->addr = (uint64_t)addrs[i]; > + desc->len = mbuf->pkt_len; > + pkt = xsk_umem__get_data(umem->mz->addr, > + desc->addr); > + rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *), > + desc->len); > + tx_bytes += mbuf->pkt_len; > rte_pktmbuf_free(mbuf); > } > > - xsk_ring_prod__submit(&txq->tx, nb_pkts); > + xsk_ring_prod__submit(&txq->tx, nb_valid); > > kick_tx(txq); > > - if (valid < nb_pkts) > - rte_ring_enqueue_bulk(umem->buf_ring, &addrs[valid], > - nb_pkts - valid, NULL); > - > - txq->stats.err_pkts += nb_pkts - valid; > - txq->stats.tx_pkts += valid; > + txq->stats.err_pkts += nb_pkts - nb_valid; > + txq->stats.tx_pkts += nb_valid; > txq->stats.tx_bytes += tx_bytes; > > return nb_pkts; > -- > 2.17.1 > > -- David Marchand