From: Qi Zhang <qi.z.zhang@intel.com>
To: dev@dpdk.org
Cc: magnus.karlsson@intel.com, bjorn.topel@intel.com,
jingjing.wu@intel.com, xiaoyun.li@intel.com,
ferruh.yigit@intel.com, Qi Zhang <qi.z.zhang@intel.com>
Subject: [dpdk-dev] [RFC v3 4/6] net/af_xdp: use mbuf mempool for buffer management
Date: Thu, 16 Aug 2018 22:43:19 +0800 [thread overview]
Message-ID: <20180816144321.17719-5-qi.z.zhang@intel.com> (raw)
In-Reply-To: <20180816144321.17719-1-qi.z.zhang@intel.com>
Now, af_xdp registered memory buffer is managed by rte_mempool.
mbuf be allocated from rte_mempool can be convert to xdp_desc's
address and vice versa.
Signed-off-by: Qi Zhang <qi.z.zhang@intel.com>
---
drivers/net/af_xdp/rte_eth_af_xdp.c | 184 +++++++++++++++++++++---------------
1 file changed, 108 insertions(+), 76 deletions(-)
diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c b/drivers/net/af_xdp/rte_eth_af_xdp.c
index 12252014d..69bc38536 100644
--- a/drivers/net/af_xdp/rte_eth_af_xdp.c
+++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
@@ -42,7 +42,11 @@
#define ETH_AF_XDP_FRAME_SIZE 2048
#define ETH_AF_XDP_NUM_BUFFERS 4096
-#define ETH_AF_XDP_DATA_HEADROOM 0
+/* mempool hdrobj size (64 bytes) + sizeof(struct rte_mbuf) (128 bytes) */
+#define ETH_AF_XDP_MBUF_OVERHEAD 192
+/* data start from offset 320 (192 + 128) bytes */
+#define ETH_AF_XDP_DATA_HEADROOM \
+ (ETH_AF_XDP_MBUF_OVERHEAD + RTE_PKTMBUF_HEADROOM)
#define ETH_AF_XDP_DFLT_NUM_DESCS 1024
#define ETH_AF_XDP_FQ_NUM_DESCS 1024
#define ETH_AF_XDP_CQ_NUM_DESCS 1024
@@ -68,7 +72,7 @@ struct xdp_umem {
char *frames;
struct xdp_umem_uqueue fq;
struct xdp_umem_uqueue cq;
- struct rte_ring *buf_ring; /* be used to manage the buffer */
+ struct rte_mempool *mb_pool; /* be used to manage the buffer */
int fd;
};
@@ -304,11 +308,25 @@ static char *get_pkt_data(struct xdp_umem *umem, uint64_t addr)
return &umem->frames[addr];
}
+static inline struct rte_mbuf *
+addr_to_mbuf(struct xdp_umem *umem, uint64_t addr)
+{
+ return (struct rte_mbuf *)((uint64_t)umem->frames + addr - 0x100);
+}
+
+static inline uint64_t
+mbuf_to_addr(struct xdp_umem *umem, struct rte_mbuf *mbuf)
+{
+ return (uint64_t)mbuf->buf_addr + mbuf->data_off -
+ (uint64_t)umem->frames;
+}
+
static uint16_t
eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
{
struct xdp_desc descs[ETH_AF_XDP_RX_BATCH_SIZE];
- void *addrs[ETH_AF_XDP_RX_BATCH_SIZE];
+ struct rte_mbuf *bufs_to_fill[ETH_AF_XDP_RX_BATCH_SIZE];
+ uint64_t addrs[ETH_AF_XDP_RX_BATCH_SIZE];
struct pkt_rx_queue *rxq = queue;
struct xdp_uqueue *uq = &rxq->rx;
struct xdp_umem_uqueue *fq = &rxq->umem->fq;
@@ -317,25 +335,25 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
unsigned long dropped = 0;
unsigned long rx_bytes = 0;
uint16_t count = 0;
- int rcvd, i;
+ int rcvd, i, ret;
nb_pkts = nb_pkts < ETH_AF_XDP_RX_BATCH_SIZE ?
nb_pkts : ETH_AF_XDP_RX_BATCH_SIZE;
if (umem_nb_free(fq, free_thresh) >= free_thresh) {
- int n = rte_ring_dequeue_bulk(rxq->umem->buf_ring,
- addrs,
- ETH_AF_XDP_RX_BATCH_SIZE,
- NULL);
- if (n == 0)
+ ret = rte_pktmbuf_alloc_bulk(rxq->umem->mb_pool,
+ bufs_to_fill,
+ ETH_AF_XDP_RX_BATCH_SIZE);
+ if (ret)
return -ENOMEM;
- if (umem_fill_to_kernel(fq, (uint64_t *)&addrs[0],
- ETH_AF_XDP_RX_BATCH_SIZE)) {
- rte_ring_enqueue_bulk(rxq->umem->buf_ring,
- addrs,
- ETH_AF_XDP_RX_BATCH_SIZE,
- NULL);
+ for (i = 0; i < ETH_AF_XDP_RX_BATCH_SIZE; i++)
+ addrs[i] = mbuf_to_addr(rxq->umem, bufs_to_fill[i]);
+
+ if (umem_fill_to_kernel(fq, addrs,
+ ETH_AF_XDP_RX_BATCH_SIZE)) {
+ for (i = 0; i < ETH_AF_XDP_RX_BATCH_SIZE; i++)
+ rte_pktmbuf_free(bufs_to_fill[i]);
}
}
@@ -361,11 +379,9 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
} else {
dropped++;
}
- addrs[i] = (void *)addr;
+ rte_pktmbuf_free(addr_to_mbuf(rxq->umem, addr));
}
- rte_ring_enqueue_bulk(rxq->umem->buf_ring, addrs, rcvd, NULL);
-
rxq->rx_pkts += (rcvd - dropped);
rxq->rx_bytes += rx_bytes;
rxq->rx_dropped += dropped;
@@ -375,11 +391,10 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
static void kick_tx(struct pkt_tx_queue *txq)
{
- void *addrs[ETH_AF_XDP_TX_BATCH_SIZE];
- struct rte_ring *buf_ring = txq->pair->umem->buf_ring;
struct xdp_umem_uqueue *cq = &txq->pair->umem->cq;
+ uint64_t addrs[ETH_AF_XDP_TX_BATCH_SIZE];
int fd = txq->pair->xsk_fd;
- int ret, n;
+ int ret, n, i;
while (1) {
@@ -398,9 +413,10 @@ static void kick_tx(struct pkt_tx_queue *txq)
n = umem_complete_from_kernel(cq,
(uint64_t *)&addrs[0],
ETH_AF_XDP_TX_BATCH_SIZE);
- if (n > 0)
- rte_ring_enqueue_bulk(buf_ring,
- addrs, n, NULL);
+ for (i = 0; i < n; i++)
+ rte_pktmbuf_free(
+ addr_to_mbuf(txq->pair->umem,
+ addrs[i]));
}
}
}
@@ -413,23 +429,21 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
struct xdp_umem_uqueue *cq = &txq->pair->umem->cq;
struct rte_mbuf *mbuf;
struct xdp_desc descs[ETH_AF_XDP_TX_BATCH_SIZE];
- void *addrs[ETH_AF_XDP_TX_BATCH_SIZE];
- uint16_t i, valid;
+ uint64_t addrs[ETH_AF_XDP_TX_BATCH_SIZE];
+ struct rte_mbuf *bufs_to_fill[ETH_AF_XDP_TX_BATCH_SIZE];
unsigned long tx_bytes = 0;
+ int i, valid, n;
nb_pkts = nb_pkts < ETH_AF_XDP_TX_BATCH_SIZE ?
nb_pkts : ETH_AF_XDP_TX_BATCH_SIZE;
- int n = umem_complete_from_kernel(cq, (uint64_t *)&addrs[0],
- ETH_AF_XDP_TX_BATCH_SIZE);
- if (n > 0)
- rte_ring_enqueue_bulk(txq->pair->umem->buf_ring,
- addrs, n, NULL);
-
- nb_pkts = rte_ring_dequeue_bulk(txq->pair->umem->buf_ring, addrs,
- nb_pkts, NULL);
- if (!nb_pkts)
- return 0;
+ n = umem_complete_from_kernel(cq, addrs,
+ ETH_AF_XDP_TX_BATCH_SIZE);
+ if (n > 0) {
+ for (i = 0; i < n; i++)
+ rte_pktmbuf_free(addr_to_mbuf(txq->pair->umem,
+ addrs[i]));
+ }
valid = 0;
for (i = 0; i < nb_pkts; i++) {
@@ -438,7 +452,13 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
ETH_AF_XDP_FRAME_SIZE - ETH_AF_XDP_DATA_HEADROOM;
mbuf = bufs[i];
if (mbuf->pkt_len <= buf_len) {
- descs[valid].addr = (uint64_t)addrs[valid];
+ bufs_to_fill[valid] =
+ rte_pktmbuf_alloc(txq->pair->umem->mb_pool);
+ if (!bufs_to_fill[valid])
+ break;
+ descs[valid].addr =
+ mbuf_to_addr(txq->pair->umem,
+ bufs_to_fill[valid]);
descs[valid].len = mbuf->pkt_len;
descs[valid].options = 0;
pkt = get_pkt_data(txq->pair->umem, descs[valid].addr);
@@ -447,20 +467,20 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
valid++;
tx_bytes += mbuf->pkt_len;
}
- rte_pktmbuf_free(mbuf);
}
if (xq_enq(uq, descs, valid)) {
+ for (i = 0; i < valid; i++)
+ rte_pktmbuf_free(bufs_to_fill[i]);
+ nb_pkts = 0;
valid = 0;
tx_bytes = 0;
} else {
kick_tx(txq);
+ for (i = 0; i < nb_pkts; i++)
+ rte_pktmbuf_free(bufs[i]);
}
- if (valid < nb_pkts)
- rte_ring_enqueue_bulk(txq->pair->umem->buf_ring, &addrs[valid],
- nb_pkts - valid, NULL);
-
txq->err_pkts += (nb_pkts - valid);
txq->tx_pkts += valid;
txq->tx_bytes += tx_bytes;
@@ -472,13 +492,15 @@ static void
fill_rx_desc(struct xdp_umem *umem)
{
struct xdp_umem_uqueue *fq = &umem->fq;
- void *p = NULL;
+ struct rte_mbuf *mbuf;
+ uint64_t addr;
uint32_t i;
for (i = 0; i < fq->size / 2; i++) {
- rte_ring_dequeue(umem->buf_ring, &p);
- if (umem_fill_to_kernel(fq, (uint64_t *)&p, 1)) {
- rte_ring_enqueue(umem->buf_ring, p);
+ mbuf = rte_pktmbuf_alloc(umem->mb_pool);
+ addr = mbuf_to_addr(umem, mbuf);
+ if (umem_fill_to_kernel(fq, &addr, 1)) {
+ rte_pktmbuf_free(mbuf);
break;
}
}
@@ -597,14 +619,28 @@ eth_link_update(struct rte_eth_dev *dev __rte_unused,
static void xdp_umem_destroy(struct xdp_umem *umem)
{
- if (umem->frames)
- free(umem->frames);
- if (umem->buf_ring)
- rte_ring_free(umem->buf_ring);
+ if (umem->mb_pool)
+ rte_mempool_free(umem->mb_pool);
free(umem);
}
+static inline uint64_t get_base_addr(struct rte_mempool *mp)
+{
+ struct rte_mempool_memhdr *memhdr;
+
+ memhdr = STAILQ_FIRST(&mp->mem_list);
+ return (uint64_t)(memhdr->addr);
+}
+
+static inline uint64_t get_len(struct rte_mempool *mp)
+{
+ struct rte_mempool_memhdr *memhdr;
+
+ memhdr = STAILQ_FIRST(&mp->mem_list);
+ return (uint64_t)(memhdr->len);
+}
+
static struct xdp_umem *xdp_umem_configure(int sfd)
{
int fq_size = ETH_AF_XDP_FQ_NUM_DESCS;
@@ -612,40 +648,29 @@ static struct xdp_umem *xdp_umem_configure(int sfd)
struct xdp_mmap_offsets off;
struct xdp_umem_reg mr;
struct xdp_umem *umem;
- char ring_name[0x100];
+ char pool_name[0x100];
socklen_t optlen;
- void *bufs = NULL;
- uint64_t i;
umem = calloc(1, sizeof(*umem));
if (!umem)
return NULL;
- snprintf(ring_name, 0x100, "%s_%d", "af_xdp_ring", sfd);
- umem->buf_ring = rte_ring_create(ring_name,
- ETH_AF_XDP_NUM_BUFFERS,
- SOCKET_ID_ANY,
- 0x0);
- if (!umem->buf_ring) {
- RTE_LOG(ERR, PMD,
- "Failed to create rte_ring\n");
- goto err;
- }
+ snprintf(pool_name, 0x100, "%s_%d", "af_xdp_ring", sfd);
+ umem->mb_pool = rte_pktmbuf_pool_create_with_flags(
+ pool_name, ETH_AF_XDP_NUM_BUFFERS,
+ 250, 0,
+ ETH_AF_XDP_FRAME_SIZE - ETH_AF_XDP_MBUF_OVERHEAD,
+ MEMPOOL_F_NO_SPREAD | MEMPOOL_F_PAGE_ALIGN,
+ SOCKET_ID_ANY);
- for (i = 0; i < ETH_AF_XDP_NUM_BUFFERS; i++)
- rte_ring_enqueue(umem->buf_ring,
- (void *)(i * ETH_AF_XDP_FRAME_SIZE +
- ETH_AF_XDP_DATA_HEADROOM));
-
- if (posix_memalign(&bufs, getpagesize(), /* PAGE_SIZE aligned */
- ETH_AF_XDP_NUM_BUFFERS * ETH_AF_XDP_FRAME_SIZE)) {
+ if (!umem->mb_pool || umem->mb_pool->nb_mem_chunks != 1) {
RTE_LOG(ERR, PMD,
- "Failed to allocate memory pool.\n");
+ "Failed to create rte_mempool\n");
goto err;
}
- mr.addr = (uint64_t)bufs;
- mr.len = ETH_AF_XDP_NUM_BUFFERS * ETH_AF_XDP_FRAME_SIZE;
+ mr.addr = get_base_addr(umem->mb_pool);
+ mr.len = get_len(umem->mb_pool);
mr.chunk_size = ETH_AF_XDP_FRAME_SIZE;
mr.headroom = ETH_AF_XDP_DATA_HEADROOM;
@@ -717,7 +742,7 @@ static struct xdp_umem *xdp_umem_configure(int sfd)
(uint32_t *)((uint64_t)umem->cq.map + off.cr.consumer);
umem->cq.ring = (uint64_t *)((uint64_t)umem->cq.map + off.cr.desc);
- umem->frames = bufs;
+ umem->frames = (void *)get_base_addr(umem->mb_pool);
umem->fd = sfd;
return umem;
@@ -729,7 +754,8 @@ static struct xdp_umem *xdp_umem_configure(int sfd)
}
static int
-xsk_configure(struct pkt_rx_queue *rxq, int ring_size, struct xdp_umem *umem)
+xsk_configure(struct pkt_rx_queue *rxq, int ring_size,
+ struct xdp_umem *umem)
{
struct pkt_tx_queue *txq = rxq->pair;
struct xdp_mmap_offsets off;
@@ -863,6 +889,12 @@ eth_rx_queue_setup(struct rte_eth_dev *dev,
int xsk_key;
int map_fd;
+ if (mb_pool == NULL) {
+ RTE_LOG(ERR, PMD,
+ "Invalid mb_pool\n");
+ return -EINVAL;
+ }
+
if (dev->data->nb_rx_queues <= rx_queue_id) {
RTE_LOG(ERR, PMD,
"Invalid rx queue id: %d\n", rx_queue_id);
@@ -1222,7 +1254,7 @@ rte_pmd_af_xdp_remove(struct rte_vdev_device *dev)
for (i = 0; i < internals->xsk_map_key_count; i++)
queue_reset(internals, i);
- rte_ring_free(internals->umem_share->buf_ring);
+ rte_mempool_free(internals->umem_share->mb_pool);
rte_free(internals->umem_share->frames);
rte_free(internals->umem_share);
rte_free(internals);
--
2.13.6
next prev parent reply other threads:[~2018-08-16 14:42 UTC|newest]
Thread overview: 11+ messages / expand[flat|nested] mbox.gz Atom feed top
2018-08-16 14:43 [dpdk-dev] [PATCH v3 0/6] PMD driver for AF_XDP Qi Zhang
2018-08-16 14:43 ` [dpdk-dev] [RFC v3 1/6] net/af_xdp: new PMD driver Qi Zhang
2018-08-16 14:43 ` [dpdk-dev] [RFC v3 2/6] lib/mbuf: enable parse flags when create mempool Qi Zhang
2018-08-16 14:43 ` [dpdk-dev] [RFC v3 3/6] lib/mempool: allow page size aligned mempool Qi Zhang
2018-08-19 6:56 ` Jerin Jacob
2018-08-16 14:43 ` Qi Zhang [this message]
2018-08-16 14:43 ` [dpdk-dev] [RFC v3 5/6] net/af_xdp: enable zero copy Qi Zhang
2018-08-16 14:43 ` [dpdk-dev] [RFC v3 6/6] app/testpmd: add mempool flags parameter Qi Zhang
2018-08-23 16:25 ` [dpdk-dev] [PATCH v3 0/6] PMD driver for AF_XDP William Tu
2018-08-28 14:11 ` Zhang, Qi Z
2018-08-25 6:11 ` Zhang, Qi Z
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20180816144321.17719-5-qi.z.zhang@intel.com \
--to=qi.z.zhang@intel.com \
--cc=bjorn.topel@intel.com \
--cc=dev@dpdk.org \
--cc=ferruh.yigit@intel.com \
--cc=jingjing.wu@intel.com \
--cc=magnus.karlsson@intel.com \
--cc=xiaoyun.li@intel.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).