DPDK patches and discussions
 help / color / mirror / Atom feed
* [dpdk-dev] [PATCH 0/2] Enhance Async Enqueue for Small Packets
@ 2020-12-11  9:21 Jiayu Hu
  2020-12-11  9:21 ` [dpdk-dev] [PATCH 1/2] vhost: cleanup async enqueue Jiayu Hu
                   ` (2 more replies)
  0 siblings, 3 replies; 18+ messages in thread
From: Jiayu Hu @ 2020-12-11  9:21 UTC (permalink / raw)
  To: dev; +Cc: maxime.coquelin, chenbo.xia, cheng1.jiang, yinan.wang, Jiayu Hu

Async enqueue offloads large copies to DMA devices, and small copies
are still performed by the CPU. However, it requires users to get
enqueue completed packets by rte_vhost_poll_enqueue_completed(), even
if they are completed by the CPU when rte_vhost_submit_enqueue_burst()
returns. This design incurs extra overheads of tracking completed
pktmbufs and function calls, thus degrading performance on small packets.

The first patch cleans up async enqueue code, and the second patch
enables rte_vhost_submit_enqueue_burst() to return completed packets.

Jiayu Hu (2):
  vhost: cleanup async enqueue
  vhost: enhance async enqueue for small packets

 lib/librte_vhost/rte_vhost_async.h |  24 ++--
 lib/librte_vhost/vhost.c           |  14 +-
 lib/librte_vhost/vhost.h           |   7 +-
 lib/librte_vhost/vhost_user.c      |   7 +-
 lib/librte_vhost/virtio_net.c      | 256 +++++++++++++++++++++----------------
 5 files changed, 171 insertions(+), 137 deletions(-)

-- 
2.7.4


^ permalink raw reply	[flat|nested] 18+ messages in thread

* [dpdk-dev] [PATCH 1/2] vhost: cleanup async enqueue
  2020-12-11  9:21 [dpdk-dev] [PATCH 0/2] Enhance Async Enqueue for Small Packets Jiayu Hu
@ 2020-12-11  9:21 ` Jiayu Hu
  2020-12-11  9:21 ` [dpdk-dev] [PATCH 2/2] vhost: enhance async enqueue for small packets Jiayu Hu
  2020-12-22  9:46 ` [dpdk-dev] [Patch v2 0/2] Enhance Async Enqueue for Small Packets Jiayu Hu
  2 siblings, 0 replies; 18+ messages in thread
From: Jiayu Hu @ 2020-12-11  9:21 UTC (permalink / raw)
  To: dev; +Cc: maxime.coquelin, chenbo.xia, cheng1.jiang, yinan.wang, Jiayu Hu

This patch removes unncessary check and function calls, and it changes
appropriate types for internal variables and fixes typos.

Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
---
 lib/librte_vhost/rte_vhost_async.h |  6 +++---
 lib/librte_vhost/virtio_net.c      | 16 ++++++++--------
 2 files changed, 11 insertions(+), 11 deletions(-)

diff --git a/lib/librte_vhost/rte_vhost_async.h b/lib/librte_vhost/rte_vhost_async.h
index c73bd7c..3be4ee4 100644
--- a/lib/librte_vhost/rte_vhost_async.h
+++ b/lib/librte_vhost/rte_vhost_async.h
@@ -147,8 +147,8 @@ __rte_experimental
 int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id);
 
 /**
- * This function submit enqueue data to async engine. This function has
- * no guranttee to the transfer completion upon return. Applications
+ * This function submits enqueue data to async engine. This function has
+ * no guarantee to the transfer completion upon return. Applications
  * should poll transfer status by rte_vhost_poll_enqueue_completed()
  *
  * @param vid
@@ -167,7 +167,7 @@ uint16_t rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
 		struct rte_mbuf **pkts, uint16_t count);
 
 /**
- * This function check async completion status for a specific vhost
+ * This function checks async completion status for a specific vhost
  * device queue. Packets which finish copying (enqueue) operation
  * will be returned in an array.
  *
diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index 6c51286..fc654be 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -1128,8 +1128,11 @@ async_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	}
 
 out:
-	async_fill_iter(src_it, tlen, src_iovec, tvec_idx);
-	async_fill_iter(dst_it, tlen, dst_iovec, tvec_idx);
+	if (tlen) {
+		async_fill_iter(src_it, tlen, src_iovec, tvec_idx);
+		async_fill_iter(dst_it, tlen, dst_iovec, tvec_idx);
+	} else
+		src_it->count = 0;
 
 	return error;
 }
@@ -1492,10 +1495,9 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	struct rte_vhost_iov_iter *src_it = it_pool;
 	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
 	uint16_t n_free_slot, slot_idx = 0;
-	uint16_t pkt_err = 0;
 	uint16_t segs_await = 0;
 	struct async_inflight_info *pkts_info = vq->async_pkts_info;
-	int n_pkts = 0;
+	uint32_t n_pkts = 0, pkt_err = 0;
 
 	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
 
@@ -1553,11 +1555,9 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 		/*
 		 * conditions to trigger async device transfer:
 		 * - buffered packet number reaches transfer threshold
-		 * - this is the last packet in the burst enqueue
 		 * - unused async iov number is less than max vhost vector
 		 */
 		if (pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
-			(pkt_idx == count - 1 && pkt_burst_idx) ||
 			(VHOST_MAX_ASYNC_VEC / 2 - segs_await <
 			BUF_VECTOR_MAX)) {
 			n_pkts = vq->async_ops.transfer_data(dev->vid,
@@ -1569,7 +1569,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 			segs_await = 0;
 			vq->async_pkts_inflight_n += pkt_burst_idx;
 
-			if (unlikely(n_pkts < (int)pkt_burst_idx)) {
+			if (unlikely(n_pkts < pkt_burst_idx)) {
 				/*
 				 * log error packets number here and do actual
 				 * error processing when applications poll
@@ -1589,7 +1589,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 				queue_id, tdes, 0, pkt_burst_idx);
 		vq->async_pkts_inflight_n += pkt_burst_idx;
 
-		if (unlikely(n_pkts < (int)pkt_burst_idx))
+		if (unlikely(n_pkts < pkt_burst_idx))
 			pkt_err = pkt_burst_idx - n_pkts;
 	}
 
-- 
2.7.4


^ permalink raw reply	[flat|nested] 18+ messages in thread

* [dpdk-dev] [PATCH 2/2] vhost: enhance async enqueue for small packets
  2020-12-11  9:21 [dpdk-dev] [PATCH 0/2] Enhance Async Enqueue for Small Packets Jiayu Hu
  2020-12-11  9:21 ` [dpdk-dev] [PATCH 1/2] vhost: cleanup async enqueue Jiayu Hu
@ 2020-12-11  9:21 ` Jiayu Hu
  2020-12-22  9:46 ` [dpdk-dev] [Patch v2 0/2] Enhance Async Enqueue for Small Packets Jiayu Hu
  2 siblings, 0 replies; 18+ messages in thread
From: Jiayu Hu @ 2020-12-11  9:21 UTC (permalink / raw)
  To: dev; +Cc: maxime.coquelin, chenbo.xia, cheng1.jiang, yinan.wang, Jiayu Hu

Async enqueue offloads large copies to DMA devices, and small copies
are still performed by the CPU. However, it requires users to get
enqueue completed packets by rte_vhost_poll_enqueue_completed(), even
if they are completed by the CPU when rte_vhost_submit_enqueue_burst()
returns. This design incurs extra overheads of tracking completed
pktmbufs and function calls, thus degrading performance on small packets.

This patch enhances async enqueue for small packets by enabling
rte_vhost_submit_enqueue_burst() to return completed packets.

Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
---
 lib/librte_vhost/rte_vhost_async.h |  18 +--
 lib/librte_vhost/vhost.c           |  14 +--
 lib/librte_vhost/vhost.h           |   7 +-
 lib/librte_vhost/vhost_user.c      |   7 +-
 lib/librte_vhost/virtio_net.c      | 240 +++++++++++++++++++++----------------
 5 files changed, 160 insertions(+), 126 deletions(-)

diff --git a/lib/librte_vhost/rte_vhost_async.h b/lib/librte_vhost/rte_vhost_async.h
index 3be4ee4..55c5d1c 100644
--- a/lib/librte_vhost/rte_vhost_async.h
+++ b/lib/librte_vhost/rte_vhost_async.h
@@ -87,13 +87,8 @@ struct rte_vhost_async_channel_ops {
  * inflight async packet information
  */
 struct async_inflight_info {
-	union {
-		uint32_t info;
-		struct {
-			uint16_t descs; /* num of descs inflight */
-			uint16_t segs; /* iov segs inflight */
-		};
-	};
+	struct rte_mbuf *mbuf;
+	uint16_t descs; /* num of descs inflight */
 };
 
 /**
@@ -159,12 +154,17 @@ int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id);
  *  array of packets to be enqueued
  * @param count
  *  packets num to be enqueued
+ * @param completed_pkts
+ *  array of transfer completed packets
+ * @param num_completed
+ *  num of transfer completed packets
  * @return
- *  num of packets enqueued
+ *  num of packets enqueued, including in-flight and transfer completed
  */
 __rte_experimental
 uint16_t rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
-		struct rte_mbuf **pkts, uint16_t count);
+		struct rte_mbuf **pkts, uint16_t count,
+		struct rte_mbuf **completed_pkts, uint32_t *num_completed);
 
 /**
  * This function checks async completion status for a specific vhost
diff --git a/lib/librte_vhost/vhost.c b/lib/librte_vhost/vhost.c
index b83cf63..47e378b 100644
--- a/lib/librte_vhost/vhost.c
+++ b/lib/librte_vhost/vhost.c
@@ -327,17 +327,17 @@ cleanup_device(struct virtio_net *dev, int destroy)
 static void
 vhost_free_async_mem(struct vhost_virtqueue *vq)
 {
-	if (vq->async_pkts_pending)
-		rte_free(vq->async_pkts_pending);
 	if (vq->async_pkts_info)
 		rte_free(vq->async_pkts_info);
+	if (vq->async_descs_split)
+		rte_free(vq->async_descs_split);
 	if (vq->it_pool)
 		rte_free(vq->it_pool);
 	if (vq->vec_pool)
 		rte_free(vq->vec_pool);
 
-	vq->async_pkts_pending = NULL;
 	vq->async_pkts_info = NULL;
+	vq->async_descs_split = NULL;
 	vq->it_pool = NULL;
 	vq->vec_pool = NULL;
 }
@@ -1628,9 +1628,6 @@ int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
 	node = SOCKET_ID_ANY;
 #endif
 
-	vq->async_pkts_pending = rte_malloc_socket(NULL,
-			vq->size * sizeof(uintptr_t),
-			RTE_CACHE_LINE_SIZE, node);
 	vq->async_pkts_info = rte_malloc_socket(NULL,
 			vq->size * sizeof(struct async_inflight_info),
 			RTE_CACHE_LINE_SIZE, node);
@@ -1640,7 +1637,10 @@ int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
 	vq->vec_pool = rte_malloc_socket(NULL,
 			VHOST_MAX_ASYNC_VEC * sizeof(struct iovec),
 			RTE_CACHE_LINE_SIZE, node);
-	if (!vq->async_pkts_pending || !vq->async_pkts_info ||
+	vq->async_descs_split = rte_malloc_socket(NULL,
+			vq->size * sizeof(struct vring_used_elem),
+			RTE_CACHE_LINE_SIZE, node);
+	if (!vq->async_descs_split || !vq->async_pkts_info ||
 		!vq->it_pool || !vq->vec_pool) {
 		vhost_free_async_mem(vq);
 		VHOST_LOG_CONFIG(ERR,
diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
index 361c9f7..d2076b4 100644
--- a/lib/librte_vhost/vhost.h
+++ b/lib/librte_vhost/vhost.h
@@ -202,11 +202,13 @@ struct vhost_virtqueue {
 	struct iovec *vec_pool;
 
 	/* async data transfer status */
-	uintptr_t	**async_pkts_pending;
 	struct async_inflight_info *async_pkts_info;
 	uint16_t	async_pkts_idx;
 	uint16_t	async_pkts_inflight_n;
 	uint16_t	async_last_pkts_n;
+	struct vring_used_elem  *async_descs_split;
+	uint16_t async_desc_idx;
+	uint16_t last_async_desc_idx;
 
 	/* vq async features */
 	bool		async_inorder;
@@ -733,8 +735,7 @@ vhost_vring_call_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
 	/* Don't kick guest if we don't reach index specified by guest. */
 	if (dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) {
 		uint16_t old = vq->signalled_used;
-		uint16_t new = vq->async_pkts_inflight_n ?
-					vq->used->idx:vq->last_used_idx;
+		uint16_t new = vq->last_used_idx;
 		bool signalled_used_valid = vq->signalled_used_valid;
 
 		vq->signalled_used = new;
diff --git a/lib/librte_vhost/vhost_user.c b/lib/librte_vhost/vhost_user.c
index 45c8ac0..2b00249 100644
--- a/lib/librte_vhost/vhost_user.c
+++ b/lib/librte_vhost/vhost_user.c
@@ -1967,12 +1967,13 @@ vhost_user_get_vring_base(struct virtio_net **pdev,
 	} else {
 		rte_free(vq->shadow_used_split);
 		vq->shadow_used_split = NULL;
-		if (vq->async_pkts_pending)
-			rte_free(vq->async_pkts_pending);
+
 		if (vq->async_pkts_info)
 			rte_free(vq->async_pkts_info);
-		vq->async_pkts_pending = NULL;
+		if (vq->async_descs_split)
+			rte_free(vq->async_descs_split);
 		vq->async_pkts_info = NULL;
+		vq->async_descs_split = NULL;
 	}
 
 	rte_free(vq->batch_copy_elems);
diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index fc654be..93f3d93 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -118,31 +118,6 @@ flush_shadow_used_ring_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
 }
 
 static __rte_always_inline void
-async_flush_shadow_used_ring_split(struct virtio_net *dev,
-	struct vhost_virtqueue *vq)
-{
-	uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
-
-	if (used_idx + vq->shadow_used_idx <= vq->size) {
-		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0,
-					  vq->shadow_used_idx);
-	} else {
-		uint16_t size;
-
-		/* update used ring interval [used_idx, vq->size] */
-		size = vq->size - used_idx;
-		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0, size);
-
-		/* update the left half used ring interval [0, left_size] */
-		do_flush_shadow_used_ring_split(dev, vq, 0, size,
-					  vq->shadow_used_idx - size);
-	}
-
-	vq->last_used_idx += vq->shadow_used_idx;
-	vq->shadow_used_idx = 0;
-}
-
-static __rte_always_inline void
 update_shadow_used_ring_split(struct vhost_virtqueue *vq,
 			 uint16_t desc_idx, uint32_t len)
 {
@@ -1480,7 +1455,8 @@ virtio_dev_rx_async_get_info_idx(uint16_t pkts_idx,
 static __rte_noinline uint32_t
 virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	struct vhost_virtqueue *vq, uint16_t queue_id,
-	struct rte_mbuf **pkts, uint32_t count)
+	struct rte_mbuf **pkts, uint32_t count,
+	struct rte_mbuf **completed_pkts, uint32_t *num_completed)
 {
 	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
 	uint16_t num_buffers;
@@ -1494,10 +1470,15 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
 	struct rte_vhost_iov_iter *src_it = it_pool;
 	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
-	uint16_t n_free_slot, slot_idx = 0;
+	uint16_t slot_idx = 0;
 	uint16_t segs_await = 0;
 	struct async_inflight_info *pkts_info = vq->async_pkts_info;
 	uint32_t n_pkts = 0, pkt_err = 0;
+	uint32_t num_async_pkts = 0, num_done_pkts = 0;
+	struct async_pkt_index {
+		uint16_t pkt_idx;
+		uint16_t last_avail_idx;
+	} async_pkts_log[MAX_PKT_BURST];
 
 	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
 
@@ -1534,21 +1515,50 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 			break;
 		}
 
-		slot_idx = (vq->async_pkts_idx + pkt_idx) & (vq->size - 1);
+		slot_idx = (vq->async_pkts_idx + num_async_pkts) &
+			(vq->size - 1);
 		if (src_it->count) {
-			async_fill_desc(&tdes[pkt_burst_idx], src_it, dst_it);
-			pkt_burst_idx++;
+			uint16_t from, to;
+
+			async_fill_desc(&tdes[pkt_burst_idx++], src_it, dst_it);
 			pkts_info[slot_idx].descs = num_buffers;
-			pkts_info[slot_idx].segs = src_it->nr_segs;
+			pkts_info[slot_idx].mbuf = pkts[pkt_idx];
+			async_pkts_log[num_async_pkts].pkt_idx = pkt_idx;
+			async_pkts_log[num_async_pkts++].last_avail_idx =
+				vq->last_avail_idx;
 			src_iovec += src_it->nr_segs;
 			dst_iovec += dst_it->nr_segs;
 			src_it += 2;
 			dst_it += 2;
 			segs_await += src_it->nr_segs;
-		} else {
-			pkts_info[slot_idx].info = num_buffers;
-			vq->async_pkts_inflight_n++;
-		}
+
+			/**
+			 * recover shadow used ring and keep DMA-occupied
+			 * descriptors.
+			 */
+			from = vq->shadow_used_idx - num_buffers;
+			to = vq->async_desc_idx & (vq->size - 1);
+			if (num_buffers + to <= vq->size) {
+				rte_memcpy(&vq->async_descs_split[to],
+						&vq->shadow_used_split[from],
+						num_buffers *
+						sizeof(struct vring_used_elem));
+			} else {
+				int size = vq->size - to;
+
+				rte_memcpy(&vq->async_descs_split[to],
+						&vq->shadow_used_split[from],
+						size *
+						sizeof(struct vring_used_elem));
+				rte_memcpy(vq->async_descs_split,
+						&vq->shadow_used_split[from +
+						size], (num_buffers - size) *
+					   sizeof(struct vring_used_elem));
+			}
+			vq->async_desc_idx += num_buffers;
+			vq->shadow_used_idx -= num_buffers;
+		} else
+			completed_pkts[num_done_pkts++] = pkts[pkt_idx];
 
 		vq->last_avail_idx += num_buffers;
 
