DPDK patches and discussions
 help / color / mirror / Atom feed
* [dpdk-dev] [PATCH v2 0/2] introduce asynchronous data path for vhost
@ 2020-06-29 14:44 patrick.fu
  2020-06-29 14:44 ` [dpdk-dev] [PATCH v2 1/2] vhost: introduce async enqueue registration API patrick.fu
  2020-06-29 14:44 ` [dpdk-dev] [PATCH v2 2/2] vhost: introduce async enqueue for split ring patrick.fu
  0 siblings, 2 replies; 5+ messages in thread
From: patrick.fu @ 2020-06-29 14:44 UTC (permalink / raw)
  To: dev, maxime.coquelin, chenbo.xia, zhihong.wang
  Cc: patrick.fu, yinan.wang, cheng1.jiang, cunming.liang

From: Patrick Fu <patrick.fu@intel.com>

Performing large memory copies usually takes up a major part of CPU
cycles and becomes the hot spot in vhost-user enqueue operation. To
offload expensive memory operations from the CPU, this patch set
proposes to leverage DMA engines, e.g., I/OAT, a DMA engine in the
Intel's processor, to accelerate large copies.

Large copies are offloaded from the CPU to the DMA in an asynchronous
manner. The CPU just submits copy jobs to the DMA but without waiting
for its copy completion. Thus, there is no CPU intervention during
data transfer; we can save precious CPU cycles and improve the overall
throughput for vhost-user based applications, like OVS. During packet
transmission, it offloads large copies to the DMA and performs small
copies by the CPU, due to startup overheads associated with the DMA.

This patch set construct a general framework that applications can
leverage to attach DMA channels with vhost-user transmit queues. Four
new RTE APIs are introduced to vhost library for applications to
register and use the asynchronous data path. In addition, two new DMA
operation callbacks are defined, by which vhost-user asynchronous data
path can interact with DMA hardware. Currently only enqueue operation
for split queue is implemented, but the framework is flexible to extend
support for packed queue.

v2:
update meson file for new header file
update rte_vhost_version.map to include new APIs
rename async APIs/structures to be prefixed with "rte_vhost"
rename some variables/structures for readibility
correct minor typo in comments/license statements
refine memory allocation logic for vq internal buffer
add error message printing in some failure cases
check inflight async packets in unregistration API call
mark new APIs as experimental

Patrick Fu (2):
  vhost: introduce async enqueue registration API
  vhost: introduce async enqueue for split ring

 lib/librte_vhost/Makefile              |   2 +-
 lib/librte_vhost/meson.build           |   2 +-
 lib/librte_vhost/rte_vhost.h           |   1 +
 lib/librte_vhost/rte_vhost_async.h     | 176 +++++++++++
 lib/librte_vhost/rte_vhost_version.map |   4 +
 lib/librte_vhost/socket.c              |  20 ++
 lib/librte_vhost/vhost.c               | 127 +++++++-
 lib/librte_vhost/vhost.h               |  30 +-
 lib/librte_vhost/vhost_user.c          |  27 +-
 lib/librte_vhost/virtio_net.c          | 539 ++++++++++++++++++++++++++++++++-
 10 files changed, 919 insertions(+), 9 deletions(-)
 create mode 100644 lib/librte_vhost/rte_vhost_async.h

-- 
1.8.3.1


^ permalink raw reply	[flat|nested] 5+ messages in thread

* [dpdk-dev] [PATCH v2 1/2] vhost: introduce async enqueue registration API
  2020-06-29 14:44 [dpdk-dev] [PATCH v2 0/2] introduce asynchronous data path for vhost patrick.fu
@ 2020-06-29 14:44 ` patrick.fu
  2020-06-29 14:44 ` [dpdk-dev] [PATCH v2 2/2] vhost: introduce async enqueue for split ring patrick.fu
  1 sibling, 0 replies; 5+ messages in thread
From: patrick.fu @ 2020-06-29 14:44 UTC (permalink / raw)
  To: dev, maxime.coquelin, chenbo.xia, zhihong.wang
  Cc: patrick.fu, yinan.wang, cheng1.jiang, cunming.liang

From: Patrick Fu <patrick.fu@intel.com>

This patch introduces registration/un-registration APIs
for vhost async data enqueue operation. Together with
the registration APIs implementations, data structures
and async callback functions required for async enqueue
data path are also defined.

Signed-off-by: Patrick Fu <patrick.fu@intel.com>
---
 lib/librte_vhost/Makefile              |   2 +-
 lib/librte_vhost/meson.build           |   2 +-
 lib/librte_vhost/rte_vhost.h           |   1 +
 lib/librte_vhost/rte_vhost_async.h     | 136 +++++++++++++++++++++++++++++++++
 lib/librte_vhost/rte_vhost_version.map |   4 +
 lib/librte_vhost/socket.c              |  20 +++++
 lib/librte_vhost/vhost.c               | 127 +++++++++++++++++++++++++++++-
 lib/librte_vhost/vhost.h               |  30 +++++++-
 lib/librte_vhost/vhost_user.c          |  27 ++++++-
 9 files changed, 342 insertions(+), 7 deletions(-)
 create mode 100644 lib/librte_vhost/rte_vhost_async.h

diff --git a/lib/librte_vhost/Makefile b/lib/librte_vhost/Makefile
index b7ff7dc..4f2f3e4 100644
--- a/lib/librte_vhost/Makefile
+++ b/lib/librte_vhost/Makefile
@@ -42,7 +42,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_VHOST) := fd_man.c iotlb.c socket.c vhost.c \
 
 # install includes
 SYMLINK-$(CONFIG_RTE_LIBRTE_VHOST)-include += rte_vhost.h rte_vdpa.h \
-						rte_vdpa_dev.h
+						rte_vdpa_dev.h rte_vhost_async.h
 
 # only compile vhost crypto when cryptodev is enabled
 ifeq ($(CONFIG_RTE_LIBRTE_CRYPTODEV),y)
diff --git a/lib/librte_vhost/meson.build b/lib/librte_vhost/meson.build
index 882a0ea..cc9aa65 100644
--- a/lib/librte_vhost/meson.build
+++ b/lib/librte_vhost/meson.build
@@ -22,5 +22,5 @@ sources = files('fd_man.c', 'iotlb.c', 'socket.c', 'vdpa.c',
 		'vhost.c', 'vhost_user.c',
 		'virtio_net.c', 'vhost_crypto.c')
 headers = files('rte_vhost.h', 'rte_vdpa.h', 'rte_vdpa_dev.h',
-		'rte_vhost_crypto.h')
+		'rte_vhost_crypto.h', 'rte_vhost_async.h')
 deps += ['ethdev', 'cryptodev', 'hash', 'pci']
diff --git a/lib/librte_vhost/rte_vhost.h b/lib/librte_vhost/rte_vhost.h
index 2fbc364..28616f4 100644
--- a/lib/librte_vhost/rte_vhost.h
+++ b/lib/librte_vhost/rte_vhost.h
@@ -35,6 +35,7 @@
 #define RTE_VHOST_USER_EXTBUF_SUPPORT	(1ULL << 5)
 /* support only linear buffers (no chained mbufs) */
 #define RTE_VHOST_USER_LINEARBUF_SUPPORT	(1ULL << 6)
