From: Qi Zhang <qi.z.zhang@intel.com>
To: dev@dpdk.org
Cc: magnus.karlsson@intel.com, bjorn.topel@intel.com,
Qi Zhang <qi.z.zhang@intel.com>
Subject: [dpdk-dev] [RFC v2 4/7] net/af_xdp: use mbuf mempool for buffer management
Date: Thu, 8 Mar 2018 21:52:46 +0800 [thread overview]
Message-ID: <20180308135249.28187-5-qi.z.zhang@intel.com> (raw)
In-Reply-To: <20180308135249.28187-1-qi.z.zhang@intel.com>
Now, af_xdp registered memory buffer is managed by rte_mempool.
mbuf be allocated from rte_mempool can be convert to descriptor
index and vice versa.
Signed-off-by: Qi Zhang <qi.z.zhang@intel.com>
---
drivers/net/af_xdp/rte_eth_af_xdp.c | 166 +++++++++++++++++++++---------------
1 file changed, 98 insertions(+), 68 deletions(-)
diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c b/drivers/net/af_xdp/rte_eth_af_xdp.c
index 5c7c53aeb..65c4c37bf 100644
--- a/drivers/net/af_xdp/rte_eth_af_xdp.c
+++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
@@ -39,7 +39,11 @@
#define ETH_AF_XDP_FRAME_SIZE 2048
#define ETH_AF_XDP_NUM_BUFFERS 131072
-#define ETH_AF_XDP_DATA_HEADROOM 0
+/* mempool hdrobj size (64 bytes) + sizeof(struct rte_mbuf) (128 bytes) */
+#define ETH_AF_XDP_MBUF_OVERHEAD 192
+/* data start from offset 320 (192 + 128) bytes */
+#define ETH_AF_XDP_DATA_HEADROOM \
+ (ETH_AF_XDP_MBUF_OVERHEAD + RTE_PKTMBUF_HEADROOM)
#define ETH_AF_XDP_DFLT_RING_SIZE 1024
#define ETH_AF_XDP_DFLT_QUEUE_IDX 0
@@ -53,6 +57,7 @@ struct xdp_umem {
unsigned int frame_size_log2;
unsigned int nframes;
int mr_fd;
+ struct rte_mempool *mb_pool;
};
struct pmd_internals {
@@ -63,7 +68,7 @@ struct pmd_internals {
struct xdp_queue rx;
struct xdp_queue tx;
struct xdp_umem *umem;
- struct rte_mempool *mb_pool;
+ struct rte_mempool *ext_mb_pool;
unsigned long rx_pkts;
unsigned long rx_bytes;
@@ -76,7 +81,6 @@ struct pmd_internals {
uint16_t port_id;
uint16_t queue_idx;
int ring_size;
- struct rte_ring *buf_ring;
};
static const char * const valid_arguments[] = {
@@ -101,6 +105,22 @@ static void *get_pkt_data(struct pmd_internals *internals,
(index << internals->umem->frame_size_log2) + offset);
}
+static uint32_t
+mbuf_to_idx(struct pmd_internals *internals, struct rte_mbuf *mbuf)
+{
+ return (uint32_t)(((uint64_t)mbuf->buf_addr -
+ (uint64_t)internals->umem->buffer) >>
+ internals->umem->frame_size_log2);
+}
+
+static struct rte_mbuf *
+idx_to_mbuf(struct pmd_internals *internals, uint32_t idx)
+{
+ return (struct rte_mbuf *)(internals->umem->buffer +
+ (idx << internals->umem->frame_size_log2) +
+ 0x40);
+}
+
static uint16_t
eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
{
@@ -115,18 +135,19 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
nb_pkts : ETH_AF_XDP_RX_BATCH_SIZE;
struct xdp_desc descs[ETH_AF_XDP_RX_BATCH_SIZE];
- void *indexes[ETH_AF_XDP_RX_BATCH_SIZE];
+ struct rte_mbuf *mbufs[ETH_AF_XDP_RX_BATCH_SIZE];
int rcvd, i;
/* fill rx ring */
if (rxq->num_free >= ETH_AF_XDP_RX_BATCH_SIZE) {
- int n = rte_ring_dequeue_bulk(internals->buf_ring,
- indexes,
- ETH_AF_XDP_RX_BATCH_SIZE,
- NULL);
- for (i = 0; i < n; i++)
- descs[i].idx = (uint32_t)((long int)indexes[i]);
- xq_enq(rxq, descs, n);
+ int ret = rte_mempool_get_bulk(internals->umem->mb_pool,
+ (void *)mbufs,
+ ETH_AF_XDP_RX_BATCH_SIZE);
+ if (!ret) {
+ for (i = 0; i < ETH_AF_XDP_RX_BATCH_SIZE; i++)
+ descs[i].idx = mbuf_to_idx(internals, mbufs[i]);
+ xq_enq(rxq, descs, ETH_AF_XDP_RX_BATCH_SIZE);
+ }
}
/* read data */
@@ -138,7 +159,7 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
char *pkt;
uint32_t idx = descs[i].idx;
- mbuf = rte_pktmbuf_alloc(internals->mb_pool);
+ mbuf = rte_pktmbuf_alloc(internals->ext_mb_pool);
rte_pktmbuf_pkt_len(mbuf) =
rte_pktmbuf_data_len(mbuf) =
descs[i].len;
@@ -151,11 +172,9 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
} else {
dropped++;
}
- indexes[i] = (void *)((long int)idx);
+ rte_pktmbuf_free(idx_to_mbuf(internals, idx));
}
- rte_ring_enqueue_bulk(internals->buf_ring, indexes, rcvd, NULL);
-
internals->rx_pkts += (rcvd - dropped);
internals->rx_bytes += rx_bytes;
internals->rx_dropped += dropped;
@@ -183,9 +202,10 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
struct xdp_queue *txq = &internals->tx;
struct rte_mbuf *mbuf;
struct xdp_desc descs[ETH_AF_XDP_TX_BATCH_SIZE];
- void *indexes[ETH_AF_XDP_TX_BATCH_SIZE];
+ struct rte_mbuf *mbufs[ETH_AF_XDP_TX_BATCH_SIZE];
uint16_t i, valid;
unsigned long tx_bytes = 0;
+ int ret;
nb_pkts = nb_pkts < ETH_AF_XDP_TX_BATCH_SIZE ?
nb_pkts : ETH_AF_XDP_TX_BATCH_SIZE;
@@ -194,13 +214,15 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
int n = xq_deq(txq, descs, ETH_AF_XDP_TX_BATCH_SIZE);
for (i = 0; i < n; i++)
- indexes[i] = (void *)((long int)descs[i].idx);
- rte_ring_enqueue_bulk(internals->buf_ring, indexes, n, NULL);
+ rte_pktmbuf_free(idx_to_mbuf(internals, descs[i].idx));
}
nb_pkts = nb_pkts > txq->num_free ? txq->num_free : nb_pkts;
- nb_pkts = rte_ring_dequeue_bulk(internals->buf_ring, indexes,
- nb_pkts, NULL);
+ ret = rte_mempool_get_bulk(internals->umem->mb_pool,
+ (void *)mbufs,
+ nb_pkts);
+ if (ret)
+ return 0;
valid = 0;
for (i = 0; i < nb_pkts; i++) {
@@ -209,14 +231,14 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
internals->umem->frame_size - ETH_AF_XDP_DATA_HEADROOM;
mbuf = bufs[i];
if (mbuf->pkt_len <= buf_len) {
- descs[valid].idx = (uint32_t)((long int)indexes[valid]);
+ descs[valid].idx = mbuf_to_idx(internals, mbufs[i]);
descs[valid].offset = ETH_AF_XDP_DATA_HEADROOM;
descs[valid].flags = 0;
descs[valid].len = mbuf->pkt_len;
pkt = get_pkt_data(internals, descs[i].idx,
descs[i].offset);
memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *),
- descs[i].len);
+ descs[i].len);
valid++;
tx_bytes += mbuf->pkt_len;
}
@@ -227,9 +249,10 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
xq_enq(txq, descs, valid);
kick_tx(internals->sfd);
- if (valid < nb_pkts)
- rte_ring_enqueue_bulk(internals->buf_ring, &indexes[valid],
- nb_pkts - valid, NULL);
+ if (valid < nb_pkts) {
+ for (i = valid; i < nb_pkts; i++)
+ rte_pktmbuf_free(mbufs[i]);
+ }
internals->err_pkts += (nb_pkts - valid);
internals->tx_pkts += valid;
@@ -242,14 +265,13 @@ static void
fill_rx_desc(struct pmd_internals *internals)
{
int num_free = internals->rx.num_free;
- void *p = NULL;
int i;
-
for (i = 0; i < num_free; i++) {
struct xdp_desc desc = {};
+ struct rte_mbuf *mbuf =
+ rte_pktmbuf_alloc(internals->umem->mb_pool);
- rte_ring_dequeue(internals->buf_ring, &p);
- desc.idx = (uint32_t)((long int)p);
+ desc.idx = mbuf_to_idx(internals, mbuf);
xq_enq(&internals->rx, &desc, 1);
}
}
@@ -344,33 +366,53 @@ eth_link_update(struct rte_eth_dev *dev __rte_unused,
return 0;
}
-static struct xdp_umem *xsk_alloc_and_mem_reg_buffers(int sfd, size_t nbuffers)
+static void *get_base_addr(struct rte_mempool *mb_pool)
+{
+ struct rte_mempool_memhdr *memhdr;
+
+ STAILQ_FOREACH(memhdr, &mb_pool->mem_list, next) {
+ return memhdr->addr;
+ }
+ return NULL;
+}
+
+static struct xdp_umem *xsk_alloc_and_mem_reg_buffers(int sfd,
+ size_t nbuffers,
+ const char *pool_name)
{
struct xdp_mr_req req = { .frame_size = ETH_AF_XDP_FRAME_SIZE,
.data_headroom = ETH_AF_XDP_DATA_HEADROOM };
- struct xdp_umem *umem;
- void *bufs;
- int ret;
+ struct xdp_umem *umem = calloc(1, sizeof(*umem));
- ret = posix_memalign((void **)&bufs, getpagesize(),
- nbuffers * req.frame_size);
- if (ret)
+ if (!umem)
+ return NULL;
+
+ umem->mb_pool =
+ rte_pktmbuf_pool_create_with_flags(
+ pool_name, nbuffers,
+ 250, 0,
+ (ETH_AF_XDP_FRAME_SIZE - ETH_AF_XDP_MBUF_OVERHEAD),
+ MEMPOOL_F_NO_SPREAD | MEMPOOL_F_PAGE_ALIGN,
+ SOCKET_ID_ANY);
+
+ if (!umem->mb_pool) {
+ free(umem);
return NULL;
+ }
- umem = calloc(1, sizeof(*umem));
- if (!umem) {
- free(bufs);
+ if (umem->mb_pool->nb_mem_chunks > 1) {
+ rte_mempool_free(umem->mb_pool);
+ free(umem);
return NULL;
}
- req.addr = (unsigned long)bufs;
+ req.addr = (uint64_t)get_base_addr(umem->mb_pool);
req.len = nbuffers * req.frame_size;
- ret = setsockopt(sfd, SOL_XDP, XDP_MEM_REG, &req, sizeof(req));
- RTE_ASSERT(ret == 0);
+ setsockopt(sfd, SOL_XDP, XDP_MEM_REG, &req, sizeof(req));
umem->frame_size = ETH_AF_XDP_FRAME_SIZE;
umem->frame_size_log2 = 11;
- umem->buffer = bufs;
+ umem->buffer = (char *)req.addr;
umem->size = nbuffers * req.frame_size;
umem->nframes = nbuffers;
umem->mr_fd = sfd;
@@ -383,38 +425,27 @@ xdp_configure(struct pmd_internals *internals)
{
struct sockaddr_xdp sxdp;
struct xdp_ring_req req;
- char ring_name[0x100];
+ char pool_name[0x100];
+
int ret = 0;
- long int i;
- snprintf(ring_name, 0x100, "%s_%s_%d", "af_xdp_ring",
+ snprintf(pool_name, 0x100, "%s_%s_%d", "af_xdp_pool",
internals->if_name, internals->queue_idx);
- internals->buf_ring = rte_ring_create(ring_name,
- ETH_AF_XDP_NUM_BUFFERS,
- SOCKET_ID_ANY,
- 0x0);
- if (!internals->buf_ring)
- return -1;
-
- for (i = 0; i < ETH_AF_XDP_NUM_BUFFERS; i++)
- rte_ring_enqueue(internals->buf_ring, (void *)i);
-
internals->umem = xsk_alloc_and_mem_reg_buffers(internals->sfd,
- ETH_AF_XDP_NUM_BUFFERS);
+ ETH_AF_XDP_NUM_BUFFERS,
+ pool_name);
if (!internals->umem)
- goto error;
+ return -1;
req.mr_fd = internals->umem->mr_fd;
req.desc_nr = internals->ring_size;
ret = setsockopt(internals->sfd, SOL_XDP, XDP_RX_RING,
&req, sizeof(req));
-
RTE_ASSERT(ret == 0);
ret = setsockopt(internals->sfd, SOL_XDP, XDP_TX_RING,
&req, sizeof(req));
-
RTE_ASSERT(ret == 0);
internals->rx.ring = mmap(0, req.desc_nr * sizeof(struct xdp_desc),
@@ -445,10 +476,6 @@ xdp_configure(struct pmd_internals *internals)
RTE_ASSERT(ret == 0);
return ret;
-error:
- rte_ring_free(internals->buf_ring);
- internals->buf_ring = NULL;
- return -1;
}
static int
@@ -463,11 +490,11 @@ eth_rx_queue_setup(struct rte_eth_dev *dev,
unsigned int buf_size, data_size;
RTE_ASSERT(rx_queue_id == 0);
- internals->mb_pool = mb_pool;
+ internals->ext_mb_pool = mb_pool;
xdp_configure(internals);
/* Now get the space available for data in the mbuf */
- buf_size = rte_pktmbuf_data_room_size(internals->mb_pool) -
+ buf_size = rte_pktmbuf_data_room_size(internals->ext_mb_pool) -
RTE_PKTMBUF_HEADROOM;
data_size = internals->umem->frame_size;
@@ -736,8 +763,11 @@ rte_pmd_af_xdp_remove(struct rte_vdev_device *dev)
return -1;
internals = eth_dev->data->dev_private;
- rte_ring_free(internals->buf_ring);
- rte_free(internals->umem);
+ if (internals->umem) {
+ if (internals->umem->mb_pool)
+ rte_mempool_free(internals->umem->mb_pool);
+ rte_free(internals->umem);
+ }
rte_free(eth_dev->data->dev_private);
rte_free(eth_dev->data);
close(internals->sfd);
--
2.13.6
next prev parent reply other threads:[~2018-03-08 13:52 UTC|newest]
Thread overview: 11+ messages / expand[flat|nested] mbox.gz Atom feed top
2018-03-08 13:52 [dpdk-dev] [RFC v2 0/7] PMD driver for AF_XDP Qi Zhang
2018-03-08 13:52 ` [dpdk-dev] [RFC v2 1/7] net/af_xdp: new PMD driver Qi Zhang
2018-03-08 13:52 ` [dpdk-dev] [RFC v2 2/7] lib/mbuf: enable parse flags when create mempool Qi Zhang
2018-03-08 13:52 ` [dpdk-dev] [RFC v2 3/7] lib/mempool: allow page size aligned mempool Qi Zhang
2018-03-08 13:52 ` Qi Zhang [this message]
2018-03-08 13:52 ` [dpdk-dev] [RFC v2 5/7] net/af_xdp: enable share mempool Qi Zhang
2018-03-08 13:52 ` [dpdk-dev] [RFC v2 6/7] net/af_xdp: load BPF file Qi Zhang
2018-03-08 14:20 ` Zhang, Qi Z
2018-03-08 23:15 ` Stephen Hemminger
2018-05-09 7:02 ` Björn Töpel
2018-03-08 13:52 ` [dpdk-dev] [RFC v2 7/7] app/testpmd: enable parameter for mempool flags Qi Zhang
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20180308135249.28187-5-qi.z.zhang@intel.com \
--to=qi.z.zhang@intel.com \
--cc=bjorn.topel@intel.com \
--cc=dev@dpdk.org \
--cc=magnus.karlsson@intel.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).