@@ -1557,9 +1567,9 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 		 * - buffered packet number reaches transfer threshold
 		 * - unused async iov number is less than max vhost vector
 		 */
-		if (pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
-			(VHOST_MAX_ASYNC_VEC / 2 - segs_await <
-			BUF_VECTOR_MAX)) {
+		if (unlikely(pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
+			((VHOST_MAX_ASYNC_VEC >> 1) - segs_await <
+			BUF_VECTOR_MAX))) {
 			n_pkts = vq->async_ops.transfer_data(dev->vid,
 					queue_id, tdes, 0, pkt_burst_idx);
 			src_iovec = vec_pool;
@@ -1567,7 +1577,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 			src_it = it_pool;
 			dst_it = it_pool + 1;
 			segs_await = 0;
-			vq->async_pkts_inflight_n += pkt_burst_idx;
+			vq->async_pkts_inflight_n += n_pkts;
 
 			if (unlikely(n_pkts < pkt_burst_idx)) {
 				/*
@@ -1587,7 +1597,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	if (pkt_burst_idx) {
 		n_pkts = vq->async_ops.transfer_data(dev->vid,
 				queue_id, tdes, 0, pkt_burst_idx);
-		vq->async_pkts_inflight_n += pkt_burst_idx;
+		vq->async_pkts_inflight_n += n_pkts;
 
 		if (unlikely(n_pkts < pkt_burst_idx))
 			pkt_err = pkt_burst_idx - n_pkts;
@@ -1595,32 +1605,32 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 
 	do_data_copy_enqueue(dev, vq);
 
-	while (unlikely(pkt_err && pkt_idx)) {
-		if (pkts_info[slot_idx].segs)
-			pkt_err--;
-		vq->last_avail_idx -= pkts_info[slot_idx].descs;
-		vq->shadow_used_idx -= pkts_info[slot_idx].descs;
-		vq->async_pkts_inflight_n--;
-		slot_idx = (slot_idx - 1) & (vq->size - 1);
-		pkt_idx--;
-	}
-
-	n_free_slot = vq->size - vq->async_pkts_idx;
-	if (n_free_slot > pkt_idx) {
-		rte_memcpy(&vq->async_pkts_pending[vq->async_pkts_idx],
-			pkts, pkt_idx * sizeof(uintptr_t));
-		vq->async_pkts_idx += pkt_idx;
-	} else {
-		rte_memcpy(&vq->async_pkts_pending[vq->async_pkts_idx],
-			pkts, n_free_slot * sizeof(uintptr_t));
-		rte_memcpy(&vq->async_pkts_pending[0],
-			&pkts[n_free_slot],
-			(pkt_idx - n_free_slot) * sizeof(uintptr_t));
-		vq->async_pkts_idx = pkt_idx - n_free_slot;
+	if (unlikely(pkt_err)) {
+		uint16_t num_descs = 0;
+
+		num_async_pkts -= pkt_err;
+		/* calculate the sum fo descriptors of DMA-error packets. */
+		while (pkt_err-- > 0) {
+			num_descs += pkts_info[slot_idx & (vq->size - 1)].descs;
+			slot_idx--;
+		}
+		vq->async_desc_idx -= num_descs;
+		/* recover shadow used ring and available ring */
+		vq->shadow_used_idx -= (vq->last_avail_idx -
+				async_pkts_log[num_async_pkts].last_avail_idx -
+				num_descs);
+		vq->last_avail_idx =
+			async_pkts_log[num_async_pkts].last_avail_idx;
+		pkt_idx = async_pkts_log[num_async_pkts].pkt_idx;
 	}
 
-	if (likely(vq->shadow_used_idx))
-		async_flush_shadow_used_ring_split(dev, vq);
+	vq->async_pkts_idx += num_async_pkts;
+	*num_completed = num_done_pkts;
+
+	if (likely(vq->shadow_used_idx)) {
+		flush_shadow_used_ring_split(dev, vq);
+		vhost_vring_call_split(dev, vq);
+	}
 
 	return pkt_idx;
 }
@@ -1632,8 +1642,8 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 	struct vhost_virtqueue *vq;
 	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0;
 	uint16_t start_idx, pkts_idx, vq_size;
-	uint16_t n_inflight;
 	struct async_inflight_info *pkts_info;
+	uint16_t from, i;
 
 	if (!dev)
 		return 0;
@@ -1655,8 +1665,7 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 
 	rte_spinlock_lock(&vq->access_lock);
 
-	n_inflight = vq->async_pkts_inflight_n;
-	pkts_idx = vq->async_pkts_idx;
+	pkts_idx = vq->async_pkts_idx & (vq->size - 1);
 	pkts_info = vq->async_pkts_info;
 	vq_size = vq->size;
 	start_idx = virtio_dev_rx_async_get_info_idx(pkts_idx,
@@ -1667,42 +1676,61 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 			queue_id, 0, count - vq->async_last_pkts_n);
 	n_pkts_cpl += vq->async_last_pkts_n;
 
-	rte_smp_wmb();
-
-	while (likely((n_pkts_put < count) && n_inflight)) {
-		uint16_t info_idx = (start_idx + n_pkts_put) & (vq_size - 1);
-		if (n_pkts_cpl && pkts_info[info_idx].segs)
-			n_pkts_cpl--;
-		else if (!n_pkts_cpl && pkts_info[info_idx].segs)
-			break;
-		n_pkts_put++;
-		n_inflight--;
-		n_descs += pkts_info[info_idx].descs;
-	}
-
-	vq->async_last_pkts_n = n_pkts_cpl;
+	n_pkts_put = RTE_MIN(count, n_pkts_cpl);
+	if (unlikely(n_pkts_put == 0)) {
+		vq->async_last_pkts_n = n_pkts_cpl;
+		goto done;
+	}
+
+	for (i = 0; i < n_pkts_put; i++) {
+		from = (start_idx + i) & (vq_size - 1);
+		n_descs += pkts_info[from].descs;
+		pkts[i] = pkts_info[from].mbuf;
+	}
+	vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put;
+	vq->async_pkts_inflight_n -= n_pkts_put;
+
+	if (likely(vq->enabled && vq->access_ok)) {
+		uint16_t nr_left = n_descs;
+		uint16_t nr_copy;
+		uint16_t to;
+
+		/* write back completed descriptors to used ring */
+		do {
+			from = vq->last_async_desc_idx & (vq->size - 1);
+			nr_copy = nr_left + from <= vq->size ? nr_left :
+				vq->size - from;
+			to = vq->last_used_idx & (vq->size - 1);
+
+			if (to + nr_copy <= vq->size) {
+				rte_memcpy(&vq->used->ring[to],
+						&vq->async_descs_split[from],
+						nr_copy *
+						sizeof(struct vring_used_elem));
+			} else {
+				uint16_t size = vq->size - to;
+
+				rte_memcpy(&vq->used->ring[to],
+						&vq->async_descs_split[from],
+						size *
+						sizeof(struct vring_used_elem));
+				rte_memcpy(vq->used->ring,
+						&vq->async_descs_split[from +
+						size], (nr_copy - size) *
+						sizeof(struct vring_used_elem));
+			}
 
-	if (n_pkts_put) {
-		vq->async_pkts_inflight_n = n_inflight;
-		if (likely(vq->enabled && vq->access_ok)) {
-			__atomic_add_fetch(&vq->used->idx,
-					n_descs, __ATOMIC_RELEASE);
-			vhost_vring_call_split(dev, vq);
-		}
+			vq->last_async_desc_idx += nr_copy;
+			vq->last_used_idx += nr_copy;
+			nr_left -= nr_copy;
+		} while (nr_left > 0);
 
-		if (start_idx + n_pkts_put <= vq_size) {
-			rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
-				n_pkts_put * sizeof(uintptr_t));
-		} else {
-			rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
-				(vq_size - start_idx) * sizeof(uintptr_t));
-			rte_memcpy(&pkts[vq_size - start_idx],
-				vq->async_pkts_pending,
-				(n_pkts_put + start_idx - vq_size) *
-				sizeof(uintptr_t));
-		}
-	}
+		__atomic_add_fetch(&vq->used->idx, n_descs, __ATOMIC_RELEASE);
+		vhost_vring_call_split(dev, vq);
+	} else
+		vq->last_async_desc_idx += n_descs;
 
+done:
 	rte_spinlock_unlock(&vq->access_lock);
 
 	return n_pkts_put;
@@ -1710,7 +1738,8 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 
 static __rte_always_inline uint32_t
 virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
-	struct rte_mbuf **pkts, uint32_t count)
+	struct rte_mbuf **pkts, uint32_t count,
+	struct rte_mbuf **completed_pkts, uint32_t *num_completed)
 {
 	struct vhost_virtqueue *vq;
 	uint32_t nb_tx = 0;
@@ -1745,7 +1774,8 @@ virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
 		nb_tx = 0;
 	else
 		nb_tx = virtio_dev_rx_async_submit_split(dev,
-				vq, queue_id, pkts, count);
+				vq, queue_id, pkts, count,
+				completed_pkts, num_completed);
 
 out:
 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
@@ -1759,7 +1789,8 @@ virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
 
 uint16_t
 rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
-		struct rte_mbuf **pkts, uint16_t count)
+		struct rte_mbuf **pkts, uint16_t count,
+		struct rte_mbuf **completed_pkts, uint32_t *num_completed)
 {
 	struct virtio_net *dev = get_device(vid);
 
@@ -1773,7 +1804,8 @@ rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
 		return 0;
 	}
 
-	return virtio_dev_rx_async_submit(dev, queue_id, pkts, count);
+	return virtio_dev_rx_async_submit(dev, queue_id, pkts, count,
+			completed_pkts, num_completed);
 }
 
 static inline bool
-- 
2.7.4


^ permalink raw reply	[flat|nested] 18+ messages in thread

* [dpdk-dev] [Patch v2 0/2] Enhance Async Enqueue for Small Packets
  2020-12-11  9:21 [dpdk-dev] [PATCH 0/2] Enhance Async Enqueue for Small Packets Jiayu Hu
  2020-12-11  9:21 ` [dpdk-dev] [PATCH 1/2] vhost: cleanup async enqueue Jiayu Hu
  2020-12-11  9:21 ` [dpdk-dev] [PATCH 2/2] vhost: enhance async enqueue for small packets Jiayu Hu
@ 2020-12-22  9:46 ` Jiayu Hu
  2020-12-22  9:46   ` [dpdk-dev] [Patch v2 1/2] vhost: cleanup async enqueue Jiayu Hu
                     ` (2 more replies)
  2 siblings, 3 replies; 18+ messages in thread
From: Jiayu Hu @ 2020-12-22  9:46 UTC (permalink / raw)
  To: dev; +Cc: maxime.coquelin, chenbo.xia, cheng1.jiang, yinan.wang, jiayu.hu

Async enqueue offloads large copies to DMA devices, and small copies
are still performed by the CPU. However, it requires users to get
enqueue completed packets by rte_vhost_poll_enqueue_completed(), even
if they are completed by the CPU when rte_vhost_submit_enqueue_burst()
returns. This design incurs extra overheads of tracking completed
pktmbufs and function calls, thus degrading performance on small packets.

The first patch cleans up async enqueue code, and the second patch
enables rte_vhost_submit_enqueue_burst() to return completed packets.

Change log
==========
v2:
- fix typo
- rename API variables
- update programmer guide

Jiayu Hu (2):
  vhost: cleanup async enqueue
  vhost: enhance async enqueue for small packets

 doc/guides/prog_guide/vhost_lib.rst |   7 +-
 lib/librte_vhost/rte_vhost_async.h  |  24 ++--
 lib/librte_vhost/vhost.c            |  14 +-
 lib/librte_vhost/vhost.h            |   7 +-
 lib/librte_vhost/vhost_user.c       |   7 +-
 lib/librte_vhost/virtio_net.c       | 256 ++++++++++++++++++++----------------
 6 files changed, 175 insertions(+), 140 deletions(-)

-- 
2.7.4


^ permalink raw reply	[flat|nested] 18+ messages in thread

* [dpdk-dev] [Patch v2 1/2] vhost: cleanup async enqueue
  2020-12-22  9:46 ` [dpdk-dev] [Patch v2 0/2] Enhance Async Enqueue for Small Packets Jiayu Hu