+#define RTE_VHOST_USER_ASYNC_COPY	(1ULL << 7)
 
 /** Protocol features. */
 #ifndef VHOST_USER_PROTOCOL_F_MQ
diff --git a/lib/librte_vhost/rte_vhost_async.h b/lib/librte_vhost/rte_vhost_async.h
new file mode 100644
index 0000000..d5a5927
--- /dev/null
+++ b/lib/librte_vhost/rte_vhost_async.h
@@ -0,0 +1,136 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+#ifndef _RTE_VHOST_ASYNC_H_
+#define _RTE_VHOST_ASYNC_H_
+
+#include "rte_vhost.h"
+
+/**
+ * iovec iterator
+ */
+struct rte_vhost_iov_iter {
+	/** offset to the first byte of interesting data */
+	size_t offset;
+	/** total bytes of data in this iterator */
+	size_t count;
+	/** pointer to the iovec array */
+	struct iovec *iov;
+	/** number of iovec in this iterator */
+	unsigned long nr_segs;
+};
+
+/**
+ * dma transfer descriptor pair
+ */
+struct rte_vhost_async_desc {
+	/** source memory iov_iter */
+	struct rte_vhost_iov_iter *src;
+	/** destination memory iov_iter */
+	struct rte_vhost_iov_iter *dst;
+};
+
+/**
+ * dma transfer status
+ */
+struct rte_vhost_async_status {
+	/** An array of application specific data for source memory */
+	uintptr_t *src_opaque_data;
+	/** An array of application specific data for destination memory */
+	uintptr_t *dst_opaque_data;
+};
+
+/**
+ * dma operation callbacks to be implemented by applications
+ */
+struct rte_vhost_async_channel_ops {
+	/**
+	 * instruct async engines to perform copies for a batch of packets
+	 *
+	 * @param vid
+	 *  id of vhost device to perform data copies
+	 * @param queue_id
+	 *  queue id to perform data copies
+	 * @param descs
+	 *  an array of DMA transfer memory descriptors
+	 * @param opaque_data
+	 *  opaque data pair sending to DMA engine
+	 * @param count
+	 *  number of elements in the "descs" array
+	 * @return
+	 *  -1 on failure, number of descs processed on success
+	 */
+	int (*transfer_data)(int vid, uint16_t queue_id,
+		struct rte_vhost_async_desc *descs,
+		struct rte_vhost_async_status *opaque_data,
+		uint16_t count);
+	/**
+	 * check copy-completed packets from the async engine
+	 * @param vid
+	 *  id of vhost device to check copy completion
+	 * @param queue_id
+	 *  queue id to check copyp completion
+	 * @param opaque_data
+	 *  buffer to receive the opaque data pair from DMA engine
+	 * @param max_packets
+	 *  max number of packets could be completed
+	 * @return
+	 *  -1 on failure, number of iov segments completed on success
+	 */
+	int (*check_completed_copies)(int vid, uint16_t queue_id,
+		struct rte_vhost_async_status *opaque_data,
+		uint16_t max_packets);
+};
+
+/**
+ *  dma channel feature bit definition
+ */
+struct rte_vhost_async_features {
+	union {
+		uint32_t intval;
+		struct {
+			uint32_t async_inorder:1;
+			uint32_t resvd_0:15;
+			uint32_t async_threshold:12;
+			uint32_t resvd_1:4;
+		};
+	};
+};
+
+/**
+ * register a async channel for vhost
+ *
+ * @param vid
+ *  vhost device id async channel to be attached to
+ * @param queue_id
+ *  vhost queue id async channel to be attached to
+ * @param features
+ *  DMA channel feature bit
+ *    b0       : DMA supports inorder data transfer
+ *    b1  - b15: reserved
+ *    b16 - b27: Packet length threshold for DMA transfer
+ *    b28 - b31: reserved
+ * @param ops
+ *  DMA operation callbacks
+ * @return
+ *  0 on success, -1 on failures
+ */
+__rte_experimental
+int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
+	uint32_t features, struct rte_vhost_async_channel_ops *ops);
+
+/**
+ * unregister a dma channel for vhost
+ *
+ * @param vid
+ *  vhost device id DMA channel to be detached
+ * @param queue_id
+ *  vhost queue id DMA channel to be detached
+ * @return
+ *  0 on success, -1 on failures
+ */
+__rte_experimental
+int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id);
+
+#endif /* _RTE_VHOST_ASYNC_H_ */
diff --git a/lib/librte_vhost/rte_vhost_version.map b/lib/librte_vhost/rte_vhost_version.map
index 299576a..2d1e796 100644
--- a/lib/librte_vhost/rte_vhost_version.map
+++ b/lib/librte_vhost/rte_vhost_version.map
@@ -72,4 +72,8 @@ EXPERIMENTAL {
 	rte_vdpa_get_queue_num;
 	rte_vdpa_get_features;
 	rte_vdpa_get_protocol_features;
+	rte_vhost_async_channel_register;
+	rte_vhost_async_channel_unregister;
+	rte_vhost_submit_enqueue_burst;
+	rte_vhost_poll_enqueue_completed;
 };
