From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga07.intel.com (mga07.intel.com [134.134.136.100]) by dpdk.org (Postfix) with ESMTP id C62AE4C8D for ; Thu, 16 Aug 2018 16:42:51 +0200 (CEST) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga008.jf.intel.com ([10.7.209.65]) by orsmga105.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 16 Aug 2018 07:42:49 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.53,247,1531810800"; d="scan'208";a="65704101" Received: from dpdk51.sh.intel.com ([10.67.110.190]) by orsmga008.jf.intel.com with ESMTP; 16 Aug 2018 07:42:42 -0700 From: Qi Zhang To: dev@dpdk.org Cc: magnus.karlsson@intel.com, bjorn.topel@intel.com, jingjing.wu@intel.com, xiaoyun.li@intel.com, ferruh.yigit@intel.com, Qi Zhang Date: Thu, 16 Aug 2018 22:43:19 +0800 Message-Id: <20180816144321.17719-5-qi.z.zhang@intel.com> X-Mailer: git-send-email 2.13.6 In-Reply-To: <20180816144321.17719-1-qi.z.zhang@intel.com> References: <20180816144321.17719-1-qi.z.zhang@intel.com> Subject: [dpdk-dev] [RFC v3 4/6] net/af_xdp: use mbuf mempool for buffer management X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 16 Aug 2018 14:42:52 -0000 Now, af_xdp registered memory buffer is managed by rte_mempool. mbuf be allocated from rte_mempool can be convert to xdp_desc's address and vice versa. Signed-off-by: Qi Zhang --- drivers/net/af_xdp/rte_eth_af_xdp.c | 184 +++++++++++++++++++++--------------- 1 file changed, 108 insertions(+), 76 deletions(-) diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c b/drivers/net/af_xdp/rte_eth_af_xdp.c index 12252014d..69bc38536 100644 --- a/drivers/net/af_xdp/rte_eth_af_xdp.c +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c @@ -42,7 +42,11 @@ #define ETH_AF_XDP_FRAME_SIZE 2048 #define ETH_AF_XDP_NUM_BUFFERS 4096 -#define ETH_AF_XDP_DATA_HEADROOM 0 +/* mempool hdrobj size (64 bytes) + sizeof(struct rte_mbuf) (128 bytes) */ +#define ETH_AF_XDP_MBUF_OVERHEAD 192 +/* data start from offset 320 (192 + 128) bytes */ +#define ETH_AF_XDP_DATA_HEADROOM \ + (ETH_AF_XDP_MBUF_OVERHEAD + RTE_PKTMBUF_HEADROOM) #define ETH_AF_XDP_DFLT_NUM_DESCS 1024 #define ETH_AF_XDP_FQ_NUM_DESCS 1024 #define ETH_AF_XDP_CQ_NUM_DESCS 1024 @@ -68,7 +72,7 @@ struct xdp_umem { char *frames; struct xdp_umem_uqueue fq; struct xdp_umem_uqueue cq; - struct rte_ring *buf_ring; /* be used to manage the buffer */ + struct rte_mempool *mb_pool; /* be used to manage the buffer */ int fd; }; @@ -304,11 +308,25 @@ static char *get_pkt_data(struct xdp_umem *umem, uint64_t addr) return &umem->frames[addr]; } +static inline struct rte_mbuf * +addr_to_mbuf(struct xdp_umem *umem, uint64_t addr) +{ + return (struct rte_mbuf *)((uint64_t)umem->frames + addr - 0x100); +} + +static inline uint64_t +mbuf_to_addr(struct xdp_umem *umem, struct rte_mbuf *mbuf) +{ + return (uint64_t)mbuf->buf_addr + mbuf->data_off - + (uint64_t)umem->frames; +} + static uint16_t eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) { struct xdp_desc descs[ETH_AF_XDP_RX_BATCH_SIZE]; - void *addrs[ETH_AF_XDP_RX_BATCH_SIZE]; + struct rte_mbuf *bufs_to_fill[ETH_AF_XDP_RX_BATCH_SIZE]; + uint64_t addrs[ETH_AF_XDP_RX_BATCH_SIZE]; struct pkt_rx_queue *rxq = queue; struct xdp_uqueue *uq = &rxq->rx; struct xdp_umem_uqueue *fq = &rxq->umem->fq; @@ -317,25 +335,25 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) unsigned long dropped = 0; unsigned long rx_bytes = 0; uint16_t count = 0; - int rcvd, i; + int rcvd, i, ret; nb_pkts = nb_pkts < ETH_AF_XDP_RX_BATCH_SIZE ? nb_pkts : ETH_AF_XDP_RX_BATCH_SIZE; if (umem_nb_free(fq, free_thresh) >= free_thresh) { - int n = rte_ring_dequeue_bulk(rxq->umem->buf_ring, - addrs, - ETH_AF_XDP_RX_BATCH_SIZE, - NULL); - if (n == 0) + ret = rte_pktmbuf_alloc_bulk(rxq->umem->mb_pool, + bufs_to_fill, + ETH_AF_XDP_RX_BATCH_SIZE); + if (ret) return -ENOMEM; - if (umem_fill_to_kernel(fq, (uint64_t *)&addrs[0], - ETH_AF_XDP_RX_BATCH_SIZE)) { - rte_ring_enqueue_bulk(rxq->umem->buf_ring, - addrs, - ETH_AF_XDP_RX_BATCH_SIZE, - NULL); + for (i = 0; i < ETH_AF_XDP_RX_BATCH_SIZE; i++) + addrs[i] = mbuf_to_addr(rxq->umem, bufs_to_fill[i]); + + if (umem_fill_to_kernel(fq, addrs, + ETH_AF_XDP_RX_BATCH_SIZE)) { + for (i = 0; i < ETH_AF_XDP_RX_BATCH_SIZE; i++) + rte_pktmbuf_free(bufs_to_fill[i]); } } @@ -361,11 +379,9 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) } else { dropped++; } - addrs[i] = (void *)addr; + rte_pktmbuf_free(addr_to_mbuf(rxq->umem, addr)); } - rte_ring_enqueue_bulk(rxq->umem->buf_ring, addrs, rcvd, NULL); - rxq->rx_pkts += (rcvd - dropped); rxq->rx_bytes += rx_bytes; rxq->rx_dropped += dropped; @@ -375,11 +391,10 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) static void kick_tx(struct pkt_tx_queue *txq) { - void *addrs[ETH_AF_XDP_TX_BATCH_SIZE]; - struct rte_ring *buf_ring = txq->pair->umem->buf_ring; struct xdp_umem_uqueue *cq = &txq->pair->umem->cq; + uint64_t addrs[ETH_AF_XDP_TX_BATCH_SIZE]; int fd = txq->pair->xsk_fd; - int ret, n; + int ret, n, i; while (1) { @@ -398,9 +413,10 @@ static void kick_tx(struct pkt_tx_queue *txq) n = umem_complete_from_kernel(cq, (uint64_t *)&addrs[0], ETH_AF_XDP_TX_BATCH_SIZE); - if (n > 0) - rte_ring_enqueue_bulk(buf_ring, - addrs, n, NULL); + for (i = 0; i < n; i++) + rte_pktmbuf_free( + addr_to_mbuf(txq->pair->umem, + addrs[i])); } } } @@ -413,23 +429,21 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) struct xdp_umem_uqueue *cq = &txq->pair->umem->cq; struct rte_mbuf *mbuf; struct xdp_desc descs[ETH_AF_XDP_TX_BATCH_SIZE]; - void *addrs[ETH_AF_XDP_TX_BATCH_SIZE]; - uint16_t i, valid; + uint64_t addrs[ETH_AF_XDP_TX_BATCH_SIZE]; + struct rte_mbuf *bufs_to_fill[ETH_AF_XDP_TX_BATCH_SIZE]; unsigned long tx_bytes = 0; + int i, valid, n; nb_pkts = nb_pkts < ETH_AF_XDP_TX_BATCH_SIZE ? nb_pkts : ETH_AF_XDP_TX_BATCH_SIZE; - int n = umem_complete_from_kernel(cq, (uint64_t *)&addrs[0], - ETH_AF_XDP_TX_BATCH_SIZE); - if (n > 0) - rte_ring_enqueue_bulk(txq->pair->umem->buf_ring, - addrs, n, NULL); - - nb_pkts = rte_ring_dequeue_bulk(txq->pair->umem->buf_ring, addrs, - nb_pkts, NULL); - if (!nb_pkts) - return 0; + n = umem_complete_from_kernel(cq, addrs, + ETH_AF_XDP_TX_BATCH_SIZE); + if (n > 0) { + for (i = 0; i < n; i++) + rte_pktmbuf_free(addr_to_mbuf(txq->pair->umem, + addrs[i])); + } valid = 0; for (i = 0; i < nb_pkts; i++) { @@ -438,7 +452,13 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) ETH_AF_XDP_FRAME_SIZE - ETH_AF_XDP_DATA_HEADROOM; mbuf = bufs[i]; if (mbuf->pkt_len <= buf_len) { - descs[valid].addr = (uint64_t)addrs[valid]; + bufs_to_fill[valid] = + rte_pktmbuf_alloc(txq->pair->umem->mb_pool); + if (!bufs_to_fill[valid]) + break; + descs[valid].addr = + mbuf_to_addr(txq->pair->umem, + bufs_to_fill[valid]); descs[valid].len = mbuf->pkt_len; descs[valid].options = 0; pkt = get_pkt_data(txq->pair->umem, descs[valid].addr); @@ -447,20 +467,20 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) valid++; tx_bytes += mbuf->pkt_len; } - rte_pktmbuf_free(mbuf); } if (xq_enq(uq, descs, valid)) { + for (i = 0; i < valid; i++) + rte_pktmbuf_free(bufs_to_fill[i]); + nb_pkts = 0; valid = 0; tx_bytes = 0; } else { kick_tx(txq); + for (i = 0; i < nb_pkts; i++) + rte_pktmbuf_free(bufs[i]); } - if (valid < nb_pkts) - rte_ring_enqueue_bulk(txq->pair->umem->buf_ring, &addrs[valid], - nb_pkts - valid, NULL); - txq->err_pkts += (nb_pkts - valid); txq->tx_pkts += valid; txq->tx_bytes += tx_bytes; @@ -472,13 +492,15 @@ static void fill_rx_desc(struct xdp_umem *umem) { struct xdp_umem_uqueue *fq = &umem->fq; - void *p = NULL; + struct rte_mbuf *mbuf; + uint64_t addr; uint32_t i; for (i = 0; i < fq->size / 2; i++) { - rte_ring_dequeue(umem->buf_ring, &p); - if (umem_fill_to_kernel(fq, (uint64_t *)&p, 1)) { - rte_ring_enqueue(umem->buf_ring, p); + mbuf = rte_pktmbuf_alloc(umem->mb_pool); + addr = mbuf_to_addr(umem, mbuf); + if (umem_fill_to_kernel(fq, &addr, 1)) { + rte_pktmbuf_free(mbuf); break; } } @@ -597,14 +619,28 @@ eth_link_update(struct rte_eth_dev *dev __rte_unused, static void xdp_umem_destroy(struct xdp_umem *umem) { - if (umem->frames) - free(umem->frames); - if (umem->buf_ring) - rte_ring_free(umem->buf_ring); + if (umem->mb_pool) + rte_mempool_free(umem->mb_pool); free(umem); } +static inline uint64_t get_base_addr(struct rte_mempool *mp) +{ + struct rte_mempool_memhdr *memhdr; + + memhdr = STAILQ_FIRST(&mp->mem_list); + return (uint64_t)(memhdr->addr); +} + +static inline uint64_t get_len(struct rte_mempool *mp) +{ + struct rte_mempool_memhdr *memhdr; + + memhdr = STAILQ_FIRST(&mp->mem_list); + return (uint64_t)(memhdr->len); +} + static struct xdp_umem *xdp_umem_configure(int sfd) { int fq_size = ETH_AF_XDP_FQ_NUM_DESCS; @@ -612,40 +648,29 @@ static struct xdp_umem *xdp_umem_configure(int sfd) struct xdp_mmap_offsets off; struct xdp_umem_reg mr; struct xdp_umem *umem; - char ring_name[0x100]; + char pool_name[0x100]; socklen_t optlen; - void *bufs = NULL; - uint64_t i; umem = calloc(1, sizeof(*umem)); if (!umem) return NULL; - snprintf(ring_name, 0x100, "%s_%d", "af_xdp_ring", sfd); - umem->buf_ring = rte_ring_create(ring_name, - ETH_AF_XDP_NUM_BUFFERS, - SOCKET_ID_ANY, - 0x0); - if (!umem->buf_ring) { - RTE_LOG(ERR, PMD, - "Failed to create rte_ring\n"); - goto err; - } + snprintf(pool_name, 0x100, "%s_%d", "af_xdp_ring", sfd); + umem->mb_pool = rte_pktmbuf_pool_create_with_flags( + pool_name, ETH_AF_XDP_NUM_BUFFERS, + 250, 0, + ETH_AF_XDP_FRAME_SIZE - ETH_AF_XDP_MBUF_OVERHEAD, + MEMPOOL_F_NO_SPREAD | MEMPOOL_F_PAGE_ALIGN, + SOCKET_ID_ANY); - for (i = 0; i < ETH_AF_XDP_NUM_BUFFERS; i++) - rte_ring_enqueue(umem->buf_ring, - (void *)(i * ETH_AF_XDP_FRAME_SIZE + - ETH_AF_XDP_DATA_HEADROOM)); - - if (posix_memalign(&bufs, getpagesize(), /* PAGE_SIZE aligned */ - ETH_AF_XDP_NUM_BUFFERS * ETH_AF_XDP_FRAME_SIZE)) { + if (!umem->mb_pool || umem->mb_pool->nb_mem_chunks != 1) { RTE_LOG(ERR, PMD, - "Failed to allocate memory pool.\n"); + "Failed to create rte_mempool\n"); goto err; } - mr.addr = (uint64_t)bufs; - mr.len = ETH_AF_XDP_NUM_BUFFERS * ETH_AF_XDP_FRAME_SIZE; + mr.addr = get_base_addr(umem->mb_pool); + mr.len = get_len(umem->mb_pool); mr.chunk_size = ETH_AF_XDP_FRAME_SIZE; mr.headroom = ETH_AF_XDP_DATA_HEADROOM; @@ -717,7 +742,7 @@ static struct xdp_umem *xdp_umem_configure(int sfd) (uint32_t *)((uint64_t)umem->cq.map + off.cr.consumer); umem->cq.ring = (uint64_t *)((uint64_t)umem->cq.map + off.cr.desc); - umem->frames = bufs; + umem->frames = (void *)get_base_addr(umem->mb_pool); umem->fd = sfd; return umem; @@ -729,7 +754,8 @@ static struct xdp_umem *xdp_umem_configure(int sfd) } static int -xsk_configure(struct pkt_rx_queue *rxq, int ring_size, struct xdp_umem *umem) +xsk_configure(struct pkt_rx_queue *rxq, int ring_size, + struct xdp_umem *umem) { struct pkt_tx_queue *txq = rxq->pair; struct xdp_mmap_offsets off; @@ -863,6 +889,12 @@ eth_rx_queue_setup(struct rte_eth_dev *dev, int xsk_key; int map_fd; + if (mb_pool == NULL) { + RTE_LOG(ERR, PMD, + "Invalid mb_pool\n"); + return -EINVAL; + } + if (dev->data->nb_rx_queues <= rx_queue_id) { RTE_LOG(ERR, PMD, "Invalid rx queue id: %d\n", rx_queue_id); @@ -1222,7 +1254,7 @@ rte_pmd_af_xdp_remove(struct rte_vdev_device *dev) for (i = 0; i < internals->xsk_map_key_count; i++) queue_reset(internals, i); - rte_ring_free(internals->umem_share->buf_ring); + rte_mempool_free(internals->umem_share->mb_pool); rte_free(internals->umem_share->frames); rte_free(internals->umem_share); rte_free(internals); -- 2.13.6