@ 2020-12-22  9:46   ` Jiayu Hu
  2020-12-22  9:46   ` [dpdk-dev] [Patch v2 2/2] vhost: enhance async enqueue for small packets Jiayu Hu
  2020-12-25  8:28   ` [dpdk-dev] [PATCH v3 0/2] Enhance Async Enqueue for Small Packets Jiayu Hu
  2 siblings, 0 replies; 18+ messages in thread
From: Jiayu Hu @ 2020-12-22  9:46 UTC (permalink / raw)
  To: dev; +Cc: maxime.coquelin, chenbo.xia, cheng1.jiang, yinan.wang, jiayu.hu

This patch removes unnecessary check and function calls, and it changes
appropriate types for internal variables and fixes typos.

Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
---
 lib/librte_vhost/rte_vhost_async.h |  6 +++---
 lib/librte_vhost/virtio_net.c      | 16 ++++++++--------
 2 files changed, 11 insertions(+), 11 deletions(-)

diff --git a/lib/librte_vhost/rte_vhost_async.h b/lib/librte_vhost/rte_vhost_async.h
index c73bd7c..3be4ee4 100644
--- a/lib/librte_vhost/rte_vhost_async.h
+++ b/lib/librte_vhost/rte_vhost_async.h
@@ -147,8 +147,8 @@ __rte_experimental
 int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id);
 
 /**
- * This function submit enqueue data to async engine. This function has
- * no guranttee to the transfer completion upon return. Applications
+ * This function submits enqueue data to async engine. This function has
+ * no guarantee to the transfer completion upon return. Applications
  * should poll transfer status by rte_vhost_poll_enqueue_completed()
  *
  * @param vid
@@ -167,7 +167,7 @@ uint16_t rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
 		struct rte_mbuf **pkts, uint16_t count);
 
 /**
- * This function check async completion status for a specific vhost
+ * This function checks async completion status for a specific vhost
  * device queue. Packets which finish copying (enqueue) operation
  * will be returned in an array.
  *
diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index 6c51286..fc654be 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -1128,8 +1128,11 @@ async_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	}
 
 out:
-	async_fill_iter(src_it, tlen, src_iovec, tvec_idx);
-	async_fill_iter(dst_it, tlen, dst_iovec, tvec_idx);
+	if (tlen) {
+		async_fill_iter(src_it, tlen, src_iovec, tvec_idx);
+		async_fill_iter(dst_it, tlen, dst_iovec, tvec_idx);
+	} else
+		src_it->count = 0;
 
 	return error;
 }
@@ -1492,10 +1495,9 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	struct rte_vhost_iov_iter *src_it = it_pool;
 	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
 	uint16_t n_free_slot, slot_idx = 0;
-	uint16_t pkt_err = 0;
 	uint16_t segs_await = 0;
 	struct async_inflight_info *pkts_info = vq->async_pkts_info;
-	int n_pkts = 0;
+	uint32_t n_pkts = 0, pkt_err = 0;
 
 	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
 
@@ -1553,11 +1555,9 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 		/*
 		 * conditions to trigger async device transfer:
 		 * - buffered packet number reaches transfer threshold
-		 * - this is the last packet in the burst enqueue
 		 * - unused async iov number is less than max vhost vector
 		 */
 		if (pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
-			(pkt_idx == count - 1 && pkt_burst_idx) ||
 			(VHOST_MAX_ASYNC_VEC / 2 - segs_await <
 			BUF_VECTOR_MAX)) {
 			n_pkts = vq->async_ops.transfer_data(dev->vid,
@@ -1569,7 +1569,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 			segs_await = 0;
 			vq->async_pkts_inflight_n += pkt_burst_idx;
 
-			if (unlikely(n_pkts < (int)pkt_burst_idx)) {
+			if (unlikely(n_pkts < pkt_burst_idx)) {
 				/*
 				 * log error packets number here and do actual
 				 * error processing when applications poll
@@ -1589,7 +1589,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 				queue_id, tdes, 0, pkt_burst_idx);
 		vq->async_pkts_inflight_n += pkt_burst_idx;
 
-		if (unlikely(n_pkts < (int)pkt_burst_idx))
+		if (unlikely(n_pkts < pkt_burst_idx))
 			pkt_err = pkt_burst_idx - n_pkts;
 	}
 
-- 
2.7.4


^ permalink raw reply	[flat|nested] 18+ messages in thread

* [dpdk-dev] [Patch v2 2/2] vhost: enhance async enqueue for small packets
  2020-12-22  9:46 ` [dpdk-dev] [Patch v2 0/2] Enhance Async Enqueue for Small Packets Jiayu Hu
  2020-12-22  9:46   ` [dpdk-dev] [Patch v2 1/2] vhost: cleanup async enqueue Jiayu Hu
@ 2020-12-22  9:46   ` Jiayu Hu
  2020-12-25  8:28   ` [dpdk-dev] [PATCH v3 0/2] Enhance Async Enqueue for Small Packets Jiayu Hu
  2 siblings, 0 replies; 18+ messages in thread
From: Jiayu Hu @ 2020-12-22  9:46 UTC (permalink / raw)
  To: dev; +Cc: maxime.coquelin, chenbo.xia, cheng1.jiang, yinan.wang, jiayu.hu

Async enqueue offloads large copies to DMA devices, and small copies
are still performed by the CPU. However, it requires users to get
enqueue completed packets by rte_vhost_poll_enqueue_completed(), even
if they are completed by the CPU when rte_vhost_submit_enqueue_burst()
returns. This design incurs extra overheads of tracking completed
pktmbufs and function calls, thus degrading performance on small packets.

This patch enhances async enqueue for small packets by enabling
rte_vhost_submit_enqueue_burst() to return completed packets.

Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
---
 doc/guides/prog_guide/vhost_lib.rst |   7 +-
 lib/librte_vhost/rte_vhost_async.h  |  18 +--
 lib/librte_vhost/vhost.c            |  14 +--
 lib/librte_vhost/vhost.h            |   7 +-
 lib/librte_vhost/vhost_user.c       |   7 +-
 lib/librte_vhost/virtio_net.c       | 240 ++++++++++++++++++++----------------
 6 files changed, 164 insertions(+), 129 deletions(-)

diff --git a/doc/guides/prog_guide/vhost_lib.rst b/doc/guides/prog_guide/vhost_lib.rst
index ba4c62a..0668fd8 100644
--- a/doc/guides/prog_guide/vhost_lib.rst
+++ b/doc/guides/prog_guide/vhost_lib.rst
@@ -245,11 +245,12 @@ The following is an overview of some key Vhost API functions:
 
   Unregister the async copy device channel from a vhost queue.
 
-* ``rte_vhost_submit_enqueue_burst(vid, queue_id, pkts, count)``
+* ``rte_vhost_submit_enqueue_burst(vid, queue_id, pkts, count, comp_pkts, comp_count)``
 
   Submit an enqueue request to transmit ``count`` packets from host to guest
-  by async data path. Enqueue is not guaranteed to finish upon the return of
-  this API call.
+  by async data path. Enqueue completed packets are returned in
+  ``comp_pkts``, and others are not guaranteed to finish, when this API
+  call returns.
 
   Applications must not free the packets submitted for enqueue until the
   packets are completed.
diff --git a/lib/librte_vhost/rte_vhost_async.h b/lib/librte_vhost/rte_vhost_async.h
index 3be4ee4..6f230ba 100644
--- a/lib/librte_vhost/rte_vhost_async.h
+++ b/lib/librte_vhost/rte_vhost_async.h
@@ -87,13 +87,8 @@ struct rte_vhost_async_channel_ops {
  * inflight async packet information
  */
 struct async_inflight_info {
-	union {
-		uint32_t info;
-		struct {
-			uint16_t descs; /* num of descs inflight */
-			uint16_t segs; /* iov segs inflight */
-		};
-	};
+	struct rte_mbuf *mbuf;
+	uint16_t descs; /* num of descs inflight */
 };
 
 /**
@@ -159,12 +154,17 @@ int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id);
  *  array of packets to be enqueued
  * @param count
  *  packets num to be enqueued
+ * @param comp_pkts
+ *  array of transfer completed packets
+ * @param comp_count
+ *  num of transfer completed packets
  * @return
- *  num of packets enqueued
+ *  num of packets enqueued, including in-flight and transfer completed
  */
 __rte_experimental
 uint16_t rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
-		struct rte_mbuf **pkts, uint16_t count);
+		struct rte_mbuf **pkts, uint16_t count,
+		struct rte_mbuf **comp_pkts, uint32_t *comp_count);
 
 /**
  * This function checks async completion status for a specific vhost
diff --git a/lib/librte_vhost/vhost.c b/lib/librte_vhost/vhost.c
index b83cf63..47e378b 100644
--- a/lib/librte_vhost/vhost.c
+++ b/lib/librte_vhost/vhost.c
@@ -327,17 +327,17 @@ cleanup_device(struct virtio_net *dev, int destroy)
 static void
 vhost_free_async_mem(struct vhost_virtqueue *vq)
 {
-	if (vq->async_pkts_pending)
-		rte_free(vq->async_pkts_pending);
 	if (vq->async_pkts_info)
 		rte_free(vq->async_pkts_info);
+	if (vq->async_descs_split)
+		rte_free(vq->async_descs_split);
 	if (vq->it_pool)
 		rte_free(vq->it_pool);
 	if (vq->vec_pool)
 		rte_free(vq->vec_pool);
 
-	vq->async_pkts_pending = NULL;
 	vq->async_pkts_info = NULL;
+	vq->async_descs_split = NULL;
 	vq->it_pool = NULL;
 	vq->vec_pool = NULL;
 }
@@ -1628,9 +1628,6 @@ int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
 	node = SOCKET_ID_ANY;
 #endif
 
-	vq->async_pkts_pending = rte_malloc_socket(NULL,
-			vq->size * sizeof(uintptr_t),
-			RTE_CACHE_LINE_SIZE, node);
 	vq->async_pkts_info = rte_malloc_socket(NULL,
 			vq->size * sizeof(struct async_inflight_info),
 			RTE_CACHE_LINE_SIZE, node);
@@ -1640,7 +1637,10 @@ int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
 	vq->vec_pool = rte_malloc_socket(NULL,
 			VHOST_MAX_ASYNC_VEC * sizeof(struct iovec),
 			RTE_CACHE_LINE_SIZE, node);
-	if (!vq->async_pkts_pending || !vq->async_pkts_info ||
+	vq->async_descs_split = rte_malloc_socket(NULL,
+			vq->size * sizeof(struct vring_used_elem),
+			RTE_CACHE_LINE_SIZE, node);
+	if (!vq->async_descs_split || !vq->async_pkts_info ||
 		!vq->it_pool || !vq->vec_pool) {
 		vhost_free_async_mem(vq);
 		VHOST_LOG_CONFIG(ERR,
diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
index 361c9f7..d2076b4 100644
--- a/lib/librte_vhost/vhost.h
+++ b/lib/librte_vhost/vhost.h
@@ -202,11 +202,13 @@ struct vhost_virtqueue {
 	struct iovec *vec_pool;
 
 	/* async data transfer status */
-	uintptr_t	**async_pkts_pending;
 	struct async_inflight_info *async_pkts_info;
 	uint16_t	async_pkts_idx;
 	uint16_t	async_pkts_inflight_n;
 	uint16_t	async_last_pkts_n;
+	struct vring_used_elem  *async_descs_split;
+	uint16_t async_desc_idx;
+	uint16_t last_async_desc_idx;
 
 	/* vq async features */
 	bool		async_inorder;
@@ -733,8 +735,7 @@ vhost_vring_call_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
 	/* Don't kick guest if we don't reach index specified by guest. */
 	if (dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) {
 		uint16_t old = vq->signalled_used;
-		uint16_t new = vq->async_pkts_inflight_n ?
-					vq->used->idx:vq->last_used_idx;
+		uint16_t new = vq->last_used_idx;
 		bool signalled_used_valid = vq->signalled_used_valid;
 
 		vq->signalled_used = new;
diff --git a/lib/librte_vhost/vhost_user.c b/lib/librte_vhost/vhost_user.c
index 45c8ac0..2b00249 100644
--- a/lib/librte_vhost/vhost_user.c
+++ b/lib/librte_vhost/vhost_user.c
@@ -1967,12 +1967,13 @@ vhost_user_get_vring_base(struct virtio_net **pdev,
 	} else {
 		rte_free(vq->shadow_used_split);
 		vq->shadow_used_split = NULL;
-		if (vq->async_pkts_pending)
-			rte_free(vq->async_pkts_pending);
+
 		if (vq->async_pkts_info)
 			rte_free(vq->async_pkts_info);
-		vq->async_pkts_pending = NULL;
+		if (vq->async_descs_split)
+			rte_free(vq->async_descs_split);
 		vq->async_pkts_info = NULL;
+		vq->async_descs_split = NULL;
 	}
 
 	rte_free(vq->batch_copy_elems);
diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index fc654be..52df311 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -118,31 +118,6 @@ flush_shadow_used_ring_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
 }
 
 static __rte_always_inline void
-async_flush_shadow_used_ring_split(struct virtio_net *dev,
-	struct vhost_virtqueue *vq)
-{
-	uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
-
-	if (used_idx + vq->shadow_used_idx <= vq->size) {
-		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0,
-					  vq->shadow_used_idx);
-	} else {
-		uint16_t size;
-
-		/* update used ring interval [used_idx, vq->size] */
-		size = vq->size - used_idx;
-		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0, size);
-
-		/* update the left half used ring interval [0, left_size] */
-		do_flush_shadow_used_ring_split(dev, vq, 0, size,
-					  vq->shadow_used_idx - size);
-	}
-
-	vq->last_used_idx += vq->shadow_used_idx;
-	vq->shadow_used_idx = 0;
-}
-
-static __rte_always_inline void
 update_shadow_used_ring_split(struct vhost_virtqueue *vq,
 			 uint16_t desc_idx, uint32_t len)
 {
@@ -1480,7 +1455,8 @@ virtio_dev_rx_async_get_info_idx(uint16_t pkts_idx,
 static __rte_noinline uint32_t
 virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	struct vhost_virtqueue *vq, uint16_t queue_id,
-	struct rte_mbuf **pkts, uint32_t count)
+	struct rte_mbuf **pkts, uint32_t count,
+	struct rte_mbuf **comp_pkts, uint32_t *comp_count)
 {
 	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
 	uint16_t num_buffers;
@@ -1494,10 +1470,15 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
 	struct rte_vhost_iov_iter *src_it = it_pool;
 	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
-	uint16_t n_free_slot, slot_idx = 0;
+	uint16_t slot_idx = 0;
 	uint16_t segs_await = 0;
 	struct async_inflight_info *pkts_info = vq->async_pkts_info;
 	uint32_t n_pkts = 0, pkt_err = 0;
+	uint32_t num_async_pkts = 0, num_done_pkts = 0;
+	struct {
+		uint16_t pkt_idx;
+		uint16_t last_avail_idx;
+	} async_pkts_log[MAX_PKT_BURST];
 
 	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
 
@@ -1534,21 +1515,50 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 			break;
 		}
 
-		slot_idx = (vq->async_pkts_idx + pkt_idx) & (vq->size - 1);
+		slot_idx = (vq->async_pkts_idx + num_async_pkts) &
+			(vq->size - 1);
 		if (src_it->count) {
-			async_fill_desc(&tdes[pkt_burst_idx], src_it, dst_it);
-			pkt_burst_idx++;
+			uint16_t from, to;
+
+			async_fill_desc(&tdes[pkt_burst_idx++], src_it, dst_it);
 			pkts_info[slot_idx].descs = num_buffers;
-			pkts_info[slot_idx].segs = src_it->nr_segs;
+			pkts_info[slot_idx].mbuf = pkts[pkt_idx];
+			async_pkts_log[num_async_pkts].pkt_idx = pkt_idx;
+			async_pkts_log[num_async_pkts++].last_avail_idx =
+				vq->last_avail_idx;
 			src_iovec += src_it->nr_segs;
 			dst_iovec += dst_it->nr_segs;
 			src_it += 2;
 			dst_it += 2;
 			segs_await += src_it->nr_segs;
-		} else {
-			pkts_info[slot_idx].info = num_buffers;
-			vq->async_pkts_inflight_n++;
-		}
+
+			/**
+			 * recover shadow used ring and keep DMA-occupied
+			 * descriptors.
+			 */
+			from = vq->shadow_used_idx - num_buffers;
+			to = vq->async_desc_idx & (vq->size - 1);
+			if (num_buffers + to <= vq->size) {
+				rte_memcpy(&vq->async_descs_split[to],
+						&vq->shadow_used_split[from],
+						num_buffers *
+						sizeof(struct vring_used_elem));
+			} else {
+				int size = vq->size - to;
+
+				rte_memcpy(&vq->async_descs_split[to],
+						&vq->shadow_used_split[from],
+						size *
+						sizeof(struct vring_used_elem));
+				rte_memcpy(vq->async_descs_split,
+						&vq->shadow_used_split[from +
+						size], (num_buffers - size) *
+					   sizeof(struct vring_used_elem));
+			}
+			vq->async_desc_idx += num_buffers;
+			vq->shadow_used_idx -= num_buffers;
+		} else
+			comp_pkts[num_done_pkts++] = pkts[pkt_idx];
 
 		vq->last_avail_idx += num_buffers;
 
@@ -1557,9 +1567,9 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 		 * - buffered packet number reaches transfer threshold
 		 * - unused async iov number is less than max vhost vector
 		 */
-		if (pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
-			(VHOST_MAX_ASYNC_VEC / 2 - segs_await <
-			BUF_VECTOR_MAX)) {
+		if (unlikely(pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
+			((VHOST_MAX_ASYNC_VEC >> 1) - segs_await <
+			BUF_VECTOR_MAX))) {
 			n_pkts = vq->async_ops.transfer_data(dev->vid,
 					queue_id, tdes, 0, pkt_burst_idx);
 			src_iovec = vec_pool;
@@ -1567,7 +1577,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 			src_it = it_pool;
 			dst_it = it_pool + 1;
 			segs_await = 0;
-			vq->async_pkts_inflight_n += pkt_burst_idx;
+			vq->async_pkts_inflight_n += n_pkts;
 
 			if (unlikely(n_pkts < pkt_burst_idx)) {
 				/*
@@ -1587,7 +1597,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	if (pkt_burst_idx) {
 		n_pkts = vq->async_ops.transfer_data(dev->vid,
 				queue_id, tdes, 0, pkt_burst_idx);
-		vq->async_pkts_inflight_n += pkt_burst_idx;
+		vq->async_pkts_inflight_n += n_pkts;
 
 		if (unlikely(n_pkts < pkt_burst_idx))
 			pkt_err = pkt_burst_idx - n_pkts;
@@ -1595,32 +1605,32 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 
 	do_data_copy_enqueue(dev, vq);
 
-	while (unlikely(pkt_err && pkt_idx)) {
-		if (pkts_info[slot_idx].segs)
-			pkt_err--;
-		vq->last_avail_idx -= pkts_info[slot_idx].descs;
-		vq->shadow_used_idx -= pkts_info[slot_idx].descs;
-		vq->async_pkts_inflight_n--;
-		slot_idx = (slot_idx - 1) & (vq->size - 1);
-		pkt_idx--;
-	}
-
-	n_free_slot = vq->size - vq->async_pkts_idx;
-	if (n_free_slot > pkt_idx) {
-		rte_memcpy(&vq->async_pkts_pending[vq->async_pkts_idx],
-			pkts, pkt_idx * sizeof(uintptr_t));
-		vq->async_pkts_idx += pkt_idx;
-	} else {
-		rte_memcpy(&vq->async_pkts_pending[vq->async_pkts_idx],
-			pkts, n_free_slot * sizeof(uintptr_t));
-		rte_memcpy(&vq->async_pkts_pending[0],
-			&pkts[n_free_slot],
-			(pkt_idx - n_free_slot) * sizeof(uintptr_t));
-		vq->async_pkts_idx = pkt_idx - n_free_slot;
+	if (unlikely(pkt_err)) {
+		uint16_t num_descs = 0;
+
+		num_async_pkts -= pkt_err;
+		/* calculate the sum fo descriptors of DMA-error packets. */
+		while (pkt_err-- > 0) {
+			num_descs += pkts_info[slot_idx & (vq->size - 1)].descs;
+			slot_idx--;
+		}
+		vq->async_desc_idx -= num_descs;
+		/* recover shadow used ring and available ring */
+		vq->shadow_used_idx -= (vq->last_avail_idx -
+				async_pkts_log[num_async_pkts].last_avail_idx -
+				num_descs);
+		vq->last_avail_idx =
+			async_pkts_log[num_async_pkts].last_avail_idx;
+		pkt_idx = async_pkts_log[num_async_pkts].pkt_idx;
 	}
 
-	if (likely(vq->shadow_used_idx))
-		async_flush_shadow_used_ring_split(dev, vq);
+	vq->async_pkts_idx += num_async_pkts;
+	*comp_count = num_done_pkts;
+
+	if (likely(vq->shadow_used_idx)) {
+		flush_shadow_used_ring_split(dev, vq);
+		vhost_vring_call_split(dev, vq);
+	}
 
 	return pkt_idx;
 }
@@ -1632,8 +1642,8 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 	struct vhost_virtqueue *vq;
 	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0;
 	uint16_t start_idx, pkts_idx, vq_size;
-	uint16_t n_inflight;
 	struct async_inflight_info *pkts_info;
+	uint16_t from, i;
 
 	if (!dev)
 		return 0;
@@ -1655,8 +1665,7 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 
 	rte_spinlock_lock(&vq->access_lock);
 
-	n_inflight = vq->async_pkts_inflight_n;
-	pkts_idx = vq->async_pkts_idx;
+	pkts_idx = vq->async_pkts_idx & (vq->size - 1);
 	pkts_info = vq->async_pkts_info;
 	vq_size = vq->size;
 	start_idx = virtio_dev_rx_async_get_info_idx(pkts_idx,
@@ -1667,42 +1676,61 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 			queue_id, 0, count - vq->async_last_pkts_n);
 	n_pkts_cpl += vq->async_last_pkts_n;
 
-	rte_smp_wmb();
-
-	while (likely((n_pkts_put < count) && n_inflight)) {
-		uint16_t info_idx = (start_idx + n_pkts_put) & (vq_size - 1);
-		if (n_pkts_cpl && pkts_info[info_idx].segs)
-			n_pkts_cpl--;
-		else if (!n_pkts_cpl && pkts_info[info_idx].segs)
-			break;
-		n_pkts_put++;
-		n_inflight--;
-		n_descs += pkts_info[info_idx].descs;
-	}
-
-	vq->async_last_pkts_n = n_pkts_cpl;
+	n_pkts_put = RTE_MIN(count, n_pkts_cpl);
+	if (unlikely(n_pkts_put == 0)) {
+		vq->async_last_pkts_n = n_pkts_cpl;
+		goto done;
+	}
+
+	for (i = 0; i < n_pkts_put; i++) {
+		from = (start_idx + i) & (vq_size - 1);
+		n_descs += pkts_info[from].descs;
+		pkts[i] = pkts_info[from].mbuf;
+	}
+	vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put;
+	vq->async_pkts_inflight_n -= n_pkts_put;
+
+	if (likely(vq->enabled && vq->access_ok)) {
+		uint16_t nr_left = n_descs;
+		uint16_t nr_copy;
+		uint16_t to;
+
+		/* write back completed descriptors to used ring */
+		do {
+			from = vq->last_async_desc_idx & (vq->size - 1);
+			nr_copy = nr_left + from <= vq->size ? nr_left :
+				vq->size - from;
+			to = vq->last_used_idx & (vq->size - 1);
+
+			if (to + nr_copy <= vq->size) {
+				rte_memcpy(&vq->used->ring[to],
+						&vq->async_descs_split[from],
+						nr_copy *
+						sizeof(struct vring_used_elem));
+			} else {
+				uint16_t size = vq->size - to;
+
+				rte_memcpy(&vq->used->ring[to],
+						&vq->async_descs_split[from],
+						size *
+						sizeof(struct vring_used_elem));
+				rte_memcpy(vq->used->ring,
+						&vq->async_descs_split[from +
+						size], (nr_copy - size) *
+						sizeof(struct vring_used_elem));
+			}
 
-	if (n_pkts_put) {
-		vq->async_pkts_inflight_n = n_inflight;
-		if (likely(vq->enabled && vq->access_ok)) {
-			__atomic_add_fetch(&vq->used->idx,
-					n_descs, __ATOMIC_RELEASE);
-			vhost_vring_call_split(dev, vq);
-		}
+			vq->last_async_desc_idx += nr_copy;
+			vq->last_used_idx += nr_copy;
+			nr_left -= nr_copy;
+		} while (nr_left > 0);
 
-		if (start_idx + n_pkts_put <= vq_size) {
-			rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
-				n_pkts_put * sizeof(uintptr_t));
-		} else {
-			rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
-				(vq_size - start_idx) * sizeof(uintptr_t));
-			rte_memcpy(&pkts[vq_size - start_idx],
-				vq->async_pkts_pending,
-				(n_pkts_put + start_idx - vq_size) *
-				sizeof(uintptr_t));
-		}
-	}
+		__atomic_add_fetch(&vq->used->idx, n_descs, __ATOMIC_RELEASE);
+		vhost_vring_call_split(dev, vq);
+	} else
+		vq->last_async_desc_idx += n_descs;
 
+done:
 	rte_spinlock_unlock(&vq->access_lock);
 
 	return n_pkts_put;
@@ -1710,7 +1738,8 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 
 static __rte_always_inline uint32_t
 virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
-	struct rte_mbuf **pkts, uint32_t count)
+	struct rte_mbuf **pkts, uint32_t count,
+	struct rte_mbuf **comp_pkts, uint32_t *comp_count)
 {
 	struct vhost_virtqueue *vq;
 	uint32_t nb_tx = 0;
@@ -1745,7 +1774,8 @@ virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
 		nb_tx = 0;
 	else
 		nb_tx = virtio_dev_rx_async_submit_split(dev,
-				vq, queue_id, pkts, count);
+				vq, queue_id, pkts, count, comp_pkts,
+				comp_count);
 
 out:
 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
@@ -1759,7 +1789,8 @@ virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
 
 uint16_t
 rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
-		struct rte_mbuf **pkts, uint16_t count)
+		struct rte_mbuf **pkts, uint16_t count,
+		struct rte_mbuf **comp_pkts, uint32_t *comp_count)
 {
 	struct virtio_net *dev = get_device(vid);
 
@@ -1773,7 +1804,8 @@ rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
 		return 0;
 	}
 
-	return virtio_dev_rx_async_submit(dev, queue_id, pkts, count);
+	return virtio_dev_rx_async_submit(dev, queue_id, pkts, count, comp_pkts,
+			comp_count);
 }
 
 static inline bool
-- 
2.7.4


^ permalink raw reply	[flat|nested] 18+ messages in thread

* [dpdk-dev] [PATCH v3 0/2] Enhance Async Enqueue for Small Packets
  2020-12-22  9:46 ` [dpdk-dev] [Patch v2 0/2] Enhance Async Enqueue for Small Packets Jiayu Hu
  2020-12-22  9:46   ` [dpdk-dev] [Patch v2 1/2] vhost: cleanup async enqueue Jiayu Hu
  2020-12-22  9:46   ` [dpdk-dev] [Patch v2 2/2] vhost: enhance async enqueue for small packets Jiayu Hu