diff --git a/lib/librte_vhost/socket.c b/lib/librte_vhost/socket.c
index 49267ce..698b44e 100644
--- a/lib/librte_vhost/socket.c
+++ b/lib/librte_vhost/socket.c
@@ -42,6 +42,7 @@ struct vhost_user_socket {
 	bool use_builtin_virtio_net;
 	bool extbuf;
 	bool linearbuf;
+	bool async_copy;
 
 	/*
 	 * The "supported_features" indicates the feature bits the
@@ -205,6 +206,7 @@ struct vhost_user {
 	size_t size;
 	struct vhost_user_connection *conn;
 	int ret;
+	struct virtio_net *dev;
 
 	if (vsocket == NULL)
 		return;
@@ -236,6 +238,13 @@ struct vhost_user {
 	if (vsocket->linearbuf)
 		vhost_enable_linearbuf(vid);
 
+	if (vsocket->async_copy) {
+		dev = get_device(vid);
+
+		if (dev)
+			dev->async_copy = 1;
+	}
+
 	VHOST_LOG_CONFIG(INFO, "new device, handle is %d\n", vid);
 
 	if (vsocket->notify_ops->new_connection) {
@@ -881,6 +890,17 @@ struct rte_vdpa_device *
 		goto out_mutex;
 	}
 
+	vsocket->async_copy = flags & RTE_VHOST_USER_ASYNC_COPY;
+
+	if (vsocket->async_copy &&
+		(flags & (RTE_VHOST_USER_IOMMU_SUPPORT |
+		RTE_VHOST_USER_POSTCOPY_SUPPORT))) {
+		VHOST_LOG_CONFIG(ERR, "error: enabling async copy and IOMMU "
+			"or post-copy feature simultaneously is not "
+			"supported\n");
+		goto out_mutex;
+	}
+
 	/*
 	 * Set the supported features correctly for the builtin vhost-user
 	 * net driver.
diff --git a/lib/librte_vhost/vhost.c b/lib/librte_vhost/vhost.c
index 0d822d6..58ee3ef 100644
--- a/lib/librte_vhost/vhost.c
+++ b/lib/librte_vhost/vhost.c
@@ -332,8 +332,13 @@
 {
 	if (vq_is_packed(dev))
 		rte_free(vq->shadow_used_packed);
-	else
+	else {
 		rte_free(vq->shadow_used_split);
+		if (vq->async_pkts_pending)
+			rte_free(vq->async_pkts_pending);
+		if (vq->async_pending_info)
+			rte_free(vq->async_pending_info);
+	}
 	rte_free(vq->batch_copy_elems);
 	rte_mempool_free(vq->iotlb_pool);
 	rte_free(vq);
@@ -1522,3 +1527,123 @@ int rte_vhost_extern_callback_register(int vid,
 	if (vhost_data_log_level >= 0)
 		rte_log_set_level(vhost_data_log_level, RTE_LOG_WARNING);
 }
+
+int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
+					uint32_t features,
+					struct rte_vhost_async_channel_ops *ops)
+{
+	struct vhost_virtqueue *vq;
+	struct virtio_net *dev = get_device(vid);
+	struct rte_vhost_async_features f;
+
+	if (dev == NULL || ops == NULL)
+		return -1;
+
+	f.intval = features;
+
+	vq = dev->virtqueue[queue_id];
+
+	if (unlikely(vq == NULL || !dev->async_copy))
+		return -1;
+
+	/** packed queue is not supported */
+	if (unlikely(vq_is_packed(dev) || !f.async_inorder)) {
+		VHOST_LOG_CONFIG(ERR,
+			"async copy is not supported on packed queue or non-inorder mode "
+			"(vid %d, qid: %d)\n", vid, queue_id);
+		return -1;
+	}
+
+	if (unlikely(ops->check_completed_copies == NULL ||
+		ops->transfer_data == NULL))
+		return -1;
+
+	rte_spinlock_lock(&vq->access_lock);
+
+	if (unlikely(vq->async_registered)) {
+		VHOST_LOG_CONFIG(ERR,
+			"async register failed: channel already registered "
+			"(vid %d, qid: %d)\n", vid, queue_id);
+		goto reg_out;
+	}
+
+	vq->async_pkts_pending = rte_malloc(NULL,
+			vq->size * sizeof(uintptr_t),
+			RTE_CACHE_LINE_SIZE);
+	vq->async_pending_info = rte_malloc(NULL,
+			vq->size * sizeof(uint64_t),
+			RTE_CACHE_LINE_SIZE);
+	if (!vq->async_pkts_pending || !vq->async_pending_info) {
+		if (vq->async_pkts_pending)
+			rte_free(vq->async_pkts_pending);
+
+		if (vq->async_pending_info)
+			rte_free(vq->async_pending_info);
+
+		VHOST_LOG_CONFIG(ERR,
+				"async register failed: cannot allocate memory for vq data "
+				"(vid %d, qid: %d)\n", vid, queue_id);
+		goto reg_out;
+	}
+
+	vq->async_ops.check_completed_copies = ops->check_completed_copies;
+	vq->async_ops.transfer_data = ops->transfer_data;
+
+	vq->async_inorder = f.async_inorder;
+	vq->async_threshold = f.async_threshold;
+
+	vq->async_registered = true;
+
+reg_out:
+	rte_spinlock_unlock(&vq->access_lock);
+
+	return 0;
+}
+
+int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id)
+{
+	struct vhost_virtqueue *vq;
+	struct virtio_net *dev = get_device(vid);
+	int ret = -1;
+
+	if (dev == NULL)
+		return ret;
+
+	vq = dev->virtqueue[queue_id];
+
+	if (vq == NULL)
+		return ret;
+
+	ret = 0;
+	rte_spinlock_lock(&vq->access_lock);
+
+	if (!vq->async_registered)
+		goto out;
+
+	if (vq->async_pkts_inflight_n) {
+		VHOST_LOG_CONFIG(ERR, "Failed to unregister async channel. "
+			"async inflight packets must be completed before unregistration.\n");
+		ret = -1;
+		goto out;
+	}
+
+	if (vq->async_pkts_pending) {
+		rte_free(vq->async_pkts_pending);
+		vq->async_pkts_pending = 0;
+	}
+
+	if (vq->async_pending_info) {
+		rte_free(vq->async_pending_info);
+		vq->async_pending_info = 0;
+	}
+
+	vq->async_ops.transfer_data = NULL;
+	vq->async_ops.check_completed_copies = NULL;
+	vq->async_registered = false;
+
+out:
+	rte_spinlock_unlock(&vq->access_lock);
+
+	return ret;
+}
+
diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
index 941a426..8baf322 100644
--- a/lib/librte_vhost/vhost.h
+++ b/lib/librte_vhost/vhost.h
@@ -24,6 +24,8 @@
 #include "rte_vdpa.h"
 #include "rte_vdpa_dev.h"
 
+#include "rte_vhost_async.h"
+
 /* Used to indicate that the device is running on a data core */
 #define VIRTIO_DEV_RUNNING 1
 /* Used to indicate that the device is ready to operate */
@@ -40,6 +42,11 @@
 
 #define VHOST_LOG_CACHE_NR 32
 
+#define MAX_PKT_BURST 32
+
+#define VHOST_MAX_ASYNC_IT (MAX_PKT_BURST * 2)
+#define VHOST_MAX_ASYNC_VEC (BUF_VECTOR_MAX * 2)
+
 #define PACKED_DESC_ENQUEUE_USED_FLAG(w)	\
 	((w) ? (VRING_DESC_F_AVAIL | VRING_DESC_F_USED | VRING_DESC_F_WRITE) : \
 		VRING_DESC_F_WRITE)
@@ -201,6 +208,25 @@ struct vhost_virtqueue {
 	TAILQ_HEAD(, vhost_iotlb_entry) iotlb_list;
 	int				iotlb_cache_nr;
 	TAILQ_HEAD(, vhost_iotlb_entry) iotlb_pending_list;
+
+	/* operation callbacks for async dma */
+	struct rte_vhost_async_channel_ops	async_ops;
+
+	struct rte_vhost_iov_iter it_pool[VHOST_MAX_ASYNC_IT];
+	struct iovec vec_pool[VHOST_MAX_ASYNC_VEC];
+
+	/* async data transfer status */
+	uintptr_t	**async_pkts_pending;
+	#define		ASYNC_PENDING_INFO_N_MSK 0xFFFF
+	#define		ASYNC_PENDING_INFO_N_SFT 16
+	uint64_t	*async_pending_info;
+	uint16_t	async_pkts_idx;
+	uint16_t	async_pkts_inflight_n;
+
+	/* vq async features */
+	bool		async_inorder;
+	bool		async_registered;
+	uint16_t	async_threshold;
 } __rte_cache_aligned;
 
 /* Old kernels have no such macros defined */
