DPDK patches and discussions
 help / color / mirror / Atom feed
* [RFC PATCH 0/5] Using shared mempools for zero-copy IO proxying
@ 2023-09-22  8:19 Bruce Richardson
  2023-09-22  8:19 ` [RFC PATCH 1/5] bus: new driver to accept shared memory over unix socket Bruce Richardson
                   ` (4 more replies)
  0 siblings, 5 replies; 7+ messages in thread
From: Bruce Richardson @ 2023-09-22  8:19 UTC (permalink / raw)
  To: dev; +Cc: Bruce Richardson

Following my talk at the recent DPDK Summit [1], here is an RFC patchset
containing the prototypes I created which led to the talk.  This
patchset is simply to demonstrate:

* what is currently possible with DPDK in terms of zero-copy IPC
* where the big gaps, and general problem areas are
* what the performance is like doing zero-copy between processes
* how we may look to have new deployment models for DPDK apps.

This cover letter is quite long, as it covers how to run the demo app
and use the drivers included in this set. I felt it more accessible this
way than putting it in rst files in the patches. This patchset depends
upon patchsets [2] and [3]

[1] https://dpdksummit2023.sched.com/event/1P9wU
[2] http://patches.dpdk.org/project/dpdk/list/?series=29536
[3] http://patches.dpdk.org/project/dpdk/list/?series=29538

Overview
--------

The patchset contains at a high level the following parts: a proxy
application which performs packet IO and steers traffic on a per-queue
basis to other applications which connect to it via unix sockets, and a
set of drivers to be used by those applications so that they can
(hopefully) receive packets from the proxy app without any changes to
their own code. This all helps to demonstrate the feasibility of zero-
copy packet transfer between independent DPDK apps.

The drivers are:
* a bus driver, which makes the connection to the proxy app via
  the unix socket. Thereafter it accepts the shared memory from the
  proxy and maps it into the running process for use for buffers and
  rings etc. It also handled communication with the proxy app on behalf
  of the other two drivers
* a mempool driver, which simply manages a set of buffers on the basis
  of offsets within the shared memory area rather than using pointers.
  The big downside of its use is that it assumes all the objects stored
  in the mempool are mbufs. (As described in my talk, this is a big
  issue where I'm not sure we have a good solution available right now
  to resolve it)
* an ethernet driver, which creates an rx and tx ring in shared memory
  for use in communicating with the proxy app. All buffers sent/received
  are converted to offsets within the shared memory area.

The proxy app itself implements all the other logic - mostly inside
datapath.c - to allow the connecting app to run. When an app connects to
the unix socket, the proxy app uses memfd to create a hugepage block to
be passed through to the "guest" app, and then sends/receives the
messages from the drivers until the app connection is up and running to
handle traffic. [Ideally, this IPC over unix socket mechanism should
probably be generalized into a library used by the app, but for now it's
just built-in]. As stated above, the steering of traffic is done
per-queue, that is, each app connects to a specific socket corresponding
to a NIC queue. For demo purposes, the traffic to the queues is just
distributed using RSS, but obviously it would be possible to use e.g.
rte_flow to do more interesting distribution in future.

Running the Apps
----------------

To get things all working just do a DPDK build as normal. Then run the
io-proxy app. It only takes a single parameter of the core number to
use. For example, on my system I run it on lcore 25:

	./build/app/dpdk-io-proxy 25

The sockets to be created and how they map to ports/queues is controlled
via commandline, but a startup script can be provided, which just needs
to be in the current directory and name "dpdk-io-proxy.cmds". Patch 5 of
this set contains an example setup that I use. Therefore it's
recommended that you run the proxy app from a directory containing that
file. If so, the proxy app will use two ports and create two queues on
each, mapping them to 4 unix socket files in /tmp. (Each socket is
created in its own directory to simplify use with docker containers as
described below in next section).

No traffic is handled by the app until other end-user apps connect to
it. Testpmd works as that second "guest" app without any changes to it.
To run multiple testpmd instances, each taking traffic from a unique RX
queue and forwarding it back, the following sequence of commands can be
used [in this case, doing forwarding on cores 26 through 29, and using
the 4 unix sockets configured using the startup file referenced above].

	./build/app/dpdk-testpmd -l 24,26 --no-huge -m1 --no-shconf \
		-a sock:/tmp/socket_0_0/sock  -- --forward-mode=macswap
	./build/app/dpdk-testpmd -l 24,27 --no-huge -m1 --no-shconf \
		-a sock:/tmp/socket_0_1/sock  -- --forward-mode=macswap
	./build/app/dpdk-testpmd -l 24,28 --no-huge -m1 --no-shconf \
		-a sock:/tmp/socket_1_0/sock  -- --forward-mode=macswap
	./build/app/dpdk-testpmd -l 24,29 --no-huge -m1 --no-shconf \
		-a sock:/tmp/socket_1_1/sock  -- --forward-mode=macswap

NOTE:
* the "--no-huge -m1" is present to guarantee that no regular DPDK
  hugepage memory is being used by the app. It's all coming from the
  proxy app's memfd
* the "--no-shconf" parameter is necessary just to avoid us needing to
  specify a unix file-prefix for each instance
* the forwarding type to be used is optional, macswap is chosen just to
  have some work done inside testpmd to prove it can touch the packet
  payload, not just the mbuf header.

Using with docker containers
----------------------------

The testpmd instances run above can also be run within a docker
container. Using a dockerfile like below we can run testpmd in a
container getting the packets in a zero-copy manner from the io-proxy
running on the host.

   # syntax=docker/dockerfile:1-labs
   FROM alpine
   RUN apk add --update alpine-sdk \
           py3-elftools meson ninja \
           bsd-compat-headers \
           linux-headers \
           numactl-dev \
           bash
   ADD . dpdk
   WORKDIR dpdk
   RUN rm -rf build
   RUN meson setup -Denable_drivers=*/shared_mem -Ddisable_libs=* \
        -Denable_apps=test-pmd -Dtests=false build
   RUN ninja -v -C build
   ENTRYPOINT ["/dpdk/build/app/dpdk-testpmd"]

To access the proxy, all the container needs is access to the unix
socket on the filesystem. Since in the example startup script each
socket is placed in its own directory we can use "--volume" parameter to
give each instance it's own unique unix socket, and therefore proxied
NIC RX/TX queue. To run four testpmd instances as above, just in
containers the following commands can be used - assuming the dockerfile
above is built to an image called "testpmd".

	docker run -it --volume=/tmp/socket_0_0:/run testpmd \
		-l 24,26 --no-huge -a sock:/run/sock -- \
		--no-mlockall --forward-mode=macswap
	docker run -it --volume=/tmp/socket_0_1:/run testpmd \
		-l 24,27 --no-huge -a sock:/run/sock -- \
		--no-mlockall --forward-mode=macswap
	docker run -it --volume=/tmp/socket_1_0:/run testpmd \
		-l 24,28 --no-huge -a sock:/run/sock -- \
		--no-mlockall --forward-mode=macswap
	docker run -it --volume=/tmp/socket_1_1:/run testpmd \
		-l 24,29 --no-huge -a sock:/run/sock -- \
		--no-mlockall --forward-mode=macswap

NOTE: since these docker testpmd instances don't access IO or allocate
hugepages directly, they should be runable without extra privileges, so
long as they can connect to the unix socket.

Additional info
---------------

* Stats are available via app commandline
* By default (#define in code), the proxy app only uses 2 queues per
  port, so you can't configure more than that via cmdline
* Any ports used by the proxy script must support queue reconfiguration
  at runtime without stopping the port.
* When a "guest" process connected to a socket terminates, all shared
  memory used by that process is detroyed and a new memfd created on
  reconnect.
* The above setups using testpmd are the only ways in which this app and
  drivers have been tested. I would be hopeful that other apps would
  work too, but there are quite a few limitations (see my DPDK summit
  talk for some more details on those).

Congratulations on reading this far! :-)
All comments/feedback on this welcome.

Bruce Richardson (5):
  bus: new driver to accept shared memory over unix socket
  mempool: driver for mempools of mbufs on shared memory
  net: new ethdev driver to communicate using shared mem
  app: add IO proxy app using shared memory interfaces
  app/io-proxy: add startup commands

 app/io-proxy/command_fns.c                 | 160 ++++++
 app/io-proxy/commands.list                 |   6 +
 app/io-proxy/datapath.c                    | 595 +++++++++++++++++++++
 app/io-proxy/datapath.h                    |  37 ++
 app/io-proxy/datapath_mp.c                 |  78 +++
 app/io-proxy/dpdk-io-proxy.cmds            |   6 +
 app/io-proxy/main.c                        |  71 +++
 app/io-proxy/meson.build                   |  12 +
 app/meson.build                            |   1 +
 drivers/bus/meson.build                    |   1 +
 drivers/bus/shared_mem/meson.build         |  11 +
 drivers/bus/shared_mem/shared_mem_bus.c    | 323 +++++++++++
 drivers/bus/shared_mem/shared_mem_bus.h    |  75 +++
 drivers/bus/shared_mem/version.map         |  11 +
 drivers/mempool/meson.build                |   1 +
 drivers/mempool/shared_mem/meson.build     |  10 +
 drivers/mempool/shared_mem/shared_mem_mp.c |  94 ++++
 drivers/net/meson.build                    |   1 +
 drivers/net/shared_mem/meson.build         |  11 +
 drivers/net/shared_mem/shared_mem_eth.c    | 295 ++++++++++
 20 files changed, 1799 insertions(+)
 create mode 100644 app/io-proxy/command_fns.c
 create mode 100644 app/io-proxy/commands.list
 create mode 100644 app/io-proxy/datapath.c
 create mode 100644 app/io-proxy/datapath.h
 create mode 100644 app/io-proxy/datapath_mp.c
 create mode 100644 app/io-proxy/dpdk-io-proxy.cmds
 create mode 100644 app/io-proxy/main.c
 create mode 100644 app/io-proxy/meson.build
 create mode 100644 drivers/bus/shared_mem/meson.build
 create mode 100644 drivers/bus/shared_mem/shared_mem_bus.c
 create mode 100644 drivers/bus/shared_mem/shared_mem_bus.h
 create mode 100644 drivers/bus/shared_mem/version.map
 create mode 100644 drivers/mempool/shared_mem/meson.build
 create mode 100644 drivers/mempool/shared_mem/shared_mem_mp.c
 create mode 100644 drivers/net/shared_mem/meson.build
 create mode 100644 drivers/net/shared_mem/shared_mem_eth.c

--
2.39.2


^ permalink raw reply	[flat|nested] 7+ messages in thread

* [RFC PATCH 1/5] bus: new driver to accept shared memory over unix socket
  2023-09-22  8:19 [RFC PATCH 0/5] Using shared mempools for zero-copy IO proxying Bruce Richardson
@ 2023-09-22  8:19 ` Bruce Richardson
  2023-11-23 14:50   ` Jerin Jacob
  2023-09-22  8:19 ` [RFC PATCH 2/5] mempool: driver for mempools of mbufs on shared memory Bruce Richardson
                   ` (3 subsequent siblings)
  4 siblings, 1 reply; 7+ messages in thread
From: Bruce Richardson @ 2023-09-22  8:19 UTC (permalink / raw)
  To: dev; +Cc: Bruce Richardson

Add a new driver to DPDK which supports taking in memory e.g. hugepage
memory via a unix socket connection and maps it into the DPDK process
replacing the current socket memory as the default memory for use by
future requests.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 drivers/bus/meson.build                 |   1 +
 drivers/bus/shared_mem/meson.build      |  11 +
 drivers/bus/shared_mem/shared_mem_bus.c | 323 ++++++++++++++++++++++++
 drivers/bus/shared_mem/shared_mem_bus.h |  75 ++++++
 drivers/bus/shared_mem/version.map      |  11 +
 5 files changed, 421 insertions(+)
 create mode 100644 drivers/bus/shared_mem/meson.build
 create mode 100644 drivers/bus/shared_mem/shared_mem_bus.c
 create mode 100644 drivers/bus/shared_mem/shared_mem_bus.h
 create mode 100644 drivers/bus/shared_mem/version.map