@ 2020-12-25  8:28   ` Jiayu Hu
  2020-12-25  8:28     ` [dpdk-dev] [PATCH v3 1/2] vhost: cleanup async enqueue Jiayu Hu
                       ` (4 more replies)
  2 siblings, 5 replies; 18+ messages in thread
From: Jiayu Hu @ 2020-12-25  8:28 UTC (permalink / raw)
  To: dev; +Cc: maxime.coquelin, chenbo.xia, cheng1.jiang, yinan.wang, jiayu.hu

Async enqueue offloads large copies to DMA devices, and small copies
are still performed by the CPU. However, it requires users to get
enqueue completed packets by rte_vhost_poll_enqueue_completed(), even
if they are completed by the CPU when rte_vhost_submit_enqueue_burst()
returns. This design incurs extra overheads of tracking completed
pktmbufs and function calls, thus degrading performance on small packets.

The first patch cleans up async enqueue code, and the second patch
enables rte_vhost_submit_enqueue_burst() to return completed packets.

Change log
==========
v3:
- fix incorrect ret value when DMA ring is full
- enhance description of API declaration and programmer guide
v2:
- fix typo
- rename API variables
- update programmer guide

Jiayu Hu (2):
  vhost: cleanup async enqueue
  vhost: enhance async enqueue for small packets

 doc/guides/prog_guide/vhost_lib.rst |   8 +-
 lib/librte_vhost/rte_vhost_async.h  |  32 +++--
 lib/librte_vhost/vhost.c            |  14 +-
 lib/librte_vhost/vhost.h            |   7 +-
 lib/librte_vhost/vhost_user.c       |   7 +-
 lib/librte_vhost/virtio_net.c       | 258 ++++++++++++++++++++----------------
 6 files changed, 185 insertions(+), 141 deletions(-)

-- 
2.7.4


^ permalink raw reply	[flat|nested] 18+ messages in thread

* [dpdk-dev] [PATCH v3 1/2] vhost: cleanup async enqueue
  2020-12-25  8:28   ` [dpdk-dev] [PATCH v3 0/2] Enhance Async Enqueue for Small Packets Jiayu Hu
@ 2020-12-25  8:28     ` Jiayu Hu
  2020-12-25  8:28     ` [dpdk-dev] [PATCH v3 2/2] vhost: enhance async enqueue for small packets Jiayu Hu
                       ` (3 subsequent siblings)
  4 siblings, 0 replies; 18+ messages in thread
From: Jiayu Hu @ 2020-12-25  8:28 UTC (permalink / raw)
  To: dev; +Cc: maxime.coquelin, chenbo.xia, cheng1.jiang, yinan.wang, jiayu.hu

This patch removes unnecessary check and function calls, and it changes
appropriate types for internal variables and fixes typos.

Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
---
 lib/librte_vhost/rte_vhost_async.h |  6 +++---
 lib/librte_vhost/virtio_net.c      | 16 ++++++++--------
 2 files changed, 11 insertions(+), 11 deletions(-)

diff --git a/lib/librte_vhost/rte_vhost_async.h b/lib/librte_vhost/rte_vhost_async.h
index c73bd7c..3be4ee4 100644
--- a/lib/librte_vhost/rte_vhost_async.h
+++ b/lib/librte_vhost/rte_vhost_async.h
@@ -147,8 +147,8 @@ __rte_experimental
 int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id);
 
 /**
- * This function submit enqueue data to async engine. This function has
- * no guranttee to the transfer completion upon return. Applications
+ * This function submits enqueue data to async engine. This function has
+ * no guarantee to the transfer completion upon return. Applications
  * should poll transfer status by rte_vhost_poll_enqueue_completed()
  *
  * @param vid
@@ -167,7 +167,7 @@ uint16_t rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
 		struct rte_mbuf **pkts, uint16_t count);
 
 /**
- * This function check async completion status for a specific vhost
+ * This function checks async completion status for a specific vhost
  * device queue. Packets which finish copying (enqueue) operation
  * will be returned in an array.
  *
diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index 6c51286..fc654be 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -1128,8 +1128,11 @@ async_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	}
 
 out:
-	async_fill_iter(src_it, tlen, src_iovec, tvec_idx);
-	async_fill_iter(dst_it, tlen, dst_iovec, tvec_idx);
+	if (tlen) {
+		async_fill_iter(src_it, tlen, src_iovec, tvec_idx);
+		async_fill_iter(dst_it, tlen, dst_iovec, tvec_idx);
+	} else
+		src_it->count = 0;
 
 	return error;
 }
@@ -1492,10 +1495,9 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	struct rte_vhost_iov_iter *src_it = it_pool;
 	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
 	uint16_t n_free_slot, slot_idx = 0;
-	uint16_t pkt_err = 0;
 	uint16_t segs_await = 0;
 	struct async_inflight_info *pkts_info = vq->async_pkts_info;
-	int n_pkts = 0;
+	uint32_t n_pkts = 0, pkt_err = 0;
 
 	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
 
@@ -1553,11 +1555,9 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 		/*
 		 * conditions to trigger async device transfer:
 		 * - buffered packet number reaches transfer threshold
-		 * - this is the last packet in the burst enqueue
 		 * - unused async iov number is less than max vhost vector
 		 */
 		if (pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
-			(pkt_idx == count - 1 && pkt_burst_idx) ||
 			(VHOST_MAX_ASYNC_VEC / 2 - segs_await <
 			BUF_VECTOR_MAX)) {
 			n_pkts = vq->async_ops.transfer_data(dev->vid,
@@ -1569,7 +1569,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 			segs_await = 0;
 			vq->async_pkts_inflight_n += pkt_burst_idx;
 
-			if (unlikely(n_pkts < (int)pkt_burst_idx)) {
+			if (unlikely(n_pkts < pkt_burst_idx)) {
 				/*
 				 * log error packets number here and do actual
 				 * error processing when applications poll
@@ -1589,7 +1589,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 				queue_id, tdes, 0, pkt_burst_idx);
 		vq->async_pkts_inflight_n += pkt_burst_idx;
 
-		if (unlikely(n_pkts < (int)pkt_burst_idx))
+		if (unlikely(n_pkts < pkt_burst_idx))
 			pkt_err = pkt_burst_idx - n_pkts;
 	}
 
-- 
2.7.4


^ permalink raw reply	[flat|nested] 18+ messages in thread

* [dpdk-dev] [PATCH v3 2/2] vhost: enhance async enqueue for small packets
  2020-12-25  8:28   ` [dpdk-dev] [PATCH v3 0/2] Enhance Async Enqueue for Small Packets Jiayu Hu
  2020-12-25  8:28     ` [dpdk-dev] [PATCH v3 1/2] vhost: cleanup async enqueue Jiayu Hu
@ 2020-12-25  8:28     ` Jiayu Hu
  2021-01-05 11:41     ` [dpdk-dev] [PATCH v3 0/2] Enhance Async Enqueue for Small Packets Wang, Yinan
                       ` (2 subsequent siblings)
  4 siblings, 0 replies; 18+ messages in thread
From: Jiayu Hu @ 2020-12-25  8:28 UTC (permalink / raw)
  To: dev; +Cc: maxime.coquelin, chenbo.xia, cheng1.jiang, yinan.wang, jiayu.hu

Async enqueue offloads large copies to DMA devices, and small copies
are still performed by the CPU. However, it requires users to get
enqueue completed packets by rte_vhost_poll_enqueue_completed(), even
if they are completed by the CPU when rte_vhost_submit_enqueue_burst()
returns. This design incurs extra overheads of tracking completed
pktmbufs and function calls, thus degrading performance on small packets.

This patch enhances async enqueue for small packets by enabling
rte_vhost_submit_enqueue_burst() to return completed packets.

Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
---
 doc/guides/prog_guide/vhost_lib.rst |   8 +-
 lib/librte_vhost/rte_vhost_async.h  |  30 +++--
 lib/librte_vhost/vhost.c            |  14 +--
 lib/librte_vhost/vhost.h            |   7 +-
 lib/librte_vhost/vhost_user.c       |   7 +-
 lib/librte_vhost/virtio_net.c       | 242 ++++++++++++++++++++----------------
 6 files changed, 176 insertions(+), 132 deletions(-)

diff --git a/doc/guides/prog_guide/vhost_lib.rst b/doc/guides/prog_guide/vhost_lib.rst
index ba4c62a..dc29229 100644
--- a/doc/guides/prog_guide/vhost_lib.rst
+++ b/doc/guides/prog_guide/vhost_lib.rst
@@ -245,11 +245,13 @@ The following is an overview of some key Vhost API functions:
 
   Unregister the async copy device channel from a vhost queue.
 
-* ``rte_vhost_submit_enqueue_burst(vid, queue_id, pkts, count)``
+* ``rte_vhost_submit_enqueue_burst(vid, queue_id, pkts, count, comp_pkts, comp_count)``
 
   Submit an enqueue request to transmit ``count`` packets from host to guest
-  by async data path. Enqueue is not guaranteed to finish upon the return of
-  this API call.
+  by async data path. Successfully enqueued packets can be transfer completed
+  or being occupied by DMA engines; transfer completed packets are returned in
+  ``comp_pkts``, but others are not guaranteed to finish, when this API
+  call returns.
 
   Applications must not free the packets submitted for enqueue until the
   packets are completed.
diff --git a/lib/librte_vhost/rte_vhost_async.h b/lib/librte_vhost/rte_vhost_async.h
index 3be4ee4..59fe6f9 100644
--- a/lib/librte_vhost/rte_vhost_async.h
+++ b/lib/librte_vhost/rte_vhost_async.h
@@ -87,13 +87,8 @@ struct rte_vhost_async_channel_ops {
  * inflight async packet information
  */
 struct async_inflight_info {
-	union {
-		uint32_t info;
-		struct {
-			uint16_t descs; /* num of descs inflight */
-			uint16_t segs; /* iov segs inflight */
-		};
-	};
+	struct rte_mbuf *mbuf;
+	uint16_t descs; /* num of descs inflight */
 };
 
 /**
@@ -147,9 +142,13 @@ __rte_experimental
 int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id);
 
 /**
- * This function submits enqueue data to async engine. This function has
- * no guarantee to the transfer completion upon return. Applications
- * should poll transfer status by rte_vhost_poll_enqueue_completed()
+ * This function submits enqueue data to async engine. Successfully
+ * enqueued packets can be transfer completed or being occupied by DMA
+ * engines, when this API returns. Transfer completed packets are returned
+ * in comp_pkts, so users need to guarantee its size is greater than or
+ * equal to the size of pkts; for packets that are successfully enqueued
+ * but not transfer completed, users should poll transfer status by
+ * rte_vhost_poll_enqueue_completed().
  *
  * @param vid
  *  id of vhost device to enqueue data
@@ -159,12 +158,19 @@ int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id);
  *  array of packets to be enqueued
  * @param count
  *  packets num to be enqueued
+ * @param comp_pkts
+ *  empty array to get transfer completed packets. Users need to
+ *  guarantee its size is greater than or equal to that of pkts
+ * @param comp_count
+ *  num of packets that are transfer completed, when this API returns.
+ *  If no packets are transfer completed, its value is set to 0.
  * @return
- *  num of packets enqueued
+ *  num of packets enqueued, including in-flight and transfer completed
  */
 __rte_experimental
 uint16_t rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
-		struct rte_mbuf **pkts, uint16_t count);
+		struct rte_mbuf **pkts, uint16_t count,
+		struct rte_mbuf **comp_pkts, uint32_t *comp_count);
 
 /**
  * This function checks async completion status for a specific vhost
diff --git a/lib/librte_vhost/vhost.c b/lib/librte_vhost/vhost.c
index b83cf63..47e378b 100644
--- a/lib/librte_vhost/vhost.c
+++ b/lib/librte_vhost/vhost.c
@@ -327,17 +327,17 @@ cleanup_device(struct virtio_net *dev, int destroy)
 static void
 vhost_free_async_mem(struct vhost_virtqueue *vq)
 {
-	if (vq->async_pkts_pending)
-		rte_free(vq->async_pkts_pending);
 	if (vq->async_pkts_info)
 		rte_free(vq->async_pkts_info);
+	if (vq->async_descs_split)
+		rte_free(vq->async_descs_split);
 	if (vq->it_pool)
 		rte_free(vq->it_pool);
 	if (vq->vec_pool)
 		rte_free(vq->vec_pool);
 
-	vq->async_pkts_pending = NULL;
 	vq->async_pkts_info = NULL;
+	vq->async_descs_split = NULL;
 	vq->it_pool = NULL;
 	vq->vec_pool = NULL;
 }
@@ -1628,9 +1628,6 @@ int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
 	node = SOCKET_ID_ANY;
 #endif
 
-	vq->async_pkts_pending = rte_malloc_socket(NULL,
-			vq->size * sizeof(uintptr_t),
-			RTE_CACHE_LINE_SIZE, node);
 	vq->async_pkts_info = rte_malloc_socket(NULL,
 			vq->size * sizeof(struct async_inflight_info),
 			RTE_CACHE_LINE_SIZE, node);
@@ -1640,7 +1637,10 @@ int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
 	vq->vec_pool = rte_malloc_socket(NULL,
 			VHOST_MAX_ASYNC_VEC * sizeof(struct iovec),
 			RTE_CACHE_LINE_SIZE, node);
-	if (!vq->async_pkts_pending || !vq->async_pkts_info ||
+	vq->async_descs_split = rte_malloc_socket(NULL,
+			vq->size * sizeof(struct vring_used_elem),
+			RTE_CACHE_LINE_SIZE, node);
+	if (!vq->async_descs_split || !vq->async_pkts_info ||
 		!vq->it_pool || !vq->vec_pool) {
 		vhost_free_async_mem(vq);
 		VHOST_LOG_CONFIG(ERR,
diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
index 361c9f7..d2076b4 100644
--- a/lib/librte_vhost/vhost.h
+++ b/lib/librte_vhost/vhost.h
@@ -202,11 +202,13 @@ struct vhost_virtqueue {
 	struct iovec *vec_pool;
 
 	/* async data transfer status */
-	uintptr_t	**async_pkts_pending;
 	struct async_inflight_info *async_pkts_info;
 	uint16_t	async_pkts_idx;
 	uint16_t	async_pkts_inflight_n;
 	uint16_t	async_last_pkts_n;
+	struct vring_used_elem  *async_descs_split;
+	uint16_t async_desc_idx;
+	uint16_t last_async_desc_idx;
 
 	/* vq async features */
 	bool		async_inorder;