@@ -354,6 +380,7 @@ struct virtio_net {
 	int16_t			broadcast_rarp;
 	uint32_t		nr_vring;
 	int			dequeue_zero_copy;
+	int			async_copy;
 	int			extbuf;
 	int			linearbuf;
 	struct vhost_virtqueue	*virtqueue[VHOST_MAX_QUEUE_PAIRS * 2];
@@ -699,7 +726,8 @@ uint64_t translate_log_addr(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	/* Don't kick guest if we don't reach index specified by guest. */
 	if (dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) {
 		uint16_t old = vq->signalled_used;
-		uint16_t new = vq->last_used_idx;
+		uint16_t new = vq->async_pkts_inflight_n ?
+					vq->used->idx:vq->last_used_idx;
 		bool signalled_used_valid = vq->signalled_used_valid;
 
 		vq->signalled_used = new;
diff --git a/lib/librte_vhost/vhost_user.c b/lib/librte_vhost/vhost_user.c
index 3405cd8..716effa 100644
--- a/lib/librte_vhost/vhost_user.c
+++ b/lib/librte_vhost/vhost_user.c
@@ -462,12 +462,18 @@
 	} else {
 		if (vq->shadow_used_split)
 			rte_free(vq->shadow_used_split);
+		if (vq->async_pkts_pending)
+			rte_free(vq->async_pkts_pending);
+		if (vq->async_pending_info)
+			rte_free(vq->async_pending_info);
+
 		vq->shadow_used_split = rte_malloc(NULL,
 				vq->size * sizeof(struct vring_used_elem),
 				RTE_CACHE_LINE_SIZE);
+
 		if (!vq->shadow_used_split) {
 			VHOST_LOG_CONFIG(ERR,
-					"failed to allocate memory for shadow used ring.\n");
+					"failed to allocate memory for vq internal data.\n");
 			return RTE_VHOST_MSG_RESULT_ERR;
 		}
 	}
@@ -1145,7 +1151,8 @@
 			goto err_mmap;
 		}
 
-		populate = (dev->dequeue_zero_copy) ? MAP_POPULATE : 0;
+		populate = (dev->dequeue_zero_copy || dev->async_copy) ?
+			MAP_POPULATE : 0;
 		mmap_addr = mmap(NULL, mmap_size, PROT_READ | PROT_WRITE,
 				 MAP_SHARED | populate, fd, 0);
 
@@ -1160,7 +1167,7 @@
 		reg->host_user_addr = (uint64_t)(uintptr_t)mmap_addr +
 				      mmap_offset;
 
