* [RFC PATCH 1/5] bus: new driver to accept shared memory over unix socket
2023-09-22 8:19 [RFC PATCH 0/5] Using shared mempools for zero-copy IO proxying Bruce Richardson
@ 2023-09-22 8:19 ` Bruce Richardson
2023-11-23 14:50 ` Jerin Jacob
2023-09-22 8:19 ` [RFC PATCH 2/5] mempool: driver for mempools of mbufs on shared memory Bruce Richardson
` (4 subsequent siblings)
5 siblings, 1 reply; 8+ messages in thread
From: Bruce Richardson @ 2023-09-22 8:19 UTC (permalink / raw)
To: dev; +Cc: Bruce Richardson
Add a new driver to DPDK which supports taking in memory e.g. hugepage
memory via a unix socket connection and maps it into the DPDK process
replacing the current socket memory as the default memory for use by
future requests.
Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
drivers/bus/meson.build | 1 +
drivers/bus/shared_mem/meson.build | 11 +
drivers/bus/shared_mem/shared_mem_bus.c | 323 ++++++++++++++++++++++++
drivers/bus/shared_mem/shared_mem_bus.h | 75 ++++++
drivers/bus/shared_mem/version.map | 11 +
5 files changed, 421 insertions(+)
create mode 100644 drivers/bus/shared_mem/meson.build
create mode 100644 drivers/bus/shared_mem/shared_mem_bus.c
create mode 100644 drivers/bus/shared_mem/shared_mem_bus.h
create mode 100644 drivers/bus/shared_mem/version.map
diff --git a/drivers/bus/meson.build b/drivers/bus/meson.build
index a78b4283bf..0e64959d1a 100644
--- a/drivers/bus/meson.build
+++ b/drivers/bus/meson.build
@@ -9,6 +9,7 @@ drivers = [
+ 'shared_mem',
diff --git a/drivers/bus/shared_mem/meson.build b/drivers/bus/shared_mem/meson.build
new file mode 100644
index 0000000000..1fa21f3a09
--- /dev/null
+++ b/drivers/bus/shared_mem/meson.build
@@ -0,0 +1,11 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2023 Intel Corporation
+if is_windows
+ build = false
+ reason = 'not supported on Windows'
+sources = files('shared_mem_bus.c')
+require_iova_in_mbuf = false
+deps += ['mbuf', 'net']
diff --git a/drivers/bus/shared_mem/shared_mem_bus.c b/drivers/bus/shared_mem/shared_mem_bus.c
new file mode 100644
index 0000000000..e0369ed416
--- /dev/null
+++ b/drivers/bus/shared_mem/shared_mem_bus.c
@@ -0,0 +1,323 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#include <errno.h>
+#include <malloc.h>
+#include <inttypes.h>
+#include <sys/un.h>
+#include <sys/stat.h>
+#include <sys/mman.h>
+#include <sys/socket.h>
+#include <rte_log.h>
+#include <rte_lcore.h>
+#include <rte_errno.h>
+#include <rte_malloc.h>
+#include <rte_devargs.h>
+#include <rte_mbuf_pool_ops.h>
+#include <bus_driver.h>
+#include <dev_driver.h>
+#include "shared_mem_bus.h"
+RTE_LOG_REGISTER_DEFAULT(shared_mem_bus_logtype, DEBUG);
+#define BUS_LOG(level, fmt, args...) rte_log(RTE_LOG_ ## level, \
+ shared_mem_bus_logtype, "## SHARED MEM BUS: %s(): " fmt "\n", __func__, ##args)
+#define BUS_ERR(fmt, args...) BUS_LOG(ERR, fmt, ## args)
+#define BUS_INFO(fmt, args...) BUS_LOG(INFO, fmt, ## args)
+#define BUS_DEBUG(fmt, args...) BUS_LOG(DEBUG, fmt, ## args)
+static int dev_scan(void);
+static int dev_probe(void);
+static struct rte_device *find_device(const struct rte_device *start, rte_dev_cmp_t cmp,
+ const void *data);
+static enum rte_iova_mode get_iommu_class(void);
+static int addr_parse(const char *, void *);
+struct socket_device {
+ struct rte_device rte_device;
+ TAILQ_ENTRY(socket_device) next;
+ int fd;
+ uintptr_t membase;
+ uintptr_t memlen;
+/** List of devices */
+TAILQ_HEAD(socket_list, socket_device);
+TAILQ_HEAD(device_list, rte_device);
+struct shared_mem_bus {
+ struct rte_bus bus;
+ struct socket_list socket_list;
+ struct shared_mem_drv *ethdrv;
+ struct device_list device_list;
+static struct shared_mem_bus shared_mem_bus = {
+ .bus = {
+ .scan = dev_scan,
+ .probe = dev_probe,
+ .find_device = find_device,
+ .get_iommu_class = get_iommu_class,
+ .parse = addr_parse,
+ },
+ .socket_list = TAILQ_HEAD_INITIALIZER(shared_mem_bus.socket_list),
+ .device_list = TAILQ_HEAD_INITIALIZER(shared_mem_bus.device_list),
+RTE_REGISTER_BUS(shared_mem, shared_mem_bus.bus);
+rte_shm_bus_send_message(void *msg, size_t msglen)
+ return send(shared_mem_bus.socket_list.tqh_first->fd, msg, msglen, 0);
+rte_shm_bus_recv_message(void *msg, size_t msglen)
+ return recv(shared_mem_bus.socket_list.tqh_first->fd, msg, msglen, 0);
+rte_shm_bus_get_mem_offset(void *ptr)
+ struct socket_device *dev;
+ uintptr_t pval = (uintptr_t)ptr;
+ TAILQ_FOREACH(dev, &shared_mem_bus.socket_list, next) {
+ if (dev->membase < pval && dev->membase + dev->memlen > pval)
+ return pval - dev->membase;
+ }
+ return (uintptr_t)-1;
+void *
+rte_shm_bus_get_mem_ptr(uintptr_t offset)
+ struct socket_device *dev;
+ TAILQ_FOREACH(dev, &shared_mem_bus.socket_list, next) {
+ if (offset < dev->memlen)
+ return RTE_PTR_ADD(dev->membase, offset);
+ }
+ return (void *)-1;
+static int
+ if (shared_mem_bus.bus.conf.scan_mode != RTE_BUS_SCAN_ALLOWLIST)
+ return 0;
+ struct rte_devargs *devargs;
+ RTE_EAL_DEVARGS_FOREACH(shared_mem_bus.bus.name, devargs) {
+ int fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+ if (fd < 0) {
+ BUS_ERR("Error creating socket");
+ return -errno;
+ }
+ struct sockaddr_un sun = {.sun_family = AF_UNIX};
+ if (strlen(devargs->name) - 5 >= sizeof(sun.sun_path) ||
+ addr_parse(devargs->name, sun.sun_path) != 0) {
+ BUS_ERR("Error parsing device address");
+ return -EINVAL;
+ }
+ if (connect(fd, (void *)&sun, sizeof(sun)) != 0) {
+ BUS_ERR("Error connecting to socket");
+ return -errno;
+ }
+ struct socket_device *sdev = malloc(sizeof(*sdev));
+ if (sdev == NULL) {
+ BUS_ERR("Error with malloc");
+ return -ENOMEM;
+ }
+ BUS_INFO("Allocating dev for %s", devargs->name);
+ sdev->rte_device.name = devargs->name;
+ sdev->rte_device.numa_node = rte_socket_id();
+ sdev->rte_device.bus = &shared_mem_bus.bus;
+ sdev->fd = fd;
+ TAILQ_INSERT_TAIL(&shared_mem_bus.socket_list, sdev, next);
+ }
+ return 0;
+static int
+recv_fd(int from, uint64_t *memsize, rte_iova_t *iova, uint64_t *pg_size)
+ int fd = 0;
+ struct {
+ uint64_t fd_size;
+ rte_iova_t iova;
+ uint64_t pg_size;
+ } data_message;
+ size_t cmsglen = CMSG_LEN(sizeof(fd));
+ struct cmsghdr *cmhdr = malloc(cmsglen);
+ if (cmhdr == NULL) {
+ BUS_ERR("Malloc error");
+ return -1;
+ }
+ struct iovec iov = {
+ .iov_base = (void *)&data_message,
+ .iov_len = sizeof(data_message)
+ };
+ struct msghdr msg = {
+ .msg_iov = &iov,
+ .msg_iovlen = 1,
+ .msg_control = cmhdr,
+ .msg_controllen = cmsglen,
+ };
+ if (recvmsg(from, &msg, 0) != (int)iov.iov_len) {
+ BUS_ERR("recvmsg error %s", strerror(errno));
+ return -1;
+ }
+ if (msg.msg_controllen != cmsglen) {
+ BUS_ERR("Error with fd on message received");
+ return -1;
+ }
+ fd = *(int *)CMSG_DATA(cmhdr);
+ free(cmhdr);
+ *memsize = data_message.fd_size;
+ *iova = data_message.iova;
+ *pg_size = data_message.pg_size;
+ return fd;
+static int
+ if (TAILQ_EMPTY(&shared_mem_bus.socket_list))
+ return 0;
+ if (rte_mbuf_set_platform_mempool_ops("shared_mem") != 0) {
+ BUS_ERR("Error setting default mempool ops\n");
+ return -1;
+ }
+ BUS_INFO("Set default mempool ops to 'shared_mem'");
+ struct socket_device *dev;
+ TAILQ_FOREACH(dev, &shared_mem_bus.socket_list, next) {
+ uint64_t memsize = 0;
+ uint64_t pgsize = 0;
+ rte_iova_t iova = 0;
+ int memfd = recv_fd(dev->fd, &memsize, &iova, &pgsize);
+ /* check memfd is valid, the size is non-zero and multiple of 2MB */
+ if (memfd < 0 || memsize <= 0 || memsize % (1 << 21) != 0) {
+ BUS_ERR("Error getting memfd and size");
+ return -1;
+ }
+ BUS_DEBUG("Received fd %d with memsize %"PRIu64" and pgsize %"PRIu64,
+ memfd, memsize, pgsize);
+ void *mem = mmap(NULL, memsize, PROT_READ|PROT_WRITE, MAP_SHARED, memfd, 0);
+ if (mem == MAP_FAILED) {
+ BUS_ERR("Error mmapping the received fd");
+ return -1;
+ }
+ BUS_DEBUG("%u MB of memory mapped at %p\n", (unsigned int)(memsize >> 20), mem);
+ dev->membase = (uintptr_t)mem;
+ dev->memlen = memsize;
+ struct eth_shared_mem_msg msg = {
+ .offset = dev->membase,
+ };
+ rte_shm_bus_send_message(&msg, sizeof(msg));
+ char malloc_heap_name[32];
+ snprintf(malloc_heap_name, sizeof(malloc_heap_name),
+ "socket_%d_ext", rte_socket_id());
+ if (rte_malloc_heap_create(malloc_heap_name) != 0) {
+ BUS_ERR("Error creating heap %s\n", malloc_heap_name);
+ return -1;
+ }
+ int nb_pages = (memsize / pgsize);
+ rte_iova_t *iovas = malloc(sizeof(iovas[0]) * nb_pages);
+ iovas[0] = iova;
+ for (int i = 1; i < nb_pages; i++)
+ iovas[i] = iovas[i - 1] + pgsize;
+ BUS_DEBUG("Attempting to add memory to heap: %s", malloc_heap_name);
+ if (rte_malloc_heap_memory_add(malloc_heap_name, mem, memsize,
+ iovas, nb_pages, pgsize) < 0) {
+ BUS_ERR("Error adding to malloc heap: %s", strerror(rte_errno));
+ free(iovas);
+ return -1;
+ }
+ free(iovas);
+ BUS_DEBUG("Added memory to heap");
+ rte_malloc_heap_swap_socket(rte_socket_id(),
+ rte_malloc_heap_get_socket(malloc_heap_name));
+ BUS_DEBUG("Swapped in memory as socket %d memory\n", rte_socket_id());
+ if (shared_mem_bus.ethdrv != NULL) {
+ struct rte_device *dev = malloc(sizeof(*dev));
+ if (dev == NULL)
+ return -1;
+ *dev = (struct rte_device){
+ .name = "shared_mem_ethdev",
+ .driver = &shared_mem_bus.ethdrv->driver,
+ .bus = &shared_mem_bus.bus,
+ .numa_node = SOCKET_ID_ANY,
+ };
+ shared_mem_bus.ethdrv->probe(shared_mem_bus.ethdrv, dev);
+ }
+ }
+ return 0;
+static struct rte_device *
+find_device(const struct rte_device *start, rte_dev_cmp_t cmp,
+ const void *data)
+ RTE_SET_USED(start);
+ RTE_SET_USED(cmp);
+ RTE_SET_USED(data);
+ return NULL;
+static enum rte_iova_mode
+ /* if there are no devices, report don't care, otherwise VA mode */
+ return TAILQ_EMPTY(&shared_mem_bus.socket_list) ? RTE_IOVA_DC : RTE_IOVA_VA;
+static int
+addr_parse(const char *name, void *addr)
+ if (strncmp(name, "sock:", 5) != 0) {
+ BUS_DEBUG("no sock: prefix on %s", name);
+ return -1;
+ }
+ const char *filename = &name[5];
+ struct stat st;
+ if (stat(filename, &st) < 0 || (st.st_mode & S_IFMT) != S_IFSOCK) {
+ BUS_ERR("stat failed, or not a socket, %s", filename);
+ return -1;
+ }
+ if (addr != NULL)
+ strcpy(addr, filename);
+ BUS_DEBUG("Matched filename: %s", filename);
+ return 0;
+shared_mem_register_driver(struct shared_mem_drv *drv)
+ if (drv->probe == NULL)
+ return -1;
+ shared_mem_bus.ethdrv = drv;
+ return 0;
diff --git a/drivers/bus/shared_mem/shared_mem_bus.h b/drivers/bus/shared_mem/shared_mem_bus.h
new file mode 100644
index 0000000000..01a9a2a99a
--- /dev/null
+++ b/drivers/bus/shared_mem/shared_mem_bus.h
@@ -0,0 +1,75 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#include <stdint.h>
+#include <rte_common.h>
+#include <rte_ether.h>
+#include <dev_driver.h>
+enum shared_mem_msg_type {
+struct eth_shared_mem_msg {
+ enum shared_mem_msg_type type; /* type implicitly defines which union member is used */
+ union {
+ uintptr_t offset; /* for many messages, just pass an offset */
+ struct rte_ether_addr ethaddr; /* allow passing mac address */
+ uintptr_t datalen; /* for other messages, pass a data length after the data */
+ };
+ char data[];
+struct shared_mem_drv;
+ * Initialisation function for the driver
+ */
+typedef int (c_eth_probe_t)(struct shared_mem_drv *drv, struct rte_device *dev);
+struct shared_mem_drv {
+ struct rte_driver driver;
+ c_eth_probe_t *probe; /**< Device probe function. */
+/** Helper for PCI device registration from driver (eth, crypto) instance */
+#define RTE_PMD_REGISTER_SHMEM_DRV(nm, c_drv) \
+RTE_INIT(shared_mem_initfn_ ##nm) \
+ (c_drv).driver.name = RTE_STR(nm);\
+ shared_mem_register_driver(&c_drv); \
+} \
+shared_mem_register_driver(struct shared_mem_drv *drv);
+rte_shm_bus_send_message(void *msg, size_t msglen);
+rte_shm_bus_recv_message(void *msg, size_t msglen);
+rte_shm_bus_get_mem_offset(void *ptr);
+void *
+rte_shm_bus_get_mem_ptr(uintptr_t offset);
diff --git a/drivers/bus/shared_mem/version.map b/drivers/bus/shared_mem/version.map
new file mode 100644
index 0000000000..2af82689b1
--- /dev/null
+++ b/drivers/bus/shared_mem/version.map
@@ -0,0 +1,11 @@
+ global:
+ shared_mem_register_driver;
+ rte_shm_bus_get_mem_offset;
+ rte_shm_bus_get_mem_ptr;
+ rte_shm_bus_recv_message;
+ rte_shm_bus_send_message;
+ local: *;
^ permalink raw reply [flat|nested] 8+ messages in thread
* [RFC PATCH 3/5] net: new ethdev driver to communicate using shared mem
2023-09-22 8:19 [RFC PATCH 0/5] Using shared mempools for zero-copy IO proxying Bruce Richardson
2023-09-22 8:19 ` [RFC PATCH 1/5] bus: new driver to accept shared memory over unix socket Bruce Richardson
2023-09-22 8:19 ` [RFC PATCH 2/5] mempool: driver for mempools of mbufs on shared memory Bruce Richardson
@ 2023-09-22 8:19 ` Bruce Richardson
2023-09-22 8:19 ` [RFC PATCH 4/5] app: add IO proxy app using shared memory interfaces Bruce Richardson
` (2 subsequent siblings)
5 siblings, 0 replies; 8+ messages in thread
From: Bruce Richardson @ 2023-09-22 8:19 UTC (permalink / raw)
To: dev; +Cc: Bruce Richardson
This ethdev builds on the previous shared_mem bus driver and shared_mem
mempool driver to provide an ethdev interface which can allow zero-copy
I/O from one process to another.
Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
drivers/net/meson.build | 1 +
drivers/net/shared_mem/meson.build | 11 +
drivers/net/shared_mem/shared_mem_eth.c | 295 ++++++++++++++++++++++++
3 files changed, 307 insertions(+)
create mode 100644 drivers/net/shared_mem/meson.build
create mode 100644 drivers/net/shared_mem/shared_mem_eth.c
diff --git a/drivers/net/meson.build b/drivers/net/meson.build
index bd38b533c5..505d208497 100644
--- a/drivers/net/meson.build
+++ b/drivers/net/meson.build
@@ -53,6 +53,7 @@ drivers = [
+ 'shared_mem',
diff --git a/drivers/net/shared_mem/meson.build b/drivers/net/shared_mem/meson.build
new file mode 100644
index 0000000000..17d1b84454
--- /dev/null
+++ b/drivers/net/shared_mem/meson.build
@@ -0,0 +1,11 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2023 Intel Corporation
+if is_windows
+ build = false
+ reason = 'not supported on Windows'
+sources = files('shared_mem_eth.c')
+deps += 'bus_shared_mem'
+require_iova_in_mbuf = false
diff --git a/drivers/net/shared_mem/shared_mem_eth.c b/drivers/net/shared_mem/shared_mem_eth.c
new file mode 100644
index 0000000000..564bfdb907
--- /dev/null
+++ b/drivers/net/shared_mem/shared_mem_eth.c
@@ -0,0 +1,295 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#include <rte_common.h>
+#include <shared_mem_bus.h>
+#include <ethdev_driver.h>
+RTE_LOG_REGISTER_DEFAULT(shared_mem_eth_logtype, DEBUG);
+#define SHM_ETH_LOG(level, fmt, args...) rte_log(RTE_LOG_ ## level, \
+ shared_mem_eth_logtype, "## SHARED MEM ETH: %s(): " fmt "\n", __func__, ##args)
+#define SHM_ETH_ERR(fmt, args...) SHM_ETH_LOG(ERR, fmt, ## args)
+#define SHM_ETH_INFO(fmt, args...) SHM_ETH_LOG(INFO, fmt, ## args)
+#define SHM_ETH_DEBUG(fmt, args...) SHM_ETH_LOG(DEBUG, fmt, ## args)
+struct shm_eth_stats {
+ uint64_t rx_pkts;
+ uint64_t tx_pkts;
+ uint64_t rx_bytes;
+ uint64_t tx_bytes;
+struct shm_eth_private {
+ struct rte_ether_addr addr;
+ struct rte_ring *rx;
+ struct rte_ring *tx;
+ struct shm_eth_stats stats;
+static struct rte_mempool *rx_mp; /* TODO: use one per queue */
+static int
+shm_eth_configure(struct rte_eth_dev *dev __rte_unused)
+ return 0;
+static int
+shm_eth_start(struct rte_eth_dev *dev)
+ struct shm_eth_private *priv = dev->data->dev_private;
+ struct eth_shared_mem_msg msg = (struct eth_shared_mem_msg){
+ .type = MSG_TYPE_START,
+ };
+ rte_shm_bus_send_message(&msg, sizeof(msg));
+ rte_shm_bus_recv_message(&msg, sizeof(msg));
+ if (msg.type != MSG_TYPE_ACK) {
+ SHM_ETH_ERR("Didn't get ack from host\n");
+ return -1;
+ }
+ memset(&priv->stats, 0, sizeof(priv->stats));
+ return 0;
+static int
+shm_eth_stop(struct rte_eth_dev *dev __rte_unused)
+ return 0;
+static int
+shm_eth_infos_get(struct rte_eth_dev *dev, struct rte_eth_dev_info *info)
+ *info = (struct rte_eth_dev_info){
+ .driver_name = dev->device->driver->name,
+ .max_rx_queues = 1,
+ .max_tx_queues = 1,
+ .max_mac_addrs = 1,
+ .min_mtu = 64,
+ .max_mtu = UINT16_MAX,
+ .max_rx_pktlen = UINT16_MAX,
+ .nb_rx_queues = 1,
+ .nb_tx_queues = 1,
+ .tx_desc_lim = { .nb_max = 8192, .nb_min = 128, .nb_align = 64 },
+ .rx_desc_lim = { .nb_max = 8192, .nb_min = 128, .nb_align = 64 },
+ };
+ return 0;
+static int
+shm_eth_mtu_set(struct rte_eth_dev *dev, uint16_t mtu)
+ dev->data->mtu = mtu;
+ return 0;
+static int
+shm_eth_link_update(struct rte_eth_dev *dev, int wait __rte_unused)
+ dev->data->dev_link = (struct rte_eth_link){
+ .link_speed = RTE_ETH_SPEED_NUM_100G,
+ .link_duplex = 1,
+ .link_autoneg = 1,
+ .link_status = 1,
+ };
+ return 0;
+static int
+shm_eth_rx_queue_setup(struct rte_eth_dev *dev, uint16_t rx_queue_id,
+ uint16_t nb_rx_desc,
+ unsigned int socket_id,
+ const struct rte_eth_rxconf *rx_conf,
+ struct rte_mempool *mb_pool)
+ RTE_SET_USED(rx_conf);
+ struct shm_eth_private *priv = dev->data->dev_private;
+ char ring_name[32];
+ if (rte_shm_bus_get_mem_offset(mb_pool) == (uintptr_t)-1) {
+ SHM_ETH_ERR("Mempool not in shared memory");
+ return -1;
+ }
+ snprintf(ring_name, sizeof(ring_name), "shm_eth_rxr%u", rx_queue_id);
+ priv->rx = rte_ring_create(ring_name, nb_rx_desc, socket_id, RING_F_SP_ENQ | RING_F_SC_DEQ);
+ if (priv->rx == NULL)
+ return -1;
+ SHM_ETH_INFO("RX ring @ %p\n", priv->rx);
+ if (rte_shm_bus_get_mem_offset(priv->rx) == (uintptr_t)-1) {
+ SHM_ETH_ERR("Ring not created on shared memory.");
+ return -1;
+ }
+ dev->data->rx_queues[rx_queue_id] = priv;
+ SHM_ETH_INFO("Mempool offset is: %p", (void *)rte_shm_bus_get_mem_offset(mb_pool));
+ SHM_ETH_INFO("Rx queue offset is: %p", (void *)rte_shm_bus_get_mem_offset(priv->rx));
+ struct eth_shared_mem_msg msg = (struct eth_shared_mem_msg){
+ .offset = rte_shm_bus_get_mem_offset(mb_pool),
+ };
+ rte_shm_bus_send_message(&msg, sizeof(msg));
+ msg = (struct eth_shared_mem_msg){
+ .offset = rte_shm_bus_get_mem_offset(priv->rx),
+ };
+ rte_shm_bus_send_message(&msg, sizeof(msg));
+ rx_mp = mb_pool;
+ return 0;
+static int
+shm_eth_tx_queue_setup(struct rte_eth_dev *dev, uint16_t tx_queue_id,
+ uint16_t nb_tx_desc,
+ unsigned int socket_id,
+ const struct rte_eth_txconf *tx_conf)
+ RTE_SET_USED(tx_conf);
+ struct shm_eth_private *priv = dev->data->dev_private;
+ char ring_name[32];
+ snprintf(ring_name, sizeof(ring_name), "shm_eth_txr%u", tx_queue_id);
+ priv->tx = rte_ring_create(ring_name, nb_tx_desc, socket_id, RING_F_SP_ENQ | RING_F_SC_DEQ);
+ if (priv->tx == NULL)
+ return -1;
+ SHM_ETH_DEBUG("TX ring @ %p\n", priv->tx);
+ if (rte_shm_bus_get_mem_offset(priv->tx) == (uintptr_t)-1) {
+ SHM_ETH_ERR("TX ring not on shared memory");
+ return -1;
+ }
+ dev->data->tx_queues[tx_queue_id] = priv;
+ struct eth_shared_mem_msg msg = (struct eth_shared_mem_msg){
+ .offset = rte_shm_bus_get_mem_offset(priv->tx),
+ };
+ rte_shm_bus_send_message(&msg, sizeof(msg));
+ return 0;
+static int
+shm_eth_stats_get(struct rte_eth_dev *dev, struct rte_eth_stats *stats)
+ struct shm_eth_private *priv = dev->data->dev_private;
+ stats->ibytes = priv->stats.rx_bytes;
+ stats->ipackets = priv->stats.rx_pkts;
+ stats->obytes = priv->stats.tx_bytes;
+ stats->opackets = priv->stats.tx_pkts;
+ return 0;
+static const struct eth_dev_ops ops = {
+ .dev_configure = shm_eth_configure,
+ .dev_start = shm_eth_start,
+ .dev_stop = shm_eth_stop,
+ .dev_infos_get = shm_eth_infos_get,
+ .mtu_set = shm_eth_mtu_set,
+ .rx_queue_setup = shm_eth_rx_queue_setup,
+ .tx_queue_setup = shm_eth_tx_queue_setup,
+ .link_update = shm_eth_link_update,
+ .stats_get = shm_eth_stats_get,
+static uint16_t
+shm_eth_rx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_bufs)
+ void *deq_vals[nb_bufs];
+ struct shm_eth_private *priv = queue;
+ struct rte_ring *rxr = priv->rx;
+ uintptr_t offset = (uintptr_t)rte_shm_bus_get_mem_ptr(0);
+ int nb_rx = rte_ring_dequeue_burst(rxr, deq_vals, nb_bufs, NULL);
+ if (nb_rx == 0)
+ return 0;
+ uint64_t bytes = 0;
+ for (int i = 0; i < nb_rx; i++) {
+ bufs[i] = RTE_PTR_ADD(deq_vals[i], offset);
+ bufs[i]->pool = rx_mp;
+ bufs[i]->buf_addr = RTE_PTR_ADD(bufs[i]->buf_addr, offset);
+ bytes += bufs[i]->pkt_len;
+ }
+ priv->stats.rx_pkts += nb_rx;
+ priv->stats.rx_bytes += bytes;
+ return nb_rx;
+static uint16_t
+shm_eth_tx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_bufs)
+ void *enq_vals[nb_bufs];
+ struct shm_eth_private *priv = queue;
+ struct rte_ring *txr = priv->tx;
+ uintptr_t offset = (uintptr_t)rte_shm_bus_get_mem_ptr(0);
+ uint64_t bytes = 0;
+ for (int i = 0; i < nb_bufs; i++) {
+ bufs[i]->buf_addr = RTE_PTR_SUB(bufs[i]->buf_addr, offset);
+ bytes += bufs[i]->pkt_len;
+ rte_cldemote(bufs[i]);
+ enq_vals[i] = RTE_PTR_SUB(bufs[i], offset);
+ }
+ uint16_t nb_enq = rte_ring_enqueue_burst(txr, enq_vals, nb_bufs, NULL);
+ if (nb_enq != nb_bufs) {
+ /* restore original buffer settings */
+ for (int i = nb_enq; i < nb_bufs; i++) {
+ bufs[i]->buf_addr = RTE_PTR_ADD(bufs[i]->buf_addr, offset);
+ bytes -= bufs[i]->pkt_len;
+ }
+ }
+ priv->stats.tx_pkts += nb_enq;
+ priv->stats.tx_bytes += bytes;
+ return nb_enq;
+static int
+ethdev_init(struct rte_eth_dev *ethdev, void *init_params __rte_unused)
+ struct shm_eth_private *priv = ethdev->data->dev_private;
+ ethdev->dev_ops = &ops;
+ ethdev->data->mac_addrs = &priv->addr;
+ ethdev->rx_pkt_burst = shm_eth_rx_burst;
+ ethdev->tx_pkt_burst = shm_eth_tx_burst;
+ struct eth_shared_mem_msg msg = (struct eth_shared_mem_msg){
+ .type = MSG_TYPE_GET_MAC,
+ };
+ rte_shm_bus_send_message(&msg, sizeof(msg));
+ rte_shm_bus_recv_message(&msg, sizeof(msg));
+ if (msg.type != MSG_TYPE_REPORT_MAC) {
+ SHM_ETH_ERR("Didn't get mac address from host\n");
+ return -1;
+ }
+ rte_ether_addr_copy(&msg.ethaddr, &priv->addr);
+ return 0;
+static int
+shm_eth_probe(struct shared_mem_drv *drv, struct rte_device *dev)
+ SHM_ETH_INFO("Probing device %p on driver %s", dev, drv->driver.name);
+ int ret = rte_eth_dev_create(dev, "shared_mem_ethdev", sizeof(struct shm_eth_private),
+ ethdev_init, NULL);
+ if (ret != 0)
+ goto out;
+ SHM_ETH_DEBUG("Ethdev created ok\n");
+ return ret;
+struct shared_mem_drv shm_drv = {
+ .probe = shm_eth_probe,
+RTE_PMD_REGISTER_SHMEM_DRV(shm_eth, shm_drv);
^ permalink raw reply [flat|nested] 8+ messages in thread
* [RFC PATCH 4/5] app: add IO proxy app using shared memory interfaces
2023-09-22 8:19 [RFC PATCH 0/5] Using shared mempools for zero-copy IO proxying Bruce Richardson
` (2 preceding siblings ...)
2023-09-22 8:19 ` [RFC PATCH 3/5] net: new ethdev driver to communicate using shared mem Bruce Richardson
@ 2023-09-22 8:19 ` Bruce Richardson
2023-09-22 8:19 ` [RFC PATCH 5/5] app/io-proxy: add startup commands Bruce Richardson
2025-02-07 1:55 ` [RFC PATCH 0/5] Using shared mempools for zero-copy IO proxying Stephen Hemminger
5 siblings, 0 replies; 8+ messages in thread
From: Bruce Richardson @ 2023-09-22 8:19 UTC (permalink / raw)
To: dev; +Cc: Bruce Richardson
This app uses the shared memory poll, and shared ethdev infrastructure
to act as a zero-copy IO proxy to other applications. It has been tested
and verified to work successfully proxying data to testpmd instances on
the system, with those testpmd instances each being passed a unix socket
to work with via the shared memory bus "-a sock:/path/to/sock..."
Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
app/io-proxy/command_fns.c | 160 ++++++++++
app/io-proxy/commands.list | 6 +
app/io-proxy/datapath.c | 595 +++++++++++++++++++++++++++++++++++++
app/io-proxy/datapath.h | 37 +++
app/io-proxy/datapath_mp.c | 78 +++++
app/io-proxy/main.c | 71 +++++
app/io-proxy/meson.build | 12 +
app/meson.build | 1 +
8 files changed, 960 insertions(+)
create mode 100644 app/io-proxy/command_fns.c
create mode 100644 app/io-proxy/commands.list
create mode 100644 app/io-proxy/datapath.c
create mode 100644 app/io-proxy/datapath.h
create mode 100644 app/io-proxy/datapath_mp.c
create mode 100644 app/io-proxy/main.c
create mode 100644 app/io-proxy/meson.build
diff --git a/app/io-proxy/command_fns.c b/app/io-proxy/command_fns.c
new file mode 100644
index 0000000000..f48921e005
--- /dev/null
+++ b/app/io-proxy/command_fns.c
@@ -0,0 +1,160 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#include <stdlib.h>
+#include <unistd.h>
+#include <stdbool.h>
+#include <rte_ethdev.h>
+#include "datapath.h"
+#include "commands.h"
+extern volatile bool quit;
+extern volatile bool running_startup_script;
+cmd_add_socket_parsed(void *parsed_result, struct cmdline *cl __rte_unused,
+ void *data __rte_unused)
+ struct cmd_add_socket_result *res = parsed_result;
+ uint64_t maxmem = 0;
+ char *endchar;
+ maxmem = strtoull(res->memsize, &endchar, 0);
+ switch (*endchar) {
+ case 'G': case 'g':
+ maxmem *= 1024;
+ /* fall-through */
+ case 'M': case 'm':
+ maxmem *= 1024;
+ /* fall-through */
+ case 'K': case 'k':
+ maxmem *= 1024;
+ break;
+ }
+ if (res->port >= MAX_PORTS_SUPPORTED) {
+ fprintf(stderr, "Port id out of range. Must be <%u\n", MAX_PORTS_SUPPORTED);
+ goto err;
+ }
+ if (res->queue >= MAX_QUEUES_SUPPORTED) {
+ fprintf(stderr, "Queue id out of range. Must be <%u\n", MAX_QUEUES_SUPPORTED);
+ goto err;
+ }
+ if (listen_unix_socket(res->path, maxmem, res->port, res->queue) != 0) {
+ fprintf(stderr, "error initializing socket: %s\n", res->path);
+ goto err;
+ }
+ printf("Created socket = %s with memsize = %s using port = %u, queue = %u\n",
+ res->path, res->memsize, res->port, res->queue);
+ return;
+ if (running_startup_script) {
+ quit = true;
+ /* wait for main thread to quit. Just spin here for condition which
+ * will never actually come true, as main thread should just exit
+ */
+ while (quit)
+ usleep(100);
+ }
+ /* if running interactively, do nothing on error except report it above */
+cmd_list_sockets_parsed(__rte_unused void *parsed_result,
+ __rte_unused struct cmdline *cl,
+ __rte_unused void *data)
+ const char *path;
+ int sock;
+ uint64_t maxmem;
+ uint16_t port, queue;
+ bool connected;
+ for (int i = get_next_socket(0, &path, &sock, &maxmem, &port, &queue, &connected);
+ i = get_next_socket(i + 1, &path, &sock, &maxmem, &port,
+ &queue, &connected)) {
+ char memstr[32];
+ if (maxmem % (1UL << 30) == 0)
+ snprintf(memstr, sizeof(memstr), "%" PRIu64 "G", maxmem >> 30);
+ else if (maxmem % (1UL << 20) == 0)
+ snprintf(memstr, sizeof(memstr), "%" PRIu64 "M", maxmem >> 20);
+ else if (maxmem % (1UL << 10) == 0)
+ snprintf(memstr, sizeof(memstr), "%" PRIu64 "K", maxmem >> 10);
+ else
+ snprintf(memstr, sizeof(memstr), "%" PRIu64, maxmem);
+ printf("Socket %s [%s]: mem=%s, port=%u, queue=%u\n",
+ path, connected ? "connected" : "idle", memstr, port, queue);
+ }
+cmd_list_ports_parsed(__rte_unused void *parsed_result,
+ __rte_unused struct cmdline *cl,
+ __rte_unused void *data)
+ for (int i = 0; i < rte_eth_dev_count_avail(); i++) {
+ struct rte_ether_addr addr;
+ int retval = rte_eth_macaddr_get(i, &addr);
+ if (retval != 0) {
+ printf("Port %d - MAC UNKNOWN\n", i);
+ continue;
+ }
+ printf("Port %d - "RTE_ETHER_ADDR_PRT_FMT"\n", i, RTE_ETHER_ADDR_BYTES(&addr));
+ }
+cmd_show_port_stats_parsed(__rte_unused void *parsed_result,
+ __rte_unused struct cmdline *cl,
+ __rte_unused void *data)
+ for (int i = 0; i < rte_eth_dev_count_avail(); i++) {
+ struct rte_eth_stats stats = {0};
+ int retval = rte_eth_stats_get(i, &stats);
+ if (retval != 0) {
+ printf("Port %d - Cannot get stats\n", i);
+ continue;
+ }
+ printf("Port %d - ipkts: %"PRIu64", imissed: %"PRIu64
+ ", ierrors: %"PRIu64", opkts: %"PRIu64"\n",
+ i, stats.ipackets, stats.imissed, stats.ierrors, stats.opackets);
+ }
+cmd_show_socket_stats_parsed(__rte_unused void *parsed_result,
+ __rte_unused struct cmdline *cl,
+ __rte_unused void *data)
+ const char *path;
+ int sock;
+ uint64_t maxmem;
+ uint16_t port, queue;
+ bool connected;
+ for (int i = get_next_socket(0, &path, &sock, &maxmem, &port, &queue, &connected);
+ i = get_next_socket(i + 1, &path, &sock, &maxmem, &port,
+ &queue, &connected)) {
+ if (connected || dp_stats[i].rx != 0 || dp_stats[i].deq != 0)
+ printf("Socket %u [port %u, q %u]: RX %" PRIu64 ", Enq_drops %" PRIu64
+ ", Deq %" PRIu64 ", TX_drops %" PRIu64 "\n",
+ dp_stats[i].rx, dp_stats[i].enq_drop,
+ dp_stats[i].deq, dp_stats[i].tx_drop);
+ }
+cmd_quit_parsed(__rte_unused void *parsed_result, struct cmdline *cl,
+ __rte_unused void *data)
+ cmdline_quit(cl);
diff --git a/app/io-proxy/commands.list b/app/io-proxy/commands.list
new file mode 100644
index 0000000000..9dab9bba28
--- /dev/null
+++ b/app/io-proxy/commands.list
@@ -0,0 +1,6 @@
+add socket <STRING>path <STRING>memsize <UINT16>port <UINT16>queue
+list sockets
+list ports
+show port stats
+show socket stats
diff --git a/app/io-proxy/datapath.c b/app/io-proxy/datapath.c
new file mode 100644
index 0000000000..1f7162de18
--- /dev/null
+++ b/app/io-proxy/datapath.c
@@ -0,0 +1,595 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#include <errno.h>
+#include <stdarg.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <libgen.h>
+#include <sys/un.h>
+#include <sys/stat.h>
+#include <sys/mman.h>
+#include <sys/socket.h>
+#include <linux/memfd.h>
+#include <rte_eal.h>
+#include <rte_dev.h>
+#include <rte_malloc.h>
+#include <rte_ethdev.h>
+#include <rte_common.h>
+#include <rte_config.h>
+#include <rte_mempool.h>
+#include <shared_mem_bus.h>
+#include "datapath.h"
+static int mempool_ops_index = -1;
+static struct rte_mempool *default_mempool;
+static volatile unsigned long long port_poll_mask;
+static volatile unsigned long long used_poll_mask;
+struct listen_socket_params {
+ const char *path;
+ int sock;
+ uint16_t port_id;
+ uint16_t qid;
+ uint64_t maxmem;
+#define S_IDX(p, q) (((p) * MAX_QUEUES_SUPPORTED) + (q))
+static struct rte_ring *rx_rings[MAX_SOCKETS];
+static struct rte_ring *tx_rings[MAX_SOCKETS];
+static uintptr_t base_addrs[MAX_SOCKETS];
+static uint64_t lengths[MAX_SOCKETS];
+static struct rte_mempool *mps[MAX_SOCKETS];
+static struct listen_socket_params sock_params[MAX_SOCKETS];
+struct rxtx_stats dp_stats[MAX_SOCKETS] = {0};
+get_next_socket(int start, const char **path, int *sock, uint64_t *maxmem,
+ uint16_t *port, uint16_t *queue, bool *connected)
+ int i;
+ for (i = start; i < MAX_SOCKETS; i++) {
+ if (sock_params[i].sock > 0) {
+ *path = sock_params[i].path;
+ *sock = sock_params[i].sock;
+ *maxmem = sock_params[i].maxmem;
+ *port = sock_params[i].port_id;
+ *queue = sock_params[i].qid;
+ *connected = (port_poll_mask & (1U << i)) != 0;
+ break;
+ }
+ }
+ return i;
+static int
+init_port(uint16_t port_id, struct rte_mempool *mbuf_pool)
+ struct rte_eth_conf port_conf = {
+ .rxmode = { .mq_mode = RTE_ETH_MQ_RX_RSS, },
+ .rx_adv_conf = {
+ .rss_conf = { .rss_hf = RTE_ETH_RSS_IP | RTE_ETH_RSS_UDP, },
+ },
+ };
+ struct rte_eth_dev_info dev_info;
+ int socket = rte_socket_id();
+ int retval = rte_eth_dev_info_get(port_id, &dev_info);
+ if (retval != 0) {
+ printf("Error during getting device (port %u) info: %s\n",
+ port_id, strerror(-retval));
+ return retval;
+ }
+ if (dev_info.tx_offload_capa & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE)
+ port_conf.txmode.offloads |= RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE;
+ port_conf.rx_adv_conf.rss_conf.rss_hf &= dev_info.flow_type_rss_offloads;
+ if (rte_eth_dev_configure(port_id, MAX_QUEUES_SUPPORTED, MAX_QUEUES_SUPPORTED,
+ &port_conf) < 0) {
+ printf("Error configuring port\n");
+ return -1;
+ }
+ for (uint16_t q = 0; q < MAX_QUEUES_SUPPORTED; q++) {
+ retval = rte_eth_rx_queue_setup(port_id, q, 128, socket, NULL, mbuf_pool);
+ if (retval < 0) {
+ printf("Error running rx_queue_setup\n");
+ return retval;
+ }
+ retval = rte_eth_tx_queue_setup(port_id, q, 256, socket, NULL);
+ if (retval < 0) {
+ printf("Error running tx_queue_setup\n");
+ return retval;
+ }
+ }
+ retval = rte_eth_dev_start(port_id);
+ if (retval < 0) {
+ printf("Error running dev_start\n");
+ return retval;
+ }
+ printf("Port %u started ok\n", port_id);
+ if (rte_eth_promiscuous_enable(port_id) < 0)
+ printf("Warning: could not enable promisc mode on port %u\n", port_id);
+ return 0;
+datapath_init(const char *corelist)
+ /* eal init requires non-const parameters, so copy */
+ char *cl = strdup(corelist); /* todo, free copy */
+ char l_flag[] = "-l";
+ char in_mem[] = "--in-memory";
+ char use_avx512[] = "--force-max-simd-bitwidth=512";
+ char *argv[] = {
+ program_invocation_short_name,
+ l_flag, cl,
+ in_mem,
+ use_avx512,
+ };
+ RTE_BUILD_BUG_ON(sizeof(port_poll_mask) * CHAR_BIT < MAX_SOCKETS);
+ int ret = rte_eal_init(RTE_DIM(argv) - 1, argv);
+ if (ret < 0)
+ return ret;
+ mempool_ops_index = check_mempool_ops();
+ if (mempool_ops_index == -1)
+ rte_panic("Cannot get mempool ops");
+ printf("Mempool ops index is %d\n", mempool_ops_index);
+ default_mempool = rte_pktmbuf_pool_create("proxy_def",
+ MAX_SOCKETS * 200, 32, 0,
+ RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
+ if (default_mempool == NULL)
+ rte_panic("Cannot create default mempool\n");
+ int nb_ethdevs = rte_eth_dev_count_avail();
+ if (nb_ethdevs > MAX_PORTS_SUPPORTED) {
+ fprintf(stderr, "More ports available than supported, some will be unused\n");
+ nb_ethdevs = MAX_PORTS_SUPPORTED;
+ }
+ for (int i = 0; i < nb_ethdevs; i++) {
+ if (init_port(i, default_mempool) != 0)
+ rte_panic("Cannot init port %d\n", i);
+ }
+ return 0;
+static int
+send_fd(int to, int fd, uint64_t fd_size, rte_iova_t iova, uint64_t pg_size)
+ struct iovec iov = {0};
+ struct msghdr msg = {0};
+ size_t cmsglen = CMSG_LEN(sizeof(fd));
+ struct cmsghdr *cmhdr = malloc(cmsglen);
+ int ret = 0;
+ struct {
+ uint64_t fd_size;
+ rte_iova_t iova;
+ uint64_t pg_size;
+ } data_message = {fd_size, iova, pg_size};
+ if (cmhdr == NULL)
+ return -1;
+ iov.iov_base = (void *)&data_message;
+ iov.iov_len = sizeof(data_message);
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+ cmhdr->cmsg_level = SOL_SOCKET;
+ cmhdr->cmsg_type = SCM_RIGHTS;
+ cmhdr->cmsg_len = cmsglen;
+ msg.msg_control = cmhdr;
+ msg.msg_controllen = cmsglen;
+ *(int *)CMSG_DATA(cmhdr) = fd;
+ if (sendmsg(to, &msg, 0) != (int)iov.iov_len) {
+ printf("Error sending message to client, %s\n", strerror(errno));
+ ret = -1;
+ }
+ free(cmhdr);
+ return ret;
+static int
+reconfigure_queue(uint16_t port_id, uint16_t qid, struct rte_mempool *p)
+ if (rte_eth_dev_rx_queue_stop(port_id, qid) != 0) {
+ printf("Error with rx_queue_stop\n");
+ return -1;
+ }
+ if (rte_eth_dev_tx_queue_stop(port_id, qid) != 0) {
+ printf("Error with tx_queue_stop\n");
+ return -1;
+ }
+ if (rte_eth_rx_queue_setup(port_id, qid, 1024,
+ rte_socket_id(), NULL, p) != 0) {
+ printf("Error with rx_queue_setup\n");
+ return -1;
+ }
+ if (rte_eth_dev_tx_queue_start(port_id, qid) != 0) {
+ printf("Error with tx_queue_start\n");
+ return -1;
+ }
+ if (rte_eth_dev_rx_queue_start(port_id, qid) != 0) {
+ printf("Error with rx_queue_start\n");
+ return -1;
+ }
+ return 0;
+static void
+handle_connection(int client, void *const client_mem, uint64_t memsize,
+ uint16_t port_id, uint16_t qid)
+ uintptr_t client_mmap_addr = 0;
+ struct rte_ring *rx_ring, *tx_ring;
+ struct rte_mempool *local_mp;
+ size_t mempool_memsize = sizeof(*local_mp)
+ + sizeof(local_mp->local_cache[0]) * RTE_MAX_LCORE
+ + sizeof(struct rte_pktmbuf_pool_private);
+ local_mp = rte_malloc(NULL, mempool_memsize, 0);
+ if (local_mp == NULL) {
+ printf("Error allocating mempool struct\n");
+ return;
+ }
+ memset(local_mp, 0, mempool_memsize);
+ *local_mp = (struct rte_mempool){
+ .name = "proxy_mp",
+ .cache_size = 256,
+ .ops_index = mempool_ops_index,
+ .pool_config = client_mem,
+ .private_data_size = sizeof(struct rte_pktmbuf_pool_private),
+ .local_cache = RTE_PTR_ADD(local_mp, sizeof(*local_mp)),
+ };
+ for (uint i = 0; i < RTE_MAX_LCORE; i++) {
+ local_mp->local_cache[i].size = 256;
+ local_mp->local_cache[i].flushthresh = 300;
+ }
+ struct eth_shared_mem_msg *msg = malloc(sizeof(*msg) + 1024);
+ if (msg == NULL) {
+ printf("Error mallocing message buffer\n");
+ goto out;
+ }
+ int bytes_read = read(client, msg, sizeof(msg) + 1024);
+ while (bytes_read != 0) {
+ switch (msg->type) {
+ client_mmap_addr = msg->offset;
+ printf("Got mmap base addr of %p\n", (void *)client_mmap_addr);
+ break;
+ struct rte_mempool *remote_pool;
+ uintptr_t remote_pd_offset;
+ remote_pool = RTE_PTR_ADD(client_mem, msg->offset);
+ remote_pd_offset = (uintptr_t)remote_pool->pool_data - client_mmap_addr;
+ local_mp->pool_data = RTE_PTR_ADD(client_mem, remote_pd_offset);
+ memcpy(rte_mempool_get_priv(local_mp), rte_mempool_get_priv(remote_pool),
+ sizeof(struct rte_pktmbuf_pool_private));
+ printf("Got mempool offset of %p, stack name is %s\n",
+ (void *)msg->offset, (char *)local_mp->pool_data);
+ struct rte_mbuf *mb = rte_pktmbuf_alloc(local_mp);
+ if (mb == NULL) {
+ printf("Error allocating buffer\n");
+ return;
+ }
+ if ((uintptr_t)mb->buf_addr != (uintptr_t)mb + 128)
+ rte_panic("Error, bad buffer\n");
+ rte_pktmbuf_free(mb);
+ break;
+ }
+ printf("Got Rx ring offset of %p\n", (void *)msg->offset);
+ rx_ring = RTE_PTR_ADD(client_mem, msg->offset);
+ rx_rings[S_IDX(port_id, qid)] = rx_ring;
+ break;
+ printf("Got Tx ring offset of %p\n", (void *)msg->offset);
+ tx_ring = RTE_PTR_ADD(client_mem, msg->offset);
+ tx_rings[S_IDX(port_id, qid)] = tx_ring;
+ break;
+ base_addrs[S_IDX(port_id, qid)] = (uintptr_t)client_mem;
+ lengths[S_IDX(port_id, qid)] = memsize;
+ mps[S_IDX(port_id, qid)] = local_mp;
+ if (reconfigure_queue(port_id, qid, local_mp) < 0)
+ goto out;
+ port_poll_mask |= (1UL << S_IDX(port_id, qid));
+ while (used_poll_mask != port_poll_mask)
+ usleep(10);
+ *msg = (struct eth_shared_mem_msg){ .type = MSG_TYPE_ACK, };
+ if (write(client, msg, sizeof(*msg)) < (int)sizeof(*msg))
+ goto out;
+ dp_stats[S_IDX(port_id, qid)] = (struct rxtx_stats){0};
+ break;
+ *msg = (struct eth_shared_mem_msg){
+ };
+ rte_eth_macaddr_get(port_id, &msg->ethaddr);
+ if (write(client, msg, sizeof(*msg)) < (int)sizeof(*msg))
+ goto out;
+ break;
+ default:
+ printf("Unknown message\n");
+ }
+ bytes_read = read(client, msg, sizeof(msg) + 1024);
+ }
+ port_poll_mask &= ~(1UL << S_IDX(port_id, qid));
+ while (used_poll_mask != port_poll_mask)
+ usleep(10);
+ reconfigure_queue(port_id, qid, default_mempool);
+ free(msg);
+ rte_free(local_mp);
+ printf("Client disconnect\n");
+static int
+accept_client(const int sock, uint64_t maxmem, uint16_t port_id, uint16_t qid)
+ int ret = 0;
+ rte_iova_t *iovas = NULL;
+ const int client = accept(sock, NULL, NULL);
+ if (client < 0) {
+ printf("Error with accept\n");
+ return errno;
+ }
+ printf("Client connected\n");
+ char filename[32];
+ int flags = MFD_HUGETLB;
+ uint32_t pgsize = (1 << 21);
+ if (maxmem % (1 << 30) == 0) {
+ flags |= MFD_HUGE_1GB;
+ pgsize = (1 << 30);
+ }
+ snprintf(filename, sizeof(filename), "client_memory_%d", client);
+ const int memfd = memfd_create(filename, flags);
+ if (memfd < 0) {
+ printf("Error with memfd_create\n");
+ return errno;
+ }
+ if (ftruncate(memfd, maxmem) < 0) {
+ printf("Error with ftruncate\n");
+ close(memfd);
+ return errno;
+ }
+ void * const client_mem = mmap(NULL, maxmem, PROT_READ | PROT_WRITE,
+ MAP_SHARED, memfd, 0);
+ if (client_mem == MAP_FAILED) {
+ printf("Error with mmap\n");
+ ret = errno;
+ goto out;
+ }
+ const int nb_pages = maxmem / pgsize;
+ printf("Registering %d pages of memory with DPDK\n", nb_pages);
+ iovas = malloc(sizeof(*iovas) * nb_pages);
+ if (iovas == NULL) {
+ printf("Error with malloc for iovas\n");
+ ret = ENOMEM;
+ goto out;
+ }
+ /* assume vfio, VA = IOVA */
+ iovas[0] = (uintptr_t)client_mem;
+ for (int i = 1; i < nb_pages; i++)
+ iovas[i] = iovas[i - 1] + pgsize;
+ if (rte_extmem_register(client_mem, maxmem, iovas, nb_pages, pgsize) < 0) {
+ printf("Error registering memory with DPDK, %s\n", strerror(rte_errno));
+ ret = rte_errno;
+ goto out;
+ }
+ printf("Registered memory: %" PRIu64 " @ %p in heap %s\n", maxmem, client_mem, filename);
+ struct rte_eth_dev_info info;
+ if (rte_eth_dev_info_get(port_id, &info) < 0) {
+ printf("Error getting ethdev info\n");
+ ret = -1;
+ goto out;
+ }
+ if (rte_dev_dma_map(info.device, client_mem, iovas[0], maxmem) < 0) {
+ printf("Error mapping dma for device, %s\n", strerror(rte_errno));
+ ret = rte_errno;
+ goto out;
+ }
+ if (send_fd(client, memfd, maxmem, iovas[0], pgsize) != 0) {
+ printf("Error sending fd to client\n");
+ ret = errno;
+ goto out;
+ }
+ printf("Sent FD to client for mapping\n");
+ handle_connection(client, client_mem, maxmem, port_id, qid);
+ if (iovas != NULL)
+ rte_dev_dma_unmap(info.device, client_mem, iovas[0], maxmem);
+ printf("Unregistering memory: %" PRIu64 " @ %p in heap %s\n", maxmem, client_mem, filename);
+ if (rte_extmem_unregister(client_mem, maxmem) < 0)
+ printf("Error unregistering memory, %s\n", strerror(rte_errno));
+ close(memfd);
+ close(client);
+ if (client_mem != NULL)
+ munmap(client_mem, maxmem);
+ return ret;
+static void *
+listen_fn(void *param)
+ struct listen_socket_params *p = param;
+ int ret = 0;
+ rte_thread_register();
+ while (1) {
+ const int ret = accept_client(p->sock, p->maxmem, p->port_id, p->qid);
+ if (ret != 0)
+ goto out;
+ }
+ free(p);
+ return (void *)(uintptr_t)ret;
+listen_unix_socket(const char *path, const uint64_t maxmem, uint16_t port_id, uint16_t qid)
+ if (sock_params[S_IDX(port_id, qid)].sock != 0) {
+ printf("Error, port already in use\n");
+ return EEXIST;
+ }
+ if (port_id >= rte_eth_dev_count_avail()) {
+ printf("Error, port %u does not exist\n", port_id);
+ return EINVAL;
+ }
+ printf("Opening and listening on socket: %s\n", path);
+ char *pathcp = strdup(path);
+ if (pathcp == NULL) {
+ printf("Error with strdup()\n");
+ free(pathcp);
+ return ENOMEM;
+ }
+ char *dirpath = dirname(pathcp);
+ mkdir(dirpath, 0700);
+ free(pathcp);
+ int sock = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+ if (sock < 0) {
+ printf("Error creating socket\n");
+ return errno;
+ }
+ struct sockaddr_un sun = {.sun_family = AF_UNIX};
+ strlcpy(sun.sun_path, path, sizeof(sun.sun_path));
+ printf("Attempting socket bind to path '%s'\n", path);
+ printf("Associated parameters are: maxmem = %"PRIu64", port = %u, qid = %u\n",
+ maxmem, port_id, qid);
+ if (bind(sock, (void *) &sun, sizeof(sun)) < 0) {
+ printf("Initial bind to socket '%s' failed.\n", path);
+ /* check if current socket is active */
+ if (connect(sock, (void *)&sun, sizeof(sun)) == 0) {
+ close(sock);
+ return EADDRINUSE;
+ }
+ /* socket is not active, delete and attempt rebind */
+ printf("Attempting unlink and retrying bind\n");
+ unlink(sun.sun_path);
+ if (bind(sock, (void *) &sun, sizeof(sun)) < 0) {
+ printf("Error binding socket: %s\n", strerror(errno));
+ close(sock);
+ return errno; /* if unlink failed, this will be -EADDRINUSE as above */
+ }
+ }
+ if (listen(sock, 1) < 0) {
+ printf("Error calling listen for socket: %s\n", strerror(errno));
+ unlink(sun.sun_path);
+ close(sock);
+ return errno;
+ }
+ printf("Socket %s listening ok\n", path);
+ struct listen_socket_params *p = &sock_params[S_IDX(port_id, qid)];
+ pthread_t listen_thread;
+ *p = (struct listen_socket_params){strdup(path), sock, port_id, qid, maxmem};
+ pthread_create(&listen_thread, NULL, listen_fn, p);
+ pthread_detach(listen_thread);
+ return 0;
+ const typeof(port_poll_mask) to_poll = port_poll_mask;
+ if (used_poll_mask != to_poll) {
+ printf("Poll mask is now %#llx\n", to_poll);
+ used_poll_mask = to_poll;
+ }
+ if (to_poll == 0) {
+ usleep(100);
+ return;
+ }
+ for (uint16_t i = 0; i < sizeof(to_poll) * CHAR_BIT; i++) {
+ struct rte_mbuf *mbs[32];
+ void *offsets[32];
+ if (((1UL << i) & to_poll) == 0)
+ continue;
+ uint16_t port_id = i / MAX_QUEUES_SUPPORTED;
+ uint16_t qid = i % MAX_QUEUES_SUPPORTED;
+ uint16_t nb_rx = rte_eth_rx_burst(port_id, qid, mbs, RTE_DIM(mbs));
+ if (nb_rx != 0) {
+ dp_stats[i].rx += nb_rx;
+ for (uint pkt = 0; pkt < nb_rx; pkt++) {
+ mbs[pkt]->buf_addr = RTE_PTR_SUB(mbs[pkt]->buf_addr, base_addrs[i]);
+ offsets[pkt] = RTE_PTR_SUB(mbs[pkt], base_addrs[i]);
+ }
+ uint16_t nb_enq = rte_ring_enqueue_burst(rx_rings[i], offsets, nb_rx, NULL);
+ if (nb_enq != nb_rx) {
+ dp_stats[i].enq_drop += nb_rx - nb_enq;
+ for (uint pkt = nb_enq; pkt < nb_rx; pkt++) {
+ mbs[pkt]->buf_addr = RTE_PTR_ADD(mbs[pkt]->buf_addr,
+ base_addrs[i]);
+ mbs[pkt]->pool = mps[i];
+ }
+ rte_mempool_put_bulk(mps[i], (void *)&mbs[nb_enq], nb_rx - nb_enq);
+ }
+ }
+ uint16_t nb_deq = rte_ring_dequeue_burst(tx_rings[i], offsets,
+ RTE_DIM(offsets), NULL);
+ if (nb_deq != 0) {
+ dp_stats[i].deq += nb_deq;
+ for (uint pkt = 0; pkt < nb_deq; pkt++) {
+ mbs[pkt] = RTE_PTR_ADD(offsets[pkt], base_addrs[i]);
+ rte_prefetch0_write(mbs[pkt]);
+ }
+ for (uint pkt = 0; pkt < nb_deq; pkt++) {
+ mbs[pkt]->pool = mps[i];
+ mbs[pkt]->buf_addr = RTE_PTR_ADD(mbs[pkt]->buf_addr, base_addrs[i]);
+ }
+ uint16_t nb_tx = rte_eth_tx_burst(port_id, qid, mbs, nb_deq);
+ if (nb_tx != nb_deq) {
+ dp_stats[i].tx_drop += (nb_deq - nb_tx);
+ rte_pktmbuf_free_bulk(&mbs[nb_tx], nb_deq - nb_tx);
+ }
+ }
+ }
+unsigned int
+ return rte_lcore_id();
diff --git a/app/io-proxy/datapath.h b/app/io-proxy/datapath.h
new file mode 100644
index 0000000000..ec5b395164
--- /dev/null
+++ b/app/io-proxy/datapath.h
@@ -0,0 +1,37 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#include <stdint.h>
+#define MEMPOOL_OPS_NAME "proxy_mp"
+struct rxtx_stats {
+ uint64_t rx;
+ uint64_t enq_drop;
+ uint64_t deq;
+ uint64_t tx_drop;
+extern struct rxtx_stats dp_stats[MAX_SOCKETS];
+int check_mempool_ops(void);
+int datapath_init(const char *corelist);
+int listen_unix_socket(const char *path, uint64_t maxmem, uint16_t port, uint16_t qid);
+void handle_forwarding(void);
+unsigned int lcore_id(void);
+int get_next_socket(int start, const char **path, int *sock, uint64_t *maxmem,
+ uint16_t *port, uint16_t *queue, bool *connected);
diff --git a/app/io-proxy/datapath_mp.c b/app/io-proxy/datapath_mp.c
new file mode 100644
index 0000000000..bba21a5b14
--- /dev/null
+++ b/app/io-proxy/datapath_mp.c
@@ -0,0 +1,78 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#include <sys/types.h>
+#include <rte_stack.h>
+#include <rte_mempool.h>
+#include <rte_mbuf.h>
+#include "datapath.h"
+/* Mempool value "pool_config" contains pointer to base address for this mapping */
+/* no alloc/free etc. functions for this pool, as we never create/destroy it, only use
+ * enqueue and dequeue from it.
+ */
+static int
+proxy_mp_enqueue(struct rte_mempool *mp, void * const *obj_table,
+ unsigned int n)
+ struct rte_stack *s = mp->pool_data;
+ void *offset_table[n];
+ uintptr_t mempool_base = (uintptr_t)mp->pool_config;
+ for (uint i = 0; i < n; i++)
+ offset_table[i] = RTE_PTR_SUB(obj_table[i], mempool_base);
+ return rte_stack_push(s, offset_table, n) == 0 ? -ENOBUFS : 0;
+static int
+proxy_mp_dequeue(struct rte_mempool *mp, void **obj_table,
+ unsigned int n)
+ struct rte_stack *s = mp->pool_data;
+ uintptr_t mempool_base = (uintptr_t)mp->pool_config;
+ if (rte_stack_pop(s, obj_table, n) == 0)
+ return -ENOBUFS;
+ for (uint i = 0; i < n; i++) {
+ obj_table[i] = RTE_PTR_ADD(obj_table[i], mempool_base);
+ struct rte_mbuf *mb = obj_table[i];
+ mb->buf_addr = RTE_PTR_ADD(mb, sizeof(struct rte_mbuf) + rte_pktmbuf_priv_size(mp));
+ mb->pool = mp;
+ }
+ return 0;
+static int
+proxy_mp_alloc(struct rte_mempool *mp __rte_unused)
+ rte_panic("Should not be called\n");
+static unsigned int
+proxy_mp_get_count(const struct rte_mempool *mp __rte_unused)
+ rte_panic("Should not be called\n");
+static struct rte_mempool_ops ops_proxy_mp = {
+ .alloc = proxy_mp_alloc,
+ .enqueue = proxy_mp_enqueue,
+ .dequeue = proxy_mp_dequeue,
+ .get_count = proxy_mp_get_count,
+ for (uint i = 0; i < rte_mempool_ops_table.num_ops; i++) {
+ if (strcmp(rte_mempool_ops_table.ops[i].name, MEMPOOL_OPS_NAME) == 0)
+ return i;
+ }
+ return -1;
diff --git a/app/io-proxy/main.c b/app/io-proxy/main.c
new file mode 100644
index 0000000000..82eef81fb0
--- /dev/null
+++ b/app/io-proxy/main.c
@@ -0,0 +1,71 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#include <stdio.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <rte_eal.h>
+#include <rte_common.h>
+#include <cmdline.h>
+#include <cmdline_socket.h>
+#include "datapath.h"
+#include "commands.h"
+volatile bool quit;
+volatile bool running_startup_script;
+static const char *startup_file = "dpdk-io-proxy.cmds";
+static void *
+run_cmdline(void *arg __rte_unused)
+ struct cmdline *cl;
+ int fd = open(startup_file, O_RDONLY);
+ if (fd >= 0) {
+ running_startup_script = true;
+ cl = cmdline_new(ctx, "\n# ", fd, STDOUT_FILENO);
+ if (cl == NULL) {
+ fprintf(stderr, "Error processing %s\n", startup_file);
+ goto end_startup;
+ }
+ cmdline_interact(cl);
+ cmdline_quit(cl);
+ running_startup_script = false;
+ close(fd);
+ }
+ cl = cmdline_stdin_new(ctx, "\nProxy>> ");
+ if (cl == NULL)
+ goto out;
+ cmdline_interact(cl);
+ cmdline_stdin_exit(cl);
+ quit = true;
+ return NULL;
+main(int argc, char *argv[])
+ pthread_t cmdline_th;
+ if (argc != 2 || datapath_init(argv[1]) < 0) {
+ fprintf(stderr, "Usage %s <corelist>\n", program_invocation_short_name);
+ rte_exit(EXIT_FAILURE, "Cannot init\n");
+ }
+ if (pthread_create(&cmdline_th, NULL, run_cmdline, NULL) < 0)
+ rte_exit(EXIT_FAILURE, "Cannot spawn cmdline thread\n");
+ pthread_detach(cmdline_th);
+ while (!quit)
+ handle_forwarding();
+ return 0;
diff --git a/app/io-proxy/meson.build b/app/io-proxy/meson.build
new file mode 100644
index 0000000000..f03783b68f
--- /dev/null
+++ b/app/io-proxy/meson.build
@@ -0,0 +1,12 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2023 Intel Corporation
+cmd_h = custom_target('commands_hdr',
+ output: 'commands.h',
+ input: files('commands.list'),
+ capture: true,
+ command: [cmdline_gen_cmd, '@INPUT@']
+sources += files('datapath.c', 'datapath_mp.c', 'main.c', 'command_fns.c')
+sources += cmd_h
+deps += ['cmdline', 'ethdev', 'stack', 'bus_shared_mem']
diff --git a/app/meson.build b/app/meson.build
index e4bf5c531c..27f69d883e 100644
--- a/app/meson.build
+++ b/app/meson.build
@@ -18,6 +18,7 @@ apps = [
+ 'io-proxy',
^ permalink raw reply [flat|nested] 8+ messages in thread