@@ -733,8 +735,7 @@ vhost_vring_call_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
 	/* Don't kick guest if we don't reach index specified by guest. */
 	if (dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) {
 		uint16_t old = vq->signalled_used;
-		uint16_t new = vq->async_pkts_inflight_n ?
-					vq->used->idx:vq->last_used_idx;
+		uint16_t new = vq->last_used_idx;
 		bool signalled_used_valid = vq->signalled_used_valid;
 
 		vq->signalled_used = new;
diff --git a/lib/librte_vhost/vhost_user.c b/lib/librte_vhost/vhost_user.c
index 45c8ac0..2b00249 100644
--- a/lib/librte_vhost/vhost_user.c
+++ b/lib/librte_vhost/vhost_user.c
@@ -1967,12 +1967,13 @@ vhost_user_get_vring_base(struct virtio_net **pdev,
 	} else {
 		rte_free(vq->shadow_used_split);
 		vq->shadow_used_split = NULL;
-		if (vq->async_pkts_pending)
-			rte_free(vq->async_pkts_pending);
+
 		if (vq->async_pkts_info)
 			rte_free(vq->async_pkts_info);
-		vq->async_pkts_pending = NULL;
+		if (vq->async_descs_split)
+			rte_free(vq->async_descs_split);
 		vq->async_pkts_info = NULL;
+		vq->async_descs_split = NULL;
 	}
 
 	rte_free(vq->batch_copy_elems);
diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index fc654be..e28e8cc 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -118,31 +118,6 @@ flush_shadow_used_ring_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
 }
 
 static __rte_always_inline void
-async_flush_shadow_used_ring_split(struct virtio_net *dev,
-	struct vhost_virtqueue *vq)
-{
-	uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
-
-	if (used_idx + vq->shadow_used_idx <= vq->size) {
-		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0,
-					  vq->shadow_used_idx);
-	} else {
-		uint16_t size;
-
-		/* update used ring interval [used_idx, vq->size] */
-		size = vq->size - used_idx;
-		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0, size);
-
-		/* update the left half used ring interval [0, left_size] */
-		do_flush_shadow_used_ring_split(dev, vq, 0, size,
-					  vq->shadow_used_idx - size);
-	}
-
-	vq->last_used_idx += vq->shadow_used_idx;
-	vq->shadow_used_idx = 0;
-}
-
-static __rte_always_inline void
 update_shadow_used_ring_split(struct vhost_virtqueue *vq,
 			 uint16_t desc_idx, uint32_t len)
 {
@@ -1480,7 +1455,8 @@ virtio_dev_rx_async_get_info_idx(uint16_t pkts_idx,
 static __rte_noinline uint32_t
 virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	struct vhost_virtqueue *vq, uint16_t queue_id,
-	struct rte_mbuf **pkts, uint32_t count)
+	struct rte_mbuf **pkts, uint32_t count,
+	struct rte_mbuf **comp_pkts, uint32_t *comp_count)
 {
 	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
 	uint16_t num_buffers;
@@ -1494,10 +1470,15 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
 	struct rte_vhost_iov_iter *src_it = it_pool;
 	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
-	uint16_t n_free_slot, slot_idx = 0;
+	uint16_t slot_idx = 0;
 	uint16_t segs_await = 0;
 	struct async_inflight_info *pkts_info = vq->async_pkts_info;
 	uint32_t n_pkts = 0, pkt_err = 0;
+	uint32_t num_async_pkts = 0, num_done_pkts = 0;
+	struct {
+		uint16_t pkt_idx;
+		uint16_t last_avail_idx;
+	} async_pkts_log[MAX_PKT_BURST];
 
 	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
 
@@ -1534,21 +1515,50 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 			break;
 		}
 
-		slot_idx = (vq->async_pkts_idx + pkt_idx) & (vq->size - 1);
+		slot_idx = (vq->async_pkts_idx + num_async_pkts) &
+			(vq->size - 1);
 		if (src_it->count) {
-			async_fill_desc(&tdes[pkt_burst_idx], src_it, dst_it);
-			pkt_burst_idx++;
+			uint16_t from, to;
+
+			async_fill_desc(&tdes[pkt_burst_idx++], src_it, dst_it);
 			pkts_info[slot_idx].descs = num_buffers;
-			pkts_info[slot_idx].segs = src_it->nr_segs;
+			pkts_info[slot_idx].mbuf = pkts[pkt_idx];
+			async_pkts_log[num_async_pkts].pkt_idx = pkt_idx;
+			async_pkts_log[num_async_pkts++].last_avail_idx =
+				vq->last_avail_idx;
 			src_iovec += src_it->nr_segs;
 			dst_iovec += dst_it->nr_segs;
 			src_it += 2;
 			dst_it += 2;
 			segs_await += src_it->nr_segs;
-		} else {
-			pkts_info[slot_idx].info = num_buffers;
-			vq->async_pkts_inflight_n++;
-		}
+
+			/**
+			 * recover shadow used ring and keep DMA-occupied
+			 * descriptors.
+			 */
+			from = vq->shadow_used_idx - num_buffers;
+			to = vq->async_desc_idx & (vq->size - 1);
+			if (num_buffers + to <= vq->size) {
+				rte_memcpy(&vq->async_descs_split[to],
+						&vq->shadow_used_split[from],
+						num_buffers *
+						sizeof(struct vring_used_elem));
+			} else {
+				int size = vq->size - to;
+
+				rte_memcpy(&vq->async_descs_split[to],
+						&vq->shadow_used_split[from],
+						size *
+						sizeof(struct vring_used_elem));
+				rte_memcpy(vq->async_descs_split,
+						&vq->shadow_used_split[from +
+						size], (num_buffers - size) *
+					   sizeof(struct vring_used_elem));
+			}
+			vq->async_desc_idx += num_buffers;
+			vq->shadow_used_idx -= num_buffers;
+		} else
+			comp_pkts[num_done_pkts++] = pkts[pkt_idx];
 
 		vq->last_avail_idx += num_buffers;
 
@@ -1557,9 +1567,9 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 		 * - buffered packet number reaches transfer threshold
 		 * - unused async iov number is less than max vhost vector
 		 */
-		if (pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
-			(VHOST_MAX_ASYNC_VEC / 2 - segs_await <
-			BUF_VECTOR_MAX)) {
+		if (unlikely(pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
+			((VHOST_MAX_ASYNC_VEC >> 1) - segs_await <
+			BUF_VECTOR_MAX))) {
 			n_pkts = vq->async_ops.transfer_data(dev->vid,
 					queue_id, tdes, 0, pkt_burst_idx);
 			src_iovec = vec_pool;
@@ -1567,7 +1577,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 			src_it = it_pool;
 			dst_it = it_pool + 1;
 			segs_await = 0;
-			vq->async_pkts_inflight_n += pkt_burst_idx;
+			vq->async_pkts_inflight_n += n_pkts;
 
 			if (unlikely(n_pkts < pkt_burst_idx)) {
 				/*
@@ -1587,7 +1597,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	if (pkt_burst_idx) {
 		n_pkts = vq->async_ops.transfer_data(dev->vid,
 				queue_id, tdes, 0, pkt_burst_idx);
-		vq->async_pkts_inflight_n += pkt_burst_idx;
+		vq->async_pkts_inflight_n += n_pkts;
 
 		if (unlikely(n_pkts < pkt_burst_idx))
 			pkt_err = pkt_burst_idx - n_pkts;
@@ -1595,32 +1605,33 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 
 	do_data_copy_enqueue(dev, vq);
 
-	while (unlikely(pkt_err && pkt_idx)) {
-		if (pkts_info[slot_idx].segs)
-			pkt_err--;
-		vq->last_avail_idx -= pkts_info[slot_idx].descs;
-		vq->shadow_used_idx -= pkts_info[slot_idx].descs;
-		vq->async_pkts_inflight_n--;
-		slot_idx = (slot_idx - 1) & (vq->size - 1);
-		pkt_idx--;
-	}
-
-	n_free_slot = vq->size - vq->async_pkts_idx;
-	if (n_free_slot > pkt_idx) {
-		rte_memcpy(&vq->async_pkts_pending[vq->async_pkts_idx],
-			pkts, pkt_idx * sizeof(uintptr_t));
-		vq->async_pkts_idx += pkt_idx;
-	} else {
-		rte_memcpy(&vq->async_pkts_pending[vq->async_pkts_idx],
-			pkts, n_free_slot * sizeof(uintptr_t));
-		rte_memcpy(&vq->async_pkts_pending[0],
-			&pkts[n_free_slot],
-			(pkt_idx - n_free_slot) * sizeof(uintptr_t));
-		vq->async_pkts_idx = pkt_idx - n_free_slot;
+	if (unlikely(pkt_err)) {
+		uint16_t num_descs = 0;
+
+		num_async_pkts -= pkt_err;
+		/* calculate the sum fo descriptors of DMA-error packets. */
+		while (pkt_err-- > 0) {
+			num_descs += pkts_info[slot_idx & (vq->size - 1)].descs;
+			slot_idx--;
+		}
+		vq->async_desc_idx -= num_descs;
+		/* recover shadow used ring and available ring */
+		vq->shadow_used_idx -= (vq->last_avail_idx -
+				async_pkts_log[num_async_pkts].last_avail_idx -
+				num_descs);
+		vq->last_avail_idx =
+			async_pkts_log[num_async_pkts].last_avail_idx;
+		pkt_idx = async_pkts_log[num_async_pkts].pkt_idx;
+		num_done_pkts = pkt_idx - num_async_pkts;
 	}
 
-	if (likely(vq->shadow_used_idx))
-		async_flush_shadow_used_ring_split(dev, vq);
+	vq->async_pkts_idx += num_async_pkts;
+	*comp_count = num_done_pkts;
+
+	if (likely(vq->shadow_used_idx)) {
+		flush_shadow_used_ring_split(dev, vq);
+		vhost_vring_call_split(dev, vq);
+	}
 
 	return pkt_idx;
 }
@@ -1632,8 +1643,8 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 	struct vhost_virtqueue *vq;
 	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0;
 	uint16_t start_idx, pkts_idx, vq_size;
-	uint16_t n_inflight;
 	struct async_inflight_info *pkts_info;
+	uint16_t from, i;
 
 	if (!dev)
 		return 0;
@@ -1655,8 +1666,7 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 
 	rte_spinlock_lock(&vq->access_lock);
 
-	n_inflight = vq->async_pkts_inflight_n;
-	pkts_idx = vq->async_pkts_idx;
+	pkts_idx = vq->async_pkts_idx & (vq->size - 1);
 	pkts_info = vq->async_pkts_info;
 	vq_size = vq->size;
 	start_idx = virtio_dev_rx_async_get_info_idx(pkts_idx,
@@ -1667,42 +1677,61 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 			queue_id, 0, count - vq->async_last_pkts_n);
 	n_pkts_cpl += vq->async_last_pkts_n;
 
-	rte_smp_wmb();
-
-	while (likely((n_pkts_put < count) && n_inflight)) {
-		uint16_t info_idx = (start_idx + n_pkts_put) & (vq_size - 1);
-		if (n_pkts_cpl && pkts_info[info_idx].segs)
-			n_pkts_cpl--;
-		else if (!n_pkts_cpl && pkts_info[info_idx].segs)
-			break;
-		n_pkts_put++;
-		n_inflight--;
-		n_descs += pkts_info[info_idx].descs;
-	}
-
-	vq->async_last_pkts_n = n_pkts_cpl;
+	n_pkts_put = RTE_MIN(count, n_pkts_cpl);
+	if (unlikely(n_pkts_put == 0)) {
+		vq->async_last_pkts_n = n_pkts_cpl;
+		goto done;
+	}
+
+	for (i = 0; i < n_pkts_put; i++) {
+		from = (start_idx + i) & (vq_size - 1);
+		n_descs += pkts_info[from].descs;
+		pkts[i] = pkts_info[from].mbuf;
+	}
+	vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put;
+	vq->async_pkts_inflight_n -= n_pkts_put;
+
+	if (likely(vq->enabled && vq->access_ok)) {
+		uint16_t nr_left = n_descs;
+		uint16_t nr_copy;
+		uint16_t to;
+
+		/* write back completed descriptors to used ring */
+		do {
+			from = vq->last_async_desc_idx & (vq->size - 1);
+			nr_copy = nr_left + from <= vq->size ? nr_left :
+				vq->size - from;
+			to = vq->last_used_idx & (vq->size - 1);
+
+			if (to + nr_copy <= vq->size) {
+				rte_memcpy(&vq->used->ring[to],
+						&vq->async_descs_split[from],
+						nr_copy *
+						sizeof(struct vring_used_elem));
+			} else {
+				uint16_t size = vq->size - to;
+
+				rte_memcpy(&vq->used->ring[to],
+						&vq->async_descs_split[from],
+						size *
+						sizeof(struct vring_used_elem));
+				rte_memcpy(vq->used->ring,
+						&vq->async_descs_split[from +
+						size], (nr_copy - size) *
+						sizeof(struct vring_used_elem));
+			}
 
-	if (n_pkts_put) {
-		vq->async_pkts_inflight_n = n_inflight;
-		if (likely(vq->enabled && vq->access_ok)) {
-			__atomic_add_fetch(&vq->used->idx,
-					n_descs, __ATOMIC_RELEASE);
-			vhost_vring_call_split(dev, vq);
-		}
+			vq->last_async_desc_idx += nr_copy;
+			vq->last_used_idx += nr_copy;
+			nr_left -= nr_copy;
+		} while (nr_left > 0);
 
-		if (start_idx + n_pkts_put <= vq_size) {
-			rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
-				n_pkts_put * sizeof(uintptr_t));
-		} else {
-			rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
-				(vq_size - start_idx) * sizeof(uintptr_t));
-			rte_memcpy(&pkts[vq_size - start_idx],
-				vq->async_pkts_pending,
-				(n_pkts_put + start_idx - vq_size) *
-				sizeof(uintptr_t));
-		}
-	}
+		__atomic_add_fetch(&vq->used->idx, n_descs, __ATOMIC_RELEASE);
+		vhost_vring_call_split(dev, vq);
+	} else
+		vq->last_async_desc_idx += n_descs;
 
+done:
 	rte_spinlock_unlock(&vq->access_lock);
 
 	return n_pkts_put;
@@ -1710,7 +1739,8 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 
 static __rte_always_inline uint32_t
 virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
-	struct rte_mbuf **pkts, uint32_t count)
+	struct rte_mbuf **pkts, uint32_t count,
+	struct rte_mbuf **comp_pkts, uint32_t *comp_count)
 {
 	struct vhost_virtqueue *vq;
 	uint32_t nb_tx = 0;
@@ -1745,7 +1775,8 @@ virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
 		nb_tx = 0;
 	else
 		nb_tx = virtio_dev_rx_async_submit_split(dev,
-				vq, queue_id, pkts, count);
+				vq, queue_id, pkts, count, comp_pkts,
+				comp_count);
 
 out:
 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
@@ -1759,10 +1790,12 @@ virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
 
 uint16_t
 rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
-		struct rte_mbuf **pkts, uint16_t count)
+		struct rte_mbuf **pkts, uint16_t count,
+		struct rte_mbuf **comp_pkts, uint32_t *comp_count)
 {
 	struct virtio_net *dev = get_device(vid);
 
+	*comp_count = 0;
 	if (!dev)
 		return 0;
 
@@ -1773,7 +1806,8 @@ rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
 		return 0;
 	}
 
-	return virtio_dev_rx_async_submit(dev, queue_id, pkts, count);
+	return virtio_dev_rx_async_submit(dev, queue_id, pkts, count, comp_pkts,
+			comp_count);
 }
 
 static inline bool
-- 
2.7.4