diff --git a/drivers/bus/meson.build b/drivers/bus/meson.build
index a78b4283bf..0e64959d1a 100644
--- a/drivers/bus/meson.build
+++ b/drivers/bus/meson.build
@@ -9,6 +9,7 @@ drivers = [
         'ifpga',
         'pci',
         'platform',
+        'shared_mem',
         'vdev',
         'vmbus',
 ]
diff --git a/drivers/bus/shared_mem/meson.build b/drivers/bus/shared_mem/meson.build
new file mode 100644
index 0000000000..1fa21f3a09
--- /dev/null
+++ b/drivers/bus/shared_mem/meson.build
@@ -0,0 +1,11 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2023 Intel Corporation
+
+if is_windows
+    build = false
+    reason = 'not supported on Windows'
+endif
+
+sources = files('shared_mem_bus.c')
+require_iova_in_mbuf = false
+deps += ['mbuf', 'net']
diff --git a/drivers/bus/shared_mem/shared_mem_bus.c b/drivers/bus/shared_mem/shared_mem_bus.c
new file mode 100644
index 0000000000..e0369ed416
--- /dev/null
+++ b/drivers/bus/shared_mem/shared_mem_bus.c
@@ -0,0 +1,323 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#include <errno.h>
+#include <malloc.h>
+#include <inttypes.h>
+#include <sys/un.h>
+#include <sys/stat.h>
+#include <sys/mman.h>
+#include <sys/socket.h>
+
+#include <rte_log.h>
+#include <rte_lcore.h>
+#include <rte_errno.h>
+#include <rte_malloc.h>
+#include <rte_devargs.h>
+#include <rte_mbuf_pool_ops.h>
+
+#include <bus_driver.h>
+#include <dev_driver.h>
+#include "shared_mem_bus.h"
+
+RTE_LOG_REGISTER_DEFAULT(shared_mem_bus_logtype, DEBUG);
+#define BUS_LOG(level, fmt, args...) rte_log(RTE_LOG_ ## level, \
+		shared_mem_bus_logtype, "## SHARED MEM BUS: %s(): " fmt "\n", __func__, ##args)
+#define BUS_ERR(fmt, args...)  BUS_LOG(ERR, fmt, ## args)
+#define BUS_INFO(fmt, args...)  BUS_LOG(INFO, fmt, ## args)
+#define BUS_DEBUG(fmt, args...)  BUS_LOG(DEBUG, fmt, ## args)
+
+static int dev_scan(void);
+static int dev_probe(void);
+static struct rte_device *find_device(const struct rte_device *start, rte_dev_cmp_t cmp,
+		 const void *data);
+static enum rte_iova_mode get_iommu_class(void);
+static int addr_parse(const char *, void *);
+
+struct socket_device {
+	struct rte_device rte_device;
+	TAILQ_ENTRY(socket_device) next;
+	int fd;
+	uintptr_t membase;
+	uintptr_t memlen;
+};
+
+/** List of devices */
+TAILQ_HEAD(socket_list, socket_device);
+TAILQ_HEAD(device_list, rte_device);
+
+struct shared_mem_bus {
+	struct rte_bus bus;
+	struct socket_list socket_list;
+	struct shared_mem_drv *ethdrv;
+	struct device_list device_list;
+};
+
+static struct shared_mem_bus shared_mem_bus = {
+	.bus = {
+		.scan = dev_scan,
+		.probe = dev_probe,
+		.find_device = find_device,
+		.get_iommu_class = get_iommu_class,
+		.parse = addr_parse,
+	},
+
+	.socket_list = TAILQ_HEAD_INITIALIZER(shared_mem_bus.socket_list),
+	.device_list = TAILQ_HEAD_INITIALIZER(shared_mem_bus.device_list),
+};
+
+RTE_REGISTER_BUS(shared_mem, shared_mem_bus.bus);
+
+int
+rte_shm_bus_send_message(void *msg, size_t msglen)
+{
+	return send(shared_mem_bus.socket_list.tqh_first->fd, msg, msglen, 0);
+}
+
+int
+rte_shm_bus_recv_message(void *msg, size_t msglen)
+{
+	return recv(shared_mem_bus.socket_list.tqh_first->fd, msg, msglen, 0);
+}
+
+uintptr_t
+rte_shm_bus_get_mem_offset(void *ptr)
+{
+	struct socket_device *dev;
+	uintptr_t pval = (uintptr_t)ptr;
+
+	TAILQ_FOREACH(dev, &shared_mem_bus.socket_list, next) {
+		if (dev->membase < pval && dev->membase + dev->memlen > pval)
+			return pval - dev->membase;
+	}
+	return (uintptr_t)-1;
+}
+
+void *
+rte_shm_bus_get_mem_ptr(uintptr_t offset)
+{
+	struct socket_device *dev;
+
+	TAILQ_FOREACH(dev, &shared_mem_bus.socket_list, next) {
+		if (offset < dev->memlen)
+			return RTE_PTR_ADD(dev->membase, offset);
+	}
+	return (void *)-1;
+}
+
+static int
+dev_scan(void)
+{
+	if (shared_mem_bus.bus.conf.scan_mode != RTE_BUS_SCAN_ALLOWLIST)
+		return 0;
+
+	struct rte_devargs *devargs;
+	RTE_EAL_DEVARGS_FOREACH(shared_mem_bus.bus.name, devargs) {
+
+		int fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+		if (fd < 0) {
+			BUS_ERR("Error creating socket");
+			return -errno;
+		}
+
+		struct sockaddr_un sun = {.sun_family = AF_UNIX};
+		if (strlen(devargs->name) - 5 >= sizeof(sun.sun_path) ||
+				addr_parse(devargs->name, sun.sun_path) != 0) {
+			BUS_ERR("Error parsing device address");
+			return -EINVAL;
+		}
+
+		if (connect(fd, (void *)&sun, sizeof(sun)) != 0) {
+			BUS_ERR("Error connecting to socket");
+			return -errno;
+		}
+
+		struct socket_device *sdev = malloc(sizeof(*sdev));
+		if (sdev == NULL) {
+			BUS_ERR("Error with malloc");
+			return -ENOMEM;
+		}
+		BUS_INFO("Allocating dev for %s", devargs->name);
+		sdev->rte_device.name = devargs->name;
+		sdev->rte_device.numa_node = rte_socket_id();
+		sdev->rte_device.bus = &shared_mem_bus.bus;
+		sdev->fd = fd;
+		TAILQ_INSERT_TAIL(&shared_mem_bus.socket_list, sdev, next);
+	}
+
+	return 0;
+}
+
+static int
+recv_fd(int from, uint64_t *memsize, rte_iova_t *iova, uint64_t *pg_size)
+{
+	int fd = 0;
+	struct {
+		uint64_t fd_size;
+		rte_iova_t iova;
+		uint64_t pg_size;
+	} data_message;
+
+	size_t cmsglen = CMSG_LEN(sizeof(fd));
+	struct cmsghdr *cmhdr = malloc(cmsglen);
+	if (cmhdr == NULL) {
+		BUS_ERR("Malloc error");
+		return -1;
+	}
+
+	struct iovec iov = {
+			.iov_base = (void *)&data_message,
+			.iov_len = sizeof(data_message)
+	};
+	struct msghdr msg = {
+			.msg_iov = &iov,
+			.msg_iovlen = 1,
+			.msg_control = cmhdr,
+			.msg_controllen = cmsglen,
+	};
+	if (recvmsg(from, &msg, 0) != (int)iov.iov_len) {
+		BUS_ERR("recvmsg error %s", strerror(errno));
+		return -1;
+	}
+	if (msg.msg_controllen != cmsglen) {
+		BUS_ERR("Error with fd on message received");
+		return -1;
+	}
+	fd = *(int *)CMSG_DATA(cmhdr);
+
+	free(cmhdr);
+
+	*memsize = data_message.fd_size;
+	*iova = data_message.iova;
+	*pg_size = data_message.pg_size;
+	return fd;
+}
+
+static int
+dev_probe(void)
+{
+	if (TAILQ_EMPTY(&shared_mem_bus.socket_list))
+		return 0;
+
+	if (rte_mbuf_set_platform_mempool_ops("shared_mem") != 0) {
+		BUS_ERR("Error setting default mempool ops\n");
+		return -1;
+	}
+	BUS_INFO("Set default mempool ops to 'shared_mem'");
+
+	struct socket_device *dev;
+	TAILQ_FOREACH(dev, &shared_mem_bus.socket_list, next) {
+		uint64_t memsize = 0;
+		uint64_t pgsize = 0;
+		rte_iova_t iova = 0;
+		int memfd = recv_fd(dev->fd, &memsize, &iova, &pgsize);
+		/* check memfd is valid, the size is non-zero and multiple of 2MB */
+		if (memfd < 0 || memsize <= 0 || memsize % (1 << 21) != 0) {
+			BUS_ERR("Error getting memfd and size");
+			return -1;
+		}
+		BUS_DEBUG("Received fd %d with memsize %"PRIu64" and pgsize %"PRIu64,
+				memfd, memsize, pgsize);
+
+		void *mem = mmap(NULL, memsize, PROT_READ|PROT_WRITE, MAP_SHARED, memfd, 0);
+		if (mem == MAP_FAILED) {
+			BUS_ERR("Error mmapping the received fd");
+			return -1;
+		}
+		BUS_DEBUG("%u MB of memory mapped at %p\n", (unsigned int)(memsize >> 20), mem);
+		dev->membase = (uintptr_t)mem;
+		dev->memlen = memsize;
+
+		struct eth_shared_mem_msg msg = {
+				.type = MSG_TYPE_MMAP_BASE_ADDR,
+				.offset = dev->membase,
+		};
+		rte_shm_bus_send_message(&msg, sizeof(msg));
+
+		char malloc_heap_name[32];
+		snprintf(malloc_heap_name, sizeof(malloc_heap_name),
+				"socket_%d_ext", rte_socket_id());
+		if (rte_malloc_heap_create(malloc_heap_name) != 0) {
+			BUS_ERR("Error creating heap %s\n", malloc_heap_name);
+			return -1;
+		}
+
+		int nb_pages = (memsize / pgsize);
+		rte_iova_t *iovas = malloc(sizeof(iovas[0]) * nb_pages);
+		iovas[0] = iova;
+		for (int i = 1; i < nb_pages; i++)
+			iovas[i] = iovas[i - 1] + pgsize;
+		BUS_DEBUG("Attempting to add memory to heap: %s", malloc_heap_name);
+		if (rte_malloc_heap_memory_add(malloc_heap_name, mem, memsize,
+				iovas, nb_pages, pgsize) < 0) {
+			BUS_ERR("Error adding to malloc heap: %s", strerror(rte_errno));
+			free(iovas);
+			return -1;
+		}
+		free(iovas);
+		BUS_DEBUG("Added memory to heap");
+		rte_malloc_heap_swap_socket(rte_socket_id(),
+				rte_malloc_heap_get_socket(malloc_heap_name));
+		BUS_DEBUG("Swapped in memory as socket %d memory\n", rte_socket_id());
+
+		if (shared_mem_bus.ethdrv != NULL) {
+			struct rte_device *dev = malloc(sizeof(*dev));
+			if (dev == NULL)
+				return -1;
+			*dev = (struct rte_device){
+				.name = "shared_mem_ethdev",
+				.driver = &shared_mem_bus.ethdrv->driver,
+				.bus = &shared_mem_bus.bus,
+				.numa_node = SOCKET_ID_ANY,
+			};
+			shared_mem_bus.ethdrv->probe(shared_mem_bus.ethdrv, dev);
+		}
+	}
+	return 0;
+}
+static struct rte_device *
+find_device(const struct rte_device *start, rte_dev_cmp_t cmp,
+			 const void *data)
+{
+	RTE_SET_USED(start);
+	RTE_SET_USED(cmp);
+	RTE_SET_USED(data);
+	return NULL;
+}
+
+static enum rte_iova_mode
+get_iommu_class(void)
+{
+	/* if there are no devices, report don't care, otherwise VA mode */
+	return TAILQ_EMPTY(&shared_mem_bus.socket_list) ?  RTE_IOVA_DC : RTE_IOVA_VA;
+}
+
+static int
+addr_parse(const char *name, void *addr)
+{
+	if (strncmp(name, "sock:", 5) != 0) {
+		BUS_DEBUG("no sock: prefix on %s", name);
+		return -1;
+	}
+
+	const char *filename = &name[5];
+	struct stat st;
+	if (stat(filename, &st) < 0 || (st.st_mode & S_IFMT) != S_IFSOCK) {
+		BUS_ERR("stat failed, or not a socket, %s", filename);
+		return -1;
+	}
+	if (addr != NULL)
+		strcpy(addr, filename);
+	BUS_DEBUG("Matched filename: %s", filename);
+	return 0;
+}
+
+int
+shared_mem_register_driver(struct shared_mem_drv *drv)
+{
+	if (drv->probe == NULL)
+		return -1;
+	shared_mem_bus.ethdrv = drv;
+	return 0;
+}
+
diff --git a/drivers/bus/shared_mem/shared_mem_bus.h b/drivers/bus/shared_mem/shared_mem_bus.h
new file mode 100644
index 0000000000..01a9a2a99a
--- /dev/null
+++ b/drivers/bus/shared_mem/shared_mem_bus.h
@@ -0,0 +1,75 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+
+#ifndef DRIVERS_BUS_SHARED_MEM_H_
+#define DRIVERS_BUS_SHARED_MEM_H_
+
+#include <stdint.h>
+#include <rte_common.h>
+#include <rte_ether.h>
+#include <dev_driver.h>
+
+enum shared_mem_msg_type {
+	MSG_TYPE_ACK = 0,
+	MSG_TYPE_MMAP_BASE_ADDR,
+	MSG_TYPE_MEMPOOL_OFFSET,
+	MSG_TYPE_RX_RING_OFFSET,
+	MSG_TYPE_TX_RING_OFFSET,
+	MSG_TYPE_START,
+	MSG_TYPE_GET_MAC,
+	MSG_TYPE_REPORT_MAC,
+};
+
+struct eth_shared_mem_msg {
+	enum shared_mem_msg_type type;  /* type implicitly defines which union member is used */
+	union {
+		uintptr_t offset;    /* for many messages, just pass an offset */
+		struct rte_ether_addr ethaddr; /* allow passing mac address */
+		uintptr_t datalen;   /* for other messages, pass a data length after the data */
+	};
+	char data[];
+};
+
+struct shared_mem_drv;
+
+/**
+ * Initialisation function for the driver
+ */
+typedef int (c_eth_probe_t)(struct shared_mem_drv *drv, struct rte_device *dev);
+
+struct shared_mem_drv {
+	struct rte_driver driver;
+	c_eth_probe_t *probe;            /**< Device probe function. */
+};
+
+/** Helper for PCI device registration from driver (eth, crypto) instance */
+#define RTE_PMD_REGISTER_SHMEM_DRV(nm, c_drv) \
+RTE_INIT(shared_mem_initfn_ ##nm) \
+{\
+	(c_drv).driver.name = RTE_STR(nm);\
+	shared_mem_register_driver(&c_drv); \
+} \
+RTE_PMD_EXPORT_NAME(nm, __COUNTER__)
+
+__rte_internal
+int
+shared_mem_register_driver(struct shared_mem_drv *drv);
+
+__rte_internal
+int
+rte_shm_bus_send_message(void *msg, size_t msglen);
+
+__rte_internal
+int
+rte_shm_bus_recv_message(void *msg, size_t msglen);
+
+__rte_internal
+uintptr_t
+rte_shm_bus_get_mem_offset(void *ptr);
+
+__rte_internal
+void *
+rte_shm_bus_get_mem_ptr(uintptr_t offset);
+
+#endif /* DRIVERS_BUS_SHARED_MEM_H_ */
diff --git a/drivers/bus/shared_mem/version.map b/drivers/bus/shared_mem/version.map
new file mode 100644
index 0000000000..2af82689b1
--- /dev/null
+++ b/drivers/bus/shared_mem/version.map
@@ -0,0 +1,11 @@
+INTERNAL {
+	global:
+
+	shared_mem_register_driver;
+	rte_shm_bus_get_mem_offset;
+	rte_shm_bus_get_mem_ptr;
+	rte_shm_bus_recv_message;
+	rte_shm_bus_send_message;
+
+	local: *;
+};
-- 
2.39.2


^ permalink raw reply	[flat|nested] 7+ messages in thread

* [RFC PATCH 2/5] mempool: driver for mempools of mbufs on shared memory
  2023-09-22  8:19 [RFC PATCH 0/5] Using shared mempools for zero-copy IO proxying Bruce Richardson
  2023-09-22  8:19 ` [RFC PATCH 1/5] bus: new driver to accept shared memory over unix socket Bruce Richardson
@ 2023-09-22  8:19 ` Bruce Richardson
  2023-09-22  8:19 ` [RFC PATCH 3/5] net: new ethdev driver to communicate using shared mem Bruce Richardson
                   ` (2 subsequent siblings)
  4 siblings, 0 replies; 7+ messages in thread
From: Bruce Richardson @ 2023-09-22  8:19 UTC (permalink / raw)
  To: dev; +Cc: Bruce Richardson

This mempool driver can be used with the shared_mem bus driver to create
a pool of shared mbufs on a shared memory segment.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 drivers/mempool/meson.build                |  1 +
 drivers/mempool/shared_mem/meson.build     | 10 +++
 drivers/mempool/shared_mem/shared_mem_mp.c | 94 ++++++++++++++++++++++
 3 files changed, 105 insertions(+)
 create mode 100644 drivers/mempool/shared_mem/meson.build
 create mode 100644 drivers/mempool/shared_mem/shared_mem_mp.c

diff --git a/drivers/mempool/meson.build b/drivers/mempool/meson.build
index dc88812585..4326bd0ea3 100644
--- a/drivers/mempool/meson.build
+++ b/drivers/mempool/meson.build
@@ -8,6 +8,7 @@ drivers = [
         'dpaa2',
         'octeontx',
         'ring',
+        'shared_mem',
         'stack',
 ]
 
diff --git a/drivers/mempool/shared_mem/meson.build b/drivers/mempool/shared_mem/meson.build
new file mode 100644
index 0000000000..ee740ccdc9
--- /dev/null
+++ b/drivers/mempool/shared_mem/meson.build
@@ -0,0 +1,10 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2023 Intel Corporation
+
+if is_windows
+    build = false
+    reason = 'not supported on Windows'
+endif
+sources = files('shared_mem_mp.c')
+require_iova_in_mbuf = false
+deps += ['stack', 'bus_shared_mem']
diff --git a/drivers/mempool/shared_mem/shared_mem_mp.c b/drivers/mempool/shared_mem/shared_mem_mp.c
new file mode 100644
index 0000000000..7dae8aba92
--- /dev/null
+++ b/drivers/mempool/shared_mem/shared_mem_mp.c
@@ -0,0 +1,94 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#include <sys/types.h>
+#include <rte_mbuf.h>
+#include <rte_stack.h>
+#include <rte_mempool.h>
+#include <shared_mem_bus.h>
+
+RTE_LOG_REGISTER_DEFAULT(shared_mem_mp_log, DEBUG);
+#define MP_LOG(level, fmt, args...) rte_log(RTE_LOG_ ## level, \
+		shared_mem_mp_log, "## SHARED MP: %s(): " fmt "\n", __func__, ##args)
+#define MP_ERR(fmt, args...)  MP_LOG(ERR, fmt, ## args)
+#define MP_INFO(fmt, args...)  MP_LOG(INFO, fmt, ## args)
+#define MP_DEBUG(fmt, args...)  MP_LOG(DEBUG, fmt, ## args)
+
+static int
+shm_mp_alloc(struct rte_mempool *mp)
+{
+	char name[RTE_STACK_NAMESIZE];
+	struct rte_stack *s;
+	int ret;
+
+	ret = snprintf(name, sizeof(name), RTE_MEMPOOL_MZ_FORMAT, mp->name);
+	if (ret < 0 || ret >= (int)sizeof(name)) {
+		rte_errno = ENAMETOOLONG;
+		return -rte_errno;
+	}
+
+	s = rte_stack_create(name, mp->size, mp->socket_id, 0);
+	if (s == NULL)
+		return -rte_errno;
+	MP_DEBUG("Stack created at address: %p", s);
+
+	mp->pool_data = s;
+
+	return 0;
+}
+
+static int
+shm_mp_enqueue(struct rte_mempool *mp, void * const *obj_table,
+	      unsigned int n)
+{
+	struct rte_stack *s = mp->pool_data;
+
+	void *offset_table[n];
+	uintptr_t mempool_base = (uintptr_t)rte_shm_bus_get_mem_ptr(0); /* offset 0 == base addr */
+	for (uint i = 0; i < n; i++)
+		offset_table[i] = RTE_PTR_SUB(obj_table[i], mempool_base);
+
+	return rte_stack_push(s, offset_table, n) == 0 ? -ENOBUFS : 0;
+}
+
+static int
+shm_mp_dequeue(struct rte_mempool *mp, void **obj_table,
+	      unsigned int n)
+{
+	struct rte_stack *s = mp->pool_data;
+	uintptr_t mempool_base = (uintptr_t)rte_shm_bus_get_mem_ptr(0); /* offset 0 == base addr */
+	uint16_t priv_size = rte_pktmbuf_priv_size(mp);
+
+	if (rte_stack_pop(s, obj_table, n) == 0)
+		return -ENOBUFS;
+	for (uint i = 0; i < n; i++) {
+		obj_table[i] = RTE_PTR_ADD(obj_table[i], mempool_base);
+		struct rte_mbuf *mb = obj_table[i];
+		mb->buf_addr = RTE_PTR_ADD(mb, sizeof(struct rte_mbuf) + priv_size);
+		mb->pool = mp;
+	}
+	return 0;
+}
+
+static unsigned
+shm_mp_get_count(const struct rte_mempool *mp)
+{
+	return rte_stack_count(mp->pool_data);
+}
+
+static void
+shm_mp_free(struct rte_mempool *mp)
+{
+	rte_stack_free(mp->pool_data);
+}
+
+static struct rte_mempool_ops ops_shared_mem_mp = {
+	.name = "shared_mem",
+	.alloc = shm_mp_alloc,
+	.free = shm_mp_free,
+	.enqueue = shm_mp_enqueue,
+	.dequeue = shm_mp_dequeue,
+	.get_count = shm_mp_get_count,
+};
+
+RTE_MEMPOOL_REGISTER_OPS(ops_shared_mem_mp);
-- 
2.39.2


^ permalink raw reply	[flat|nested] 7+ messages in thread

* [RFC PATCH 3/5] net: new ethdev driver to communicate using shared mem
  2023-09-22  8:19 [RFC PATCH 0/5] Using shared mempools for zero-copy IO proxying Bruce Richardson
  2023-09-22  8:19 ` [RFC PATCH 1/5] bus: new driver to accept shared memory over unix socket Bruce Richardson
  2023-09-22  8:19 ` [RFC PATCH 2/5] mempool: driver for mempools of mbufs on shared memory Bruce Richardson
@ 2023-09-22  8:19 ` Bruce Richardson
  2023-09-22  8:19 ` [RFC PATCH 4/5] app: add IO proxy app using shared memory interfaces Bruce Richardson
  2023-09-22  8:19 ` [RFC PATCH 5/5] app/io-proxy: add startup commands Bruce Richardson
  4 siblings, 0 replies; 7+ messages in thread
From: Bruce Richardson @ 2023-09-22  8:19 UTC (permalink / raw)
  To: dev; +Cc: Bruce Richardson

This ethdev builds on the previous shared_mem bus driver and shared_mem
mempool driver to provide an ethdev interface which can allow zero-copy
I/O from one process to another.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 drivers/net/meson.build                 |   1 +
 drivers/net/shared_mem/meson.build      |  11 +
 drivers/net/shared_mem/shared_mem_eth.c | 295 ++++++++++++++++++++++++
 3 files changed, 307 insertions(+)
 create mode 100644 drivers/net/shared_mem/meson.build
 create mode 100644 drivers/net/shared_mem/shared_mem_eth.c

diff --git a/drivers/net/meson.build b/drivers/net/meson.build
index bd38b533c5..505d208497 100644
--- a/drivers/net/meson.build
+++ b/drivers/net/meson.build
@@ -53,6 +53,7 @@ drivers = [
         'qede',
         'ring',
         'sfc',
+        'shared_mem',
         'softnic',
         'tap',
         'thunderx',
diff --git a/drivers/net/shared_mem/meson.build b/drivers/net/shared_mem/meson.build
new file mode 100644
index 0000000000..17d1b84454
--- /dev/null
+++ b/drivers/net/shared_mem/meson.build
@@ -0,0 +1,11 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2023 Intel Corporation
+
+if is_windows
+    build = false
+    reason = 'not supported on Windows'
+endif
+
+sources = files('shared_mem_eth.c')
+deps += 'bus_shared_mem'
+require_iova_in_mbuf = false
diff --git a/drivers/net/shared_mem/shared_mem_eth.c b/drivers/net/shared_mem/shared_mem_eth.c
new file mode 100644
index 0000000000..564bfdb907
--- /dev/null
+++ b/drivers/net/shared_mem/shared_mem_eth.c
@@ -0,0 +1,295 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#include <rte_common.h>
+#include <shared_mem_bus.h>
+#include <ethdev_driver.h>
+
+RTE_LOG_REGISTER_DEFAULT(shared_mem_eth_logtype, DEBUG);
+#define SHM_ETH_LOG(level, fmt, args...) rte_log(RTE_LOG_ ## level, \
+		shared_mem_eth_logtype, "## SHARED MEM ETH: %s(): " fmt "\n", __func__, ##args)
+#define SHM_ETH_ERR(fmt, args...)  SHM_ETH_LOG(ERR, fmt, ## args)
+#define SHM_ETH_INFO(fmt, args...)  SHM_ETH_LOG(INFO, fmt, ## args)
+#define SHM_ETH_DEBUG(fmt, args...)  SHM_ETH_LOG(DEBUG, fmt, ## args)
+
+struct shm_eth_stats {
+	uint64_t rx_pkts;
+	uint64_t tx_pkts;
+	uint64_t rx_bytes;
+	uint64_t tx_bytes;
+};
+
+struct shm_eth_private {
+	struct rte_ether_addr addr;
+	struct rte_ring *rx;
+	struct rte_ring *tx;
+	struct shm_eth_stats stats;
+};
+
+static struct rte_mempool *rx_mp; /* TODO: use one per queue */
+
+static int
+shm_eth_configure(struct rte_eth_dev *dev __rte_unused)
+{
+	return 0;
+}
+
+static int
+shm_eth_start(struct rte_eth_dev *dev)
+{
+	struct shm_eth_private *priv = dev->data->dev_private;
+
+	struct eth_shared_mem_msg msg = (struct eth_shared_mem_msg){
+		.type = MSG_TYPE_START,
+	};
+	rte_shm_bus_send_message(&msg, sizeof(msg));
+
+	rte_shm_bus_recv_message(&msg, sizeof(msg));
+	if (msg.type != MSG_TYPE_ACK) {
+		SHM_ETH_ERR("Didn't get ack from host\n");
+		return -1;
+	}
+
+	memset(&priv->stats, 0, sizeof(priv->stats));
+	return 0;
+}
+
+static int
+shm_eth_stop(struct rte_eth_dev *dev __rte_unused)
+{
+	return 0;
+}
+
+static int
+shm_eth_infos_get(struct rte_eth_dev *dev, struct rte_eth_dev_info *info)
+{
+	*info = (struct rte_eth_dev_info){
+		.driver_name = dev->device->driver->name,
+		.max_rx_queues = 1,
+		.max_tx_queues = 1,
+		.max_mac_addrs = 1,
+		.min_mtu = 64,
+		.max_mtu = UINT16_MAX,
+		.max_rx_pktlen = UINT16_MAX,
+		.nb_rx_queues = 1,
+		.nb_tx_queues = 1,
+		.tx_desc_lim = { .nb_max = 8192, .nb_min = 128, .nb_align = 64 },
+		.rx_desc_lim = { .nb_max = 8192, .nb_min = 128, .nb_align = 64 },
+	};
+	return 0;
+}
+
+static int
+shm_eth_mtu_set(struct rte_eth_dev *dev, uint16_t mtu)
+{
+	dev->data->mtu = mtu;
+	return 0;
+}
+
+static int
+shm_eth_link_update(struct rte_eth_dev *dev, int wait __rte_unused)
+{
+	dev->data->dev_link = (struct rte_eth_link){
+		.link_speed = RTE_ETH_SPEED_NUM_100G,
+		.link_duplex = 1,
+		.link_autoneg = 1,
+		.link_status = 1,
+	};
+	return 0;
+}
+
+static int
+shm_eth_rx_queue_setup(struct rte_eth_dev *dev, uint16_t rx_queue_id,
+		    uint16_t nb_rx_desc,
+		    unsigned int socket_id,
+		    const struct rte_eth_rxconf *rx_conf,
+		    struct rte_mempool *mb_pool)
+{
+	RTE_SET_USED(rx_conf);
+
+	struct shm_eth_private *priv = dev->data->dev_private;
+	char ring_name[32];
+
+	if (rte_shm_bus_get_mem_offset(mb_pool) == (uintptr_t)-1) {
+		SHM_ETH_ERR("Mempool not in shared memory");
+		return -1;
+	}
+	snprintf(ring_name, sizeof(ring_name), "shm_eth_rxr%u", rx_queue_id);
+	priv->rx = rte_ring_create(ring_name, nb_rx_desc, socket_id, RING_F_SP_ENQ | RING_F_SC_DEQ);
+	if (priv->rx == NULL)
+		return -1;
+	SHM_ETH_INFO("RX ring @ %p\n", priv->rx);
+	if (rte_shm_bus_get_mem_offset(priv->rx) == (uintptr_t)-1) {
+		SHM_ETH_ERR("Ring not created on shared memory.");
+		return -1;
+	}
+	dev->data->rx_queues[rx_queue_id] = priv;
+
+	SHM_ETH_INFO("Mempool offset is: %p", (void *)rte_shm_bus_get_mem_offset(mb_pool));
+	SHM_ETH_INFO("Rx queue offset is: %p", (void *)rte_shm_bus_get_mem_offset(priv->rx));
+
+	struct eth_shared_mem_msg msg = (struct eth_shared_mem_msg){
+		.type = MSG_TYPE_MEMPOOL_OFFSET,
+		.offset = rte_shm_bus_get_mem_offset(mb_pool),
+	};
+	rte_shm_bus_send_message(&msg, sizeof(msg));
+	msg = (struct eth_shared_mem_msg){
+		.type = MSG_TYPE_RX_RING_OFFSET,
+		.offset = rte_shm_bus_get_mem_offset(priv->rx),
+	};
+	rte_shm_bus_send_message(&msg, sizeof(msg));
+	rx_mp = mb_pool;
+	return 0;
+}
+
+static int
+shm_eth_tx_queue_setup(struct rte_eth_dev *dev, uint16_t tx_queue_id,
+		    uint16_t nb_tx_desc,
+		    unsigned int socket_id,
+		    const struct rte_eth_txconf *tx_conf)
+{
+	RTE_SET_USED(tx_conf);
+
+	struct shm_eth_private *priv = dev->data->dev_private;
+	char ring_name[32];
+
+	snprintf(ring_name, sizeof(ring_name), "shm_eth_txr%u", tx_queue_id);
+	priv->tx = rte_ring_create(ring_name, nb_tx_desc, socket_id, RING_F_SP_ENQ | RING_F_SC_DEQ);
+	if (priv->tx == NULL)
+		return -1;
+	SHM_ETH_DEBUG("TX ring @ %p\n", priv->tx);
+	if (rte_shm_bus_get_mem_offset(priv->tx) == (uintptr_t)-1) {
+		SHM_ETH_ERR("TX ring not on shared memory");
+		return -1;
+	}
+	dev->data->tx_queues[tx_queue_id] = priv;
+
+	struct eth_shared_mem_msg msg = (struct eth_shared_mem_msg){
+		.type = MSG_TYPE_TX_RING_OFFSET,
+		.offset = rte_shm_bus_get_mem_offset(priv->tx),
+	};
+	rte_shm_bus_send_message(&msg, sizeof(msg));
+
+	return 0;
+}
+
+static int
+shm_eth_stats_get(struct rte_eth_dev *dev, struct rte_eth_stats *stats)
+{
+	struct shm_eth_private *priv = dev->data->dev_private;
+	stats->ibytes = priv->stats.rx_bytes;
+	stats->ipackets = priv->stats.rx_pkts;
+	stats->obytes = priv->stats.tx_bytes;
+	stats->opackets = priv->stats.tx_pkts;
+	return 0;
+}
+
+static const struct eth_dev_ops ops = {
+		.dev_configure = shm_eth_configure,
+		.dev_start = shm_eth_start,
+		.dev_stop = shm_eth_stop,
+		.dev_infos_get = shm_eth_infos_get,
+		.mtu_set = shm_eth_mtu_set,
+		.rx_queue_setup = shm_eth_rx_queue_setup,
+		.tx_queue_setup = shm_eth_tx_queue_setup,
+		.link_update = shm_eth_link_update,
+		.stats_get = shm_eth_stats_get,
+};
+
+static uint16_t
+shm_eth_rx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_bufs)
+{
+	void *deq_vals[nb_bufs];
+	struct shm_eth_private *priv = queue;
+	struct rte_ring *rxr = priv->rx;
+	uintptr_t offset = (uintptr_t)rte_shm_bus_get_mem_ptr(0);
+
+	int nb_rx = rte_ring_dequeue_burst(rxr, deq_vals, nb_bufs, NULL);
+	if (nb_rx == 0)
+		return 0;
+
+	uint64_t bytes = 0;
+	for (int i = 0; i < nb_rx; i++) {
+		bufs[i] = RTE_PTR_ADD(deq_vals[i], offset);
+		bufs[i]->pool = rx_mp;
+		bufs[i]->buf_addr = RTE_PTR_ADD(bufs[i]->buf_addr, offset);
+		bytes += bufs[i]->pkt_len;
+	}
+	priv->stats.rx_pkts += nb_rx;
+	priv->stats.rx_bytes += bytes;
+	return nb_rx;
+}
+
+static uint16_t
+shm_eth_tx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_bufs)
+{
+	void *enq_vals[nb_bufs];
+	struct shm_eth_private *priv = queue;
+	struct rte_ring *txr = priv->tx;
+	uintptr_t offset = (uintptr_t)rte_shm_bus_get_mem_ptr(0);
+	uint64_t bytes = 0;
+
+	for (int i = 0; i < nb_bufs; i++) {
+		bufs[i]->buf_addr = RTE_PTR_SUB(bufs[i]->buf_addr, offset);
+		bytes += bufs[i]->pkt_len;
+		rte_cldemote(bufs[i]);
+		enq_vals[i] = RTE_PTR_SUB(bufs[i], offset);
+	}
+	uint16_t nb_enq = rte_ring_enqueue_burst(txr, enq_vals, nb_bufs, NULL);
+	if (nb_enq != nb_bufs) {
+		/* restore original buffer settings */
+		for (int i = nb_enq; i < nb_bufs; i++) {
+			bufs[i]->buf_addr = RTE_PTR_ADD(bufs[i]->buf_addr, offset);
+			bytes -= bufs[i]->pkt_len;
+		}
+	}
+	priv->stats.tx_pkts += nb_enq;
+	priv->stats.tx_bytes += bytes;
+	return nb_enq;
+}
+
+static int
+ethdev_init(struct rte_eth_dev *ethdev, void *init_params __rte_unused)
+{
+	struct shm_eth_private *priv = ethdev->data->dev_private;
+	ethdev->dev_ops = &ops;
+	ethdev->data->mac_addrs = &priv->addr;
+	ethdev->rx_pkt_burst = shm_eth_rx_burst;
+	ethdev->tx_pkt_burst = shm_eth_tx_burst;
+
+	struct eth_shared_mem_msg msg = (struct eth_shared_mem_msg){
+		.type = MSG_TYPE_GET_MAC,
+	};
+	rte_shm_bus_send_message(&msg, sizeof(msg));
+
+	rte_shm_bus_recv_message(&msg, sizeof(msg));
+	if (msg.type != MSG_TYPE_REPORT_MAC) {
+		SHM_ETH_ERR("Didn't get mac address from host\n");
+		return -1;
+	}
+	rte_ether_addr_copy(&msg.ethaddr, &priv->addr);
+
+	return 0;
+}
+
+static int
+shm_eth_probe(struct shared_mem_drv *drv, struct rte_device *dev)
+{
+	SHM_ETH_INFO("Probing device %p on driver %s", dev, drv->driver.name);
+	int ret = rte_eth_dev_create(dev, "shared_mem_ethdev", sizeof(struct shm_eth_private),
+			NULL, NULL,
+			ethdev_init, NULL);
+	if (ret != 0)
+		goto out;
+
+	SHM_ETH_DEBUG("Ethdev created ok\n");
+out:
+	return ret;
+}
+
+struct shared_mem_drv shm_drv = {
+		.probe = shm_eth_probe,
+};
+
+
+RTE_PMD_REGISTER_SHMEM_DRV(shm_eth, shm_drv);
-- 
2.39.2


^ permalink raw reply	[flat|nested] 7+ messages in thread

* [RFC PATCH 4/5] app: add IO proxy app using shared memory interfaces
  2023-09-22  8:19 [RFC PATCH 0/5] Using shared mempools for zero-copy IO proxying Bruce Richardson
                   ` (2 preceding siblings ...)
  2023-09-22  8:19 ` [RFC PATCH 3/5] net: new ethdev driver to communicate using shared mem Bruce Richardson
@ 2023-09-22  8:19 ` Bruce Richardson
  2023-09-22  8:19 ` [RFC PATCH 5/5] app/io-proxy: add startup commands Bruce Richardson
  4 siblings, 0 replies; 7+ messages in thread
From: Bruce Richardson @ 2023-09-22  8:19 UTC (permalink / raw)
  To: dev; +Cc: Bruce Richardson

This app uses the shared memory poll, and shared ethdev infrastructure
to act as a zero-copy IO proxy to other applications. It has been tested
and verified to work successfully proxying data to testpmd instances on
the system, with those testpmd instances each being passed a unix socket
to work with via the shared memory bus "-a sock:/path/to/sock..."
parameter.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 app/io-proxy/command_fns.c | 160 ++++++++++
 app/io-proxy/commands.list |   6 +
 app/io-proxy/datapath.c    | 595 +++++++++++++++++++++++++++++++++++++
 app/io-proxy/datapath.h    |  37 +++
 app/io-proxy/datapath_mp.c |  78 +++++
 app/io-proxy/main.c        |  71 +++++
 app/io-proxy/meson.build   |  12 +
 app/meson.build            |   1 +
 8 files changed, 960 insertions(+)
 create mode 100644 app/io-proxy/command_fns.c
 create mode 100644 app/io-proxy/commands.list
 create mode 100644 app/io-proxy/datapath.c
 create mode 100644 app/io-proxy/datapath.h
 create mode 100644 app/io-proxy/datapath_mp.c
 create mode 100644 app/io-proxy/main.c
 create mode 100644 app/io-proxy/meson.build

diff --git a/app/io-proxy/command_fns.c b/app/io-proxy/command_fns.c
new file mode 100644
index 0000000000..f48921e005
--- /dev/null
+++ b/app/io-proxy/command_fns.c
@@ -0,0 +1,160 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+
+#include <stdlib.h>
+#include <unistd.h>
+#include <stdbool.h>
+
+#include <rte_ethdev.h>
+
+#include "datapath.h"
+#include "commands.h"
+
+extern volatile bool quit;
+extern volatile bool running_startup_script;
+
+void
+cmd_add_socket_parsed(void *parsed_result, struct cmdline *cl __rte_unused,
+		void *data __rte_unused)
+{
+	struct cmd_add_socket_result *res = parsed_result;
+	uint64_t maxmem = 0;
+	char *endchar;
+
+	maxmem = strtoull(res->memsize, &endchar, 0);
+	switch (*endchar) {
+	case 'G': case 'g':
+		maxmem *= 1024;
+		/* fall-through */
+	case 'M': case 'm':
+		maxmem *= 1024;
+		/* fall-through */
+	case 'K': case 'k':
+		maxmem *= 1024;
+		break;
+	}
+	if (res->port >= MAX_PORTS_SUPPORTED) {
+		fprintf(stderr, "Port id out of range. Must be <%u\n", MAX_PORTS_SUPPORTED);
+		goto err;
+	}
+	if (res->queue >= MAX_QUEUES_SUPPORTED) {
+		fprintf(stderr, "Queue id out of range. Must be <%u\n", MAX_QUEUES_SUPPORTED);
+		goto err;
+	}
+	if (listen_unix_socket(res->path, maxmem, res->port, res->queue) != 0) {
+		fprintf(stderr, "error initializing socket: %s\n", res->path);
+		goto err;
+	}
+
+	printf("Created socket = %s with memsize = %s using port = %u, queue = %u\n",
+			res->path, res->memsize, res->port, res->queue);
+	return;
+
+err:
+	if (running_startup_script) {
+		quit = true;
+		/* wait for main thread to quit. Just spin here for condition which
+		 * will never actually come true, as main thread should just exit
+		 */
+		while (quit)
+			usleep(100);
+	}
+	/* if running interactively, do nothing on error except report it above */
+}
+
+void
+cmd_list_sockets_parsed(__rte_unused void *parsed_result,
+		__rte_unused struct cmdline *cl,
+		__rte_unused void *data)
+{
+	const char *path;
+	int sock;
+	uint64_t maxmem;
+	uint16_t port, queue;
+	bool connected;
+
+	for (int i = get_next_socket(0, &path, &sock, &maxmem, &port, &queue, &connected);
+			i < MAX_SOCKETS;
+			i = get_next_socket(i + 1, &path, &sock, &maxmem, &port,
+					&queue, &connected)) {
+		char memstr[32];
+		if (maxmem % (1UL << 30) == 0)
+			snprintf(memstr, sizeof(memstr), "%" PRIu64 "G", maxmem >> 30);
+		else if (maxmem % (1UL << 20) == 0)
+			snprintf(memstr, sizeof(memstr), "%" PRIu64 "M", maxmem >> 20);
+		else if (maxmem % (1UL << 10) == 0)
+			snprintf(memstr, sizeof(memstr), "%" PRIu64 "K", maxmem >> 10);
+		else
+			snprintf(memstr, sizeof(memstr), "%" PRIu64, maxmem);
+
+		printf("Socket %s [%s]: mem=%s, port=%u, queue=%u\n",
+				path, connected ? "connected" : "idle", memstr, port, queue);
+	}
+}
+
+void
+cmd_list_ports_parsed(__rte_unused void *parsed_result,
+		__rte_unused struct cmdline *cl,
+		__rte_unused void *data)
+{
+	for (int i = 0; i < rte_eth_dev_count_avail(); i++) {
+		struct rte_ether_addr addr;
+		int retval = rte_eth_macaddr_get(i, &addr);
+		if (retval != 0) {
+			printf("Port %d - MAC UNKNOWN\n", i);
+			continue;
+		}
+		printf("Port %d - "RTE_ETHER_ADDR_PRT_FMT"\n", i, RTE_ETHER_ADDR_BYTES(&addr));
+	}
+}
+
+void
+cmd_show_port_stats_parsed(__rte_unused void *parsed_result,
+		__rte_unused struct cmdline *cl,
+		__rte_unused void *data)
+{
+	for (int i = 0; i < rte_eth_dev_count_avail(); i++) {
+		struct rte_eth_stats stats = {0};
+		int retval = rte_eth_stats_get(i, &stats);
+		if (retval != 0) {
+			printf("Port %d - Cannot get stats\n", i);
+			continue;
+		}
+		printf("Port %d - ipkts: %"PRIu64", imissed: %"PRIu64
+				", ierrors: %"PRIu64", opkts: %"PRIu64"\n",
+				i, stats.ipackets, stats.imissed, stats.ierrors, stats.opackets);
+	}
+}
+
+void
+cmd_show_socket_stats_parsed(__rte_unused void *parsed_result,
+		__rte_unused struct cmdline *cl,
+		__rte_unused void *data)
+{
+	const char *path;
+	int sock;
+	uint64_t maxmem;
+	uint16_t port, queue;
+	bool connected;
+
+	for (int i = get_next_socket(0, &path, &sock, &maxmem, &port, &queue, &connected);
+			i < MAX_SOCKETS;
+			i = get_next_socket(i + 1, &path, &sock, &maxmem, &port,
+					&queue, &connected)) {
+		if (connected || dp_stats[i].rx != 0 || dp_stats[i].deq != 0)
+			printf("Socket %u [port %u, q %u]: RX %" PRIu64 ", Enq_drops %" PRIu64
+					", Deq %" PRIu64 ", TX_drops %" PRIu64 "\n",
+					i, i / MAX_QUEUES_SUPPORTED, i % MAX_QUEUES_SUPPORTED,
+					dp_stats[i].rx, dp_stats[i].enq_drop,
+					dp_stats[i].deq, dp_stats[i].tx_drop);
+
+	}
+}
+
+void
+cmd_quit_parsed(__rte_unused void *parsed_result, struct cmdline *cl,
+		__rte_unused void *data)
+{
+	cmdline_quit(cl);
+}
diff --git a/app/io-proxy/commands.list b/app/io-proxy/commands.list
new file mode 100644
index 0000000000..9dab9bba28
--- /dev/null
+++ b/app/io-proxy/commands.list
@@ -0,0 +1,6 @@
+add socket <STRING>path <STRING>memsize <UINT16>port <UINT16>queue
+list sockets
+list ports
+show port stats
+show socket stats
+quit
diff --git a/app/io-proxy/datapath.c b/app/io-proxy/datapath.c
new file mode 100644
index 0000000000..1f7162de18
--- /dev/null
+++ b/app/io-proxy/datapath.c
@@ -0,0 +1,595 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#include <errno.h>
+#include <stdarg.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <libgen.h>
+#include <sys/un.h>
+#include <sys/stat.h>
+#include <sys/mman.h>
+#include <sys/socket.h>
+#include <linux/memfd.h>
+
+#include <rte_eal.h>
+#include <rte_dev.h>
+#include <rte_malloc.h>
+#include <rte_ethdev.h>
+#include <rte_common.h>
+#include <rte_config.h>
+#include <rte_mempool.h>
+#include <shared_mem_bus.h>
+
+#include "datapath.h"
+
+static int mempool_ops_index = -1;
+static struct rte_mempool *default_mempool;
+static volatile unsigned long long port_poll_mask;
+static volatile unsigned long long used_poll_mask;
+
+struct listen_socket_params {
+	const char *path;
+	int sock;
+	uint16_t port_id;
+	uint16_t qid;
+	uint64_t maxmem;
+};
+
+#define S_IDX(p, q) (((p) * MAX_QUEUES_SUPPORTED) + (q))
+static struct rte_ring *rx_rings[MAX_SOCKETS];
+static struct rte_ring *tx_rings[MAX_SOCKETS];
+static uintptr_t base_addrs[MAX_SOCKETS];
+static uint64_t lengths[MAX_SOCKETS];
+static struct rte_mempool *mps[MAX_SOCKETS];
+static struct listen_socket_params sock_params[MAX_SOCKETS];
+struct rxtx_stats dp_stats[MAX_SOCKETS] = {0};
+
+int
+get_next_socket(int start, const char **path, int *sock, uint64_t *maxmem,
+		uint16_t *port, uint16_t *queue, bool *connected)
+{
+	int i;
+	for (i = start; i < MAX_SOCKETS; i++) {
+		if (sock_params[i].sock > 0) {
+			*path = sock_params[i].path;
+			*sock = sock_params[i].sock;
+			*maxmem = sock_params[i].maxmem;
+			*port = sock_params[i].port_id;
+			*queue = sock_params[i].qid;
+			*connected = (port_poll_mask & (1U << i)) != 0;
+			break;
+		}
+	}
+	return i;
+}
+
+static int
+init_port(uint16_t port_id, struct rte_mempool *mbuf_pool)
+{
+	struct rte_eth_conf port_conf = {
+		.rxmode = { .mq_mode = RTE_ETH_MQ_RX_RSS, },
+		.rx_adv_conf = {
+			.rss_conf = { .rss_hf = RTE_ETH_RSS_IP | RTE_ETH_RSS_UDP, },
+		},
+	};
+	struct rte_eth_dev_info dev_info;
+	int socket = rte_socket_id();
+
+	int retval = rte_eth_dev_info_get(port_id, &dev_info);
+	if (retval != 0) {
+		printf("Error during getting device (port %u) info: %s\n",
+				port_id, strerror(-retval));
+		return retval;
+	}
+
+	if (dev_info.tx_offload_capa & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE)
+		port_conf.txmode.offloads |= RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE;
+
+	port_conf.rx_adv_conf.rss_conf.rss_hf &= dev_info.flow_type_rss_offloads;
+
+	if (rte_eth_dev_configure(port_id, MAX_QUEUES_SUPPORTED, MAX_QUEUES_SUPPORTED,
+			&port_conf) < 0) {
+		printf("Error configuring port\n");
+		return -1;
+	}
+
+	for (uint16_t q = 0; q < MAX_QUEUES_SUPPORTED; q++) {
+		retval = rte_eth_rx_queue_setup(port_id, q, 128, socket, NULL, mbuf_pool);
+		if (retval < 0) {
+			printf("Error running rx_queue_setup\n");
+			return retval;
+		}
+		retval = rte_eth_tx_queue_setup(port_id, q, 256, socket, NULL);
+		if (retval < 0) {
+			printf("Error running tx_queue_setup\n");
+			return retval;
+		}
+	}
+
+	retval = rte_eth_dev_start(port_id);
+	if (retval < 0) {
+		printf("Error running dev_start\n");
+		return retval;
+	}
+	printf("Port %u started ok\n", port_id);
+
+	if (rte_eth_promiscuous_enable(port_id) < 0)
+		printf("Warning: could not enable promisc mode on port %u\n", port_id);
+
+	return 0;
+}
+
+int
+datapath_init(const char *corelist)
+{
+	/* eal init requires non-const parameters, so copy */
+	char *cl = strdup(corelist); /* todo, free copy */
+	char l_flag[] = "-l";
+	char in_mem[] = "--in-memory";
+	char use_avx512[] = "--force-max-simd-bitwidth=512";
+	char *argv[] = {
+			program_invocation_short_name,
+			l_flag, cl,
+			in_mem,
+			use_avx512,
+			NULL,
+	};
+
+	RTE_BUILD_BUG_ON(sizeof(port_poll_mask) * CHAR_BIT < MAX_SOCKETS);
+
+	int ret = rte_eal_init(RTE_DIM(argv) - 1, argv);
+	if (ret < 0)
+		return ret;
+
+	mempool_ops_index = check_mempool_ops();
+	if (mempool_ops_index == -1)
+		rte_panic("Cannot get mempool ops");
+	printf("Mempool ops index is %d\n", mempool_ops_index);
+
+	default_mempool = rte_pktmbuf_pool_create("proxy_def",
+			MAX_SOCKETS * 200, 32, 0,
+			RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
+	if (default_mempool == NULL)
+		rte_panic("Cannot create default mempool\n");
+
+	int nb_ethdevs = rte_eth_dev_count_avail();
+	if (nb_ethdevs > MAX_PORTS_SUPPORTED) {
+		fprintf(stderr, "More ports available than supported, some will be unused\n");
+		nb_ethdevs = MAX_PORTS_SUPPORTED;
+	}
+	for (int i = 0; i < nb_ethdevs; i++) {
+		if (init_port(i, default_mempool) != 0)
+			rte_panic("Cannot init port %d\n", i);
+	}
+	return 0;
+}
+
+static int
+send_fd(int to, int fd, uint64_t fd_size, rte_iova_t iova, uint64_t pg_size)
+{
+	struct iovec iov = {0};
+	struct msghdr msg = {0};
+	size_t cmsglen = CMSG_LEN(sizeof(fd));
+	struct cmsghdr *cmhdr = malloc(cmsglen);
+	int ret = 0;
+
+	struct {
+		uint64_t fd_size;
+		rte_iova_t iova;
+		uint64_t pg_size;
+	} data_message = {fd_size, iova, pg_size};
+
+	if (cmhdr == NULL)
+		return -1;
+	iov.iov_base = (void *)&data_message;
+	iov.iov_len = sizeof(data_message);
+	msg.msg_iov = &iov;
+	msg.msg_iovlen = 1;
+	cmhdr->cmsg_level = SOL_SOCKET;
+	cmhdr->cmsg_type = SCM_RIGHTS;
+	cmhdr->cmsg_len = cmsglen;
+	msg.msg_control = cmhdr;
+	msg.msg_controllen = cmsglen;
+	*(int *)CMSG_DATA(cmhdr) = fd;
+
+	if (sendmsg(to, &msg, 0) != (int)iov.iov_len) {
+		printf("Error sending message to client, %s\n", strerror(errno));
+		ret = -1;
+	}
+	free(cmhdr);
+	return ret;
+}
+
+static int
+reconfigure_queue(uint16_t port_id, uint16_t qid, struct rte_mempool *p)
+{
+	if (rte_eth_dev_rx_queue_stop(port_id, qid) != 0) {
+		printf("Error with rx_queue_stop\n");
+		return -1;
+	}
+	if (rte_eth_dev_tx_queue_stop(port_id, qid) != 0) {
+		printf("Error with tx_queue_stop\n");
+		return -1;
+	}
+	if (rte_eth_rx_queue_setup(port_id, qid, 1024,
+			rte_socket_id(), NULL, p) != 0) {
+		printf("Error with rx_queue_setup\n");
+		return -1;
+	}
+	if (rte_eth_dev_tx_queue_start(port_id, qid) != 0) {
+		printf("Error with tx_queue_start\n");
+		return -1;
+	}
+	if (rte_eth_dev_rx_queue_start(port_id, qid) != 0) {
+		printf("Error with rx_queue_start\n");
+		return -1;
+	}
+	return 0;
+}
+
+static void
+handle_connection(int client, void *const client_mem, uint64_t memsize,
+		uint16_t port_id, uint16_t qid)
+{
+	uintptr_t client_mmap_addr = 0;
+	struct rte_ring *rx_ring, *tx_ring;
+	struct rte_mempool *local_mp;
+	size_t mempool_memsize = sizeof(*local_mp)
+					+ sizeof(local_mp->local_cache[0]) * RTE_MAX_LCORE
+					+ sizeof(struct rte_pktmbuf_pool_private);
+	local_mp = rte_malloc(NULL, mempool_memsize, 0);
+	if (local_mp == NULL) {
+		printf("Error allocating mempool struct\n");
+		return;
+	}
+	memset(local_mp, 0, mempool_memsize);
+	*local_mp = (struct rte_mempool){
+		.name = "proxy_mp",
+		.cache_size = 256,
+		.ops_index = mempool_ops_index,
+		.pool_config = client_mem,
+		.private_data_size = sizeof(struct rte_pktmbuf_pool_private),
+		.local_cache = RTE_PTR_ADD(local_mp, sizeof(*local_mp)),
+	};
+	for (uint i = 0; i < RTE_MAX_LCORE; i++) {
+		local_mp->local_cache[i].size = 256;
+		local_mp->local_cache[i].flushthresh = 300;
+	}
+
+	struct eth_shared_mem_msg *msg = malloc(sizeof(*msg) + 1024);
+	if (msg == NULL) {
+		printf("Error mallocing message buffer\n");
+		goto out;
+	}
+	int bytes_read = read(client, msg, sizeof(msg) + 1024);
+	while (bytes_read != 0) {
+		switch (msg->type) {
+		case MSG_TYPE_MMAP_BASE_ADDR:
+			client_mmap_addr = msg->offset;
+			printf("Got mmap base addr of %p\n", (void *)client_mmap_addr);
+			break;
+		case MSG_TYPE_MEMPOOL_OFFSET: {
+			struct rte_mempool *remote_pool;
+			uintptr_t remote_pd_offset;
+
+			remote_pool = RTE_PTR_ADD(client_mem, msg->offset);
+			remote_pd_offset = (uintptr_t)remote_pool->pool_data - client_mmap_addr;
+			local_mp->pool_data = RTE_PTR_ADD(client_mem, remote_pd_offset);
+			memcpy(rte_mempool_get_priv(local_mp), rte_mempool_get_priv(remote_pool),
+					sizeof(struct rte_pktmbuf_pool_private));
+
+			printf("Got mempool offset of %p, stack name is %s\n",
+					(void *)msg->offset, (char *)local_mp->pool_data);
+			struct rte_mbuf *mb = rte_pktmbuf_alloc(local_mp);
+			if (mb == NULL) {
+				printf("Error allocating buffer\n");
+				return;
+			}
+			if ((uintptr_t)mb->buf_addr != (uintptr_t)mb + 128)
+				rte_panic("Error, bad buffer\n");
+			rte_pktmbuf_free(mb);
+			break;
+		}
+		case MSG_TYPE_RX_RING_OFFSET:
+			printf("Got Rx ring offset of %p\n", (void *)msg->offset);
+			rx_ring = RTE_PTR_ADD(client_mem, msg->offset);
+			rx_rings[S_IDX(port_id, qid)] = rx_ring;
+			break;
+		case MSG_TYPE_TX_RING_OFFSET:
+			printf("Got Tx ring offset of %p\n", (void *)msg->offset);
+			tx_ring = RTE_PTR_ADD(client_mem, msg->offset);
+			tx_rings[S_IDX(port_id, qid)] = tx_ring;
+			break;
+
+		case MSG_TYPE_START:
+			base_addrs[S_IDX(port_id, qid)] = (uintptr_t)client_mem;
+			lengths[S_IDX(port_id, qid)] = memsize;
+			mps[S_IDX(port_id, qid)] = local_mp;
+			if (reconfigure_queue(port_id, qid, local_mp) < 0)
+				goto out;
+
+			port_poll_mask |= (1UL << S_IDX(port_id, qid));
+			while (used_poll_mask != port_poll_mask)
+				usleep(10);
+
+			*msg = (struct eth_shared_mem_msg){ .type = MSG_TYPE_ACK, };
+			if (write(client, msg, sizeof(*msg)) < (int)sizeof(*msg))
+				goto out;
+
+			dp_stats[S_IDX(port_id, qid)] = (struct rxtx_stats){0};
+			break;
+
+		case MSG_TYPE_GET_MAC:
+			*msg = (struct eth_shared_mem_msg){
+				.type = MSG_TYPE_REPORT_MAC,
+			};
+			rte_eth_macaddr_get(port_id, &msg->ethaddr);
+			if (write(client, msg, sizeof(*msg)) < (int)sizeof(*msg))
+				goto out;
+			break;
+		default:
+			printf("Unknown message\n");
+		}
+		bytes_read = read(client, msg, sizeof(msg) + 1024);
+	}
+out:
+	port_poll_mask &= ~(1UL << S_IDX(port_id, qid));
+	while (used_poll_mask != port_poll_mask)
+		usleep(10);
+
+	reconfigure_queue(port_id, qid, default_mempool);
+
+	free(msg);
+	rte_free(local_mp);
+
+	printf("Client disconnect\n");
+}
+
+static int
+accept_client(const int sock, uint64_t maxmem, uint16_t port_id, uint16_t qid)
+{
+	int ret = 0;
+	rte_iova_t *iovas = NULL;
+	const int client = accept(sock, NULL, NULL);
+	if (client < 0) {
+		printf("Error with accept\n");
+		return errno;
+	}
+	printf("Client connected\n");
+
+	char filename[32];
+	int flags = MFD_HUGETLB;
+	uint32_t pgsize = (1 << 21);
+	if (maxmem % (1 << 30) == 0) {
+		flags |= MFD_HUGE_1GB;
+		pgsize = (1 << 30);
+	}
+	snprintf(filename, sizeof(filename), "client_memory_%d", client);
+
+	const int memfd = memfd_create(filename, flags);
+	if (memfd < 0) {
+		printf("Error with memfd_create\n");
+		return errno;
+	}
+	if (ftruncate(memfd, maxmem) < 0) {
+		printf("Error with ftruncate\n");
+		close(memfd);
+		return errno;
+	}
+	void * const client_mem = mmap(NULL, maxmem, PROT_READ | PROT_WRITE,
+			MAP_SHARED, memfd, 0);
+	if (client_mem == MAP_FAILED) {
+		printf("Error with mmap\n");
+		ret = errno;
+		goto out;
+	}
+
+	const int nb_pages = maxmem / pgsize;
+	printf("Registering %d pages of memory with DPDK\n", nb_pages);
+	iovas = malloc(sizeof(*iovas) * nb_pages);
+	if (iovas == NULL) {
+		printf("Error with malloc for iovas\n");
+		ret = ENOMEM;
+		goto out;
+	}
+	/* assume vfio, VA = IOVA */
+	iovas[0] = (uintptr_t)client_mem;
+	for (int i = 1; i < nb_pages; i++)
+		iovas[i] = iovas[i - 1] + pgsize;
+
+
+	if (rte_extmem_register(client_mem, maxmem, iovas, nb_pages, pgsize) < 0) {
+		printf("Error registering memory with DPDK, %s\n", strerror(rte_errno));
+		ret = rte_errno;
+		goto out;
+	}
+	printf("Registered memory: %" PRIu64 " @ %p in heap %s\n", maxmem, client_mem, filename);
+
+	struct rte_eth_dev_info info;
+	if (rte_eth_dev_info_get(port_id, &info) < 0) {
+		printf("Error getting ethdev info\n");
+		ret = -1;
+		goto out;
+	}
+	if (rte_dev_dma_map(info.device, client_mem, iovas[0], maxmem) < 0) {
+		printf("Error mapping dma for device, %s\n", strerror(rte_errno));
+		ret = rte_errno;
+		goto out;
+	}
+
+	if (send_fd(client, memfd, maxmem, iovas[0], pgsize) != 0) {
+		printf("Error sending fd to client\n");
+		ret = errno;
+		goto out;
+	}
+	printf("Sent FD to client for mapping\n");
+
+	handle_connection(client, client_mem, maxmem, port_id, qid);
+out:
+	if (iovas != NULL)
+		rte_dev_dma_unmap(info.device, client_mem, iovas[0], maxmem);
+	printf("Unregistering memory: %" PRIu64 " @ %p in heap %s\n", maxmem, client_mem, filename);
+	if (rte_extmem_unregister(client_mem, maxmem) < 0)
+		printf("Error unregistering memory, %s\n", strerror(rte_errno));
+	close(memfd);
+	close(client);
+	if (client_mem != NULL)
+		munmap(client_mem, maxmem);
+	return ret;
+}
+
+static void *
+listen_fn(void *param)
+{
+	struct listen_socket_params *p = param;
+	int ret = 0;
+
+	rte_thread_register();
+
+	while (1) {
+		const int ret = accept_client(p->sock, p->maxmem, p->port_id, p->qid);
+		if (ret != 0)
+			goto out;
+	}
+out:
+	free(p);
+	return (void *)(uintptr_t)ret;
+}
+
+int
+listen_unix_socket(const char *path, const uint64_t maxmem, uint16_t port_id, uint16_t qid)
+{
+	if (sock_params[S_IDX(port_id, qid)].sock != 0) {
+		printf("Error, port already in use\n");
+		return EEXIST;
+	}
+
+	if (port_id >= rte_eth_dev_count_avail()) {
+		printf("Error, port %u does not exist\n", port_id);
+		return EINVAL;
+	}
+
+	printf("Opening and listening on socket: %s\n", path);
+	char *pathcp = strdup(path);
+	if (pathcp == NULL) {
+		printf("Error with strdup()\n");
+		free(pathcp);
+		return ENOMEM;
+	}
+	char *dirpath = dirname(pathcp);
+	mkdir(dirpath, 0700);
+	free(pathcp);
+
+	int sock = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+	if (sock < 0) {
+		printf("Error creating socket\n");
+		return errno;
+	}
+
+	struct sockaddr_un sun = {.sun_family = AF_UNIX};
+	strlcpy(sun.sun_path, path, sizeof(sun.sun_path));
+	printf("Attempting socket bind to path '%s'\n", path);
+	printf("Associated parameters are: maxmem = %"PRIu64", port = %u, qid = %u\n",
+			maxmem, port_id, qid);
+
+	if (bind(sock, (void *) &sun, sizeof(sun)) < 0) {
+		printf("Initial bind to socket '%s' failed.\n", path);
+
+		/* check if current socket is active */
+		if (connect(sock, (void *)&sun, sizeof(sun)) == 0) {
+			close(sock);
+			return EADDRINUSE;
+		}
+
+		/* socket is not active, delete and attempt rebind */
+		printf("Attempting unlink and retrying bind\n");
+		unlink(sun.sun_path);
+		if (bind(sock, (void *) &sun, sizeof(sun)) < 0) {
+			printf("Error binding socket: %s\n", strerror(errno));
+			close(sock);
+			return errno; /* if unlink failed, this will be -EADDRINUSE as above */
+		}
+	}
+
+	if (listen(sock, 1) < 0) {
+		printf("Error calling listen for socket: %s\n", strerror(errno));
+		unlink(sun.sun_path);
+		close(sock);
+		return errno;
+	}
+	printf("Socket %s listening ok\n", path);
+
+	struct listen_socket_params *p = &sock_params[S_IDX(port_id, qid)];
+	pthread_t listen_thread;
+	*p = (struct listen_socket_params){strdup(path), sock, port_id, qid, maxmem};
+	pthread_create(&listen_thread, NULL, listen_fn, p);
+	pthread_detach(listen_thread);
+	return 0;
+}
+
+void
+handle_forwarding(void)
+{
+	const typeof(port_poll_mask) to_poll = port_poll_mask;
+	if (used_poll_mask != to_poll) {
+		printf("Poll mask is now %#llx\n", to_poll);
+		used_poll_mask = to_poll;
+	}
+	if (to_poll == 0) {
+		usleep(100);
+		return;
+	}
+	for (uint16_t i = 0; i < sizeof(to_poll) * CHAR_BIT; i++) {
+		struct rte_mbuf *mbs[32];
+		void *offsets[32];
+		if (((1UL << i) & to_poll) == 0)
+			continue;
+
+		uint16_t port_id = i / MAX_QUEUES_SUPPORTED;
+		uint16_t qid = i % MAX_QUEUES_SUPPORTED;
+		uint16_t nb_rx = rte_eth_rx_burst(port_id, qid, mbs, RTE_DIM(mbs));
+		if (nb_rx != 0) {
+			dp_stats[i].rx += nb_rx;
+			for (uint pkt = 0; pkt < nb_rx; pkt++) {
+				mbs[pkt]->buf_addr = RTE_PTR_SUB(mbs[pkt]->buf_addr, base_addrs[i]);
+				offsets[pkt] = RTE_PTR_SUB(mbs[pkt], base_addrs[i]);
+			}
+			uint16_t nb_enq = rte_ring_enqueue_burst(rx_rings[i], offsets, nb_rx, NULL);
+			if (nb_enq != nb_rx) {
+				dp_stats[i].enq_drop += nb_rx - nb_enq;
+				for (uint pkt = nb_enq; pkt < nb_rx; pkt++) {
+					mbs[pkt]->buf_addr = RTE_PTR_ADD(mbs[pkt]->buf_addr,
+							base_addrs[i]);
+					mbs[pkt]->pool = mps[i];
+				}
+				rte_mempool_put_bulk(mps[i], (void *)&mbs[nb_enq], nb_rx - nb_enq);
+			}
+		}
+
+		uint16_t nb_deq = rte_ring_dequeue_burst(tx_rings[i], offsets,
+				RTE_DIM(offsets), NULL);
+		if (nb_deq != 0) {
+			dp_stats[i].deq += nb_deq;
+			for (uint pkt = 0; pkt < nb_deq; pkt++) {
+				mbs[pkt] = RTE_PTR_ADD(offsets[pkt], base_addrs[i]);
+				rte_prefetch0_write(mbs[pkt]);
+			}
+			for (uint pkt = 0; pkt < nb_deq; pkt++) {
+				mbs[pkt]->pool = mps[i];
+				mbs[pkt]->buf_addr = RTE_PTR_ADD(mbs[pkt]->buf_addr, base_addrs[i]);
+			}
+			uint16_t nb_tx = rte_eth_tx_burst(port_id, qid, mbs, nb_deq);
+			if (nb_tx != nb_deq) {
+				dp_stats[i].tx_drop += (nb_deq - nb_tx);
+				rte_pktmbuf_free_bulk(&mbs[nb_tx], nb_deq - nb_tx);
+			}
+		}
+	}
+}
+
+unsigned int
+lcore_id(void)
+{
+	return rte_lcore_id();
+}
diff --git a/app/io-proxy/datapath.h b/app/io-proxy/datapath.h
new file mode 100644
index 0000000000..ec5b395164
--- /dev/null
+++ b/app/io-proxy/datapath.h
@@ -0,0 +1,37 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#ifndef DATAPATH_H_INC
+#define DATAPATH_H_INC
+
+#include <stdint.h>
+
+#define MEMPOOL_OPS_NAME "proxy_mp"
+#define MAX_PORTS_SUPPORTED 8
+#define MAX_QUEUES_SUPPORTED 2
+#define MAX_SOCKETS (MAX_PORTS_SUPPORTED * MAX_QUEUES_SUPPORTED)
+
+struct rxtx_stats {
+	uint64_t rx;
+	uint64_t enq_drop;
+	uint64_t deq;
+	uint64_t tx_drop;
+};
+
+extern struct rxtx_stats dp_stats[MAX_SOCKETS];
+
+int check_mempool_ops(void);
+
+int datapath_init(const char *corelist);
+
+int listen_unix_socket(const char *path, uint64_t maxmem, uint16_t port, uint16_t qid);
+
+void handle_forwarding(void);
+
+unsigned int lcore_id(void);
+
+int get_next_socket(int start, const char **path, int *sock, uint64_t *maxmem,
+		uint16_t *port, uint16_t *queue, bool *connected);
+
+
+#endif
diff --git a/app/io-proxy/datapath_mp.c b/app/io-proxy/datapath_mp.c
new file mode 100644
index 0000000000..bba21a5b14
--- /dev/null
+++ b/app/io-proxy/datapath_mp.c
@@ -0,0 +1,78 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#include <sys/types.h>
+#include <rte_stack.h>
+#include <rte_mempool.h>
+#include <rte_mbuf.h>
+#include "datapath.h"
+
+/* Mempool value "pool_config" contains pointer to base address for this mapping */
+/* no alloc/free etc. functions for this pool, as we never create/destroy it, only use
+ * enqueue and dequeue from it.
+ */
+
+static int
+proxy_mp_enqueue(struct rte_mempool *mp, void * const *obj_table,
+	      unsigned int n)
+{
+	struct rte_stack *s = mp->pool_data;
+	void *offset_table[n];
+	uintptr_t mempool_base = (uintptr_t)mp->pool_config;
+
+	for (uint i = 0; i < n; i++)
+		offset_table[i] = RTE_PTR_SUB(obj_table[i], mempool_base);
+
+	return rte_stack_push(s, offset_table, n) == 0 ? -ENOBUFS : 0;
+}
+
+static int
+proxy_mp_dequeue(struct rte_mempool *mp, void **obj_table,
+	      unsigned int n)
+{
+	struct rte_stack *s = mp->pool_data;
+	uintptr_t mempool_base = (uintptr_t)mp->pool_config;
+
+	if (rte_stack_pop(s, obj_table, n) == 0)
+		return -ENOBUFS;
+	for (uint i = 0; i < n; i++) {
+		obj_table[i] = RTE_PTR_ADD(obj_table[i], mempool_base);
+		struct rte_mbuf *mb = obj_table[i];
+		mb->buf_addr = RTE_PTR_ADD(mb, sizeof(struct rte_mbuf) + rte_pktmbuf_priv_size(mp));
+		mb->pool = mp;
+	}
+	return 0;
+}
+
+static int
+proxy_mp_alloc(struct rte_mempool *mp __rte_unused)
+{
+	rte_panic("Should not be called\n");
+}
+
+static unsigned int
+proxy_mp_get_count(const struct rte_mempool *mp __rte_unused)
+{
+	rte_panic("Should not be called\n");
+}
+
+
+static struct rte_mempool_ops ops_proxy_mp = {
+	.name = MEMPOOL_OPS_NAME,
+	.alloc = proxy_mp_alloc,
+	.enqueue = proxy_mp_enqueue,
+	.dequeue = proxy_mp_dequeue,
+	.get_count = proxy_mp_get_count,
+};
+
+RTE_MEMPOOL_REGISTER_OPS(ops_proxy_mp);
+
+int
+check_mempool_ops(void)
+{
+	for (uint i = 0; i < rte_mempool_ops_table.num_ops; i++) {
+		if (strcmp(rte_mempool_ops_table.ops[i].name, MEMPOOL_OPS_NAME) == 0)
+			return i;
+	}
+	return -1;
+}
diff --git a/app/io-proxy/main.c b/app/io-proxy/main.c
new file mode 100644
index 0000000000..82eef81fb0
--- /dev/null
+++ b/app/io-proxy/main.c
@@ -0,0 +1,71 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#include <stdio.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include <rte_eal.h>
+#include <rte_common.h>
+#include <cmdline.h>
+#include <cmdline_socket.h>
+
+#include "datapath.h"
+#include "commands.h"
+
+volatile bool quit;
+volatile bool running_startup_script;
+static const char *startup_file = "dpdk-io-proxy.cmds";
+
+static void *
+run_cmdline(void *arg __rte_unused)
+{
+	struct cmdline *cl;
+	int fd = open(startup_file, O_RDONLY);
+
+	if (fd >= 0) {
+		running_startup_script = true;
+		cl = cmdline_new(ctx, "\n# ", fd, STDOUT_FILENO);
+		if (cl == NULL) {
+			fprintf(stderr, "Error processing %s\n", startup_file);
+			goto end_startup;
+		}
+		cmdline_interact(cl);
+		cmdline_quit(cl);
+end_startup:
+		running_startup_script = false;
+		close(fd);
+	}
+
+	cl = cmdline_stdin_new(ctx, "\nProxy>> ");
+	if (cl == NULL)
+		goto out;
+
+	cmdline_interact(cl);
+	cmdline_stdin_exit(cl);
+
+out:
+	quit = true;
+	return NULL;
+}
+
+int
+main(int argc, char *argv[])
+{
+	pthread_t cmdline_th;
+
+	if (argc != 2 || datapath_init(argv[1]) < 0) {
+		fprintf(stderr, "Usage %s <corelist>\n", program_invocation_short_name);
+		rte_exit(EXIT_FAILURE, "Cannot init\n");
+	}
+
+	if (pthread_create(&cmdline_th, NULL, run_cmdline, NULL) < 0)
+		rte_exit(EXIT_FAILURE, "Cannot spawn cmdline thread\n");
+	pthread_detach(cmdline_th);
+
+	while (!quit)
+		handle_forwarding();
+	return 0;
+}
diff --git a/app/io-proxy/meson.build b/app/io-proxy/meson.build
new file mode 100644
index 0000000000..f03783b68f
--- /dev/null
+++ b/app/io-proxy/meson.build
@@ -0,0 +1,12 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2023 Intel Corporation
+
+cmd_h = custom_target('commands_hdr',
+        output: 'commands.h',
+        input: files('commands.list'),
+        capture: true,
+        command: [cmdline_gen_cmd, '@INPUT@']
+)
+sources += files('datapath.c', 'datapath_mp.c', 'main.c', 'command_fns.c')
+sources += cmd_h
+deps += ['cmdline', 'ethdev', 'stack', 'bus_shared_mem']
diff --git a/app/meson.build b/app/meson.build
index e4bf5c531c..27f69d883e 100644
--- a/app/meson.build
+++ b/app/meson.build
@@ -18,6 +18,7 @@ apps = [
         'dumpcap',
         'pdump',
         'proc-info',
+        'io-proxy',
         'test-acl',
         'test-bbdev',
         'test-cmdline',
-- 
2.39.2


^ permalink raw reply	[flat|nested] 7+ messages in thread

* [RFC PATCH 5/5] app/io-proxy: add startup commands
  2023-09-22  8:19 [RFC PATCH 0/5] Using shared mempools for zero-copy IO proxying Bruce Richardson
                   ` (3 preceding siblings ...)
  2023-09-22  8:19 ` [RFC PATCH 4/5] app: add IO proxy app using shared memory interfaces Bruce Richardson
@ 2023-09-22  8:19 ` Bruce Richardson
  4 siblings, 0 replies; 7+ messages in thread
From: Bruce Richardson @ 2023-09-22  8:19 UTC (permalink / raw)
  To: dev; +Cc: Bruce Richardson

To make it easier to run the io-proxy, add a startup command line
example to configure by default 4 sockets on two ports.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 app/io-proxy/dpdk-io-proxy.cmds | 6 ++++++
 1 file changed, 6 insertions(+)
 create mode 100644 app/io-proxy/dpdk-io-proxy.cmds

diff --git a/app/io-proxy/dpdk-io-proxy.cmds b/app/io-proxy/dpdk-io-proxy.cmds
new file mode 100644
index 0000000000..515d598079
--- /dev/null
+++ b/app/io-proxy/dpdk-io-proxy.cmds
@@ -0,0 +1,6 @@
+add socket /tmp/socket_0_0/sock 2G 0 0
+add socket /tmp/socket_0_1/sock 2G 0 1
+add socket /tmp/socket_1_0/sock 2G 1 0
+add socket /tmp/socket_1_1/sock 2G 1 1
+list ports
+list sockets
-- 
2.39.2


^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [RFC PATCH 1/5] bus: new driver to accept shared memory over unix socket
  2023-09-22  8:19 ` [RFC PATCH 1/5] bus: new driver to accept shared memory over unix socket Bruce Richardson
@ 2023-11-23 14:50   ` Jerin Jacob
  0 siblings, 0 replies; 7+ messages in thread
From: Jerin Jacob @ 2023-11-23 14:50 UTC (permalink / raw)
  To: Bruce Richardson; +Cc: dev

On Fri, Sep 22, 2023 at 1:49 PM Bruce Richardson
<bruce.richardson@intel.com> wrote:
>
> Add a new driver to DPDK which supports taking in memory e.g. hugepage
> memory via a unix socket connection and maps it into the DPDK process
> replacing the current socket memory as the default memory for use by
> future requests.
>
> Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>

Thanks Bruce for this work. IMO, This will open up a lot of use cases
like CPU based offload on different process.

> +
> +enum shared_mem_msg_type {
> +       MSG_TYPE_ACK = 0,
> +       MSG_TYPE_MMAP_BASE_ADDR,
> +       MSG_TYPE_MEMPOOL_OFFSET,
> +       MSG_TYPE_RX_RING_OFFSET,
> +       MSG_TYPE_TX_RING_OFFSET,
> +       MSG_TYPE_START,
> +       MSG_TYPE_GET_MAC,
> +       MSG_TYPE_REPORT_MAC,
> +};

In order to cater to different use cases, IMO, drivers/bus/shared_mem/
should be generic and act only
as transport for communicating with other process. That would
translate to the following
1) drivers/bus/shared_mem/ provides means to register message type and
its callback
2) The consumers(drivers/mempool/shared_mem and drivers/net/sharedmem)
register the
the callback. Definition the callback and message type should be in consumer.

Also, We may change the bus/sharedmem to bus/socket or so, to limit
the scope of bus
driver as communion mechanism to talk to different process. That way
there can different DPDK driver
can based on socket bus in the future.

^ permalink raw reply	[flat|nested] 7+ messages in thread

end of thread, other threads:[~2023-11-23 14:50 UTC | newest]

Thread overview: 7+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-09-22  8:19 [RFC PATCH 0/5] Using shared mempools for zero-copy IO proxying Bruce Richardson
2023-09-22  8:19 ` [RFC PATCH 1/5] bus: new driver to accept shared memory over unix socket Bruce Richardson
2023-11-23 14:50   ` Jerin Jacob
2023-09-22  8:19 ` [RFC PATCH 2/5] mempool: driver for mempools of mbufs on shared memory Bruce Richardson
2023-09-22  8:19 ` [RFC PATCH 3/5] net: new ethdev driver to communicate using shared mem Bruce Richardson
2023-09-22  8:19 ` [RFC PATCH 4/5] app: add IO proxy app using shared memory interfaces Bruce Richardson
2023-09-22  8:19 ` [RFC PATCH 5/5] app/io-proxy: add startup commands Bruce Richardson

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).