-		if (dev->dequeue_zero_copy)
+		if (dev->dequeue_zero_copy || dev->async_copy)
 			if (add_guest_pages(dev, reg, alignment) < 0) {
 				VHOST_LOG_CONFIG(ERR,
 					"adding guest pages to region %u failed.\n",
@@ -1943,6 +1950,12 @@ static int vhost_user_set_vring_err(struct virtio_net **pdev __rte_unused,
 	} else {
 		rte_free(vq->shadow_used_split);
 		vq->shadow_used_split = NULL;
+		if (vq->async_pkts_pending)
+			rte_free(vq->async_pkts_pending);
+		if (vq->async_pending_info)
+			rte_free(vq->async_pending_info);
+		vq->async_pkts_pending = NULL;
+		vq->async_pending_info = NULL;
 	}
 
 	rte_free(vq->batch_copy_elems);
@@ -1977,6 +1990,14 @@ static int vhost_user_set_vring_err(struct virtio_net **pdev __rte_unused,
 		"set queue enable: %d to qp idx: %d\n",
 		enable, index);
 
+	if (!enable && dev->virtqueue[index]->async_registered) {
+		if (dev->virtqueue[index]->async_pkts_inflight_n) {
+			VHOST_LOG_CONFIG(ERR, "failed to disable vring. "
+			"async inflight packets must be completed first\n");
+			return RTE_VHOST_MSG_RESULT_ERR;
+		}
+	}
+
 	vdpa_dev = dev->vdpa_dev;
 	if (vdpa_dev && vdpa_dev->ops->set_vring_state)
 		vdpa_dev->ops->set_vring_state(dev->vid, index, enable);
-- 
1.8.3.1


^ permalink raw reply	[flat|nested] 5+ messages in thread

* [dpdk-dev] [PATCH v2 2/2] vhost: introduce async enqueue for split ring
  2020-06-29 14:44 [dpdk-dev] [PATCH v2 0/2] introduce asynchronous data path for vhost patrick.fu
  2020-06-29 14:44 ` [dpdk-dev] [PATCH v2 1/2] vhost: introduce async enqueue registration API patrick.fu
@ 2020-06-29 14:44 ` patrick.fu
  2020-07-01  8:50   ` Liu, Yong
  1 sibling, 1 reply; 5+ messages in thread
From: patrick.fu @ 2020-06-29 14:44 UTC (permalink / raw)
  To: dev, maxime.coquelin, chenbo.xia, zhihong.wang
  Cc: patrick.fu, yinan.wang, cheng1.jiang, cunming.liang

From: Patrick Fu <patrick.fu@intel.com>

This patch implements async enqueue data path for split ring.
2 new async data path APIs are defined, by which applications
can submit and poll packets to/from async engines. The async
enqueue data leverages callback functions registered by
applications to work with the async engine.

Signed-off-by: Patrick Fu <patrick.fu@intel.com>
---
 lib/librte_vhost/rte_vhost_async.h |  40 +++
 lib/librte_vhost/virtio_net.c      | 539 ++++++++++++++++++++++++++++++++++++-
 2 files changed, 577 insertions(+), 2 deletions(-)

diff --git a/lib/librte_vhost/rte_vhost_async.h b/lib/librte_vhost/rte_vhost_async.h
index d5a5927..c8ad8db 100644
--- a/lib/librte_vhost/rte_vhost_async.h
+++ b/lib/librte_vhost/rte_vhost_async.h
@@ -133,4 +133,44 @@ int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
 __rte_experimental
 int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id);
 
+/**
+ * This function submit enqueue data to async engine. This function has
+ * no guranttee to the transfer completion upon return. Applications
+ * should poll transfer status by rte_vhost_poll_enqueue_completed()
+ *
+ * @param vid
+ *  id of vhost device to enqueue data
+ * @param queue_id
+ *  queue id to enqueue data
+ * @param pkts
+ *  array of packets to be enqueued
+ * @param count
+ *  packets num to be enqueued
+ * @return
+ *  num of packets enqueued
+ */
+__rte_experimental
+uint16_t rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint16_t count);
+
+/**
+ * This function check async completion status for a specific vhost
+ * device queue. Packets which finish copying (enqueue) operation
+ * will be returned in an array.
+ *
+ * @param vid
+ *  id of vhost device to enqueue data
+ * @param queue_id
+ *  queue id to enqueue data
+ * @param pkts
+ *  blank array to get return packet pointer
+ * @param count
+ *  size of the packet array
+ * @return
+ *  num of packets returned
+ */
+__rte_experimental
+uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint16_t count);
+
 #endif /* _RTE_VHOST_ASYNC_H_ */
diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index 751c1f3..435e434 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -17,14 +17,15 @@
 #include <rte_arp.h>
 #include <rte_spinlock.h>
 #include <rte_malloc.h>
+#include <rte_vhost_async.h>
 
 #include "iotlb.h"
 #include "vhost.h"
 
-#define MAX_PKT_BURST 32
-
 #define MAX_BATCH_LEN 256
 
+#define VHOST_ASYNC_BATCH_THRESHOLD 8
+
 static  __rte_always_inline bool
 rxvq_is_mergeable(struct virtio_net *dev)
 {
@@ -117,6 +118,35 @@
 }
 
 static __rte_always_inline void
+async_flush_shadow_used_ring_split(struct virtio_net *dev,
+	struct vhost_virtqueue *vq)
+{
+	uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
+
+	if (used_idx + vq->shadow_used_idx <= vq->size) {
+		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0,
+					  vq->shadow_used_idx);
+	} else {
+		uint16_t size;
+
+		/* update used ring interval [used_idx, vq->size] */
+		size = vq->size - used_idx;
+		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0, size);
+
+		/* update the left half used ring interval [0, left_size] */
+		do_flush_shadow_used_ring_split(dev, vq, 0, size,
+					  vq->shadow_used_idx - size);
+	}
+	vq->last_used_idx += vq->shadow_used_idx;
+
+	rte_smp_wmb();
+
+	vhost_log_cache_sync(dev, vq);
+
+	vq->shadow_used_idx = 0;
+}
+
+static __rte_always_inline void
 update_shadow_used_ring_split(struct vhost_virtqueue *vq,
 			 uint16_t desc_idx, uint32_t len)
 {
@@ -905,6 +935,200 @@
 	return error;
 }
 
+static __rte_always_inline void
+async_fill_vec(struct iovec *v, void *base, size_t len)
+{
+	v->iov_base = base;
+	v->iov_len = len;
+}
+
+static __rte_always_inline void
+async_fill_it(struct rte_vhost_iov_iter *it, size_t count,
+	struct iovec *vec, unsigned long nr_seg)
+{
+	it->offset = 0;
+	it->count = count;
+
+	if (count) {
+		it->iov = vec;
+		it->nr_segs = nr_seg;
+	} else {
+		it->iov = 0;
+		it->nr_segs = 0;
+	}
+}
+
+static __rte_always_inline void
+async_fill_des(struct rte_vhost_async_desc *desc,
+	struct rte_vhost_iov_iter *src, struct rte_vhost_iov_iter *dst)
+{
+	desc->src = src;
+	desc->dst = dst;
+}
+
+static __rte_always_inline int
+async_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
+			struct rte_mbuf *m, struct buf_vector *buf_vec,
+			uint16_t nr_vec, uint16_t num_buffers,
+			struct iovec *src_iovec, struct iovec *dst_iovec,
+			struct rte_vhost_iov_iter *src_it,
+			struct rte_vhost_iov_iter *dst_it)
+{
+	uint32_t vec_idx = 0;
+	uint32_t mbuf_offset, mbuf_avail;
+	uint32_t buf_offset, buf_avail;
+	uint64_t buf_addr, buf_iova, buf_len;
+	uint32_t cpy_len, cpy_threshold;
+	uint64_t hdr_addr;
+	struct rte_mbuf *hdr_mbuf;
+	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
+	struct virtio_net_hdr_mrg_rxbuf tmp_hdr, *hdr = NULL;
+	int error = 0;
+
+	uint32_t tlen = 0;
+	int tvec_idx = 0;
+	void *hpa;
+
+	if (unlikely(m == NULL)) {
+		error = -1;
+		goto out;
+	}
+
+	cpy_threshold = vq->async_threshold;
+
+	buf_addr = buf_vec[vec_idx].buf_addr;
+	buf_iova = buf_vec[vec_idx].buf_iova;
+	buf_len = buf_vec[vec_idx].buf_len;
+
+	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1)) {
+		error = -1;
+		goto out;
+	}
+
+	hdr_mbuf = m;
+	hdr_addr = buf_addr;
+	if (unlikely(buf_len < dev->vhost_hlen))
+		hdr = &tmp_hdr;
+	else
+		hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)hdr_addr;
+
+	VHOST_LOG_DATA(DEBUG, "(%d) RX: num merge buffers %d\n",
+		dev->vid, num_buffers);
+
+	if (unlikely(buf_len < dev->vhost_hlen)) {
+		buf_offset = dev->vhost_hlen - buf_len;
+		vec_idx++;
+		buf_addr = buf_vec[vec_idx].buf_addr;
+		buf_iova = buf_vec[vec_idx].buf_iova;
+		buf_len = buf_vec[vec_idx].buf_len;
+		buf_avail = buf_len - buf_offset;
+	} else {
+		buf_offset = dev->vhost_hlen;
+		buf_avail = buf_len - dev->vhost_hlen;
+	}
+
+	mbuf_avail  = rte_pktmbuf_data_len(m);
+	mbuf_offset = 0;
+
+	while (mbuf_avail != 0 || m->next != NULL) {
+		/* done with current buf, get the next one */
+		if (buf_avail == 0) {
+			vec_idx++;
+			if (unlikely(vec_idx >= nr_vec)) {
+				error = -1;
+				goto out;
+			}
+
+			buf_addr = buf_vec[vec_idx].buf_addr;
+			buf_iova = buf_vec[vec_idx].buf_iova;
+			buf_len = buf_vec[vec_idx].buf_len;
+
+			buf_offset = 0;
+			buf_avail  = buf_len;
+		}
+
+		/* done with current mbuf, get the next one */
+		if (mbuf_avail == 0) {
+			m = m->next;
+
+			mbuf_offset = 0;
+			mbuf_avail  = rte_pktmbuf_data_len(m);
+		}
+
+		if (hdr_addr) {
+			virtio_enqueue_offload(hdr_mbuf, &hdr->hdr);
+			if (rxvq_is_mergeable(dev))
+				ASSIGN_UNLESS_EQUAL(hdr->num_buffers,
+						num_buffers);
+
+			if (unlikely(hdr == &tmp_hdr)) {
+				copy_vnet_hdr_to_desc(dev, vq, buf_vec, hdr);
+			} else {
+				PRINT_PACKET(dev, (uintptr_t)hdr_addr,
+						dev->vhost_hlen, 0);
+				vhost_log_cache_write_iova(dev, vq,
+						buf_vec[0].buf_iova,
+						dev->vhost_hlen);
+			}
+
+			hdr_addr = 0;
+		}
+
+		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
+
+		if (unlikely(cpy_len >= cpy_threshold)) {
+			hpa = (void *)(uintptr_t)gpa_to_hpa(dev,
+					buf_iova + buf_offset, cpy_len);
+
+			if (unlikely(!hpa)) {
+				error = -1;
+				goto out;
+			}
+
+			async_fill_vec(src_iovec + tvec_idx,
+				(void *)(uintptr_t)rte_pktmbuf_iova_offset(m,
+						mbuf_offset), cpy_len);
+
+			async_fill_vec(dst_iovec + tvec_idx, hpa, cpy_len);
+
+			tlen += cpy_len;
+			tvec_idx++;
+		} else {
+			if (unlikely(vq->batch_copy_nb_elems >= vq->size)) {
+				rte_memcpy(
+				(void *)((uintptr_t)(buf_addr + buf_offset)),
+				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
+				cpy_len);
+
+				PRINT_PACKET(dev,
+					(uintptr_t)(buf_addr + buf_offset),
+					cpy_len, 0);
+			} else {
+				batch_copy[vq->batch_copy_nb_elems].dst =
+				(void *)((uintptr_t)(buf_addr + buf_offset));
+				batch_copy[vq->batch_copy_nb_elems].src =
+				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
+				batch_copy[vq->batch_copy_nb_elems].log_addr =
+					buf_iova + buf_offset;
+				batch_copy[vq->batch_copy_nb_elems].len =
+					cpy_len;
+				vq->batch_copy_nb_elems++;
+			}
+		}
+
+		mbuf_avail  -= cpy_len;
+		mbuf_offset += cpy_len;
+		buf_avail  -= cpy_len;
+		buf_offset += cpy_len;
+	}
+
+out:
+	async_fill_it(src_it, tlen, src_iovec, tvec_idx);
+	async_fill_it(dst_it, tlen, dst_iovec, tvec_idx);
+
+	return error;
+}
+
 static __rte_always_inline int
 vhost_enqueue_single_packed(struct virtio_net *dev,
 			    struct vhost_virtqueue *vq,
@@ -1236,6 +1460,317 @@
 	return virtio_dev_rx(dev, queue_id, pkts, count);
 }
 
+static __rte_always_inline void
+virtio_dev_rx_async_submit_split_err(struct virtio_net *dev,
+	struct vhost_virtqueue *vq, uint16_t queue_id,
+	uint16_t last_idx, uint16_t shadow_idx)
+{
+	while (vq->async_pkts_inflight_n) {
+		int er = vq->async_ops.check_completed_copies(dev->vid,
+			queue_id, 0, MAX_PKT_BURST);
+
+		if (er < 0) {
+			vq->async_pkts_inflight_n = 0;
+			break;
+		}
+
+		vq->async_pkts_inflight_n -= er;
+	}
+
+	vq->shadow_used_idx = shadow_idx;
+	vq->last_avail_idx = last_idx;
+}
+
+static __rte_noinline uint32_t
+virtio_dev_rx_async_submit_split(struct virtio_net *dev,
+	struct vhost_virtqueue *vq, uint16_t queue_id,
+	struct rte_mbuf **pkts, uint32_t count)
+{
+	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
+	uint16_t num_buffers;
+	struct buf_vector buf_vec[BUF_VECTOR_MAX];
+	uint16_t avail_head, last_idx, shadow_idx;
+
+	struct rte_vhost_iov_iter *it_pool = vq->it_pool;
+	struct iovec *vec_pool = vq->vec_pool;
+	struct rte_vhost_async_desc tdes[MAX_PKT_BURST];
+	struct iovec *src_iovec = vec_pool;
+	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
+	struct rte_vhost_iov_iter *src_it = it_pool;
+	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
+	uint16_t n_free_slot, slot_idx;
+	int n_pkts = 0;
+
+	avail_head = *((volatile uint16_t *)&vq->avail->idx);
+	last_idx = vq->last_avail_idx;
+	shadow_idx = vq->shadow_used_idx;
+
+	/*
+	 * The ordering between avail index and
+	 * desc reads needs to be enforced.
+	 */
+	rte_smp_rmb();
+
+	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
+
+	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
+		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
+		uint16_t nr_vec = 0;
+
+		if (unlikely(reserve_avail_buf_split(dev, vq,
+						pkt_len, buf_vec, &num_buffers,
+						avail_head, &nr_vec) < 0)) {
+			VHOST_LOG_DATA(DEBUG,
+				"(%d) failed to get enough desc from vring\n",
+				dev->vid);
+			vq->shadow_used_idx -= num_buffers;
+			break;
+		}
+
+		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
+			dev->vid, vq->last_avail_idx,
+			vq->last_avail_idx + num_buffers);
+
+		if (async_mbuf_to_desc(dev, vq, pkts[pkt_idx],
+				buf_vec, nr_vec, num_buffers,
+				src_iovec, dst_iovec, src_it, dst_it) < 0) {
+			vq->shadow_used_idx -= num_buffers;
+			break;
+		}
+
+		slot_idx = (vq->async_pkts_idx + pkt_idx) & (vq->size - 1);
+		if (src_it->count) {
+			async_fill_des(&tdes[pkt_burst_idx], src_it, dst_it);
+			pkt_burst_idx++;
+			vq->async_pending_info[slot_idx] =
+				num_buffers | (src_it->nr_segs << 16);
+			src_iovec += src_it->nr_segs;
+			dst_iovec += dst_it->nr_segs;
+			src_it += 2;
+			dst_it += 2;
+		} else {
+			vq->async_pending_info[slot_idx] = num_buffers;
+			vq->async_pkts_inflight_n++;
+		}
+
+		vq->last_avail_idx += num_buffers;
+
+		if (pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
+				(pkt_idx == count - 1 && pkt_burst_idx)) {
+			n_pkts = vq->async_ops.transfer_data(dev->vid,
+					queue_id, tdes, 0, pkt_burst_idx);
+			src_iovec = vec_pool;
+			dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
+			src_it = it_pool;
+			dst_it = it_pool + 1;
+
+			if (unlikely(n_pkts < (int)pkt_burst_idx)) {
+				vq->async_pkts_inflight_n +=
+					n_pkts > 0 ? n_pkts : 0;
+				virtio_dev_rx_async_submit_split_err(dev,
+					vq, queue_id, last_idx, shadow_idx);
+				return 0;
+			}
+
+			pkt_burst_idx = 0;
+			vq->async_pkts_inflight_n += n_pkts;
+		}
+	}
+
+	if (pkt_burst_idx) {
+		n_pkts = vq->async_ops.transfer_data(dev->vid,
+				queue_id, tdes, 0, pkt_burst_idx);
+		if (unlikely(n_pkts <= (int)pkt_burst_idx)) {
+			vq->async_pkts_inflight_n += n_pkts > 0 ? n_pkts : 0;
+			virtio_dev_rx_async_submit_split_err(dev, vq, queue_id,
+			last_idx, shadow_idx);
+			return 0;
+		}
+
+		vq->async_pkts_inflight_n += n_pkts;
+	}
+
+	do_data_copy_enqueue(dev, vq);
+
+	n_free_slot = vq->size - vq->async_pkts_idx;
+	if (n_free_slot > pkt_idx) {
+		rte_memcpy(&vq->async_pkts_pending[vq->async_pkts_idx],
+			pkts, pkt_idx * sizeof(uintptr_t));
+		vq->async_pkts_idx += pkt_idx;
+	} else {
+		rte_memcpy(&vq->async_pkts_pending[vq->async_pkts_idx],
+			pkts, n_free_slot * sizeof(uintptr_t));
+		rte_memcpy(&vq->async_pkts_pending[0],
+			&pkts[n_free_slot],
+			(pkt_idx - n_free_slot) * sizeof(uintptr_t));
+		vq->async_pkts_idx = pkt_idx - n_free_slot;
+	}
+
+	if (likely(vq->shadow_used_idx))
+		async_flush_shadow_used_ring_split(dev, vq);
+
+	return pkt_idx;
+}
+
+uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint16_t count)
+{
+	struct virtio_net *dev = get_device(vid);
+	struct vhost_virtqueue *vq;
+	uint16_t n_pkts_cpl, n_pkts_put = 0, n_descs = 0;
+	uint16_t start_idx, pkts_idx, vq_size;
+	uint64_t *async_pending_info;
+
+	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
+	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
+		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
+			dev->vid, __func__, queue_id);
+		return 0;
+	}
+
+	vq = dev->virtqueue[queue_id];
+
+	rte_spinlock_lock(&vq->access_lock);
+
+	pkts_idx = vq->async_pkts_idx;
+	async_pending_info = vq->async_pending_info;
+	vq_size = vq->size;
+	start_idx = pkts_idx > vq->async_pkts_inflight_n ?
+		pkts_idx - vq->async_pkts_inflight_n :
+		(vq_size - vq->async_pkts_inflight_n + pkts_idx) &
+		(vq_size - 1);
+
+	n_pkts_cpl =
+		vq->async_ops.check_completed_copies(vid, queue_id, 0, count);
+
+	rte_smp_wmb();
+
+	while (likely(((start_idx + n_pkts_put) & (vq_size - 1)) != pkts_idx)) {
+		uint64_t info = async_pending_info[
+			(start_idx + n_pkts_put) & (vq_size - 1)];
+		uint64_t n_segs;
+		n_pkts_put++;
+		n_descs += info & ASYNC_PENDING_INFO_N_MSK;
+		n_segs = info >> ASYNC_PENDING_INFO_N_SFT;
+
+		if (n_segs) {
+			if (!n_pkts_cpl || n_pkts_cpl < n_segs) {
+				n_pkts_put--;
+				n_descs -= info & ASYNC_PENDING_INFO_N_MSK;
+				if (n_pkts_cpl) {
+					async_pending_info[
+						(start_idx + n_pkts_put) &
+						(vq_size - 1)] =
+					((n_segs - n_pkts_cpl) <<
+					 ASYNC_PENDING_INFO_N_SFT) |
+					(info & ASYNC_PENDING_INFO_N_MSK);
+					n_pkts_cpl = 0;
+				}
+				break;
+			}
+			n_pkts_cpl -= n_segs;
+		}
+	}
+
+	if (n_pkts_put) {
+		vq->async_pkts_inflight_n -= n_pkts_put;
+		*(volatile uint16_t *)&vq->used->idx += n_descs;
+
+		vhost_vring_call_split(dev, vq);
+	}
+
+	if (start_idx + n_pkts_put <= vq_size) {
+		rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
+			n_pkts_put * sizeof(uintptr_t));
+	} else {
+		rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
+			(vq_size - start_idx) * sizeof(uintptr_t));
+		rte_memcpy(&pkts[vq_size - start_idx], vq->async_pkts_pending,
+			(n_pkts_put - vq_size + start_idx) * sizeof(uintptr_t));
+	}
+
+	rte_spinlock_unlock(&vq->access_lock);
+
+	return n_pkts_put;
+}
+
+static __rte_always_inline uint32_t
+virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
+	struct rte_mbuf **pkts, uint32_t count)
+{
+	struct vhost_virtqueue *vq;
+	uint32_t nb_tx = 0;
+	bool drawback = false;
+
+	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
+	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
+		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
+			dev->vid, __func__, queue_id);
+		return 0;
+	}
+
+	vq = dev->virtqueue[queue_id];
+
+	rte_spinlock_lock(&vq->access_lock);
+
+	if (unlikely(vq->enabled == 0))
+		goto out_access_unlock;
+
+	if (unlikely(!vq->async_registered)) {
+		drawback = true;
+		goto out_access_unlock;
+	}
+
+	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
+		vhost_user_iotlb_rd_lock(vq);
+
+	if (unlikely(vq->access_ok == 0))
+		if (unlikely(vring_translate(dev, vq) < 0))
+			goto out;
+
+	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
+	if (count == 0)
+		goto out;
+
+	/* TODO: packed queue not implemented */
+	if (vq_is_packed(dev))
+		nb_tx = 0;
+	else
+		nb_tx = virtio_dev_rx_async_submit_split(dev,
+				vq, queue_id, pkts, count);
+
+out:
+	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
+		vhost_user_iotlb_rd_unlock(vq);
+
+out_access_unlock:
+	rte_spinlock_unlock(&vq->access_lock);
+
+	if (drawback)
+		return rte_vhost_enqueue_burst(dev->vid, queue_id, pkts, count);
+
+	return nb_tx;
+}
+
+uint16_t
+rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint16_t count)
+{
+	struct virtio_net *dev = get_device(vid);
+
+	if (!dev)
+		return 0;
+
+	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
+		VHOST_LOG_DATA(ERR,
+			"(%d) %s: built-in vhost net backend is disabled.\n",
+			dev->vid, __func__);
+		return 0;
+	}
+
+	return virtio_dev_rx_async_submit(dev, queue_id, pkts, count);
+}
+
 static inline bool
 virtio_net_with_host_offload(struct virtio_net *dev)
 {
-- 
1.8.3.1


^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [dpdk-dev] [PATCH v2 2/2] vhost: introduce async enqueue for split ring
  2020-06-29 14:44 ` [dpdk-dev] [PATCH v2 2/2] vhost: introduce async enqueue for split ring patrick.fu
@ 2020-07-01  8:50   ` Liu, Yong
  2020-07-02 12:21     ` Fu, Patrick
  0 siblings, 1 reply; 5+ messages in thread
From: Liu, Yong @ 2020-07-01  8:50 UTC (permalink / raw)
  To: Fu, Patrick, dev, maxime.coquelin, Xia, Chenbo, Wang, Zhihong
  Cc: Fu, Patrick, Wang, Yinan, Jiang, Cheng1, Liang, Cunming

> 
> +#define VHOST_ASYNC_BATCH_THRESHOLD 8
> +

Not very clear about why batch number is 8. It is better to save it in rte_vhost_async_features if the value come from hardware requirement. 

> +
> +static __rte_noinline uint32_t
> +virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> +	struct vhost_virtqueue *vq, uint16_t queue_id,
> +	struct rte_mbuf **pkts, uint32_t count)
> +{
> +	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
> +	uint16_t num_buffers;
> +	struct buf_vector buf_vec[BUF_VECTOR_MAX];
> +	uint16_t avail_head, last_idx, shadow_idx;
> +
> +	struct rte_vhost_iov_iter *it_pool = vq->it_pool;
> +	struct iovec *vec_pool = vq->vec_pool;
> +	struct rte_vhost_async_desc tdes[MAX_PKT_BURST];
> +	struct iovec *src_iovec = vec_pool;
> +	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
> +	struct rte_vhost_iov_iter *src_it = it_pool;
> +	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
> +	uint16_t n_free_slot, slot_idx;
> +	int n_pkts = 0;
> +
> +	avail_head = *((volatile uint16_t *)&vq->avail->idx);
> +	last_idx = vq->last_avail_idx;
> +	shadow_idx = vq->shadow_used_idx;
> +
> +	/*
> +	 * The ordering between avail index and
> +	 * desc reads needs to be enforced.
> +	 */
> +	rte_smp_rmb();
> +
> +	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
> +
> +	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
> +		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
> +		uint16_t nr_vec = 0;
> +
> +		if (unlikely(reserve_avail_buf_split(dev, vq,
> +						pkt_len, buf_vec,
> &num_buffers,
> +						avail_head, &nr_vec) < 0)) {
> +			VHOST_LOG_DATA(DEBUG,
> +				"(%d) failed to get enough desc from
> vring\n",
> +				dev->vid);
> +			vq->shadow_used_idx -= num_buffers;
> +			break;
> +		}
> +
> +		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end
> index %d\n",
> +			dev->vid, vq->last_avail_idx,
> +			vq->last_avail_idx + num_buffers);
> +
> +		if (async_mbuf_to_desc(dev, vq, pkts[pkt_idx],
> +				buf_vec, nr_vec, num_buffers,
> +				src_iovec, dst_iovec, src_it, dst_it) < 0) {
> +			vq->shadow_used_idx -= num_buffers;
> +			break;
> +		}
> +
> +		slot_idx = (vq->async_pkts_idx + pkt_idx) & (vq->size - 1);
> +		if (src_it->count) {
> +			async_fill_des(&tdes[pkt_burst_idx], src_it, dst_it);
> +			pkt_burst_idx++;
> +			vq->async_pending_info[slot_idx] =
> +				num_buffers | (src_it->nr_segs << 16);
> +			src_iovec += src_it->nr_segs;
> +			dst_iovec += dst_it->nr_segs;
> +			src_it += 2;
> +			dst_it += 2;

Patrick, 
In my understanding, nr_segs type definition can follow nr_vec type definition (uint16_t). By that can short the data saved in async_pkts_pending from 64bit to 32bit. 
Since those information will be used in datapath, the smaller size will get the better perf. 

It is better to replace integer 2 with macro. 

Thanks,
Marvin



^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [dpdk-dev] [PATCH v2 2/2] vhost: introduce async enqueue for split ring
  2020-07-01  8:50   ` Liu, Yong
@ 2020-07-02 12:21     ` Fu, Patrick
  0 siblings, 0 replies; 5+ messages in thread
From: Fu, Patrick @ 2020-07-02 12:21 UTC (permalink / raw)
  To: Liu, Yong, dev, maxime.coquelin, Xia, Chenbo, Wang, Zhihong
  Cc: Wang, Yinan, Jiang, Cheng1, Liang, Cunming

Thanks Marvin, my comments inline:

> -----Original Message-----
> From: Liu, Yong <yong.liu@intel.com>
> Sent: Wednesday, July 1, 2020 4:51 PM
> To: Fu, Patrick <patrick.fu@intel.com>; dev@dpdk.org;
> maxime.coquelin@redhat.com; Xia, Chenbo <chenbo.xia@intel.com>; Wang,
> Zhihong <zhihong.wang@intel.com>
> Cc: Fu, Patrick <patrick.fu@intel.com>; Wang, Yinan
> <yinan.wang@intel.com>; Jiang, Cheng1 <cheng1.jiang@intel.com>; Liang,
> Cunming <cunming.liang@intel.com>
> Subject: RE: [dpdk-dev] [PATCH v2 2/2] vhost: introduce async enqueue for
> split ring
> 
> >
> > +#define VHOST_ASYNC_BATCH_THRESHOLD 8
> > +
> 
> Not very clear about why batch number is 8. It is better to save it in
> rte_vhost_async_features if the value come from hardware requirement.
> 
We are in the progress of benchmarking how this value will have impact to the final performance,
and we will have a more reasonable manner to handle this macro. 

> > +
> > +static __rte_noinline uint32_t
> > +virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> > +	struct vhost_virtqueue *vq, uint16_t queue_id,
> > +	struct rte_mbuf **pkts, uint32_t count) {
> > +	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
> > +	uint16_t num_buffers;
> > +	struct buf_vector buf_vec[BUF_VECTOR_MAX];
> > +	uint16_t avail_head, last_idx, shadow_idx;
> > +
> > +	struct rte_vhost_iov_iter *it_pool = vq->it_pool;
> > +	struct iovec *vec_pool = vq->vec_pool;
> > +	struct rte_vhost_async_desc tdes[MAX_PKT_BURST];
> > +	struct iovec *src_iovec = vec_pool;
> > +	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
> > +	struct rte_vhost_iov_iter *src_it = it_pool;
> > +	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
> > +	uint16_t n_free_slot, slot_idx;
> > +	int n_pkts = 0;
> > +
> > +	avail_head = *((volatile uint16_t *)&vq->avail->idx);
> > +	last_idx = vq->last_avail_idx;
> > +	shadow_idx = vq->shadow_used_idx;
> > +
> > +	/*
> > +	 * The ordering between avail index and
> > +	 * desc reads needs to be enforced.
> > +	 */
> > +	rte_smp_rmb();
> > +
> > +	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size -
> > +1)]);
> > +
> > +	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
> > +		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
> > +		uint16_t nr_vec = 0;
> > +
> > +		if (unlikely(reserve_avail_buf_split(dev, vq,
> > +						pkt_len, buf_vec,
> > &num_buffers,
> > +						avail_head, &nr_vec) < 0)) {
> > +			VHOST_LOG_DATA(DEBUG,
> > +				"(%d) failed to get enough desc from
> > vring\n",
> > +				dev->vid);
> > +			vq->shadow_used_idx -= num_buffers;
> > +			break;
> > +		}
> > +
> > +		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end
> > index %d\n",
> > +			dev->vid, vq->last_avail_idx,
> > +			vq->last_avail_idx + num_buffers);
> > +
> > +		if (async_mbuf_to_desc(dev, vq, pkts[pkt_idx],
> > +				buf_vec, nr_vec, num_buffers,
> > +				src_iovec, dst_iovec, src_it, dst_it) < 0) {
> > +			vq->shadow_used_idx -= num_buffers;
> > +			break;
> > +		}
> > +
> > +		slot_idx = (vq->async_pkts_idx + pkt_idx) & (vq->size - 1);
> > +		if (src_it->count) {
> > +			async_fill_des(&tdes[pkt_burst_idx], src_it, dst_it);
> > +			pkt_burst_idx++;
> > +			vq->async_pending_info[slot_idx] =
> > +				num_buffers | (src_it->nr_segs << 16);
> > +			src_iovec += src_it->nr_segs;
> > +			dst_iovec += dst_it->nr_segs;
> > +			src_it += 2;
> > +			dst_it += 2;
> 
> Patrick,
> In my understanding, nr_segs type definition can follow nr_vec type
> definition (uint16_t). By that can short the data saved in async_pkts_pending
> from 64bit to 32bit.
> Since those information will be used in datapath, the smaller size will get the
> better perf.
> 
> It is better to replace integer 2 with macro.
> 
will update the code as you suggested.

Thanks,

Patrick


^ permalink raw reply	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2020-07-02 12:21 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-06-29 14:44 [dpdk-dev] [PATCH v2 0/2] introduce asynchronous data path for vhost patrick.fu
2020-06-29 14:44 ` [dpdk-dev] [PATCH v2 1/2] vhost: introduce async enqueue registration API patrick.fu
2020-06-29 14:44 ` [dpdk-dev] [PATCH v2 2/2] vhost: introduce async enqueue for split ring patrick.fu
2020-07-01  8:50   ` Liu, Yong
2020-07-02 12:21     ` Fu, Patrick

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).