^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [dpdk-dev] [PATCH v3 0/2] Enhance Async Enqueue for Small Packets
  2020-12-25  8:28   ` [dpdk-dev] [PATCH v3 0/2] Enhance Async Enqueue for Small Packets Jiayu Hu
  2020-12-25  8:28     ` [dpdk-dev] [PATCH v3 1/2] vhost: cleanup async enqueue Jiayu Hu
  2020-12-25  8:28     ` [dpdk-dev] [PATCH v3 2/2] vhost: enhance async enqueue for small packets Jiayu Hu
@ 2021-01-05 11:41     ` Wang, Yinan
  2021-01-07 10:45     ` Maxime Coquelin
  2021-01-11 12:16     ` [dpdk-dev] [PATCH v4 " Jiayu Hu
  4 siblings, 0 replies; 18+ messages in thread
From: Wang, Yinan @ 2021-01-05 11:41 UTC (permalink / raw)
  To: Hu, Jiayu, dev; +Cc: maxime.coquelin, Xia, Chenbo, Jiang, Cheng1

Tested-by: Wang, Yinan <yinan.wang@intel.com>

> -----Original Message-----
> From: Hu, Jiayu <jiayu.hu@intel.com>
> Sent: 2020?12?25? 16:29
> To: dev@dpdk.org
> Cc: maxime.coquelin@redhat.com; Xia, Chenbo <chenbo.xia@intel.com>;
> Jiang, Cheng1 <cheng1.jiang@intel.com>; Wang, Yinan
> <yinan.wang@intel.com>; Hu, Jiayu <jiayu.hu@intel.com>
> Subject: [PATCH v3 0/2] Enhance Async Enqueue for Small Packets
> 
> Async enqueue offloads large copies to DMA devices, and small copies
> are still performed by the CPU. However, it requires users to get
> enqueue completed packets by rte_vhost_poll_enqueue_completed(), even
> if they are completed by the CPU when rte_vhost_submit_enqueue_burst()
> returns. This design incurs extra overheads of tracking completed
> pktmbufs and function calls, thus degrading performance on small packets.
> 
> The first patch cleans up async enqueue code, and the second patch
> enables rte_vhost_submit_enqueue_burst() to return completed packets.
> 
> Change log
> ==========
> v3:
> - fix incorrect ret value when DMA ring is full
> - enhance description of API declaration and programmer guide
> v2:
> - fix typo
> - rename API variables
> - update programmer guide
> 
> Jiayu Hu (2):
>   vhost: cleanup async enqueue
>   vhost: enhance async enqueue for small packets
> 
>  doc/guides/prog_guide/vhost_lib.rst |   8 +-
>  lib/librte_vhost/rte_vhost_async.h  |  32 +++--
>  lib/librte_vhost/vhost.c            |  14 +-
>  lib/librte_vhost/vhost.h            |   7 +-
>  lib/librte_vhost/vhost_user.c       |   7 +-
>  lib/librte_vhost/virtio_net.c       | 258 ++++++++++++++++++++----------------
>  6 files changed, 185 insertions(+), 141 deletions(-)
> 
> --
> 2.7.4


^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [dpdk-dev] [PATCH v3 0/2] Enhance Async Enqueue for Small Packets
  2020-12-25  8:28   ` [dpdk-dev] [PATCH v3 0/2] Enhance Async Enqueue for Small Packets Jiayu Hu
                       ` (2 preceding siblings ...)
  2021-01-05 11:41     ` [dpdk-dev] [PATCH v3 0/2] Enhance Async Enqueue for Small Packets Wang, Yinan
@ 2021-01-07 10:45     ` Maxime Coquelin
  2021-01-11 12:16     ` [dpdk-dev] [PATCH v4 " Jiayu Hu
  4 siblings, 0 replies; 18+ messages in thread
From: Maxime Coquelin @ 2021-01-07 10:45 UTC (permalink / raw)
  To: Jiayu Hu, dev; +Cc: chenbo.xia, cheng1.jiang, yinan.wang



On 12/25/20 9:28 AM, Jiayu Hu wrote:
> Async enqueue offloads large copies to DMA devices, and small copies
> are still performed by the CPU. However, it requires users to get
> enqueue completed packets by rte_vhost_poll_enqueue_completed(), even
> if they are completed by the CPU when rte_vhost_submit_enqueue_burst()
> returns. This design incurs extra overheads of tracking completed
> pktmbufs and function calls, thus degrading performance on small packets.
> 
> The first patch cleans up async enqueue code, and the second patch
> enables rte_vhost_submit_enqueue_burst() to return completed packets.
> 
> Change log
> ==========
> v3:
> - fix incorrect ret value when DMA ring is full
> - enhance description of API declaration and programmer guide
> v2:
> - fix typo
> - rename API variables
> - update programmer guide
> 
> Jiayu Hu (2):
>   vhost: cleanup async enqueue
>   vhost: enhance async enqueue for small packets
> 
>  doc/guides/prog_guide/vhost_lib.rst |   8 +-
>  lib/librte_vhost/rte_vhost_async.h  |  32 +++--
>  lib/librte_vhost/vhost.c            |  14 +-
>  lib/librte_vhost/vhost.h            |   7 +-
>  lib/librte_vhost/vhost_user.c       |   7 +-
>  lib/librte_vhost/virtio_net.c       | 258 ++++++++++++++++++++----------------
>  6 files changed, 185 insertions(+), 141 deletions(-)
> 

CI reports build failure with your series, because API changes are not
done in the examples:

FAILED: examples/dpdk-vhost.p/vhost_main.c.o
cc -Iexamples/dpdk-vhost.p -Iexamples -I../examples -Iexamples/vhost
-I../examples/vhost -I. -I.. -Iconfig -I../config
-Ilib/librte_eal/include -I../lib/librte_eal/include
-Ilib/librte_eal/linux/include -I../lib/librte_eal/linux/include
-Ilib/librte_eal/x86/include -I../lib/librte_eal/x86/include
-Ilib/librte_eal/common -I../lib/librte_eal/common -Ilib/librte_eal
-I../lib/librte_eal -Ilib/librte_kvargs -I../lib/librte_kvargs
-Ilib/librte_metrics -I../lib/librte_metrics -Ilib/librte_telemetry
-I../lib/librte_telemetry -Ilib/librte_mempool -I../lib/librte_mempool
-Ilib/librte_ring -I../lib/librte_ring -Ilib/librte_net
-I../lib/librte_net -Ilib/librte_mbuf -I../lib/librte_mbuf
-Ilib/librte_ethdev -I../lib/librte_ethdev -Ilib/librte_meter
-I../lib/librte_meter -Ilib/librte_cmdline -I../lib/librte_cmdline
-Ilib/librte_vhost -I../lib/librte_vhost -Ilib/librte_cryptodev
-I../lib/librte_cryptodev -Ilib/librte_hash -I../lib/librte_hash
-Ilib/librte_rcu -I../lib/librte_rcu -Ilib/librte_pci
-I../lib/librte_pci -Idrivers/raw/ioat -I../drivers/raw/ioat
-Ilib/librte_rawdev -I../lib/librte_rawdev -Idrivers/bus/pci
-I../drivers/bus/pci -I../drivers/bus/pci/linux -Idrivers/bus/vdev
-I../drivers/bus/vdev -fdiagnostics-color=always -pipe
-D_FILE_OFFSET_BITS=64 -Wall -Winvalid-pch -Werror -O3 -include
rte_config.h -Wextra -Wcast-qual -Wdeprecated -Wformat
-Wformat-nonliteral -Wformat-security -Wmissing-declarations
-Wmissing-prototypes -Wnested-externs -Wold-style-definition
-Wpointer-arith -Wsign-compare -Wstrict-prototypes -Wundef
-Wwrite-strings -Wno-missing-field-initializers -D_GNU_SOURCE
-march=native -Wno-format-truncation -DALLOW_EXPERIMENTAL_API -MD -MQ
examples/dpdk-vhost.p/vhost_main.c.o -MF
examples/dpdk-vhost.p/vhost_main.c.o.d -o
examples/dpdk-vhost.p/vhost_main.c.o -c ../examples/vhost/main.c
../examples/vhost/main.c: In function 'virtio_xmit':
../examples/vhost/main.c:817:9: error: too few arguments to function
'rte_vhost_submit_enqueue_burst'
   ret = rte_vhost_submit_enqueue_burst(dst_vdev->vid, VIRTIO_RXQ,
         ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
In file included from ../examples/vhost/ioat.h:10:0,
                 from ../examples/vhost/main.c:28:
../lib/librte_vhost/rte_vhost_async.h:171:10: note: declared here
 uint16_t rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
          ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
../examples/vhost/main.c: In function 'drain_eth_rx':
../examples/vhost/main.c:1126:19: error: too few arguments to function
'rte_vhost_submit_enqueue_burst'
   enqueue_count = rte_vhost_submit_enqueue_burst(vdev->vid,
                   ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
In file included from ../examples/vhost/ioat.h:10:0,
                 from ../examples/vhost/main.c:28:
../lib/librte_vhost/rte_vhost_async.h:171:10: note: declared here
 uint16_t rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
          ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~


^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [dpdk-dev] [PATCH v4 1/2] vhost: cleanup async enqueue
  2021-01-11 12:16       ` [dpdk-dev] [PATCH v4 1/2] vhost: cleanup async enqueue Jiayu Hu
@ 2021-01-11 11:04         ` Maxime Coquelin
  2021-01-11 14:04           ` Maxime Coquelin
  0 siblings, 1 reply; 18+ messages in thread
From: Maxime Coquelin @ 2021-01-11 11:04 UTC (permalink / raw)
  To: Jiayu Hu, dev; +Cc: chenbo.xia, cheng1.jiang, yinan.wang



On 1/11/21 1:16 PM, Jiayu Hu wrote:
> This patch removes unnecessary check and function calls, and it changes
> appropriate types for internal variables and fixes typos.
> 
> Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
> Tested-by: Yinan Wang <yinan.wang@intel.com>
> ---
>  lib/librte_vhost/rte_vhost_async.h |  8 ++++----
>  lib/librte_vhost/virtio_net.c      | 16 ++++++++--------
>  2 files changed, 12 insertions(+), 12 deletions(-)
> 
> diff --git a/lib/librte_vhost/rte_vhost_async.h b/lib/librte_vhost/rte_vhost_async.h
> index c73bd7c..03bd558 100644
> --- a/lib/librte_vhost/rte_vhost_async.h
> +++ b/lib/librte_vhost/rte_vhost_async.h
> @@ -112,7 +112,7 @@ struct rte_vhost_async_features {
>  };
>  
>  /**
> - * register a async channel for vhost
> + * register an async channel for vhost
>   *
>   * @param vid
>   *  vhost device id async channel to be attached to
> @@ -147,8 +147,8 @@ __rte_experimental
>  int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id);
>  
>  /**
> - * This function submit enqueue data to async engine. This function has
> - * no guranttee to the transfer completion upon return. Applications
> + * This function submits enqueue data to async engine. This function has
> + * no guarantee to the transfer completion upon return. Applications
>   * should poll transfer status by rte_vhost_poll_enqueue_completed()
>   *
>   * @param vid
> @@ -167,7 +167,7 @@ uint16_t rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
>  		struct rte_mbuf **pkts, uint16_t count);
>  
>  /**
> - * This function check async completion status for a specific vhost
> + * This function checks async completion status for a specific vhost
>   * device queue. Packets which finish copying (enqueue) operation
>   * will be returned in an array.
>   *
> diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
> index fec08b2..0b63940 100644
> --- a/lib/librte_vhost/virtio_net.c
> +++ b/lib/librte_vhost/virtio_net.c
> @@ -1130,8 +1130,11 @@ async_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
>  	}
>  
>  out:
> -	async_fill_iter(src_it, tlen, src_iovec, tvec_idx);
> -	async_fill_iter(dst_it, tlen, dst_iovec, tvec_idx);
> +	if (tlen) {
> +		async_fill_iter(src_it, tlen, src_iovec, tvec_idx);
> +		async_fill_iter(dst_it, tlen, dst_iovec, tvec_idx);
> +	} else
> +		src_it->count = 0;

Minor comment, you need braces for the 'else' as there are braces for
the 'if'.


I will fix while applying.

Thanks,
Maxime


^ permalink raw reply	[flat|nested] 18+ messages in thread

* [dpdk-dev] [PATCH v4 0/2] Enhance Async Enqueue for Small Packets
  2020-12-25  8:28   ` [dpdk-dev] [PATCH v3 0/2] Enhance Async Enqueue for Small Packets Jiayu Hu
                       ` (3 preceding siblings ...)
  2021-01-07 10:45     ` Maxime Coquelin
@ 2021-01-11 12:16     ` Jiayu Hu
  2021-01-11 12:16       ` [dpdk-dev] [PATCH v4 1/2] vhost: cleanup async enqueue Jiayu Hu
                         ` (2 more replies)
  4 siblings, 3 replies; 18+ messages in thread
From: Jiayu Hu @ 2021-01-11 12:16 UTC (permalink / raw)
  To: dev; +Cc: maxime.coquelin, chenbo.xia, cheng1.jiang, yinan.wang, jiayu.hu

Async enqueue offloads large copies to DMA devices, and small copies
are still performed by the CPU. However, it requires users to get
enqueue completed packets by rte_vhost_poll_enqueue_completed(), even
if they are completed by the CPU when rte_vhost_submit_enqueue_burst()
returns. This design incurs extra overheads of tracking completed
pktmbufs and function calls, thus degrading performance on small packets.

The first patch cleans up async enqueue code, and the second patch
enables rte_vhost_submit_enqueue_burst() to return completed packets.

Change log
==========
v4:
- support new API in vhost example
v3:
- fix incorrect ret value when DMA ring is full
- enhance description of API declaration and programmer guide
v2:
- fix typo
- rename API variables
- update programmer guide

Jiayu Hu (2):
  vhost: cleanup async enqueue
  vhost: enhance async enqueue for small packets

 doc/guides/prog_guide/vhost_lib.rst |   8 +-
 examples/vhost/main.c               |  18 ++-
 lib/librte_vhost/rte_vhost_async.h  |  34 +++--
 lib/librte_vhost/vhost.c            |  14 +-
 lib/librte_vhost/vhost.h            |   7 +-
 lib/librte_vhost/vhost_user.c       |   7 +-
 lib/librte_vhost/virtio_net.c       | 258 ++++++++++++++++++++----------------
 7 files changed, 200 insertions(+), 146 deletions(-)

-- 
2.7.4


^ permalink raw reply	[flat|nested] 18+ messages in thread

* [dpdk-dev] [PATCH v4 1/2] vhost: cleanup async enqueue
  2021-01-11 12:16     ` [dpdk-dev] [PATCH v4 " Jiayu Hu
@ 2021-01-11 12:16       ` Jiayu Hu
  2021-01-11 11:04         ` Maxime Coquelin
  2021-01-11 12:16       ` [dpdk-dev] [PATCH v4 2/2] vhost: enhance async enqueue for small packets Jiayu Hu
  2021-01-11 15:02       ` [dpdk-dev] [PATCH v4 0/2] Enhance Async Enqueue for Small Packets Maxime Coquelin
  2 siblings, 1 reply; 18+ messages in thread
From: Jiayu Hu @ 2021-01-11 12:16 UTC (permalink / raw)
  To: dev; +Cc: maxime.coquelin, chenbo.xia, cheng1.jiang, yinan.wang, jiayu.hu

This patch removes unnecessary check and function calls, and it changes
appropriate types for internal variables and fixes typos.

Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
Tested-by: Yinan Wang <yinan.wang@intel.com>
---
 lib/librte_vhost/rte_vhost_async.h |  8 ++++----
 lib/librte_vhost/virtio_net.c      | 16 ++++++++--------
 2 files changed, 12 insertions(+), 12 deletions(-)

diff --git a/lib/librte_vhost/rte_vhost_async.h b/lib/librte_vhost/rte_vhost_async.h
index c73bd7c..03bd558 100644
--- a/lib/librte_vhost/rte_vhost_async.h
+++ b/lib/librte_vhost/rte_vhost_async.h
@@ -112,7 +112,7 @@ struct rte_vhost_async_features {
 };
 
 /**
- * register a async channel for vhost
+ * register an async channel for vhost
  *
  * @param vid
  *  vhost device id async channel to be attached to
@@ -147,8 +147,8 @@ __rte_experimental
 int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id);
 
 /**
- * This function submit enqueue data to async engine. This function has
- * no guranttee to the transfer completion upon return. Applications
+ * This function submits enqueue data to async engine. This function has
+ * no guarantee to the transfer completion upon return. Applications
  * should poll transfer status by rte_vhost_poll_enqueue_completed()
  *
  * @param vid
@@ -167,7 +167,7 @@ uint16_t rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
 		struct rte_mbuf **pkts, uint16_t count);
 
 /**
- * This function check async completion status for a specific vhost
+ * This function checks async completion status for a specific vhost
  * device queue. Packets which finish copying (enqueue) operation
  * will be returned in an array.
  *
diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index fec08b2..0b63940 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -1130,8 +1130,11 @@ async_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	}
 
 out:
-	async_fill_iter(src_it, tlen, src_iovec, tvec_idx);
-	async_fill_iter(dst_it, tlen, dst_iovec, tvec_idx);
+	if (tlen) {
+		async_fill_iter(src_it, tlen, src_iovec, tvec_idx);
+		async_fill_iter(dst_it, tlen, dst_iovec, tvec_idx);
+	} else
+		src_it->count = 0;
 
 	return error;
 }
@@ -1491,10 +1494,9 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	struct rte_vhost_iov_iter *src_it = it_pool;
 	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
 	uint16_t n_free_slot, slot_idx = 0;
-	uint16_t pkt_err = 0;
 	uint16_t segs_await = 0;
 	struct async_inflight_info *pkts_info = vq->async_pkts_info;
-	int n_pkts = 0;
+	uint32_t n_pkts = 0, pkt_err = 0;
 
 	/*
 	 * The ordering between avail index and desc reads need to be enforced.
@@ -1549,11 +1551,9 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 		/*
 		 * conditions to trigger async device transfer:
 		 * - buffered packet number reaches transfer threshold
-		 * - this is the last packet in the burst enqueue
 		 * - unused async iov number is less than max vhost vector
 		 */
 		if (pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
-			(pkt_idx == count - 1 && pkt_burst_idx) ||
 			(VHOST_MAX_ASYNC_VEC / 2 - segs_await <
 			BUF_VECTOR_MAX)) {
 			n_pkts = vq->async_ops.transfer_data(dev->vid,
@@ -1565,7 +1565,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 			segs_await = 0;
 			vq->async_pkts_inflight_n += pkt_burst_idx;
 
-			if (unlikely(n_pkts < (int)pkt_burst_idx)) {
+			if (unlikely(n_pkts < pkt_burst_idx)) {
 				/*
 				 * log error packets number here and do actual
 				 * error processing when applications poll
@@ -1585,7 +1585,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 				queue_id, tdes, 0, pkt_burst_idx);
 		vq->async_pkts_inflight_n += pkt_burst_idx;
 
-		if (unlikely(n_pkts < (int)pkt_burst_idx))
+		if (unlikely(n_pkts < pkt_burst_idx))
 			pkt_err = pkt_burst_idx - n_pkts;
 	}
 
-- 
2.7.4


^ permalink raw reply	[flat|nested] 18+ messages in thread

* [dpdk-dev] [PATCH v4 2/2] vhost: enhance async enqueue for small packets
  2021-01-11 12:16     ` [dpdk-dev] [PATCH v4 " Jiayu Hu
  2021-01-11 12:16       ` [dpdk-dev] [PATCH v4 1/2] vhost: cleanup async enqueue Jiayu Hu
@ 2021-01-11 12:16       ` Jiayu Hu
  2021-01-11 12:44         ` Maxime Coquelin
  2021-01-11 15:02       ` [dpdk-dev] [PATCH v4 0/2] Enhance Async Enqueue for Small Packets Maxime Coquelin
  2 siblings, 1 reply; 18+ messages in thread
From: Jiayu Hu @ 2021-01-11 12:16 UTC (permalink / raw)
  To: dev; +Cc: maxime.coquelin, chenbo.xia, cheng1.jiang, yinan.wang, jiayu.hu

Async enqueue offloads large copies to DMA devices, and small copies
are still performed by the CPU. However, it requires users to get
enqueue completed packets by rte_vhost_poll_enqueue_completed(), even
if they are completed by the CPU when rte_vhost_submit_enqueue_burst()
returns. This design incurs extra overheads of tracking completed
pktmbufs and function calls, thus degrading performance on small packets.

This patch enhances async enqueue for small packets by enabling
rte_vhost_submit_enqueue_burst() to return completed packets.

Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
Tested-by: Yinan Wang <yinan.wang@intel.com>
---
 doc/guides/prog_guide/vhost_lib.rst |   8 +-
 examples/vhost/main.c               |  18 ++-
 lib/librte_vhost/rte_vhost_async.h  |  30 +++--
 lib/librte_vhost/vhost.c            |  14 +--
 lib/librte_vhost/vhost.h            |   7 +-
 lib/librte_vhost/vhost_user.c       |   7 +-
 lib/librte_vhost/virtio_net.c       | 242 ++++++++++++++++++++----------------
 7 files changed, 190 insertions(+), 136 deletions(-)

diff --git a/doc/guides/prog_guide/vhost_lib.rst b/doc/guides/prog_guide/vhost_lib.rst
index ba4c62a..dc29229 100644
--- a/doc/guides/prog_guide/vhost_lib.rst
+++ b/doc/guides/prog_guide/vhost_lib.rst
@@ -245,11 +245,13 @@ The following is an overview of some key Vhost API functions:
 
   Unregister the async copy device channel from a vhost queue.
 
-* ``rte_vhost_submit_enqueue_burst(vid, queue_id, pkts, count)``
+* ``rte_vhost_submit_enqueue_burst(vid, queue_id, pkts, count, comp_pkts, comp_count)``
 
   Submit an enqueue request to transmit ``count`` packets from host to guest
-  by async data path. Enqueue is not guaranteed to finish upon the return of
-  this API call.
+  by async data path. Successfully enqueued packets can be transfer completed
+  or being occupied by DMA engines; transfer completed packets are returned in
+  ``comp_pkts``, but others are not guaranteed to finish, when this API
+  call returns.
 
   Applications must not free the packets submitted for enqueue until the
   packets are completed.
diff --git a/examples/vhost/main.c b/examples/vhost/main.c
index 8d8c303..2230997 100644
--- a/examples/vhost/main.c
+++ b/examples/vhost/main.c
@@ -809,13 +809,16 @@ virtio_xmit(struct vhost_dev *dst_vdev, struct vhost_dev *src_vdev,
 	    struct rte_mbuf *m)
 {
 	uint16_t ret;
-	struct rte_mbuf *m_cpl[1];
+	struct rte_mbuf *m_cpl[1], *comp_pkt;
+	uint32_t nr_comp = 0;
 
 	if (builtin_net_driver) {
 		ret = vs_enqueue_pkts(dst_vdev, VIRTIO_RXQ, &m, 1);
 	} else if (async_vhost_driver) {
 		ret = rte_vhost_submit_enqueue_burst(dst_vdev->vid, VIRTIO_RXQ,
-						&m, 1);
+						&m, 1, &comp_pkt, &nr_comp);
+		if (nr_comp == 1)
+			goto done;
 
 		if (likely(ret))
 			dst_vdev->nr_async_pkts++;
@@ -829,6 +832,7 @@ virtio_xmit(struct vhost_dev *dst_vdev, struct vhost_dev *src_vdev,
 		ret = rte_vhost_enqueue_burst(dst_vdev->vid, VIRTIO_RXQ, &m, 1);
 	}
 
+done:
 	if (enable_stats) {
 		rte_atomic64_inc(&dst_vdev->stats.rx_total_atomic);
 		rte_atomic64_add(&dst_vdev->stats.rx_atomic, ret);
@@ -1090,7 +1094,8 @@ static __rte_always_inline void
 drain_eth_rx(struct vhost_dev *vdev)
 {
 	uint16_t rx_count, enqueue_count;
-	struct rte_mbuf *pkts[MAX_PKT_BURST];
+	struct rte_mbuf *pkts[MAX_PKT_BURST], *comp_pkts[MAX_PKT_BURST];
+	uint32_t nr_comp = 0;
 
 	rx_count = rte_eth_rx_burst(ports[0], vdev->vmdq_rx_q,
 				    pkts, MAX_PKT_BURST);
@@ -1124,7 +1129,12 @@ drain_eth_rx(struct vhost_dev *vdev)
 						pkts, rx_count);
 	} else if (async_vhost_driver) {
 		enqueue_count = rte_vhost_submit_enqueue_burst(vdev->vid,
-					VIRTIO_RXQ, pkts, rx_count);
+					VIRTIO_RXQ, pkts, rx_count, comp_pkts,
+					&nr_comp);
+		if (nr_comp > 0) {
+			free_pkts(comp_pkts, nr_comp);
+			enqueue_count -= nr_comp;
+		}
 		vdev->nr_async_pkts += enqueue_count;
 	} else {
 		enqueue_count = rte_vhost_enqueue_burst(vdev->vid, VIRTIO_RXQ,
diff --git a/lib/librte_vhost/rte_vhost_async.h b/lib/librte_vhost/rte_vhost_async.h
index 03bd558..c855ff8 100644
--- a/lib/librte_vhost/rte_vhost_async.h
+++ b/lib/librte_vhost/rte_vhost_async.h
@@ -87,13 +87,8 @@ struct rte_vhost_async_channel_ops {
  * inflight async packet information
  */
 struct async_inflight_info {
-	union {
-		uint32_t info;
-		struct {
-			uint16_t descs; /* num of descs inflight */
-			uint16_t segs; /* iov segs inflight */
-		};
-	};
+	struct rte_mbuf *mbuf;
+	uint16_t descs; /* num of descs inflight */
 };
 
 /**
@@ -147,9 +142,13 @@ __rte_experimental
 int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id);
 
 /**
- * This function submits enqueue data to async engine. This function has
- * no guarantee to the transfer completion upon return. Applications
- * should poll transfer status by rte_vhost_poll_enqueue_completed()
+ * This function submits enqueue data to async engine. Successfully
+ * enqueued packets can be transfer completed or being occupied by DMA
+ * engines, when this API returns. Transfer completed packets are returned
+ * in comp_pkts, so users need to guarantee its size is greater than or
+ * equal to the size of pkts; for packets that are successfully enqueued
+ * but not transfer completed, users should poll transfer status by
+ * rte_vhost_poll_enqueue_completed().
  *
  * @param vid
  *  id of vhost device to enqueue data
@@ -159,12 +158,19 @@ int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id);
  *  array of packets to be enqueued
  * @param count
  *  packets num to be enqueued
+ * @param comp_pkts
+ *  empty array to get transfer completed packets. Users need to
+ *  guarantee its size is greater than or equal to that of pkts
+ * @param comp_count
+ *  num of packets that are transfer completed, when this API returns.
+ *  If no packets are transfer completed, its value is set to 0.
  * @return
- *  num of packets enqueued
+ *  num of packets enqueued, including in-flight and transfer completed
  */
 __rte_experimental
 uint16_t rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
-		struct rte_mbuf **pkts, uint16_t count);
+		struct rte_mbuf **pkts, uint16_t count,
+		struct rte_mbuf **comp_pkts, uint32_t *comp_count);
 
 /**
  * This function checks async completion status for a specific vhost
diff --git a/lib/librte_vhost/vhost.c b/lib/librte_vhost/vhost.c
index c69b105..efb136e 100644
--- a/lib/librte_vhost/vhost.c
+++ b/lib/librte_vhost/vhost.c
@@ -327,17 +327,17 @@ cleanup_device(struct virtio_net *dev, int destroy)
 static void
 vhost_free_async_mem(struct vhost_virtqueue *vq)
 {
-	if (vq->async_pkts_pending)
-		rte_free(vq->async_pkts_pending);
 	if (vq->async_pkts_info)
 		rte_free(vq->async_pkts_info);
+	if (vq->async_descs_split)
+		rte_free(vq->async_descs_split);
 	if (vq->it_pool)
 		rte_free(vq->it_pool);
 	if (vq->vec_pool)
 		rte_free(vq->vec_pool);
 
-	vq->async_pkts_pending = NULL;
 	vq->async_pkts_info = NULL;
+	vq->async_descs_split = NULL;
 	vq->it_pool = NULL;
 	vq->vec_pool = NULL;
 }
@@ -1628,9 +1628,6 @@ int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
 	node = SOCKET_ID_ANY;
 #endif
 
-	vq->async_pkts_pending = rte_malloc_socket(NULL,
-			vq->size * sizeof(uintptr_t),
-			RTE_CACHE_LINE_SIZE, node);
 	vq->async_pkts_info = rte_malloc_socket(NULL,
 			vq->size * sizeof(struct async_inflight_info),
 			RTE_CACHE_LINE_SIZE, node);
@@ -1640,7 +1637,10 @@ int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
 	vq->vec_pool = rte_malloc_socket(NULL,
 			VHOST_MAX_ASYNC_VEC * sizeof(struct iovec),
 			RTE_CACHE_LINE_SIZE, node);
-	if (!vq->async_pkts_pending || !vq->async_pkts_info ||
+	vq->async_descs_split = rte_malloc_socket(NULL,
+			vq->size * sizeof(struct vring_used_elem),
+			RTE_CACHE_LINE_SIZE, node);
+	if (!vq->async_descs_split || !vq->async_pkts_info ||
 		!vq->it_pool || !vq->vec_pool) {
 		vhost_free_async_mem(vq);
 		VHOST_LOG_CONFIG(ERR,
diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
index 23e11ff..658f6fc 100644
--- a/lib/librte_vhost/vhost.h
+++ b/lib/librte_vhost/vhost.h
@@ -202,11 +202,13 @@ struct vhost_virtqueue {
 	struct iovec *vec_pool;
 
 	/* async data transfer status */
-	uintptr_t	**async_pkts_pending;
 	struct async_inflight_info *async_pkts_info;
 	uint16_t	async_pkts_idx;
 	uint16_t	async_pkts_inflight_n;
 	uint16_t	async_last_pkts_n;
+	struct vring_used_elem  *async_descs_split;
+	uint16_t async_desc_idx;
+	uint16_t last_async_desc_idx;
 
 	/* vq async features */
 	bool		async_inorder;
@@ -733,8 +735,7 @@ vhost_vring_call_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
 	/* Don't kick guest if we don't reach index specified by guest. */
 	if (dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) {
 		uint16_t old = vq->signalled_used;
-		uint16_t new = vq->async_pkts_inflight_n ?
-					vq->used->idx:vq->last_used_idx;
+		uint16_t new = vq->last_used_idx;
 		bool signalled_used_valid = vq->signalled_used_valid;
 
 		vq->signalled_used = new;
diff --git a/lib/librte_vhost/vhost_user.c b/lib/librte_vhost/vhost_user.c
index 6e94a9b..da5eb65 100644
--- a/lib/librte_vhost/vhost_user.c
+++ b/lib/librte_vhost/vhost_user.c
@@ -1967,12 +1967,13 @@ vhost_user_get_vring_base(struct virtio_net **pdev,
 	} else {
 		rte_free(vq->shadow_used_split);
 		vq->shadow_used_split = NULL;
-		if (vq->async_pkts_pending)
-			rte_free(vq->async_pkts_pending);
+
 		if (vq->async_pkts_info)
 			rte_free(vq->async_pkts_info);
-		vq->async_pkts_pending = NULL;
+		if (vq->async_descs_split)
+			rte_free(vq->async_descs_split);
 		vq->async_pkts_info = NULL;
+		vq->async_descs_split = NULL;
 	}
 
 	rte_free(vq->batch_copy_elems);
diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index 0b63940..effef8a 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -118,31 +118,6 @@ flush_shadow_used_ring_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
 }
 
 static __rte_always_inline void
-async_flush_shadow_used_ring_split(struct virtio_net *dev,
-	struct vhost_virtqueue *vq)
-{
-	uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
-
-	if (used_idx + vq->shadow_used_idx <= vq->size) {
-		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0,
-					  vq->shadow_used_idx);
-	} else {
-		uint16_t size;
-
-		/* update used ring interval [used_idx, vq->size] */
-		size = vq->size - used_idx;
-		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0, size);
-
-		/* update the left half used ring interval [0, left_size] */
-		do_flush_shadow_used_ring_split(dev, vq, 0, size,
-					  vq->shadow_used_idx - size);
-	}
-
-	vq->last_used_idx += vq->shadow_used_idx;
-	vq->shadow_used_idx = 0;
-}
-
-static __rte_always_inline void
 update_shadow_used_ring_split(struct vhost_virtqueue *vq,
 			 uint16_t desc_idx, uint32_t len)
 {
@@ -1479,7 +1454,8 @@ virtio_dev_rx_async_get_info_idx(uint16_t pkts_idx,
 static __rte_noinline uint32_t
 virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	struct vhost_virtqueue *vq, uint16_t queue_id,
-	struct rte_mbuf **pkts, uint32_t count)
+	struct rte_mbuf **pkts, uint32_t count,
+	struct rte_mbuf **comp_pkts, uint32_t *comp_count)
 {
 	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
 	uint16_t num_buffers;
@@ -1493,10 +1469,15 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
 	struct rte_vhost_iov_iter *src_it = it_pool;
 	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
-	uint16_t n_free_slot, slot_idx = 0;
+	uint16_t slot_idx = 0;
 	uint16_t segs_await = 0;
 	struct async_inflight_info *pkts_info = vq->async_pkts_info;
 	uint32_t n_pkts = 0, pkt_err = 0;
+	uint32_t num_async_pkts = 0, num_done_pkts = 0;
+	struct {
+		uint16_t pkt_idx;
+		uint16_t last_avail_idx;
+	} async_pkts_log[MAX_PKT_BURST];
 
 	/*
 	 * The ordering between avail index and desc reads need to be enforced.
@@ -1530,21 +1511,50 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 			break;
 		}
 
-		slot_idx = (vq->async_pkts_idx + pkt_idx) & (vq->size - 1);
+		slot_idx = (vq->async_pkts_idx + num_async_pkts) &
+			(vq->size - 1);
 		if (src_it->count) {
-			async_fill_desc(&tdes[pkt_burst_idx], src_it, dst_it);
-			pkt_burst_idx++;
+			uint16_t from, to;
+
+			async_fill_desc(&tdes[pkt_burst_idx++], src_it, dst_it);
 			pkts_info[slot_idx].descs = num_buffers;
-			pkts_info[slot_idx].segs = src_it->nr_segs;
+			pkts_info[slot_idx].mbuf = pkts[pkt_idx];
+			async_pkts_log[num_async_pkts].pkt_idx = pkt_idx;
+			async_pkts_log[num_async_pkts++].last_avail_idx =
+				vq->last_avail_idx;
 			src_iovec += src_it->nr_segs;
 			dst_iovec += dst_it->nr_segs;
 			src_it += 2;
 			dst_it += 2;
 			segs_await += src_it->nr_segs;
-		} else {
-			pkts_info[slot_idx].info = num_buffers;
-			vq->async_pkts_inflight_n++;
-		}
+
+			/**
+			 * recover shadow used ring and keep DMA-occupied
+			 * descriptors.
+			 */
+			from = vq->shadow_used_idx - num_buffers;
+			to = vq->async_desc_idx & (vq->size - 1);
+			if (num_buffers + to <= vq->size) {
+				rte_memcpy(&vq->async_descs_split[to],
+						&vq->shadow_used_split[from],
+						num_buffers *
+						sizeof(struct vring_used_elem));
+			} else {
+				int size = vq->size - to;
+
+				rte_memcpy(&vq->async_descs_split[to],
+						&vq->shadow_used_split[from],
+						size *
+						sizeof(struct vring_used_elem));
+				rte_memcpy(vq->async_descs_split,
+						&vq->shadow_used_split[from +
+						size], (num_buffers - size) *
+					   sizeof(struct vring_used_elem));
+			}
+			vq->async_desc_idx += num_buffers;
+			vq->shadow_used_idx -= num_buffers;
+		} else
+			comp_pkts[num_done_pkts++] = pkts[pkt_idx];
 
 		vq->last_avail_idx += num_buffers;
 
@@ -1553,9 +1563,9 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 		 * - buffered packet number reaches transfer threshold
 		 * - unused async iov number is less than max vhost vector
 		 */
-		if (pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
-			(VHOST_MAX_ASYNC_VEC / 2 - segs_await <
-			BUF_VECTOR_MAX)) {
+		if (unlikely(pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
+			((VHOST_MAX_ASYNC_VEC >> 1) - segs_await <
+			BUF_VECTOR_MAX))) {
 			n_pkts = vq->async_ops.transfer_data(dev->vid,
 					queue_id, tdes, 0, pkt_burst_idx);
 			src_iovec = vec_pool;
@@ -1563,7 +1573,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 			src_it = it_pool;
 			dst_it = it_pool + 1;
 			segs_await = 0;
-			vq->async_pkts_inflight_n += pkt_burst_idx;
+			vq->async_pkts_inflight_n += n_pkts;
 
 			if (unlikely(n_pkts < pkt_burst_idx)) {
 				/*
@@ -1583,7 +1593,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	if (pkt_burst_idx) {
 		n_pkts = vq->async_ops.transfer_data(dev->vid,
 				queue_id, tdes, 0, pkt_burst_idx);
-		vq->async_pkts_inflight_n += pkt_burst_idx;
+		vq->async_pkts_inflight_n += n_pkts;
 
 		if (unlikely(n_pkts < pkt_burst_idx))
 			pkt_err = pkt_burst_idx - n_pkts;
@@ -1591,32 +1601,33 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 
 	do_data_copy_enqueue(dev, vq);
 
-	while (unlikely(pkt_err && pkt_idx)) {
-		if (pkts_info[slot_idx].segs)
-			pkt_err--;
-		vq->last_avail_idx -= pkts_info[slot_idx].descs;
-		vq->shadow_used_idx -= pkts_info[slot_idx].descs;
-		vq->async_pkts_inflight_n--;
-		slot_idx = (slot_idx - 1) & (vq->size - 1);
-		pkt_idx--;
-	}
-
-	n_free_slot = vq->size - vq->async_pkts_idx;
-	if (n_free_slot > pkt_idx) {
-		rte_memcpy(&vq->async_pkts_pending[vq->async_pkts_idx],
-			pkts, pkt_idx * sizeof(uintptr_t));
-		vq->async_pkts_idx += pkt_idx;
-	} else {
-		rte_memcpy(&vq->async_pkts_pending[vq->async_pkts_idx],
-			pkts, n_free_slot * sizeof(uintptr_t));
-		rte_memcpy(&vq->async_pkts_pending[0],
-			&pkts[n_free_slot],
-			(pkt_idx - n_free_slot) * sizeof(uintptr_t));
-		vq->async_pkts_idx = pkt_idx - n_free_slot;
+	if (unlikely(pkt_err)) {
+		uint16_t num_descs = 0;
+
+		num_async_pkts -= pkt_err;
+		/* calculate the sum fo descriptors of DMA-error packets. */
+		while (pkt_err-- > 0) {
+			num_descs += pkts_info[slot_idx & (vq->size - 1)].descs;
+			slot_idx--;
+		}
+		vq->async_desc_idx -= num_descs;
+		/* recover shadow used ring and available ring */
+		vq->shadow_used_idx -= (vq->last_avail_idx -
+				async_pkts_log[num_async_pkts].last_avail_idx -
+				num_descs);
+		vq->last_avail_idx =
+			async_pkts_log[num_async_pkts].last_avail_idx;
+		pkt_idx = async_pkts_log[num_async_pkts].pkt_idx;
+		num_done_pkts = pkt_idx - num_async_pkts;
 	}
 
-	if (likely(vq->shadow_used_idx))
-		async_flush_shadow_used_ring_split(dev, vq);
+	vq->async_pkts_idx += num_async_pkts;
+	*comp_count = num_done_pkts;
+
+	if (likely(vq->shadow_used_idx)) {
+		flush_shadow_used_ring_split(dev, vq);
+		vhost_vring_call_split(dev, vq);
+	}
 
 	return pkt_idx;
 }
@@ -1628,8 +1639,8 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 	struct vhost_virtqueue *vq;
 	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0;
 	uint16_t start_idx, pkts_idx, vq_size;
-	uint16_t n_inflight;
 	struct async_inflight_info *pkts_info;
+	uint16_t from, i;
 
 	if (!dev)
 		return 0;
@@ -1651,8 +1662,7 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 
 	rte_spinlock_lock(&vq->access_lock);
 
-	n_inflight = vq->async_pkts_inflight_n;
-	pkts_idx = vq->async_pkts_idx;
+	pkts_idx = vq->async_pkts_idx & (vq->size - 1);
 	pkts_info = vq->async_pkts_info;
 	vq_size = vq->size;
 	start_idx = virtio_dev_rx_async_get_info_idx(pkts_idx,
@@ -1663,42 +1673,61 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 			queue_id, 0, count - vq->async_last_pkts_n);
 	n_pkts_cpl += vq->async_last_pkts_n;
 
-	rte_atomic_thread_fence(__ATOMIC_RELEASE);
-
-	while (likely((n_pkts_put < count) && n_inflight)) {
-		uint16_t info_idx = (start_idx + n_pkts_put) & (vq_size - 1);
-		if (n_pkts_cpl && pkts_info[info_idx].segs)
-			n_pkts_cpl--;
-		else if (!n_pkts_cpl && pkts_info[info_idx].segs)
-			break;
-		n_pkts_put++;
-		n_inflight--;
-		n_descs += pkts_info[info_idx].descs;
-	}
-
-	vq->async_last_pkts_n = n_pkts_cpl;
+	n_pkts_put = RTE_MIN(count, n_pkts_cpl);
+	if (unlikely(n_pkts_put == 0)) {
+		vq->async_last_pkts_n = n_pkts_cpl;
+		goto done;
+	}
+
+	for (i = 0; i < n_pkts_put; i++) {
+		from = (start_idx + i) & (vq_size - 1);
+		n_descs += pkts_info[from].descs;
+		pkts[i] = pkts_info[from].mbuf;
+	}
+	vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put;
+	vq->async_pkts_inflight_n -= n_pkts_put;
+
+	if (likely(vq->enabled && vq->access_ok)) {
+		uint16_t nr_left = n_descs;
+		uint16_t nr_copy;
+		uint16_t to;
+
+		/* write back completed descriptors to used ring */
+		do {
+			from = vq->last_async_desc_idx & (vq->size - 1);
+			nr_copy = nr_left + from <= vq->size ? nr_left :
+				vq->size - from;
+			to = vq->last_used_idx & (vq->size - 1);
+
+			if (to + nr_copy <= vq->size) {
+				rte_memcpy(&vq->used->ring[to],
+						&vq->async_descs_split[from],
+						nr_copy *
+						sizeof(struct vring_used_elem));
+			} else {
+				uint16_t size = vq->size - to;
+
+				rte_memcpy(&vq->used->ring[to],
+						&vq->async_descs_split[from],
+						size *
+						sizeof(struct vring_used_elem));
+				rte_memcpy(vq->used->ring,
+						&vq->async_descs_split[from +
+						size], (nr_copy - size) *
+						sizeof(struct vring_used_elem));
+			}
 
-	if (n_pkts_put) {
-		vq->async_pkts_inflight_n = n_inflight;
-		if (likely(vq->enabled && vq->access_ok)) {
-			__atomic_add_fetch(&vq->used->idx,
-					n_descs, __ATOMIC_RELEASE);
-			vhost_vring_call_split(dev, vq);
-		}
+			vq->last_async_desc_idx += nr_copy;
+			vq->last_used_idx += nr_copy;
+			nr_left -= nr_copy;
+		} while (nr_left > 0);
 
-		if (start_idx + n_pkts_put <= vq_size) {
-			rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
-				n_pkts_put * sizeof(uintptr_t));
-		} else {
-			rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
-				(vq_size - start_idx) * sizeof(uintptr_t));
-			rte_memcpy(&pkts[vq_size - start_idx],
-				vq->async_pkts_pending,
-				(n_pkts_put + start_idx - vq_size) *
-				sizeof(uintptr_t));
-		}
-	}
+		__atomic_add_fetch(&vq->used->idx, n_descs, __ATOMIC_RELEASE);
+		vhost_vring_call_split(dev, vq);
+	} else
+		vq->last_async_desc_idx += n_descs;
 
+done:
 	rte_spinlock_unlock(&vq->access_lock);
 
 	return n_pkts_put;
@@ -1706,7 +1735,8 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 
 static __rte_always_inline uint32_t
 virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
-	struct rte_mbuf **pkts, uint32_t count)
+	struct rte_mbuf **pkts, uint32_t count,
+	struct rte_mbuf **comp_pkts, uint32_t *comp_count)
 {
 	struct vhost_virtqueue *vq;
 	uint32_t nb_tx = 0;
@@ -1741,7 +1771,8 @@ virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
 		nb_tx = 0;
 	else
 		nb_tx = virtio_dev_rx_async_submit_split(dev,
-				vq, queue_id, pkts, count);
+				vq, queue_id, pkts, count, comp_pkts,
+				comp_count);
 
 out:
 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
@@ -1755,10 +1786,12 @@ virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
 
 uint16_t
 rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
-		struct rte_mbuf **pkts, uint16_t count)
+		struct rte_mbuf **pkts, uint16_t count,
+		struct rte_mbuf **comp_pkts, uint32_t *comp_count)
 {
 	struct virtio_net *dev = get_device(vid);
 
+	*comp_count = 0;
 	if (!dev)
 		return 0;
 
@@ -1769,7 +1802,8 @@ rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
 		return 0;
 	}
 
-	return virtio_dev_rx_async_submit(dev, queue_id, pkts, count);
+	return virtio_dev_rx_async_submit(dev, queue_id, pkts, count, comp_pkts,
+			comp_count);
 }
 
 static inline bool
-- 
2.7.4


^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [dpdk-dev] [PATCH v4 2/2] vhost: enhance async enqueue for small packets
  2021-01-11 12:16       ` [dpdk-dev] [PATCH v4 2/2] vhost: enhance async enqueue for small packets Jiayu Hu
@ 2021-01-11 12:44         ` Maxime Coquelin
  0 siblings, 0 replies; 18+ messages in thread
From: Maxime Coquelin @ 2021-01-11 12:44 UTC (permalink / raw)
  To: Jiayu Hu, dev; +Cc: chenbo.xia, cheng1.jiang, yinan.wang



On 1/11/21 1:16 PM, Jiayu Hu wrote:
> Async enqueue offloads large copies to DMA devices, and small copies
> are still performed by the CPU. However, it requires users to get
> enqueue completed packets by rte_vhost_poll_enqueue_completed(), even
> if they are completed by the CPU when rte_vhost_submit_enqueue_burst()
> returns. This design incurs extra overheads of tracking completed
> pktmbufs and function calls, thus degrading performance on small packets.
> 
> This patch enhances async enqueue for small packets by enabling
> rte_vhost_submit_enqueue_burst() to return completed packets.
> 
> Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
> Tested-by: Yinan Wang <yinan.wang@intel.com>
> ---
>  doc/guides/prog_guide/vhost_lib.rst |   8 +-
>  examples/vhost/main.c               |  18 ++-
>  lib/librte_vhost/rte_vhost_async.h  |  30 +++--
>  lib/librte_vhost/vhost.c            |  14 +--
>  lib/librte_vhost/vhost.h            |   7 +-
>  lib/librte_vhost/vhost_user.c       |   7 +-
>  lib/librte_vhost/virtio_net.c       | 242 ++++++++++++++++++++----------------
>  7 files changed, 190 insertions(+), 136 deletions(-)
> 

Reviewed-by: Maxime Coquelin <maxime.coquelin@redhat.com>

Thanks,
Maxime


^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [dpdk-dev] [PATCH v4 1/2] vhost: cleanup async enqueue
  2021-01-11 11:04         ` Maxime Coquelin
@ 2021-01-11 14:04           ` Maxime Coquelin
  0 siblings, 0 replies; 18+ messages in thread
From: Maxime Coquelin @ 2021-01-11 14:04 UTC (permalink / raw)
  To: Jiayu Hu, dev; +Cc: chenbo.xia, cheng1.jiang, yinan.wang



On 1/11/21 12:04 PM, Maxime Coquelin wrote:
> 
> 
> On 1/11/21 1:16 PM, Jiayu Hu wrote:
>> This patch removes unnecessary check and function calls, and it changes
>> appropriate types for internal variables and fixes typos.
>>
>> Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
>> Tested-by: Yinan Wang <yinan.wang@intel.com>
>> ---
>>  lib/librte_vhost/rte_vhost_async.h |  8 ++++----
>>  lib/librte_vhost/virtio_net.c      | 16 ++++++++--------
>>  2 files changed, 12 insertions(+), 12 deletions(-)
>>
>> diff --git a/lib/librte_vhost/rte_vhost_async.h b/lib/librte_vhost/rte_vhost_async.h
>> index c73bd7c..03bd558 100644
>> --- a/lib/librte_vhost/rte_vhost_async.h
>> +++ b/lib/librte_vhost/rte_vhost_async.h
>> @@ -112,7 +112,7 @@ struct rte_vhost_async_features {
>>  };
>>  
>>  /**
>> - * register a async channel for vhost
>> + * register an async channel for vhost
>>   *
>>   * @param vid
>>   *  vhost device id async channel to be attached to
>> @@ -147,8 +147,8 @@ __rte_experimental
>>  int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id);
>>  
>>  /**
>> - * This function submit enqueue data to async engine. This function has
>> - * no guranttee to the transfer completion upon return. Applications
>> + * This function submits enqueue data to async engine. This function has
>> + * no guarantee to the transfer completion upon return. Applications
>>   * should poll transfer status by rte_vhost_poll_enqueue_completed()
>>   *
>>   * @param vid
>> @@ -167,7 +167,7 @@ uint16_t rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
>>  		struct rte_mbuf **pkts, uint16_t count);
>>  
>>  /**
>> - * This function check async completion status for a specific vhost
>> + * This function checks async completion status for a specific vhost
>>   * device queue. Packets which finish copying (enqueue) operation
>>   * will be returned in an array.
>>   *
>> diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
>> index fec08b2..0b63940 100644
>> --- a/lib/librte_vhost/virtio_net.c
>> +++ b/lib/librte_vhost/virtio_net.c
>> @@ -1130,8 +1130,11 @@ async_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
>>  	}
>>  
>>  out:
>> -	async_fill_iter(src_it, tlen, src_iovec, tvec_idx);
>> -	async_fill_iter(dst_it, tlen, dst_iovec, tvec_idx);
>> +	if (tlen) {
>> +		async_fill_iter(src_it, tlen, src_iovec, tvec_idx);
>> +		async_fill_iter(dst_it, tlen, dst_iovec, tvec_idx);
>> +	} else
>> +		src_it->count = 0;
> 
> Minor comment, you need braces for the 'else' as there are braces for
> the 'if'.
> 
> 
> I will fix while applying.

Reviewed-by: Maxime Coquelin <maxime.coquelin@redhat.com>

> Thanks,
> Maxime
> 


^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [dpdk-dev] [PATCH v4 0/2] Enhance Async Enqueue for Small Packets
  2021-01-11 12:16     ` [dpdk-dev] [PATCH v4 " Jiayu Hu
  2021-01-11 12:16       ` [dpdk-dev] [PATCH v4 1/2] vhost: cleanup async enqueue Jiayu Hu
  2021-01-11 12:16       ` [dpdk-dev] [PATCH v4 2/2] vhost: enhance async enqueue for small packets Jiayu Hu
@ 2021-01-11 15:02       ` Maxime Coquelin
  2 siblings, 0 replies; 18+ messages in thread
From: Maxime Coquelin @ 2021-01-11 15:02 UTC (permalink / raw)
  To: Jiayu Hu, dev; +Cc: chenbo.xia, cheng1.jiang, yinan.wang



On 1/11/21 1:16 PM, Jiayu Hu wrote:
> Async enqueue offloads large copies to DMA devices, and small copies
> are still performed by the CPU. However, it requires users to get
> enqueue completed packets by rte_vhost_poll_enqueue_completed(), even
> if they are completed by the CPU when rte_vhost_submit_enqueue_burst()
> returns. This design incurs extra overheads of tracking completed
> pktmbufs and function calls, thus degrading performance on small packets.
> 
> The first patch cleans up async enqueue code, and the second patch
> enables rte_vhost_submit_enqueue_burst() to return completed packets.
> 
> Change log
> ==========
> v4:
> - support new API in vhost example
> v3:
> - fix incorrect ret value when DMA ring is full
> - enhance description of API declaration and programmer guide
> v2:
> - fix typo
> - rename API variables
> - update programmer guide
> 
> Jiayu Hu (2):
>   vhost: cleanup async enqueue
>   vhost: enhance async enqueue for small packets
> 
>  doc/guides/prog_guide/vhost_lib.rst |   8 +-
>  examples/vhost/main.c               |  18 ++-
>  lib/librte_vhost/rte_vhost_async.h  |  34 +++--
>  lib/librte_vhost/vhost.c            |  14 +-
>  lib/librte_vhost/vhost.h            |   7 +-
>  lib/librte_vhost/vhost_user.c       |   7 +-
>  lib/librte_vhost/virtio_net.c       | 258 ++++++++++++++++++++----------------
>  7 files changed, 200 insertions(+), 146 deletions(-)
> 

Applied to dpdk-next-virtio/main.

Thanks,
Maxime


^ permalink raw reply	[flat|nested] 18+ messages in thread

end of thread, other threads:[~2021-01-11 15:02 UTC | newest]

Thread overview: 18+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-12-11  9:21 [dpdk-dev] [PATCH 0/2] Enhance Async Enqueue for Small Packets Jiayu Hu
2020-12-11  9:21 ` [dpdk-dev] [PATCH 1/2] vhost: cleanup async enqueue Jiayu Hu
2020-12-11  9:21 ` [dpdk-dev] [PATCH 2/2] vhost: enhance async enqueue for small packets Jiayu Hu
2020-12-22  9:46 ` [dpdk-dev] [Patch v2 0/2] Enhance Async Enqueue for Small Packets Jiayu Hu
2020-12-22  9:46   ` [dpdk-dev] [Patch v2 1/2] vhost: cleanup async enqueue Jiayu Hu
2020-12-22  9:46   ` [dpdk-dev] [Patch v2 2/2] vhost: enhance async enqueue for small packets Jiayu Hu
2020-12-25  8:28   ` [dpdk-dev] [PATCH v3 0/2] Enhance Async Enqueue for Small Packets Jiayu Hu
2020-12-25  8:28     ` [dpdk-dev] [PATCH v3 1/2] vhost: cleanup async enqueue Jiayu Hu
2020-12-25  8:28     ` [dpdk-dev] [PATCH v3 2/2] vhost: enhance async enqueue for small packets Jiayu Hu
2021-01-05 11:41     ` [dpdk-dev] [PATCH v3 0/2] Enhance Async Enqueue for Small Packets Wang, Yinan
2021-01-07 10:45     ` Maxime Coquelin
2021-01-11 12:16     ` [dpdk-dev] [PATCH v4 " Jiayu Hu
2021-01-11 12:16       ` [dpdk-dev] [PATCH v4 1/2] vhost: cleanup async enqueue Jiayu Hu
2021-01-11 11:04         ` Maxime Coquelin
2021-01-11 14:04           ` Maxime Coquelin
2021-01-11 12:16       ` [dpdk-dev] [PATCH v4 2/2] vhost: enhance async enqueue for small packets Jiayu Hu
2021-01-11 12:44         ` Maxime Coquelin
2021-01-11 15:02       ` [dpdk-dev] [PATCH v4 0/2] Enhance Async Enqueue for Small Packets Maxime Coquelin

DPDK patches and discussions

This inbox may be cloned and mirrored by anyone:

	git clone --mirror https://inbox.dpdk.org/dev/0 dev/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 dev dev/ https://inbox.dpdk.org/dev \
		dev@dpdk.org
	public-inbox-index dev

Example config snippet for mirrors.
Newsgroup available over NNTP:
	nntp://inbox.dpdk.org/inbox.dpdk.dev


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git