DPDK patches and discussions
 help / color / mirror / Atom feed
* [dpdk-dev] [RFC 0/2] Add a PMD for I/OAT accelerated vhost-user
@ 2019-09-26 14:26 Jiayu Hu
  2019-09-26 14:26 ` [dpdk-dev] [RFC 1/2] vhost: populate guest memory for DMA-accelerated vhost-user Jiayu Hu
                   ` (2 more replies)
  0 siblings, 3 replies; 11+ messages in thread
From: Jiayu Hu @ 2019-09-26 14:26 UTC (permalink / raw)
  To: dev; +Cc: tiwei.bie, maxime.coquelin, zhihong.wang, Jiayu Hu

In vhost-user enqueue and dequeue operations, where data movement is
heavily involved, performing large memory copies usually takes up a
major part of CPU cycles and becomes the hot spot. To offload expensive
memory operations from the CPU, this patch set proposes to leverage I/OAT,
a DMA engine in the Intel's processor, to accelerate large copies for
vhost-user.

We implement a new PMD for the I/OAT accelerated vhost-user, called
vhost-ioat. This PMD leverages librte_vhost to handle vhost messages,
but implements own vring's enqueue and dequeue operations. It offloads
large memory copies to the I/OAT in a synchronous mode; that is, the CPU
just submits copy jobs to the I/OAT but without waiting for its
completion. Thus, there is no CPU intervention during data transfer;
we can save precious CPU cycles and improve vhost performance.

The PMD provides basic functionality of packet reception and
transmission. During packet reception and transmission, it offloads
large copies to the I/OAT and performs small copies by the CPU, due to
startup overheads associated with the I/OAT. However, the PMD just
supports I/OAT acceleration in the PMD's transmit data path (i.e. vring's
enqueue operation); it still uses the CPU to perform all copies in the
PMD's receive data path (i.e. vring's dequeue operation) currently. Note
that the PMD just supports split ring.

Users can explicitly assign an I/OAT device to a TX queue by the
parameter 'ioats'. But currently, one I/OAT device can only be used by
one queue and a queue can use one I/OAT device at a time. In addition,
the PMD supports multiqueue and both client and server modes. Users can
specify the queue number and client/server mode by 'queues' and 'client'
parameters.

Jiayu Hu (2):
  vhost: populate guest memory for DMA-accelerated vhost-user
  net/vhost_ioat: add vhost I/OAT driver

 config/common_base                  |    2 +
 config/common_linux                 |    1 +
 drivers/Makefile                    |    2 +-
 drivers/net/Makefile                |    1 +
 drivers/net/vhost_ioat/Makefile     |   31 +
 drivers/net/vhost_ioat/eth_vhost.c  | 1439 +++++++++++++++++++++++++++++++++++
 drivers/net/vhost_ioat/eth_vhost.h  |  255 +++++++
 drivers/net/vhost_ioat/internal.h   |  225 ++++++
 drivers/net/vhost_ioat/virtio_net.c | 1243 ++++++++++++++++++++++++++++++
 lib/librte_vhost/rte_vhost.h        |    1 +
 lib/librte_vhost/socket.c           |   10 +
 lib/librte_vhost/vhost.h            |    2 +
 lib/librte_vhost/vhost_user.c       |    3 +-
 mk/rte.app.mk                       |    1 +
 14 files changed, 3214 insertions(+), 2 deletions(-)
 create mode 100644 drivers/net/vhost_ioat/Makefile
 create mode 100644 drivers/net/vhost_ioat/eth_vhost.c
 create mode 100644 drivers/net/vhost_ioat/eth_vhost.h
 create mode 100644 drivers/net/vhost_ioat/internal.h
 create mode 100644 drivers/net/vhost_ioat/virtio_net.c

-- 
2.7.4


^ permalink raw reply	[flat|nested] 11+ messages in thread

* [dpdk-dev] [RFC 1/2] vhost: populate guest memory for DMA-accelerated vhost-user
  2019-09-26 14:26 [dpdk-dev] [RFC 0/2] Add a PMD for I/OAT accelerated vhost-user Jiayu Hu
@ 2019-09-26 14:26 ` Jiayu Hu
  2019-12-17  7:18   ` Maxime Coquelin
  2019-09-26 14:26 ` [dpdk-dev] [RFC 2/2] net/vhost_ioat: add vhost I/OAT driver Jiayu Hu
  2019-11-01  8:54 ` [dpdk-dev] [RFC v2 0/2] Add a PMD for DMA-accelerated vhost-user Jiayu Hu
  2 siblings, 1 reply; 11+ messages in thread
From: Jiayu Hu @ 2019-09-26 14:26 UTC (permalink / raw)
  To: dev; +Cc: tiwei.bie, maxime.coquelin, zhihong.wang, Jiayu Hu

DMA engines, like I/OAT, are efficient in moving large data
within memory. Offloading large copies in vhost side to DMA
engines can save precious CPU cycles and improve vhost
performance.

However, using the DMA engine requires to populate guest's
memory. This patch is to enable DMA-accelerated vhost-user
to populate guest's memory.

Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
---
 lib/librte_vhost/rte_vhost.h  |  1 +
 lib/librte_vhost/socket.c     | 10 ++++++++++
 lib/librte_vhost/vhost.h      |  2 ++
 lib/librte_vhost/vhost_user.c |  3 ++-
 4 files changed, 15 insertions(+), 1 deletion(-)

diff --git a/lib/librte_vhost/rte_vhost.h b/lib/librte_vhost/rte_vhost.h
index 7fb1729..61f8bef 100644
--- a/lib/librte_vhost/rte_vhost.h
+++ b/lib/librte_vhost/rte_vhost.h
@@ -30,6 +30,7 @@ extern "C" {
 #define RTE_VHOST_USER_DEQUEUE_ZERO_COPY	(1ULL << 2)
 #define RTE_VHOST_USER_IOMMU_SUPPORT	(1ULL << 3)
 #define RTE_VHOST_USER_POSTCOPY_SUPPORT		(1ULL << 4)
+#define RTE_VHOST_USER_DMA_COPY		(1ULL << 5)
 
 /** Protocol features. */
 #ifndef VHOST_USER_PROTOCOL_F_MQ
diff --git a/lib/librte_vhost/socket.c b/lib/librte_vhost/socket.c
index 274988c..55498c4 100644
--- a/lib/librte_vhost/socket.c
+++ b/lib/librte_vhost/socket.c
@@ -60,6 +60,8 @@ struct vhost_user_socket {
 	 */
 	int vdpa_dev_id;
 
+	bool dma_enabled;
+
 	struct vhost_device_ops const *notify_ops;
 };
 
@@ -232,6 +234,13 @@ vhost_user_add_connection(int fd, struct vhost_user_socket *vsocket)
 	if (vsocket->dequeue_zero_copy)
 		vhost_enable_dequeue_zero_copy(vid);
 
+	if (vsocket->dma_enabled) {
+		struct virtio_net *dev;
+
+		dev = get_device(vid);
+		dev->dma_enabled = true;
+	}
+
 	RTE_LOG(INFO, VHOST_CONFIG, "new device, handle is %d\n", vid);
 
 	if (vsocket->notify_ops->new_connection) {
@@ -870,6 +879,7 @@ rte_vhost_driver_register(const char *path, uint64_t flags)
 		goto out_free;
 	}
 	vsocket->dequeue_zero_copy = flags & RTE_VHOST_USER_DEQUEUE_ZERO_COPY;
+	vsocket->dma_enabled = flags & RTE_VHOST_USER_DMA_COPY;
 
 	/*
 	 * Set the supported features correctly for the builtin vhost-user
diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
index 884befa..8f564f1 100644
--- a/lib/librte_vhost/vhost.h
+++ b/lib/librte_vhost/vhost.h
@@ -329,6 +329,8 @@ struct virtio_net {
 	 */
 	int			vdpa_dev_id;
 
+	bool			dma_enabled;
+
 	/* context data for the external message handlers */
 	void			*extern_data;
 	/* pre and post vhost user message handlers for the device */
diff --git a/lib/librte_vhost/vhost_user.c b/lib/librte_vhost/vhost_user.c
index 0b72648..6a4969b 100644
--- a/lib/librte_vhost/vhost_user.c
+++ b/lib/librte_vhost/vhost_user.c
@@ -990,7 +990,8 @@ vhost_user_set_mem_table(struct virtio_net **pdev, struct VhostUserMsg *msg,
 		}
 		mmap_size = RTE_ALIGN_CEIL(mmap_size, alignment);
 
-		populate = (dev->dequeue_zero_copy) ? MAP_POPULATE : 0;
+		populate = (dev->dequeue_zero_copy || dev->dma_enabled) ?
+			MAP_POPULATE : 0;
 		mmap_addr = mmap(NULL, mmap_size, PROT_READ | PROT_WRITE,
 				 MAP_SHARED | populate, fd, 0);
 
-- 
2.7.4


^ permalink raw reply	[flat|nested] 11+ messages in thread

* [dpdk-dev] [RFC 2/2] net/vhost_ioat: add vhost I/OAT driver
  2019-09-26 14:26 [dpdk-dev] [RFC 0/2] Add a PMD for I/OAT accelerated vhost-user Jiayu Hu
  2019-09-26 14:26 ` [dpdk-dev] [RFC 1/2] vhost: populate guest memory for DMA-accelerated vhost-user Jiayu Hu
@ 2019-09-26 14:26 ` Jiayu Hu
  2019-11-01  8:54 ` [dpdk-dev] [RFC v2 0/2] Add a PMD for DMA-accelerated vhost-user Jiayu Hu
  2 siblings, 0 replies; 11+ messages in thread
From: Jiayu Hu @ 2019-09-26 14:26 UTC (permalink / raw)
  To: dev; +Cc: tiwei.bie, maxime.coquelin, zhihong.wang, Jiayu Hu

This patch introduces a new PMD for I/OAT accelerated vhost-user, which
provides basic functionality of packet reception and transmission. This
PMD leverages librte_vhost to handle vhost messages, but it implements
own vring's enqueue and dequeue operations.

The PMD leverages I/OAT, a DMA engine in Intel's processor, to accelerate
large data movements in enqueue and dequeue operations. Large copies are
offloaded to the I/OAT in a asynchronous mode. That is, the CPU just
submits copy jobs to the I/OAT but without waiting for its completion;
there is no CPU intervention during data tranfer. Thus, we can save
precious CPU cycles and improve the vhost performance. The PMD still uses
the CPU to performs small copies, due to startup overheads associated
with the I/OAT.

Note that the PMD just supports I/OAT acceleration in the enqueue
operation; it still uses the CPU to perform all copies in the dequeue
operation currently. In addition, the PMD only supports the split ring.

The I/OAT device used by a queue is assigned by users. For the queue
without assigning an I/OAT device, the PMD will use the CPU to perform
all copies for both enqueue and dequeue operations. Note that a queue
can only use one I/OAT device and an I/OAT device can only be used by one
queue at a time currently.

The PMD has 4 parameters.
 - iface: The parameter is used to specify a path to connect to a
 	  front end device.
 - queues: The parameter is used to specify the number of the queues
 	   front end device has (Default is 1).
 - client: The parameter is used to specify the vhost port works as
 	   client mode or server mode (Default is server mode).
 - ioats: This parameter is used to specify the assigned I/OAT device
 	  of a queue.

Here is an example.
$ ./testpmd -c f -n 4 \
		--vdev 'ioat_vhost0,iface=/tmp/sock0,queues=2,ioats=(txq0@00:04.0;txq1@00:04.1),client=0'

Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
---
 config/common_base                  |    2 +
 config/common_linux                 |    1 +
 drivers/Makefile                    |    2 +-
 drivers/net/Makefile                |    1 +
 drivers/net/vhost_ioat/Makefile     |   31 +
 drivers/net/vhost_ioat/eth_vhost.c  | 1439 +++++++++++++++++++++++++++++++++++
 drivers/net/vhost_ioat/eth_vhost.h  |  255 +++++++
 drivers/net/vhost_ioat/internal.h   |  225 ++++++
 drivers/net/vhost_ioat/virtio_net.c | 1243 ++++++++++++++++++++++++++++++
 mk/rte.app.mk                       |    1 +
 10 files changed, 3199 insertions(+), 1 deletion(-)
 create mode 100644 drivers/net/vhost_ioat/Makefile
 create mode 100644 drivers/net/vhost_ioat/eth_vhost.c
 create mode 100644 drivers/net/vhost_ioat/eth_vhost.h
 create mode 100644 drivers/net/vhost_ioat/internal.h
 create mode 100644 drivers/net/vhost_ioat/virtio_net.c

diff --git a/config/common_base b/config/common_base
index 8ef75c2..9998218 100644
--- a/config/common_base
+++ b/config/common_base
@@ -1011,6 +1011,8 @@ CONFIG_RTE_LIBRTE_VHOST_DEBUG=n
 #
 CONFIG_RTE_LIBRTE_PMD_VHOST=n
 
+CONFIG_RTE_LIBRTE_PMD_VHOST_IOAT=n
+
 #
 # Compile IFC driver
 # To compile, CONFIG_RTE_LIBRTE_VHOST and CONFIG_RTE_EAL_VFIO
diff --git a/config/common_linux b/config/common_linux
index 6e25255..8e65660 100644
--- a/config/common_linux
+++ b/config/common_linux
@@ -17,6 +17,7 @@ CONFIG_RTE_LIBRTE_VHOST=y
 CONFIG_RTE_LIBRTE_VHOST_NUMA=y
 CONFIG_RTE_LIBRTE_VHOST_POSTCOPY=n
 CONFIG_RTE_LIBRTE_PMD_VHOST=y
+CONFIG_RTE_LIBRTE_PMD_VHOST_IOAT=y
 CONFIG_RTE_LIBRTE_IFC_PMD=y
 CONFIG_RTE_LIBRTE_PMD_AF_PACKET=y
 CONFIG_RTE_LIBRTE_PMD_MEMIF=y
diff --git a/drivers/Makefile b/drivers/Makefile
index 7d5da5d..6bdab76 100644
--- a/drivers/Makefile
+++ b/drivers/Makefile
@@ -9,7 +9,7 @@ DEPDIRS-bus := common
 DIRS-y += mempool
 DEPDIRS-mempool := common bus
 DIRS-y += net
-DEPDIRS-net := common bus mempool
+DEPDIRS-net := common bus mempool raw
 DIRS-$(CONFIG_RTE_LIBRTE_BBDEV) += baseband
 DEPDIRS-baseband := common bus mempool
 DIRS-$(CONFIG_RTE_LIBRTE_CRYPTODEV) += crypto
diff --git a/drivers/net/Makefile b/drivers/net/Makefile
index 5767fdf..48301b4 100644
--- a/drivers/net/Makefile
+++ b/drivers/net/Makefile
@@ -69,6 +69,7 @@ endif # $(CONFIG_RTE_LIBRTE_SCHED)
 
 ifeq ($(CONFIG_RTE_LIBRTE_VHOST),y)
 DIRS-$(CONFIG_RTE_LIBRTE_PMD_VHOST) += vhost
+DIRS-$(CONFIG_RTE_LIBRTE_PMD_VHOST_IOAT) += vhost_ioat
 ifeq ($(CONFIG_RTE_EAL_VFIO),y)
 DIRS-$(CONFIG_RTE_LIBRTE_IFC_PMD) += ifc
 endif
diff --git a/drivers/net/vhost_ioat/Makefile b/drivers/net/vhost_ioat/Makefile
new file mode 100644
index 0000000..0d95d16
--- /dev/null
+++ b/drivers/net/vhost_ioat/Makefile
@@ -0,0 +1,31 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2019 Intel Corporation
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+#
+# library name
+#
+LIB = librte_pmd_vhost_ioat.a
+
+LDLIBS += -lpthread
+LDLIBS += -lrte_eal -lrte_mbuf -lrte_mempool -lrte_ring
+LDLIBS += -lrte_ethdev -lrte_net -lrte_kvargs -lrte_vhost
+LDLIBS += -lrte_bus_vdev
+LDLIBS += -lrte_rawdev -lrte_rawdev_ioat
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+CFLAGS += -fno-strict-aliasing
+CFLAGS += -DALLOW_EXPERIMENTAL_API
+
+EXPORT_MAP := rte_pmd_vhost_ioat_version.map
+
+LIBABIVER := 1
+
+#
+# all source are stored in SRCS-y
+#
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_VHOST_IOAT) += eth_vhost.c virtio_net.c
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/drivers/net/vhost_ioat/eth_vhost.c b/drivers/net/vhost_ioat/eth_vhost.c
new file mode 100644
index 0000000..fc75782
--- /dev/null
+++ b/drivers/net/vhost_ioat/eth_vhost.c
@@ -0,0 +1,1439 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Intel Corporation
+ */
+#include <unistd.h>
+#include <pthread.h>
+#include <stdbool.h>
+
+#include <rte_mbuf.h>
+#include <rte_ethdev_driver.h>
+#include <rte_ethdev_vdev.h>
+#include <rte_malloc.h>
+#include <rte_memcpy.h>
+#include <rte_bus_vdev.h>
+#include <rte_kvargs.h>
+#include <rte_vhost.h>
+#include <rte_spinlock.h>
+#include <rte_log.h>
+#include <rte_string_fns.h>
+#include <rte_rawdev.h>
+#include <rte_ioat_rawdev.h>
+
+#include "eth_vhost.h"
+
+int vhost_ioat_logtype;
+
+enum {VIRTIO_RXQ, VIRTIO_TXQ, VIRTIO_QNUM};
+
+#define ETH_VHOST_IFACE_ARG		"iface"
+#define ETH_VHOST_QUEUES_ARG		"queues"
+#define ETH_VHOST_CLIENT_ARG		"client"
+#define ETH_VHOST_IOAT_ARG		"ioats"
+
+static const char *valid_arguments[] = {
+	ETH_VHOST_IFACE_ARG,
+	ETH_VHOST_QUEUES_ARG,
+	ETH_VHOST_CLIENT_ARG,
+	ETH_VHOST_IOAT_ARG,
+	NULL
+};
+
+static struct rte_ether_addr base_eth_addr = {
+	.addr_bytes = {
+		0x56 /* V */,
+		0x48 /* H */,
+		0x4F /* O */,
+		0x53 /* S */,
+		0x54 /* T */,
+		0x00
+	}
+};
+
+struct internal_list {
+	TAILQ_ENTRY(internal_list) next;
+	struct rte_eth_dev *eth_dev;
+};
+
+TAILQ_HEAD(internal_list_head, internal_list);
+static struct internal_list_head internal_list =
+	TAILQ_HEAD_INITIALIZER(internal_list);
+
+static pthread_mutex_t internal_list_lock = PTHREAD_MUTEX_INITIALIZER;
+
+static struct rte_eth_link pmd_link = {
+		.link_speed = 10000,
+		.link_duplex = ETH_LINK_FULL_DUPLEX,
+		.link_status = ETH_LINK_DOWN
+};
+
+#define VHOST_XSTATS_NAME_SIZE 64
+
+struct vhost_xstats_name_off {
+	char name[VHOST_XSTATS_NAME_SIZE];
+	uint64_t offset;
+};
+
+/* [rx]_is prepended to the name string here */
+static const struct vhost_xstats_name_off vhost_rxport_stat_strings[] = {
+	{"good_packets",
+	 offsetof(struct vhost_queue, stats.pkts)},
+	{"total_bytes",
+	 offsetof(struct vhost_queue, stats.bytes)},
+	{"missed_pkts",
+	 offsetof(struct vhost_queue, stats.missed_pkts)},
+	{"broadcast_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_BROADCAST_PKT])},
+	{"multicast_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_MULTICAST_PKT])},
+	{"unicast_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_UNICAST_PKT])},
+	 {"undersize_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_UNDERSIZE_PKT])},
+	{"size_64_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_64_PKT])},
+	{"size_65_to_127_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_65_TO_127_PKT])},
+	{"size_128_to_255_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_128_TO_255_PKT])},
+	{"size_256_to_511_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_256_TO_511_PKT])},
+	{"size_512_to_1023_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_512_TO_1023_PKT])},
+	{"size_1024_to_1522_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_1024_TO_1522_PKT])},
+	{"size_1523_to_max_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_1523_TO_MAX_PKT])},
+	{"errors_with_bad_CRC",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_ERRORS_PKT])},
+	{"fragmented_errors",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_ERRORS_FRAGMENTED])},
+	{"jabber_errors",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_ERRORS_JABBER])},
+	{"unknown_protos_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_UNKNOWN_PROTOCOL])},
+};
+
+/* [tx]_ is prepended to the name string here */
+static const struct vhost_xstats_name_off vhost_txport_stat_strings[] = {
+	{"good_packets",
+	 offsetof(struct vhost_queue, stats.pkts)},
+	{"total_bytes",
+	 offsetof(struct vhost_queue, stats.bytes)},
+	{"missed_pkts",
+	 offsetof(struct vhost_queue, stats.missed_pkts)},
+	{"broadcast_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_BROADCAST_PKT])},
+	{"multicast_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_MULTICAST_PKT])},
+	{"unicast_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_UNICAST_PKT])},
+	{"undersize_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_UNDERSIZE_PKT])},
+	{"size_64_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_64_PKT])},
+	{"size_65_to_127_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_65_TO_127_PKT])},
+	{"size_128_to_255_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_128_TO_255_PKT])},
+	{"size_256_to_511_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_256_TO_511_PKT])},
+	{"size_512_to_1023_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_512_TO_1023_PKT])},
+	{"size_1024_to_1522_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_1024_TO_1522_PKT])},
+	{"size_1523_to_max_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_1523_TO_MAX_PKT])},
+	{"errors_with_bad_CRC",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_ERRORS_PKT])},
+};
+
+#define VHOST_NB_XSTATS_RXPORT (sizeof(vhost_rxport_stat_strings) / \
+				sizeof(vhost_rxport_stat_strings[0]))
+
+#define VHOST_NB_XSTATS_TXPORT (sizeof(vhost_txport_stat_strings) / \
+				sizeof(vhost_txport_stat_strings[0]))
+
+static void
+vhost_dev_xstats_reset(struct rte_eth_dev *dev)
+{
+	struct vhost_queue *vq = NULL;
+	unsigned int i = 0;
+
+	for (i = 0; i < dev->data->nb_rx_queues; i++) {
+		vq = dev->data->rx_queues[i];
+		if (!vq)
+			continue;
+		memset(&vq->stats, 0, sizeof(vq->stats));
+	}
+	for (i = 0; i < dev->data->nb_tx_queues; i++) {
+		vq = dev->data->tx_queues[i];
+		if (!vq)
+			continue;
+		memset(&vq->stats, 0, sizeof(vq->stats));
+	}
+}
+
+static int
+vhost_dev_xstats_get_names(struct rte_eth_dev *dev __rte_unused,
+			   struct rte_eth_xstat_name *xstats_names,
+			   unsigned int limit __rte_unused)
+{
+	unsigned int t = 0;
+	int count = 0;
+	int nstats = VHOST_NB_XSTATS_RXPORT + VHOST_NB_XSTATS_TXPORT;
+
+	if (!xstats_names)
+		return nstats;
+	for (t = 0; t < VHOST_NB_XSTATS_RXPORT; t++) {
+		snprintf(xstats_names[count].name,
+			 sizeof(xstats_names[count].name),
+			 "rx_%s", vhost_rxport_stat_strings[t].name);
+		count++;
+	}
+	for (t = 0; t < VHOST_NB_XSTATS_TXPORT; t++) {
+		snprintf(xstats_names[count].name,
+			 sizeof(xstats_names[count].name),
+			 "tx_%s", vhost_txport_stat_strings[t].name);
+		count++;
+	}
+	return count;
+}
+
+static int
+vhost_dev_xstats_get(struct rte_eth_dev *dev, struct rte_eth_xstat *xstats,
+		     unsigned int n)
+{
+	unsigned int i;
+	unsigned int t;
+	unsigned int count = 0;
+	struct vhost_queue *vq = NULL;
+	unsigned int nxstats = VHOST_NB_XSTATS_RXPORT + VHOST_NB_XSTATS_TXPORT;
+
+	if (n < nxstats)
+		return nxstats;
+
+	for (i = 0; i < dev->data->nb_rx_queues; i++) {
+		vq = dev->data->rx_queues[i];
+		if (!vq)
+			continue;
+		vq->stats.xstats[VHOST_UNICAST_PKT] = vq->stats.pkts
+				- (vq->stats.xstats[VHOST_BROADCAST_PKT]
+				+ vq->stats.xstats[VHOST_MULTICAST_PKT]);
+	}
+	for (i = 0; i < dev->data->nb_tx_queues; i++) {
+		vq = dev->data->tx_queues[i];
+		if (!vq)
+			continue;
+		vq->stats.xstats[VHOST_UNICAST_PKT] = vq->stats.pkts
+				+ vq->stats.missed_pkts
+				- (vq->stats.xstats[VHOST_BROADCAST_PKT]
+				+ vq->stats.xstats[VHOST_MULTICAST_PKT]);
+	}
+	for (t = 0; t < VHOST_NB_XSTATS_RXPORT; t++) {
+		xstats[count].value = 0;
+		for (i = 0; i < dev->data->nb_rx_queues; i++) {
+			vq = dev->data->rx_queues[i];
+			if (!vq)
+				continue;
+			xstats[count].value +=
+				*(uint64_t *)(((char *)vq)
+				+ vhost_rxport_stat_strings[t].offset);
+		}
+		xstats[count].id = count;
+		count++;
+	}
+	for (t = 0; t < VHOST_NB_XSTATS_TXPORT; t++) {
+		xstats[count].value = 0;
+		for (i = 0; i < dev->data->nb_tx_queues; i++) {
+			vq = dev->data->tx_queues[i];
+			if (!vq)
+				continue;
+			xstats[count].value +=
+				*(uint64_t *)(((char *)vq)
+				+ vhost_txport_stat_strings[t].offset);
+		}
+		xstats[count].id = count;
+		count++;
+	}
+	return count;
+}
+
+static inline void
+vhost_count_multicast_broadcast(struct vhost_queue *vq,
+				struct rte_mbuf *mbuf)
+{
+	struct rte_ether_addr *ea = NULL;
+	struct vhost_stats *pstats = &vq->stats;
+
+	ea = rte_pktmbuf_mtod(mbuf, struct rte_ether_addr *);
+	if (rte_is_multicast_ether_addr(ea)) {
+		if (rte_is_broadcast_ether_addr(ea))
+			pstats->xstats[VHOST_BROADCAST_PKT]++;
+		else
+			pstats->xstats[VHOST_MULTICAST_PKT]++;
+	}
+}
+
+static void
+vhost_update_packet_xstats(struct vhost_queue *vq,
+			   struct rte_mbuf **bufs,
+			   uint16_t count)
+{
+	uint32_t pkt_len = 0;
+	uint64_t i = 0;
+	uint64_t index;
+	struct vhost_stats *pstats = &vq->stats;
+
+	for (i = 0; i < count ; i++) {
+		pkt_len = bufs[i]->pkt_len;
+		if (pkt_len == 64) {
+			pstats->xstats[VHOST_64_PKT]++;
+		} else if (pkt_len > 64 && pkt_len < 1024) {
+			index = (sizeof(pkt_len) * 8)
+				- __builtin_clz(pkt_len) - 5;
+			pstats->xstats[index]++;
+		} else {
+			if (pkt_len < 64)
+				pstats->xstats[VHOST_UNDERSIZE_PKT]++;
+			else if (pkt_len <= 1522)
+				pstats->xstats[VHOST_1024_TO_1522_PKT]++;
+			else if (pkt_len > 1522)
+				pstats->xstats[VHOST_1523_TO_MAX_PKT]++;
+		}
+		vhost_count_multicast_broadcast(vq, bufs[i]);
+	}
+}
+
+static uint16_t
+eth_vhost_rx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
+{
+	struct vhost_queue *queue = q;
+	uint16_t i, nb_rx = 0;
+	uint16_t nb_receive = nb_bufs;
+	struct pmd_internal *dev = queue->internal;
+
+	if (unlikely(rte_atomic32_read(&queue->allow_queuing) == 0))
+		return 0;
+
+	rte_atomic32_set(&queue->while_queuing, 1);
+
+	if (unlikely(rte_atomic32_read(&queue->allow_queuing) == 0))
+		goto out;
+
+	/* get packets from guest's TX queue */
+	while (nb_receive) {
+		uint16_t nb_pkts;
+		uint16_t num = (uint16_t)RTE_MIN(nb_receive,
+						 VHOST_MAX_PKT_BURST);
+
+		nb_pkts = vhost_ioat_dequeue_burst(dev, queue->ioat_vring,
+						   queue->mb_pool, &bufs[nb_rx],
+						   num);
+
+		nb_rx += nb_pkts;
+		nb_receive -= nb_pkts;
+		if (nb_pkts < num)
+			break;
+	}
+
+	queue->stats.pkts += nb_rx;
+
+	for (i = 0; likely(i < nb_rx); i++) {
+		bufs[i]->port = queue->port;
+		bufs[i]->vlan_tci = 0;
+
+		if (queue->internal->vlan_strip)
+			rte_vlan_strip(bufs[i]);
+
+		queue->stats.bytes += bufs[i]->pkt_len;
+	}
+
+	vhost_update_packet_xstats(queue, bufs, nb_rx);
+
+out:
+	rte_atomic32_set(&queue->while_queuing, 0);
+
+	return nb_rx;
+}
+
+static uint16_t
+eth_vhost_tx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
+{
+	struct vhost_queue *queue = q;
+	struct pmd_internal *dev = queue->internal;
+	uint16_t i, nb_tx = 0;
+	uint16_t nb_send = 0;
+
+	if (unlikely(rte_atomic32_read(&queue->allow_queuing) == 0))
+		return 0;
+
+	rte_atomic32_set(&queue->while_queuing, 1);
+
+	if (unlikely(rte_atomic32_read(&queue->allow_queuing) == 0))
+		goto out;
+
+	for (i = 0; i < nb_bufs; i++) {
+		struct rte_mbuf *m = bufs[i];
+
+		/* do VLAN tag insertion */
+		if (m->ol_flags & PKT_TX_VLAN_PKT) {
+			int error = rte_vlan_insert(&m);
+
+			if (unlikely(error)) {
+				rte_pktmbuf_free(m);
+				continue;
+			}
+		}
+
+		bufs[nb_send] = m;
+		++nb_send;
+	}
+
+	/* send packets to guest's RX queue */
+	while (nb_send) {
+		uint16_t nb_pkts;
+		uint16_t num = (uint16_t)RTE_MIN(nb_send,
+						 VHOST_MAX_PKT_BURST);
+
+		nb_pkts = vhost_ioat_enqueue_burst(dev, queue->ioat_vring,
+						   &bufs[nb_tx], num);
+
+		nb_tx += nb_pkts;
+		nb_send -= nb_pkts;
+		if (nb_pkts < num)
+			break;
+	}
+
+	queue->stats.pkts += nb_tx;
+	queue->stats.missed_pkts += nb_bufs - nb_tx;
+
+	for (i = 0; likely(i < nb_tx); i++)
+		queue->stats.bytes += bufs[i]->pkt_len;
+
+	vhost_update_packet_xstats(queue, bufs, nb_tx);
+
+	/**
+	 * According to RFC2863 page42 section ifHCOutMulticastPkts and
+	 * ifHCOutBroadcastPkts, the counters "multicast" and "broadcast"
+	 * are increased when packets are not transmitted successfully.
+	 */
+	for (i = nb_tx; i < nb_bufs; i++)
+		vhost_count_multicast_broadcast(queue, bufs[i]);
+out:
+	rte_atomic32_set(&queue->while_queuing, 0);
+
+	return nb_tx;
+}
+
+static int
+eth_dev_configure(struct rte_eth_dev *dev __rte_unused)
+{
+	struct pmd_internal *internal = dev->data->dev_private;
+	const struct rte_eth_rxmode *rxmode = &dev->data->dev_conf.rxmode;
+
+	internal->vlan_strip = !!(rxmode->offloads & DEV_RX_OFFLOAD_VLAN_STRIP);
+
+	return 0;
+}
+
+static inline struct internal_list *
+find_internal_resource(char *ifname)
+{
+	int found = 0;
+	struct internal_list *list;
+	struct pmd_internal *internal;
+
+	if (!ifname)
+		return NULL;
+
+	pthread_mutex_lock(&internal_list_lock);
+
+	TAILQ_FOREACH(list, &internal_list, next) {
+		internal = list->eth_dev->data->dev_private;
+		if (!strcmp(internal->iface_name, ifname)) {
+			found = 1;
+			break;
+		}
+	}
+
+	pthread_mutex_unlock(&internal_list_lock);
+
+	if (!found)
+		return NULL;
+
+	return list;
+}
+
+static int
+eth_rxq_intr_enable(struct rte_eth_dev *dev, uint16_t qid)
+{
+	struct vhost_queue *vq;
+
+	vq = dev->data->rx_queues[qid];
+	if (!vq) {
+		VHOST_LOG(ERR, "rxq%d is not setup yet\n", qid);
+		return -1;
+	}
+
+	VHOST_LOG(INFO, "Enable interrupt for rxq%d\n", qid);
+	vhost_ioat_enable_guest_notification(dev->data->dev_private,
+					     vq->ioat_vring, 1);
+	rte_wmb();
+
+	return 0;
+}
+
+static int
+eth_rxq_intr_disable(struct rte_eth_dev *dev, uint16_t qid)
+{
+	struct vhost_queue *vq;
+
+	vq = dev->data->rx_queues[qid];
+	if (!vq) {
+		VHOST_LOG(ERR, "rxq%d is not setup yet\n", qid);
+		return -1;
+	}
+
+	VHOST_LOG(INFO, "Disable interrupt for rxq%d\n", qid);
+	vhost_ioat_enable_guest_notification(dev->data->dev_private,
+					     vq->ioat_vring, 0);
+	rte_wmb();
+
+	return 0;
+}
+
+static void
+eth_vhost_uninstall_intr(struct rte_eth_dev *dev)
+{
+	struct rte_intr_handle *intr_handle = dev->intr_handle;
+
+	if (intr_handle) {
+		if (intr_handle->intr_vec)
+			free(intr_handle->intr_vec);
+		free(intr_handle);
+	}
+
+	dev->intr_handle = NULL;
+}
+
+static int
+eth_vhost_install_intr(struct rte_eth_dev *dev)
+{
+	struct rte_vhost_vring *vv;
+	struct vhost_queue *vq;
+	int count = 0;
+	int nb_rxq = dev->data->nb_rx_queues;
+	int i;
+
+	/* uninstall firstly if we are reconnecting */
+	if (dev->intr_handle)
+		eth_vhost_uninstall_intr(dev);
+
+	dev->intr_handle = malloc(sizeof(*dev->intr_handle));
+	if (!dev->intr_handle) {
+		VHOST_LOG(ERR, "Fail to allocate intr_handle\n");
+		return -ENOMEM;
+	}
+	memset(dev->intr_handle, 0, sizeof(*dev->intr_handle));
+
+	dev->intr_handle->efd_counter_size = sizeof(uint64_t);
+
+	dev->intr_handle->intr_vec =
+		malloc(nb_rxq * sizeof(dev->intr_handle->intr_vec[0]));
+
+	if (!dev->intr_handle->intr_vec) {
+		VHOST_LOG(ERR,
+			  "Failed to allocate memory for interrupt vector\n");
+		free(dev->intr_handle);
+		return -ENOMEM;
+	}
+
+	VHOST_LOG(INFO, "Prepare intr vec\n");
+	for (i = 0; i < nb_rxq; i++) {
+		vq = dev->data->rx_queues[i];
+		if (!vq) {
+			VHOST_LOG(INFO, "rxq-%d not setup yet, skip!\n", i);
+			continue;
+		}
+
+		vv = &vq->ioat_vring->vr;
+		if (vv->kickfd < 0) {
+			VHOST_LOG(INFO,
+				  "rxq-%d's kickfd is invalid, skip!\n", i);
+			continue;
+		}
+		dev->intr_handle->intr_vec[i] = RTE_INTR_VEC_RXTX_OFFSET + i;
+		dev->intr_handle->efds[i] = vv->kickfd;
+		count++;
+		VHOST_LOG(INFO, "Installed intr vec for rxq-%d\n", i);
+	}
+
+	dev->intr_handle->nb_efd = count;
+	dev->intr_handle->max_intr = count + 1;
+	dev->intr_handle->type = RTE_INTR_HANDLE_VDEV;
+
+	return 0;
+}
+
+static void
+update_queuing_status(struct rte_eth_dev *dev)
+{
+	struct pmd_internal *internal = dev->data->dev_private;
+	struct vhost_queue *vq;
+	unsigned int i;
+	int allow_queuing = 1;
+
+	if (!dev->data->rx_queues || !dev->data->tx_queues)
+		return;
+
+	if (rte_atomic32_read(&internal->started) == 0 ||
+	    rte_atomic32_read(&internal->dev_attached) == 0)
+		allow_queuing = 0;
+
+	/* wait until rx/tx_pkt_burst stops accessing vhost device */
+	for (i = 0; i < dev->data->nb_rx_queues; i++) {
+		vq = dev->data->rx_queues[i];
+		if (vq == NULL)
+			continue;
+		rte_atomic32_set(&vq->allow_queuing, allow_queuing);
+		while (rte_atomic32_read(&vq->while_queuing))
+			rte_pause();
+	}
+
+	for (i = 0; i < dev->data->nb_tx_queues; i++) {
+		vq = dev->data->tx_queues[i];
+		if (vq == NULL)
+			continue;
+		rte_atomic32_set(&vq->allow_queuing, allow_queuing);
+		while (rte_atomic32_read(&vq->while_queuing))
+			rte_pause();
+	}
+}
+
+static void
+queue_setup(struct rte_eth_dev *eth_dev, struct pmd_internal *internal)
+{
+	struct vhost_queue *vq;
+	int i;
+
+	for (i = 0; i < eth_dev->data->nb_rx_queues; i++) {
+		vq = eth_dev->data->rx_queues[i];
+		if (!vq)
+			continue;
+		vq->vid = internal->vid;
+		vq->internal = internal;
+		vq->port = eth_dev->data->port_id;
+		vq->ioat_vring = &internal->ioat_vrings[vq->virtqueue_id];
+	}
+	for (i = 0; i < eth_dev->data->nb_tx_queues; i++) {
+		vq = eth_dev->data->tx_queues[i];
+		if (!vq)
+			continue;
+		vq->vid = internal->vid;
+		vq->internal = internal;
+		vq->port = eth_dev->data->port_id;
+		vq->ioat_vring = &internal->ioat_vrings[vq->virtqueue_id];
+	}
+}
+
+static int
+new_device(int vid)
+{
+	struct rte_eth_dev *eth_dev;
+	struct internal_list *list;
+	struct pmd_internal *internal;
+	struct rte_eth_conf *dev_conf;
+	unsigned i;
+	char ifname[PATH_MAX];
+
+	rte_vhost_get_ifname(vid, ifname, sizeof(ifname));
+	list = find_internal_resource(ifname);
+	if (list == NULL) {
+		VHOST_LOG(INFO, "Invalid device name: %s\n", ifname);
+		return -1;
+	}
+
+	eth_dev = list->eth_dev;
+	internal = eth_dev->data->dev_private;
+	dev_conf = &eth_dev->data->dev_conf;
+
+	internal->vid = vid;
+
+	if (vhost_ioat_setup(internal) < 0) {
+		VHOST_LOG(ERR, "Failed to set up vring operations\n");
+		return -1;
+	}
+
+	if (rte_atomic32_read(&internal->started) == 1) {
+		queue_setup(eth_dev, internal);
+
+		if (dev_conf->intr_conf.rxq) {
+			if (eth_vhost_install_intr(eth_dev) < 0) {
+				VHOST_LOG(INFO, "Failed to install "
+					  "interrupt handler.");
+				return -1;
+			}
+		}
+	} else {
+		VHOST_LOG(INFO, "RX/TX queues not exist yet\n");
+	}
+
+	for (i = 0; i < rte_vhost_get_vring_num(vid); i++) {
+		vhost_ioat_enable_guest_notification(internal,
+						     &internal->ioat_vrings[i],
+						     0);
+	}
+
+	rte_vhost_get_mtu(vid, &eth_dev->data->mtu);
+
+	eth_dev->data->dev_link.link_status = ETH_LINK_UP;
+
+	rte_atomic32_set(&internal->dev_attached, 1);
+	update_queuing_status(eth_dev);
+
+	VHOST_LOG(INFO, "vHost I/OAT device %d created\n", vid);
+
+	_rte_eth_dev_callback_process(eth_dev, RTE_ETH_EVENT_INTR_LSC, NULL);
+
+	return 0;
+}
+
+static void
+destroy_device(int vid)
+{
+	struct rte_eth_dev *eth_dev;
+	struct pmd_internal *internal;
+	struct vhost_queue *vq;
+	struct internal_list *list;
+	char ifname[PATH_MAX];
+	unsigned i;
+
+	rte_vhost_get_ifname(vid, ifname, sizeof(ifname));
+	list = find_internal_resource(ifname);
+	if (list == NULL) {
+		VHOST_LOG(ERR, "Invalid interface name: %s\n", ifname);
+		return;
+	}
+	eth_dev = list->eth_dev;
+	internal = eth_dev->data->dev_private;
+
+	rte_atomic32_set(&internal->dev_attached, 0);
+	update_queuing_status(eth_dev);
+
+	eth_dev->data->dev_link.link_status = ETH_LINK_DOWN;
+
+	/**
+	 * Before destroy front end's information, we must guarantee
+	 * that RX/TX threads have stopped accessing queues.
+	 */
+	vhost_ioat_remove(internal);
+
+	if (eth_dev->data->rx_queues && eth_dev->data->tx_queues) {
+		for (i = 0; i < eth_dev->data->nb_rx_queues; i++) {
+			vq = eth_dev->data->rx_queues[i];
+			if (!vq)
+				continue;
+			vq->vid = -1;
+		}
+		for (i = 0; i < eth_dev->data->nb_tx_queues; i++) {
+			vq = eth_dev->data->tx_queues[i];
+			if (!vq)
+				continue;
+			vq->vid = -1;
+		}
+	}
+
+	VHOST_LOG(INFO, "vHost I/OAT device %d destroyed\n", vid);
+	eth_vhost_uninstall_intr(eth_dev);
+
+	_rte_eth_dev_callback_process(eth_dev, RTE_ETH_EVENT_INTR_LSC, NULL);
+}
+
+#define IOAT_RING_SIZE 1024
+
+static int
+vring_state_changed(int vid, uint16_t vring, int enable)
+{
+	struct rte_eth_dev *eth_dev;
+	struct internal_list *list;
+	char ifname[PATH_MAX];
+	struct pmd_internal *dev;
+	struct ioat_vring *ioat_vr;
+	struct rte_ioat_rawdev_config config;
+	struct rte_rawdev_info info = { .dev_private = &config };
+	char name[32];
+	uint16_t dev_id;
+	int ret;
+
+	rte_vhost_get_ifname(vid, ifname, sizeof(ifname));
+	list = find_internal_resource(ifname);
+	if (list == NULL) {
+		VHOST_LOG(ERR, "Invalid interface name: %s\n", ifname);
+		return -1;
+	}
+
+	VHOST_LOG(INFO, "vring %u is %s\n", vring,
+		  enable ? "enabled" : "disabled");
+
+	eth_dev = list->eth_dev;
+	_rte_eth_dev_callback_process(eth_dev, RTE_ETH_EVENT_QUEUE_STATE, NULL);
+
+	if (!enable)
+		return 0;
+
+	dev = eth_dev->data->dev_private;
+
+	/* assign a given I/OAT device to the queue. */
+	if (!dev->ioats[vring].is_valid)
+		return 0;
+
+	/**
+	 * a vring can only use one I/OAT device. If it has been
+	 * assigned one, return immediately.
+	 */
+	ioat_vr = &dev->ioat_vrings[vring];
+	if (ioat_vr->dma_enabled)
+		return 0;
+
+	/* configure the I/OAT device */
+	rte_pci_device_name(&dev->ioats[vring].addr, name, sizeof(name));
+
+	ret = rte_rawdev_get_dev_id(name);
+	if ((ret == (uint16_t)(-ENODEV)) || (ret == (uint16_t)(-EINVAL))) {
+		VHOST_LOG(ERR, "Cannot find the I/OAT %s.\n", name);
+		return -1;
+	}
+
+	dev_id = (uint16_t)ret;
+	rte_rawdev_info_get(dev_id, &info);
+	config.ring_size = IOAT_RING_SIZE;
+	if (rte_rawdev_configure(dev_id, &info) < 0) {
+		VHOST_LOG(ERR, "Config the I/OAT %s failed\n", name);
+		return -1;
+	}
+
+	rte_rawdev_start(dev_id);
+
+	memcpy(&ioat_vr->dma_addr, &dev->ioats[vring].addr,
+	       sizeof(struct rte_pci_addr));
+	ioat_vr->dev_id = dev_id;
+	ioat_vr->dma_enabled = true;
+	ioat_vr->nr_batching = 0;
+	ioat_vr->nr_inflight = 0;
+
+	VHOST_LOG(INFO, "Attach I/OAT %s for the TX queue %u of port %u\n",
+		  name, vring / VIRTIO_QNUM, eth_dev->data->port_id);
+	return 0;
+}
+
+static struct vhost_device_ops vhost_ops = {
+	.new_device          = new_device,
+	.destroy_device      = destroy_device,
+	.vring_state_changed = vring_state_changed,
+};
+
+static int
+eth_dev_start(struct rte_eth_dev *eth_dev)
+{
+	struct pmd_internal *internal = eth_dev->data->dev_private;
+	struct rte_eth_conf *dev_conf = &eth_dev->data->dev_conf;
+
+	queue_setup(eth_dev, internal);
+
+	if (rte_atomic32_read(&internal->dev_attached) == 1) {
+		if (dev_conf->intr_conf.rxq) {
+			if (eth_vhost_install_intr(eth_dev) < 0) {
+				VHOST_LOG(INFO, "Failed to install "
+					  "interrupt handler.");
+				return -1;
+			}
+		}
+	}
+
+	rte_atomic32_set(&internal->started, 1);
+	update_queuing_status(eth_dev);
+
+	return 0;
+}
+
+static void
+eth_dev_stop(struct rte_eth_dev *dev)
+{
+	struct pmd_internal *internal = dev->data->dev_private;
+
+	rte_atomic32_set(&internal->started, 0);
+	update_queuing_status(dev);
+}
+
+static void
+eth_dev_close(struct rte_eth_dev *dev)
+{
+	struct pmd_internal *internal;
+	struct internal_list *list;
+	unsigned int i;
+
+	internal = dev->data->dev_private;
+	if (!internal)
+		return;
+
+	eth_dev_stop(dev);
+
+	rte_vhost_driver_unregister(internal->iface_name);
+
+	list = find_internal_resource(internal->iface_name);
+	if (!list)
+		return;
+
+	pthread_mutex_lock(&internal_list_lock);
+	TAILQ_REMOVE(&internal_list, list, next);
+	pthread_mutex_unlock(&internal_list_lock);
+	rte_free(list);
+
+	if (dev->data->rx_queues)
+		for (i = 0; i < dev->data->nb_rx_queues; i++)
+			rte_free(dev->data->rx_queues[i]);
+
+	if (dev->data->tx_queues)
+		for (i = 0; i < dev->data->nb_tx_queues; i++)
+			rte_free(dev->data->tx_queues[i]);
+
+	free(internal->dev_name);
+	free(internal->iface_name);
+	rte_free(internal);
+
+	dev->data->dev_private = NULL;
+}
+
+static int
+eth_rx_queue_setup(struct rte_eth_dev *dev, uint16_t rx_queue_id,
+		   uint16_t nb_rx_desc __rte_unused,
+		   unsigned int socket_id,
+		   const struct rte_eth_rxconf *rx_conf __rte_unused,
+		   struct rte_mempool *mb_pool)
+{
+	struct vhost_queue *vq;
+
+	vq = rte_zmalloc_socket(NULL, sizeof(struct vhost_queue),
+				RTE_CACHE_LINE_SIZE, socket_id);
+	if (vq == NULL) {
+		VHOST_LOG(ERR, "Failed to allocate memory for rx queue\n");
+		return -ENOMEM;
+	}
+
+	vq->mb_pool = mb_pool;
+	vq->virtqueue_id = rx_queue_id * VIRTIO_QNUM + VIRTIO_TXQ;
+	dev->data->rx_queues[rx_queue_id] = vq;
+
+	return 0;
+}
+
+static int
+eth_tx_queue_setup(struct rte_eth_dev *dev, uint16_t tx_queue_id,
+		   uint16_t nb_tx_desc __rte_unused,
+		   unsigned int socket_id,
+		   const struct rte_eth_txconf *tx_conf __rte_unused)
+{
+	struct vhost_queue *vq;
+
+	vq = rte_zmalloc_socket(NULL, sizeof(struct vhost_queue),
+				RTE_CACHE_LINE_SIZE, socket_id);
+	if (vq == NULL) {
+		VHOST_LOG(ERR, "Failed to allocate memory for tx queue\n");
+		return -ENOMEM;
+	}
+
+	vq->virtqueue_id = tx_queue_id * VIRTIO_QNUM + VIRTIO_RXQ;
+	dev->data->tx_queues[tx_queue_id] = vq;
+
+	return 0;
+}
+
+static void
+eth_dev_info(struct rte_eth_dev *dev,
+	     struct rte_eth_dev_info *dev_info)
+{
+	struct pmd_internal *internal;
+
+	internal = dev->data->dev_private;
+	if (internal == NULL) {
+		VHOST_LOG(ERR, "Invalid device specified\n");
+		return;
+	}
+
+	dev_info->max_mac_addrs = 1;
+	dev_info->max_rx_pktlen = (uint32_t)-1;
+	dev_info->max_rx_queues = internal->max_queues;
+	dev_info->max_tx_queues = internal->max_queues;
+	dev_info->min_rx_bufsize = 0;
+
+	dev_info->tx_offload_capa = DEV_TX_OFFLOAD_MULTI_SEGS |
+				DEV_TX_OFFLOAD_VLAN_INSERT;
+	dev_info->rx_offload_capa = DEV_RX_OFFLOAD_VLAN_STRIP;
+}
+
+static int
+eth_stats_get(struct rte_eth_dev *dev, struct rte_eth_stats *stats)
+{
+	unsigned i;
+	unsigned long rx_total = 0, tx_total = 0;
+	unsigned long rx_total_bytes = 0, tx_total_bytes = 0;
+	struct vhost_queue *vq;
+
+	for (i = 0; i < RTE_ETHDEV_QUEUE_STAT_CNTRS &&
+	     i < dev->data->nb_rx_queues; i++) {
+		if (dev->data->rx_queues[i] == NULL)
+			continue;
+		vq = dev->data->rx_queues[i];
+		stats->q_ipackets[i] = vq->stats.pkts;
+		rx_total += stats->q_ipackets[i];
+
+		stats->q_ibytes[i] = vq->stats.bytes;
+		rx_total_bytes += stats->q_ibytes[i];
+	}
+
+	for (i = 0; i < RTE_ETHDEV_QUEUE_STAT_CNTRS &&
+	     i < dev->data->nb_tx_queues; i++) {
+		if (dev->data->tx_queues[i] == NULL)
+			continue;
+		vq = dev->data->tx_queues[i];
+		stats->q_opackets[i] = vq->stats.pkts;
+		tx_total += stats->q_opackets[i];
+
+		stats->q_obytes[i] = vq->stats.bytes;
+		tx_total_bytes += stats->q_obytes[i];
+	}
+
+	stats->ipackets = rx_total;
+	stats->opackets = tx_total;
+	stats->ibytes = rx_total_bytes;
+	stats->obytes = tx_total_bytes;
+
+	return 0;
+}
+
+static void
+eth_stats_reset(struct rte_eth_dev *dev)
+{
+	struct vhost_queue *vq;
+	unsigned i;
+
+	for (i = 0; i < dev->data->nb_rx_queues; i++) {
+		if (dev->data->rx_queues[i] == NULL)
+			continue;
+		vq = dev->data->rx_queues[i];
+		vq->stats.pkts = 0;
+		vq->stats.bytes = 0;
+	}
+	for (i = 0; i < dev->data->nb_tx_queues; i++) {
+		if (dev->data->tx_queues[i] == NULL)
+			continue;
+		vq = dev->data->tx_queues[i];
+		vq->stats.pkts = 0;
+		vq->stats.bytes = 0;
+		vq->stats.missed_pkts = 0;
+	}
+}
+
+static void
+eth_queue_release(void *q)
+{
+	rte_free(q);
+}
+
+static int
+eth_tx_done_cleanup(void *txq __rte_unused, uint32_t free_cnt __rte_unused)
+{
+	/**
+	 * vHost does not hang onto mbuf. eth_vhost_tx() copies packet data
+	 * and releases mbuf, so nothing to cleanup.
+	 */
+	return 0;
+}
+
+static int
+eth_link_update(struct rte_eth_dev *dev __rte_unused,
+		int wait_to_complete __rte_unused)
+{
+	return 0;
+}
+
+static uint32_t
+eth_rx_queue_count(struct rte_eth_dev *dev, uint16_t rx_queue_id)
+{
+	struct vhost_queue *vq;
+
+	vq = dev->data->rx_queues[rx_queue_id];
+	if (unlikely(vq == NULL))
+		return 0;
+
+	return rte_vhost_rx_queue_count(vq->vid, vq->virtqueue_id);
+}
+
+static const struct eth_dev_ops ops = {
+	.dev_start = eth_dev_start,
+	.dev_stop = eth_dev_stop,
+	.dev_close = eth_dev_close,
+	.dev_configure = eth_dev_configure,
+	.dev_infos_get = eth_dev_info,
+	.rx_queue_setup = eth_rx_queue_setup,
+	.tx_queue_setup = eth_tx_queue_setup,
+	.rx_queue_release = eth_queue_release,
+	.tx_queue_release = eth_queue_release,
+	.tx_done_cleanup = eth_tx_done_cleanup,
+	.rx_queue_count = eth_rx_queue_count,
+	.link_update = eth_link_update,
+	.stats_get = eth_stats_get,
+	.stats_reset = eth_stats_reset,
+	.xstats_reset = vhost_dev_xstats_reset,
+	.xstats_get = vhost_dev_xstats_get,
+	.xstats_get_names = vhost_dev_xstats_get_names,
+	.rx_queue_intr_enable = eth_rxq_intr_enable,
+	.rx_queue_intr_disable = eth_rxq_intr_disable,
+};
+
+static int
+eth_dev_vhost_create(struct rte_vdev_device *dev, char *iface_name,
+		     int16_t queues, const unsigned int numa_node,
+		     uint64_t flags, struct ioat_info *ioats)
+{
+	const char *name = rte_vdev_device_name(dev);
+	struct rte_eth_dev_data *data;
+	struct pmd_internal *internal = NULL;
+	struct rte_eth_dev *eth_dev = NULL;
+	struct rte_ether_addr *eth_addr = NULL;
+	struct internal_list *list = NULL;
+
+	VHOST_LOG(INFO, "Creating vHost I/OAT backend on numa socket %u\n",
+		  numa_node);
+
+	list = rte_zmalloc_socket(name, sizeof(*list), 0, numa_node);
+	if (list == NULL)
+		goto error;
+
+	/* reserve an ethdev entry */
+	eth_dev = rte_eth_vdev_allocate(dev, sizeof(*internal));
+	if (eth_dev == NULL)
+		goto error;
+	data = eth_dev->data;
+
+	eth_addr = rte_zmalloc_socket(name, sizeof(*eth_addr), 0, numa_node);
+	if (eth_addr == NULL)
+		goto error;
+	data->mac_addrs = eth_addr;
+	*eth_addr = base_eth_addr;
+	eth_addr->addr_bytes[5] = eth_dev->data->port_id;
+
+	/**
+	 * now put it all together
+	 * - store queue data in internal,
+	 * - point eth_dev_data to internal
+	 * - and point eth_dev structure to new eth_dev_data structure
+	 */
+	internal = eth_dev->data->dev_private;
+	internal->dev_name = strdup(name);
+	if (internal->dev_name == NULL)
+		goto error;
+	internal->iface_name = strdup(iface_name);
+	if (internal->iface_name == NULL)
+		goto error;
+
+	list->eth_dev = eth_dev;
+	pthread_mutex_lock(&internal_list_lock);
+	TAILQ_INSERT_TAIL(&internal_list, list, next);
+	pthread_mutex_unlock(&internal_list_lock);
+
+	data->nb_rx_queues = queues;
+	data->nb_tx_queues = queues;
+	internal->max_queues = queues;
+	internal->vid = -1;
+
+	memcpy(internal->ioats, ioats, sizeof(struct ioat_info) * 2 *
+	       RTE_MAX_QUEUES_PER_PORT);
+
+	data->dev_link = pmd_link;
+	data->dev_flags = RTE_ETH_DEV_INTR_LSC | RTE_ETH_DEV_CLOSE_REMOVE;
+
+	eth_dev->dev_ops = &ops;
+
+	/* assign rx and tx ops */
+	eth_dev->rx_pkt_burst = eth_vhost_rx;
+	eth_dev->tx_pkt_burst = eth_vhost_tx;
+
+	if (rte_vhost_driver_register(iface_name, flags))
+		goto error;
+
+	if (rte_vhost_driver_disable_features(iface_name,
+					      VHOST_IOAT_UNSUPPORTED_FEATURES) <
+	    0)
+		goto error;
+
+	if (rte_vhost_driver_callback_register(iface_name, &vhost_ops) < 0) {
+		VHOST_LOG(ERR, "Can't register callbacks\n");
+		goto error;
+	}
+
+	if (rte_vhost_driver_start(iface_name) < 0) {
+		VHOST_LOG(ERR, "Failed to start driver for %s\n", iface_name);
+		goto error;
+	}
+
+	rte_eth_dev_probing_finish(eth_dev);
+	return data->port_id;
+
+error:
+	if (internal) {
+		free(internal->iface_name);
+		free(internal->dev_name);
+	}
+	rte_eth_dev_release_port(eth_dev);
+	rte_free(list);
+
+	return -1;
+}
+
+static inline int
+open_iface(const char *key __rte_unused, const char *value, void *extra_args)
+{
+	const char **iface_name = extra_args;
+
+	if (value == NULL)
+		return -1;
+
+	*iface_name = value;
+
+	return 0;
+}
+
+struct ioat_info_input {
+	struct ioat_info ioats[RTE_MAX_QUEUES_PER_PORT * 2];
+	uint16_t nr;
+};
+
+static inline int
+open_ioat(const char *key __rte_unused, const char *value, void *extra_args)
+{
+	struct ioat_info_input *ioat_info = extra_args;
+	char *input = strndup(value, strlen(value) + 1);
+	char *addrs = input;
+	char *ptrs[2];
+	char *start, *end, *substr;
+	int64_t qid, vring_id;
+	int ret = 0;
+
+	while (isblank(*addrs))
+		addrs++;
+	if (addrs == '\0') {
+		VHOST_LOG(ERR, "No input I/OAT address\n");
+		ret = -1;
+		goto out;
+	}
+
+	/* process single I/OAT device */
+	if (*addrs != '(') {
+		rte_strsplit(addrs, strlen(addrs), ptrs, 2, '@');
+
+		start = strstr(ptrs[0], "txq");
+		if (start == NULL) {
+			VHOST_LOG(ERR, "We only support I/OAT for TX "
+				  "queues currently\n");
+			ret = -1;
+			goto out;
+		}
+
+		start += 3;
+		qid = strtol(start, &end, 0);
+		if (end == start) {
+			VHOST_LOG(ERR, "No input queue ID\n");
+			ret = -1;
+			goto out;
+		}
+
+		vring_id = qid * 2 + VIRTIO_RXQ;
+		rte_pci_addr_parse(ptrs[1], &ioat_info->ioats[vring_id].addr);
+		ioat_info->ioats[vring_id].is_valid = true;
+		ioat_info->nr++;
+		goto out;
+	}
+
+	/* process multiple I/OAT devices within bracket. */
+	addrs++;
+	substr = strtok(addrs, ";");
+	if (!substr) {
+		VHOST_LOG(ERR, "No input I/OAT addresse\n");
+		ret = -1;
+		goto out;
+	}
+
+	do {
+		rte_strsplit(substr, strlen(substr), ptrs, 2, '@');
+
+		start = strstr(ptrs[0], "txq");
+		if (start == NULL) {
+			VHOST_LOG(ERR, "We only support I/OAT for TX queues\n");
+			ret = -1;
+			goto out;
+		}
+
+		start += 3;
+		qid = strtol(start, &end, 0);
+		if (end == start) {
+			VHOST_LOG(ERR, "No input queue ID\n");
+			ret = -1;
+			goto out;
+		}
+
+		vring_id = qid * 2 + VIRTIO_RXQ;
+		rte_pci_addr_parse(ptrs[1], &ioat_info->ioats[vring_id].addr);
+		ioat_info->ioats[vring_id].is_valid = true;
+		ioat_info->nr++;
+
+		substr = strtok(NULL, ";)");
+	} while (substr);
+
+out:
+	free(input);
+	return ret;
+}
+
+static inline int
+open_int(const char *key __rte_unused, const char *value, void *extra_args)
+{
+	uint16_t *n = extra_args;
+
+	if (value == NULL || extra_args == NULL)
+		return -EINVAL;
+
+	*n = (uint16_t)strtoul(value, NULL, 0);
+	if (*n == USHRT_MAX && errno == ERANGE)
+		return -1;
+
+	return 0;
+}
+
+static int
+rte_pmd_vhost_ioat_probe(struct rte_vdev_device *dev)
+{
+	struct rte_kvargs *kvlist = NULL;
+	int ret = 0;
+	char *iface_name;
+	uint16_t queues;
+	uint64_t flags = 0;
+	int client_mode = 0;
+	struct rte_eth_dev *eth_dev;
+	const char *name = rte_vdev_device_name(dev);
+	struct ioat_info_input ioat_info = { 0 };
+
+	VHOST_LOG(INFO, "Initializing vhost I/OAT for %s\n", name);
+
+	if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
+		eth_dev = rte_eth_dev_attach_secondary(name);
+		if (!eth_dev) {
+			VHOST_LOG(ERR, "Failed to probe %s\n", name);
+			return -1;
+		}
+		/* TODO: request info from primary to set up Rx and Tx */
+		eth_dev->dev_ops = &ops;
+		eth_dev->device = &dev->device;
+		rte_eth_dev_probing_finish(eth_dev);
+		return 0;
+	}
+
+	kvlist = rte_kvargs_parse(rte_vdev_device_args(dev), valid_arguments);
+	if (kvlist == NULL)
+		return -1;
+
+	if (rte_kvargs_count(kvlist, ETH_VHOST_IFACE_ARG) == 1) {
+		ret = rte_kvargs_process(kvlist, ETH_VHOST_IFACE_ARG,
+					 &open_iface, &iface_name);
+		if (ret < 0)
+			goto out_free;
+	} else {
+		ret = -1;
+		goto out_free;
+	}
+
+	if (rte_kvargs_count(kvlist, ETH_VHOST_QUEUES_ARG) == 1) {
+		ret = rte_kvargs_process(kvlist, ETH_VHOST_QUEUES_ARG,
+					 &open_int, &queues);
+		if (ret < 0 || queues > RTE_MAX_QUEUES_PER_PORT)
+			goto out_free;
+
+	} else {
+		queues = 1;
+	}
+
+	if (rte_kvargs_count(kvlist, ETH_VHOST_CLIENT_ARG) == 1) {
+		ret = rte_kvargs_process(kvlist, ETH_VHOST_CLIENT_ARG,
+					 &open_int, &client_mode);
+		if (ret < 0)
+			goto out_free;
+
+		if (client_mode)
+			flags |= RTE_VHOST_USER_CLIENT;
+	}
+
+	if (rte_kvargs_count(kvlist, ETH_VHOST_IOAT_ARG) == 1) {
+		ret = rte_kvargs_process(kvlist, ETH_VHOST_IOAT_ARG,
+					 &open_ioat, &ioat_info);
+		if (ret < 0)
+			goto out_free;
+
+		if (ioat_info.nr > 0)
+			flags |= RTE_VHOST_USER_DMA_COPY;
+	}
+
+	/* vhost I/OAT device is in the same NUMA node as the core. */
+	if (dev->device.numa_node == SOCKET_ID_ANY)
+		dev->device.numa_node = rte_socket_id();
+
+	eth_dev_vhost_create(dev, iface_name, queues, dev->device.numa_node,
+			     flags, ioat_info.ioats);
+
+out_free:
+	rte_kvargs_free(kvlist);
+	return ret;
+}
+
+static int
+rte_pmd_vhost_ioat_remove(struct rte_vdev_device *dev)
+{
+	const char *name;
+	struct rte_eth_dev *eth_dev = NULL;
+
+	name = rte_vdev_device_name(dev);
+	VHOST_LOG(INFO, "Un-Initializing pmd_vhost for %s\n", name);
+
+	/* find an ethdev entry */
+	eth_dev = rte_eth_dev_allocated(name);
+	if (eth_dev == NULL)
+		return 0;
+
+	if (rte_eal_process_type() != RTE_PROC_PRIMARY)
+		return rte_eth_dev_release_port(eth_dev);
+
+	eth_dev_close(eth_dev);
+
+	rte_eth_dev_release_port(eth_dev);
+
+	return 0;
+}
+
+static struct rte_vdev_driver pmd_vhost_ioat_drv = {
+	.probe = rte_pmd_vhost_ioat_probe,
+	.remove = rte_pmd_vhost_ioat_remove,
+};
+
+RTE_PMD_REGISTER_VDEV(net_ioat_vhost, pmd_vhost_ioat_drv);
+RTE_PMD_REGISTER_ALIAS(net_ioat_vhost, ioat_vhost);
+RTE_PMD_REGISTER_PARAM_STRING(net_ioat_vhost,
+	"iface=<ifc> "
+	"queues=<int> "
+	"client=<0|1> "
+	"ioats=(txq0@addr0;txq1@addr1...)");
+
+RTE_INIT(vhost_ioat_init_log)
+{
+	vhost_ioat_logtype = rte_log_register("vhost_ioat");
+	if (vhost_ioat_logtype >= 0)
+		rte_log_set_level(vhost_ioat_logtype, RTE_LOG_NOTICE);
+}
diff --git a/drivers/net/vhost_ioat/eth_vhost.h b/drivers/net/vhost_ioat/eth_vhost.h
new file mode 100644
index 0000000..d2cab99
--- /dev/null
+++ b/drivers/net/vhost_ioat/eth_vhost.h
@@ -0,0 +1,255 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Intel Corporation
+ */
+#ifndef _ETH_VHOST_IOAT_H_
+#define _ETH_VHOST_IOAT_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdint.h>
+#include <stdbool.h>
+
+#include <rte_pci.h>
+#include <rte_vhost.h>
+#include <rte_log.h>
+
+#ifndef VIRTIO_F_IOMMU_PLATFORM
+#define VIRTIO_F_IOMMU_PLATFORM 33
+#endif
+
+#ifndef VIRTIO_F_RING_PACKED
+#define VIRTIO_F_RING_PACKED 34
+#endif
+
+#define VHOST_IOAT_UNSUPPORTED_FEATURES ((1ULL << VHOST_F_LOG_ALL)	| \
+		(1ULL << VIRTIO_F_IOMMU_PLATFORM)	| \
+		(1ULL << VIRTIO_F_RING_PACKED))
+
+#define VHOST_MAX_PKT_BURST 32
+
+/* batching size before a I/OAT kick */
+#define IOAT_BATCHING_SIZE 8
+/**
+ * copy length threshold for the I/OAT. We offload copy jobs whose
+ * lengths are greater than IOAT_COPY_LENGTH_THRESHOLD to the I/OAT; for
+ * small copies, we still use the CPU to perform copies, due to startup
+ * overheads associated with the I/OAT.
+ */
+#define IOAT_COPY_LENGTH_THRESHOLD 1024
+
+extern int vhost_ioat_logtype;
+
+#define VHOST_LOG(level, fmt, args...) rte_log(RTE_LOG_ ## level,	\
+		vhost_ioat_logtype, "VHOST_IOAT: " fmt, ## args)
+
+#define vhost_avail_event(vr) \
+	(*(volatile uint16_t*)&(vr)->used->ring[(vr)->size])
+#define vhost_used_event(vr) \
+	(*(volatile uint16_t*)&(vr)->avail->ring[(vr)->size])
+
+enum vhost_xstats_pkts {
+	VHOST_UNDERSIZE_PKT = 0,
+	VHOST_64_PKT,
+	VHOST_65_TO_127_PKT,
+	VHOST_128_TO_255_PKT,
+	VHOST_256_TO_511_PKT,
+	VHOST_512_TO_1023_PKT,
+	VHOST_1024_TO_1522_PKT,
+	VHOST_1523_TO_MAX_PKT,
+	VHOST_BROADCAST_PKT,
+	VHOST_MULTICAST_PKT,
+	VHOST_UNICAST_PKT,
+	VHOST_ERRORS_PKT,
+	VHOST_ERRORS_FRAGMENTED,
+	VHOST_ERRORS_JABBER,
+	VHOST_UNKNOWN_PROTOCOL,
+	VHOST_XSTATS_MAX,
+};
+
+struct vhost_stats {
+	uint64_t pkts;
+	uint64_t bytes;
+	uint64_t missed_pkts;
+	uint64_t xstats[VHOST_XSTATS_MAX];
+};
+
+struct batch_copy_elem {
+	void *dst;
+	void *src;
+	uint32_t len;
+};
+
+struct guest_page {
+	uint64_t guest_phys_addr;
+	uint64_t host_phys_addr;
+	uint64_t size;
+};
+
+struct ioat_vring {
+	struct rte_vhost_vring  vr;
+
+	uint16_t last_avail_idx;
+	uint16_t last_used_idx;
+
+	/* the last used index that front end can consume */
+	uint16_t copy_done_used;
+
+	uint16_t signalled_used;
+	bool signalled_used_valid;
+
+	struct vring_used_elem *shadow_used_split;
+	uint16_t shadow_used_idx;
+
+	struct batch_copy_elem  *batch_copy_elems;
+	uint16_t batch_copy_nb_elems;
+
+	bool dma_enabled;
+	/* I/OAT rawdev ID */
+	uint16_t dev_id;
+	/* I/OAT address */
+	struct rte_pci_addr dma_addr;
+	/**
+	 * the number of copy jobs submitted to the I/OAT but may not
+	 * be completed
+	 */
+	uint64_t nr_inflight;
+	int nr_batching;
+
+	/* host physical address of the index of used ring */
+	phys_addr_t used_idx_hpa;
+
+	struct ring_index *indices;
+	uint16_t max_indices;
+};
+
+struct vhost_queue {
+	int vid;
+	rte_atomic32_t allow_queuing;
+	rte_atomic32_t while_queuing;
+	struct pmd_internal *internal;
+	struct rte_mempool *mb_pool;
+	uint16_t port;
+	uint16_t virtqueue_id;
+	struct vhost_stats stats;
+	struct ioat_vring *ioat_vring;
+};
+
+struct ioat_info {
+	struct rte_pci_addr addr;
+	bool is_valid;
+};
+
+struct pmd_internal {
+	rte_atomic32_t dev_attached;
+	rte_atomic32_t started;
+	char *dev_name;
+	char *iface_name;
+	uint16_t max_queues;
+	int vid;
+	uint8_t vlan_strip;
+
+	struct ioat_info ioats[RTE_MAX_QUEUES_PER_PORT * 2];
+
+	/* guest's memory regions */
+	struct rte_vhost_memory *mem;
+	/* address mapping table of guest and host physical addresses */
+	struct guest_page *guest_pages;
+	uint32_t nr_guest_pages;
+	uint32_t max_guest_pages;
+
+	/* guest's vrings */
+	struct ioat_vring ioat_vrings[RTE_MAX_QUEUES_PER_PORT * 2];
+	size_t hdr_len;
+	/* the number of vrings */
+	uint16_t nr_vrings;
+	/* negotiated features */
+	uint64_t features;
+};
+
+static inline void
+vhost_enable_notify_split(struct pmd_internal *dev, struct ioat_vring *ioat_vr,
+			  int enable)
+{
+	struct rte_vhost_vring *vr = &ioat_vr->vr;
+
+	if (!(dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX))) {
+		if (enable)
+			vr->used->flags &= ~VRING_USED_F_NO_NOTIFY;
+		else
+			vr->used->flags |= VRING_USED_F_NO_NOTIFY;
+	} else {
+		if (enable)
+			vhost_avail_event(vr) = ioat_vr->last_avail_idx;
+	}
+}
+
+/* This function is to enable front end to notify backend. */
+static inline void
+vhost_ioat_enable_guest_notification(struct pmd_internal *dev,
+				     struct ioat_vring *ioat_vr, int enable)
+{
+	vhost_enable_notify_split(dev, ioat_vr, enable);
+}
+
+/**
+ * This function gets front end's memory and vrings information.
+ * In addition, it sets up necessary data structures for enqueue
+ * and dequeue operations.
+ */
+int vhost_ioat_setup(struct pmd_internal *dev);
+
+/**
+ * This function destroys front end's information and frees data
+ * structures for enqueue and dequeue operations.
+ */
+void vhost_ioat_remove(struct pmd_internal *dev);
+
+/**
+ * This function sends packet buffers to front end's RX vring.
+ * It will free the mbufs of successfully transmitted packets.
+ *
+ * @param dev
+ *  vhost ioat device
+ * @param ioat_vr
+ *  a front end's RX vring
+ * @param pkts
+ *  packets to send
+ * @param count
+ *  the number of packets to send
+ *
+ * @return
+ *  the number of packets successfully sent
+ */
+uint16_t vhost_ioat_enqueue_burst(struct pmd_internal *dev,
+				  struct ioat_vring *ioat_vr,
+				  struct rte_mbuf **pkts, uint32_t count);
+
+/**
+ * This function gets packet buffers from front end's TX virtqueue.
+ *
+ * @param dev
+ *  vhost ioat device
+ * @param ioat_vr
+ *  a front-end's TX vring
+ * @param mbuf_pool
+ *  mempool for allocating mbufs for received packets
+ * @param pkts
+ *  pointer array used to keep addresses of received packets
+ * @param count
+ *  the element number in 'pkts'
+ *
+ * @return
+ *  the number of packets successfully received
+ */
+uint16_t vhost_ioat_dequeue_burst(struct pmd_internal *dev,
+				  struct ioat_vring *ioat_vr,
+				  struct rte_mempool *mbuf_pool,
+				  struct rte_mbuf **pkts, uint16_t count);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _ETH_VHOST_IOAT_H_ */
diff --git a/drivers/net/vhost_ioat/internal.h b/drivers/net/vhost_ioat/internal.h
new file mode 100644
index 0000000..08591b3
--- /dev/null
+++ b/drivers/net/vhost_ioat/internal.h
@@ -0,0 +1,225 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Intel Corporation
+ */
+#ifndef _INTERNAL_H_
+#define _INTERNAL_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include "eth_vhost.h"
+
+struct buf_vector {
+	uint64_t buf_iova;
+	uint64_t buf_addr;
+	uint32_t buf_len;
+	uint32_t desc_idx;
+};
+
+#define BUF_VECTOR_MAX 256
+
+struct ring_index {
+	/* physical address of 'data' */
+	uintptr_t pa;
+	uintptr_t idx;
+	uint16_t data;
+	bool in_use;
+} __rte_cache_aligned;
+
+static __rte_always_inline int
+setup_indices(struct ring_index **indices, uint16_t num)
+{
+	struct ring_index *array;
+	uint16_t i;
+
+	array = rte_zmalloc(NULL, sizeof(struct ring_index) * num, 0);
+	if (!array) {
+		printf("Init indices failed\n");
+		*indices = NULL;
+		return -1;
+	}
+
+	for (i = 0; i < num; i++) {
+		array[i].pa = rte_mem_virt2iova(&array[i].data);
+		array[i].idx = i;
+	}
+
+	*indices = array;
+	return 0;
+}
+
+static __rte_always_inline void
+destroy_indices(struct ring_index **indices)
+{
+	if (!indices)
+		return;
+	rte_free(*indices);
+	*indices = NULL;
+}
+
+static __rte_always_inline struct ring_index *
+get_empty_indices(struct ring_index *indices, uint16_t num)
+{
+	uint16_t i;
+
+	for (i = 0; i < num; i++) {
+		if (!indices[i].in_use)
+			break;
+	}
+
+	if (unlikely(i == num))
+		return NULL;
+
+	indices[i].in_use = true;
+	return &indices[i];
+}
+
+static __rte_always_inline void
+put_used_indices(struct ring_index *indices, uint16_t idx)
+{
+	indices[idx].in_use = false;
+}
+
+static uint64_t
+get_blk_size(int fd)
+{
+	struct stat stat;
+	int ret;
+
+	ret = fstat(fd, &stat);
+	return ret == -1 ? (uint64_t)-1 : (uint64_t)stat.st_blksize;
+}
+
+static __rte_always_inline int
+add_one_guest_page(struct pmd_internal *dev, uint64_t guest_phys_addr,
+		   uint64_t host_phys_addr, uint64_t size)
+{
+	struct guest_page *page, *last_page;
+	struct guest_page *old_pages;
+
+	if (dev->nr_guest_pages == dev->max_guest_pages) {
+		dev->max_guest_pages *= 2;
+		old_pages = dev->guest_pages;
+		dev->guest_pages = realloc(dev->guest_pages,
+					   dev->max_guest_pages *
+					   sizeof(*page));
+		if (!dev->guest_pages) {
+			VHOST_LOG(ERR, "Cannot realloc guest_pages\n");
+			free(old_pages);
+			return -1;
+		}
+	}
+
+	if (dev->nr_guest_pages > 0) {
+		last_page = &dev->guest_pages[dev->nr_guest_pages - 1];
+		/* merge if the two pages are continuous */
+		if (host_phys_addr == last_page->host_phys_addr +
+		    last_page->size) {
+			last_page->size += size;
+			return 0;
+		}
+	}
+
+	page = &dev->guest_pages[dev->nr_guest_pages++];
+	page->guest_phys_addr = guest_phys_addr;
+	page->host_phys_addr  = host_phys_addr;
+	page->size = size;
+
+	return 0;
+}
+
+static __rte_always_inline int
+add_guest_page(struct pmd_internal *dev, struct rte_vhost_mem_region *reg)
+{
+	uint64_t reg_size = reg->size;
+	uint64_t host_user_addr  = reg->host_user_addr;
+	uint64_t guest_phys_addr = reg->guest_phys_addr;
+	uint64_t host_phys_addr;
+	uint64_t size, page_size;
+
+	page_size = get_blk_size(reg->fd);
+	if (page_size == (uint64_t)-1) {
+		VHOST_LOG(ERR, "Cannot get hugepage size through fstat\n");
+		return -1;
+	}
+
+	host_phys_addr = rte_mem_virt2iova((void *)(uintptr_t)host_user_addr);
+	size = page_size - (guest_phys_addr & (page_size - 1));
+	size = RTE_MIN(size, reg_size);
+
+	if (add_one_guest_page(dev, guest_phys_addr, host_phys_addr, size) < 0)
+		return -1;
+
+	host_user_addr  += size;
+	guest_phys_addr += size;
+	reg_size -= size;
+
+	while (reg_size > 0) {
+		size = RTE_MIN(reg_size, page_size);
+		host_phys_addr = rte_mem_virt2iova((void *)(uintptr_t)
+						   host_user_addr);
+		if (add_one_guest_page(dev, guest_phys_addr, host_phys_addr,
+				       size) < 0)
+			return -1;
+
+		host_user_addr  += size;
+		guest_phys_addr += size;
+		reg_size -= size;
+	}
+
+	return 0;
+}
+
+static __rte_always_inline int
+setup_guest_pages(struct pmd_internal *dev, struct rte_vhost_memory *mem)
+{
+	uint32_t nr_regions = mem->nregions;
+	uint32_t i;
+
+	dev->nr_guest_pages = 0;
+	dev->max_guest_pages = 8;
+
+	dev->guest_pages = malloc(dev->max_guest_pages *
+			sizeof(struct guest_page));
+	if (dev->guest_pages == NULL) {
+		VHOST_LOG(ERR, "(%d) failed to allocate memory "
+			  "for dev->guest_pages\n", dev->vid);
+		return -1;
+	}
+
+	for (i = 0; i < nr_regions; i++) {
+		if (add_guest_page(dev, &mem->regions[i]) < 0)
+			return -1;
+	}
+	return 0;
+}
+
+static __rte_always_inline rte_iova_t
+gpa_to_hpa(struct pmd_internal *dev, uint64_t gpa, uint64_t size)
+{
+	uint32_t i;
+	struct guest_page *page;
+
+	for (i = 0; i < dev->nr_guest_pages; i++) {
+		page = &dev->guest_pages[i];
+
+		if (gpa >= page->guest_phys_addr &&
+		    gpa + size < page->guest_phys_addr + page->size) {
+			return gpa - page->guest_phys_addr +
+			       page->host_phys_addr;
+		}
+	}
+
+	return 0;
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _INTERNAL_H_ */
diff --git a/drivers/net/vhost_ioat/virtio_net.c b/drivers/net/vhost_ioat/virtio_net.c
new file mode 100644
index 0000000..65a4757
--- /dev/null
+++ b/drivers/net/vhost_ioat/virtio_net.c
@@ -0,0 +1,1243 @@
+#include <stdint.h>
+#include <stdbool.h>
+#include <linux/virtio_net.h>
+
+#include <rte_mbuf.h>
+#include <rte_malloc.h>
+#include <rte_memcpy.h>
+#include <rte_ethdev.h>
+#include <rte_vhost.h>
+#include <rte_rawdev.h>
+#include <rte_ioat_rawdev.h>
+#include <rte_log.h>
+
+#include "eth_vhost.h"
+#include "internal.h"
+
+#define MAX_BATCH_LEN 256
+
+static __rte_always_inline bool
+vq_is_packed(struct pmd_internal *dev)
+{
+	return dev->features & (1ull << VIRTIO_F_RING_PACKED);
+}
+
+static __rte_always_inline int
+vhost_need_event(uint16_t event_idx, uint16_t new_idx, uint16_t old)
+{
+	return (uint16_t)(new_idx - event_idx - 1) < (uint16_t)(new_idx - old);
+}
+
+static __rte_always_inline void
+vhost_vring_call_split(struct pmd_internal *dev, struct ioat_vring *ioat_vr)
+{
+	struct rte_vhost_vring *vr = &ioat_vr->vr;
+
+	/* flush used->idx update before we read avail->flags. */
+	rte_smp_mb();
+
+	if (dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) {
+		uint16_t old = ioat_vr->signalled_used;
+		uint16_t new = ioat_vr->copy_done_used;
+		bool signalled_used_valid = ioat_vr->signalled_used_valid;
+
+		ioat_vr->signalled_used = new;
+		ioat_vr->signalled_used_valid = true;
+
+		VHOST_LOG(DEBUG, "%s: used_event_idx=%d, old=%d, new=%d\n",
+			  __func__, vhost_used_event(vr), old, new);
+
+		if ((vhost_need_event(vhost_used_event(vr), new, old) &&
+		     (vr->callfd >= 0)) || unlikely(!signalled_used_valid))
+			eventfd_write(vr->callfd, (eventfd_t)1);
+	} else {
+		if (!(vr->avail->flags & VRING_AVAIL_F_NO_INTERRUPT) &&
+		    (vr->callfd >= 0))
+			eventfd_write(vr->callfd, (eventfd_t)1);
+	}
+}
+
+/* notify front-end of enqueued packets */
+static __rte_always_inline void
+vhost_ioat_vring_call(struct pmd_internal *dev, struct ioat_vring *ioat_vr)
+{
+	vhost_vring_call_split(dev, ioat_vr);
+}
+
+static int
+process_ioat_completed(struct pmd_internal *dev, struct ioat_vring *ioat_vr)
+{
+	uintptr_t flags[255], tmps[255];
+	int dma_done, i;
+	uint16_t used_idx;
+
+	dma_done = rte_ioat_completed_copies(ioat_vr->dev_id, 255, flags,
+					     tmps);
+	if (unlikely(dma_done <= 0))
+		return dma_done;
+
+	ioat_vr->nr_inflight -= dma_done;
+	for (i = 0; i < dma_done; i++) {
+		if ((uint64_t)flags[i] >= ioat_vr->max_indices) {
+			/* the I/OAT finishes a packet copy job. */
+			struct rte_mbuf *pkt = (struct rte_mbuf *)flags[i];
+
+			rte_mbuf_refcnt_update(pkt, -1);
+			if (rte_mbuf_refcnt_read(pkt) == 1)
+				rte_pktmbuf_free(pkt);
+		} else {
+			/**
+			 * the I/OAT finishes updating index of the
+			 * used ring.
+			 */
+			uint16_t id = flags[i];
+
+			used_idx = ioat_vr->indices[id].data;
+			VHOST_LOG(DEBUG, "I/OAT finishes updating index %u "
+				  "for the used ring.\n", used_idx);
+
+			ioat_vr->copy_done_used = used_idx;
+			vhost_ioat_vring_call(dev, ioat_vr);
+			put_used_indices(ioat_vr->indices, id);
+		}
+	}
+	return dma_done;
+}
+
+static  __rte_always_inline bool
+rxvq_is_mergeable(struct pmd_internal *dev)
+{
+	return dev->features & (1ULL << VIRTIO_NET_F_MRG_RXBUF);
+}
+
+static __rte_always_inline void
+do_flush_shadow_used_ring_split(struct ioat_vring *ioat_vr, uint16_t to,
+				uint16_t from, uint16_t size)
+{
+	rte_memcpy(&ioat_vr->vr.used->ring[to],
+		   &ioat_vr->shadow_used_split[from],
+		   size * sizeof(struct vring_used_elem));
+}
+
+static __rte_always_inline void
+flush_shadow_used_ring_split(struct pmd_internal *dev,
+			     struct ioat_vring *ioat_vr)
+{
+	uint16_t used_idx = ioat_vr->last_used_idx & (ioat_vr->vr.size - 1);
+
+	if (used_idx + ioat_vr->shadow_used_idx <= ioat_vr->vr.size) {
+		do_flush_shadow_used_ring_split(ioat_vr, used_idx, 0,
+						ioat_vr->shadow_used_idx);
+	} else {
+		uint16_t size;
+
+		/* update used ring interval [used_idx, vr->size] */
+		size = ioat_vr->vr.size - used_idx;
+		do_flush_shadow_used_ring_split(ioat_vr, used_idx, 0, size);
+
+		/* update the left half used ring interval [0, left_size] */
+		do_flush_shadow_used_ring_split(ioat_vr, 0, size,
+						ioat_vr->shadow_used_idx -
+						size);
+	}
+	ioat_vr->last_used_idx += ioat_vr->shadow_used_idx;
+
+	rte_smp_wmb();
+
+	if (ioat_vr->dma_enabled && ioat_vr->nr_inflight > 0) {
+		struct ring_index *index;
+
+		index = get_empty_indices(ioat_vr->indices,
+					  ioat_vr->max_indices);
+
+		index->data = ioat_vr->last_used_idx;
+		while (unlikely(rte_ioat_enqueue_copy(ioat_vr->dev_id,
+						      index->pa,
+						      ioat_vr->used_idx_hpa,
+						      sizeof(uint16_t),
+						      index->idx, 0, 0) ==
+				0)) {
+			int ret;
+
+			do {
+				ret = process_ioat_completed(dev, ioat_vr);
+			} while (ret <= 0);
+		}
+		ioat_vr->nr_batching++;
+		ioat_vr->nr_inflight++;
+	} else {
+		/**
+		 * we update index of used ring when all previous copy
+		 * jobs are completed.
+		 *
+		 * without enabling I/OAT copy, the CPU performs all memory
+		 * copy operations. In this case, the CPU is in charge of
+		 * updating the index of used ring.
+		 *
+		 * with enabling I/OAT copy, if there are outstanding copy
+		 * jobs of the I/OAT, to avoid the I/OAT overwriting the
+		 * write of the CPU, the I/OAT is in charge of updating
+		 * the index of used ring.
+		 */
+		*(volatile uint16_t *)&ioat_vr->vr.used->idx +=
+			ioat_vr->shadow_used_idx;
+		ioat_vr->copy_done_used += ioat_vr->shadow_used_idx;
+	}
+
+	ioat_vr->shadow_used_idx = 0;
+}
+
+static __rte_always_inline void
+update_shadow_used_ring_split(struct ioat_vring *ioat_vr,
+			      uint16_t desc_idx, uint32_t len)
+{
+	uint16_t i = ioat_vr->shadow_used_idx++;
+
+	ioat_vr->shadow_used_split[i].id  = desc_idx;
+	ioat_vr->shadow_used_split[i].len = len;
+}
+
+static inline void
+do_data_copy(struct ioat_vring *ioat_vr)
+{
+	struct batch_copy_elem *elem = ioat_vr->batch_copy_elems;
+	uint16_t count = ioat_vr->batch_copy_nb_elems;
+	int i;
+
+	for (i = 0; i < count; i++)
+		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
+
+	ioat_vr->batch_copy_nb_elems = 0;
+}
+
+#define ASSIGN_UNLESS_EQUAL(var, val) do {	\
+	if ((var) != (val))			\
+		(var) = (val);			\
+} while (0)
+
+static __rte_always_inline void
+virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
+{
+	uint64_t csum_l4 = m_buf->ol_flags & PKT_TX_L4_MASK;
+
+	if (m_buf->ol_flags & PKT_TX_TCP_SEG)
+		csum_l4 |= PKT_TX_TCP_CKSUM;
+
+	if (csum_l4) {
+		net_hdr->flags = VIRTIO_NET_HDR_F_NEEDS_CSUM;
+		net_hdr->csum_start = m_buf->l2_len + m_buf->l3_len;
+
+		switch (csum_l4) {
+		case PKT_TX_TCP_CKSUM:
+			net_hdr->csum_offset = (offsetof(struct rte_tcp_hdr,
+						cksum));
+			break;
+		case PKT_TX_UDP_CKSUM:
+			net_hdr->csum_offset = (offsetof(struct rte_udp_hdr,
+						dgram_cksum));
+			break;
+		case PKT_TX_SCTP_CKSUM:
+			net_hdr->csum_offset = (offsetof(struct rte_sctp_hdr,
+						cksum));
+			break;
+		}
+	} else {
+		ASSIGN_UNLESS_EQUAL(net_hdr->csum_start, 0);
+		ASSIGN_UNLESS_EQUAL(net_hdr->csum_offset, 0);
+		ASSIGN_UNLESS_EQUAL(net_hdr->flags, 0);
+	}
+
+	/* IP cksum verification cannot be bypassed, then calculate here */
+	if (m_buf->ol_flags & PKT_TX_IP_CKSUM) {
+		struct rte_ipv4_hdr *ipv4_hdr;
+
+		ipv4_hdr = rte_pktmbuf_mtod_offset(m_buf, struct rte_ipv4_hdr *,
+						   m_buf->l2_len);
+		ipv4_hdr->hdr_checksum = rte_ipv4_cksum(ipv4_hdr);
+	}
+
+	if (m_buf->ol_flags & PKT_TX_TCP_SEG) {
+		if (m_buf->ol_flags & PKT_TX_IPV4)
+			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV4;
+		else
+			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV6;
+		net_hdr->gso_size = m_buf->tso_segsz;
+		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len
+					+ m_buf->l4_len;
+	} else if (m_buf->ol_flags & PKT_TX_UDP_SEG) {
+		net_hdr->gso_type = VIRTIO_NET_HDR_GSO_UDP;
+		net_hdr->gso_size = m_buf->tso_segsz;
+		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len +
+			m_buf->l4_len;
+	} else {
+		ASSIGN_UNLESS_EQUAL(net_hdr->gso_type, 0);
+		ASSIGN_UNLESS_EQUAL(net_hdr->gso_size, 0);
+		ASSIGN_UNLESS_EQUAL(net_hdr->hdr_len, 0);
+	}
+}
+
+static __rte_always_inline void *
+vhost_alloc_copy_ind_table(struct pmd_internal *dev, uint64_t desc_addr,
+			   uint64_t desc_len)
+{
+	void *idesc;
+	uint64_t src, dst;
+	uint64_t len, remain = desc_len;
+
+	idesc = rte_malloc(NULL, desc_len, 0);
+	if (unlikely(!idesc))
+		return NULL;
+
+	dst = (uint64_t)(uintptr_t)idesc;
+
+	while (remain) {
+		len = remain;
+		src = rte_vhost_va_from_guest_pa(dev->mem, desc_addr, &len);
+		if (unlikely(!src || !len)) {
+			rte_free(idesc);
+			return NULL;
+		}
+
+		rte_memcpy((void *)(uintptr_t)dst, (void *)(uintptr_t)src,
+			   len);
+
+		remain -= len;
+		dst += len;
+		desc_addr += len;
+	}
+
+	return idesc;
+}
+
+static __rte_always_inline void
+free_ind_table(void *idesc)
+{
+	rte_free(idesc);
+}
+
+static __rte_always_inline int
+map_one_desc(struct pmd_internal *dev, struct buf_vector *buf_vec,
+	     uint16_t *vec_idx, uint64_t desc_iova, uint64_t desc_len)
+{
+	uint16_t vec_id = *vec_idx;
+
+	while (desc_len) {
+		uint64_t desc_addr;
+		uint64_t desc_chunck_len = desc_len;
+
+		if (unlikely(vec_id >= BUF_VECTOR_MAX))
+			return -1;
+
+		desc_addr = rte_vhost_va_from_guest_pa(dev->mem, desc_iova,
+						       &desc_chunck_len);
+		if (unlikely(!desc_addr))
+			return -1;
+
+		rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+		buf_vec[vec_id].buf_iova = desc_iova;
+		buf_vec[vec_id].buf_addr = desc_addr;
+		buf_vec[vec_id].buf_len  = desc_chunck_len;
+
+		desc_len -= desc_chunck_len;
+		desc_iova += desc_chunck_len;
+		vec_id++;
+	}
+	*vec_idx = vec_id;
+
+	return 0;
+}
+
+static __rte_always_inline int
+fill_vec_buf_split(struct pmd_internal *dev, struct ioat_vring *ioat_vr,
+		   uint32_t avail_idx, uint16_t *vec_idx,
+		   struct buf_vector *buf_vec, uint16_t *desc_chain_head,
+		   uint32_t *desc_chain_len)
+{
+	struct rte_vhost_vring *vr = &ioat_vr->vr;
+	uint16_t idx = vr->avail->ring[avail_idx & (vr->size - 1)];
+	uint16_t vec_id = *vec_idx;
+	uint32_t len    = 0;
+	uint64_t dlen;
+	uint32_t nr_descs = vr->size;
+	uint32_t cnt    = 0;
+	struct vring_desc *descs = vr->desc;
+	struct vring_desc *idesc = NULL;
+
+	if (unlikely(idx >= vr->size))
+		return -1;
+
+	*desc_chain_head = idx;
+
+	if (vr->desc[idx].flags & VRING_DESC_F_INDIRECT) {
+		dlen = vr->desc[idx].len;
+		nr_descs = dlen / sizeof(struct vring_desc);
+		if (unlikely(nr_descs > vr->size))
+			return -1;
+
+		descs = (struct vring_desc *)(uintptr_t)
+			rte_vhost_va_from_guest_pa(dev->mem,
+						   vr->desc[idx].addr, &dlen);
+		if (unlikely(!descs))
+			return -1;
+
+		if (unlikely(dlen < vr->desc[idx].len)) {
+			/**
+			 * the indirect desc table is not contiguous
+			 * in process VA space, we have to copy it.
+			 */
+			idesc = vhost_alloc_copy_ind_table(dev,
+							   vr->desc[idx].addr,
+							   vr->desc[idx].len);
+			if (unlikely(!idesc))
+				return -1;
+
+			descs = idesc;
+		}
+
+		idx = 0;
+	}
+
+	while (1) {
+		if (unlikely(idx >= nr_descs || cnt++ >= nr_descs)) {
+			free_ind_table(idesc);
+			return -1;
+		}
+
+		len += descs[idx].len;
+
+		if (unlikely(map_one_desc(dev, buf_vec, &vec_id,
+					  descs[idx].addr, descs[idx].len))) {
+			free_ind_table(idesc);
+			return -1;
+		}
+
+		if ((descs[idx].flags & VRING_DESC_F_NEXT) == 0)
+			break;
+
+		idx = descs[idx].next;
+	}
+
+	*desc_chain_len = len;
+	*vec_idx = vec_id;
+
+	if (unlikely(!!idesc))
+		free_ind_table(idesc);
+
+	return 0;
+}
+
+static inline int
+reserve_avail_buf_split(struct pmd_internal *dev, struct ioat_vring *ioat_vr,
+			uint32_t size, struct buf_vector *buf_vec,
+			uint16_t *num_buffers, uint16_t avail_head,
+			uint16_t *nr_vec)
+{
+	struct rte_vhost_vring *vr = &ioat_vr->vr;
+
+	uint16_t cur_idx;
+	uint16_t vec_idx = 0;
+	uint16_t max_tries, tries = 0;
+	uint16_t head_idx = 0;
+	uint32_t len = 0;
+
+	*num_buffers = 0;
+	cur_idx  = ioat_vr->last_avail_idx;
+
+	if (rxvq_is_mergeable(dev))
+		max_tries = vr->size - 1;
+	else
+		max_tries = 1;
+
+	while (size > 0) {
+		if (unlikely(cur_idx == avail_head))
+			return -1;
+		/**
+		 * if we tried all available ring items, and still
+		 * can't get enough buf, it means something abnormal
+		 * happened.
+		 */
+		if (unlikely(++tries > max_tries))
+			return -1;
+
+		if (unlikely(fill_vec_buf_split(dev, ioat_vr, cur_idx,
+						&vec_idx, buf_vec,
+						&head_idx, &len) < 0))
+			return -1;
+		len = RTE_MIN(len, size);
+		update_shadow_used_ring_split(ioat_vr, head_idx, len);
+		size -= len;
+
+		cur_idx++;
+		*num_buffers += 1;
+	}
+
+	*nr_vec = vec_idx;
+
+	return 0;
+}
+
+static __rte_noinline void
+copy_vnet_hdr_to_desc(struct pmd_internal *dev, struct buf_vector *buf_vec,
+		      struct virtio_net_hdr_mrg_rxbuf *hdr)
+{
+	uint64_t len;
+	uint64_t remain = dev->hdr_len;
+	uint64_t src = (uint64_t)(uintptr_t)hdr, dst;
+	uint64_t iova = buf_vec->buf_iova;
+
+	while (remain) {
+		len = RTE_MIN(remain,
+			      buf_vec->buf_len);
+		dst = buf_vec->buf_addr;
+		rte_memcpy((void *)(uintptr_t)dst, (void *)(uintptr_t)src,
+			   len);
+
+		remain -= len;
+		iova += len;
+		src += len;
+		buf_vec++;
+	}
+}
+
+static __rte_always_inline int
+copy_mbuf_to_desc(struct pmd_internal *dev, struct ioat_vring *ioat_vr,
+		  struct rte_mbuf *m, struct buf_vector *buf_vec,
+		  uint16_t nr_vec, uint16_t num_buffers, bool *copy_done)
+{
+	uint32_t vec_idx = 0;
+	uint32_t mbuf_offset, mbuf_avail;
+	uint32_t buf_offset, buf_avail;
+	uint64_t buf_addr, buf_iova, buf_len;
+	uint32_t cpy_len;
+	uint64_t hdr_addr;
+	struct rte_mbuf *hdr_mbuf;
+	struct batch_copy_elem *batch_copy = ioat_vr->batch_copy_elems;
+	struct virtio_net_hdr_mrg_rxbuf tmp_hdr, *hdr = NULL;
+	uint64_t dst, src;
+	int error = 0;
+
+	if (unlikely(m == NULL)) {
+		error = -1;
+		goto out;
+	}
+
+	*copy_done = true;
+
+	buf_addr = buf_vec[vec_idx].buf_addr;
+	buf_iova = buf_vec[vec_idx].buf_iova;
+	buf_len = buf_vec[vec_idx].buf_len;
+
+	if (unlikely(buf_len < dev->hdr_len && nr_vec <= 1)) {
+		error = -1;
+		goto out;
+	}
+
+	hdr_mbuf = m;
+	hdr_addr = buf_addr;
+	if (unlikely(buf_len < dev->hdr_len))
+		hdr = &tmp_hdr;
+	else
+		hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)hdr_addr;
+
+	VHOST_LOG(DEBUG, "(%d) RX: num merge buffers %d\n", dev->vid,
+		  num_buffers);
+
+	if (unlikely(buf_len < dev->hdr_len)) {
+		buf_offset = dev->hdr_len - buf_len;
+		vec_idx++;
+		buf_addr = buf_vec[vec_idx].buf_addr;
+		buf_iova = buf_vec[vec_idx].buf_iova;
+		buf_len = buf_vec[vec_idx].buf_len;
+		buf_avail = buf_len - buf_offset;
+	} else {
+		buf_offset = dev->hdr_len;
+		buf_avail = buf_len - dev->hdr_len;
+	}
+
+	mbuf_avail  = rte_pktmbuf_data_len(m);
+	mbuf_offset = 0;
+	while (mbuf_avail != 0 || m->next != NULL) {
+		bool dma_copy = false;
+
+		/* done with current buf, get the next one */
+		if (buf_avail == 0) {
+			vec_idx++;
+			if (unlikely(vec_idx >= nr_vec)) {
+				error = -1;
+				goto out;
+			}
+
+			buf_addr = buf_vec[vec_idx].buf_addr;
+			buf_iova = buf_vec[vec_idx].buf_iova;
+			buf_len = buf_vec[vec_idx].buf_len;
+
+			buf_offset = 0;
+			buf_avail  = buf_len;
+		}
+
+		/* done with current mbuf, get the next one */
+		if (mbuf_avail == 0) {
+			m = m->next;
+
+			mbuf_offset = 0;
+			mbuf_avail  = rte_pktmbuf_data_len(m);
+		}
+
+		if (hdr_addr) {
+			virtio_enqueue_offload(hdr_mbuf, &hdr->hdr);
+			if (rxvq_is_mergeable(dev))
+				ASSIGN_UNLESS_EQUAL(hdr->num_buffers,
+						    num_buffers);
+
+			if (unlikely(hdr == &tmp_hdr))
+				copy_vnet_hdr_to_desc(dev, buf_vec, hdr);
+			hdr_addr = 0;
+		}
+
+		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
+
+		if (ioat_vr->dma_enabled && cpy_len >
+		    IOAT_COPY_LENGTH_THRESHOLD) {
+			dst = gpa_to_hpa(dev, buf_iova + buf_offset, cpy_len);
+			dma_copy = (dst != 0);
+		}
+
+		if (dma_copy) {
+			src = rte_pktmbuf_iova_offset(m, mbuf_offset);
+
+			/**
+			 * if I/OAT enqueue fails, we wait until there are
+			 * available I/OAT descriptors.
+			 */
+			while (unlikely(rte_ioat_enqueue_copy(ioat_vr->dev_id,
+							      src, dst, cpy_len,
+							      (uintptr_t)
+							      hdr_mbuf, 0, 0) ==
+					0)) {
+				int ret;
+
+				do {
+					ret = process_ioat_completed(dev,
+								     ioat_vr);
+				} while (ret <= 0);
+			}
+
+			ioat_vr->nr_batching++;
+			ioat_vr->nr_inflight++;
+			rte_mbuf_refcnt_update(hdr_mbuf, 1);
+			*copy_done = false;
+		} else if (likely(cpy_len > MAX_BATCH_LEN ||
+				  ioat_vr->batch_copy_nb_elems >=
+				  ioat_vr->vr.size)) {
+			rte_memcpy((void *)((uintptr_t)(buf_addr + buf_offset)),
+				   rte_pktmbuf_mtod_offset(m, void *,
+							   mbuf_offset),
+				   cpy_len);
+		} else {
+			batch_copy[ioat_vr->batch_copy_nb_elems].dst =
+				(void *)((uintptr_t)(buf_addr + buf_offset));
+			batch_copy[ioat_vr->batch_copy_nb_elems].src =
+				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
+			batch_copy[ioat_vr->batch_copy_nb_elems].len = cpy_len;
+			ioat_vr->batch_copy_nb_elems++;
+		}
+
+		mbuf_avail  -= cpy_len;
+		mbuf_offset += cpy_len;
+		buf_avail  -= cpy_len;
+		buf_offset += cpy_len;
+	}
+
+out:
+
+	return error;
+}
+
+static __rte_always_inline uint16_t
+vhost_ioat_enqueue_split(struct pmd_internal *dev, struct ioat_vring *ioat_vr,
+			 struct rte_mbuf **pkts, uint32_t count)
+{
+	struct rte_vhost_vring *vr = &ioat_vr->vr;
+
+	uint32_t pkt_idx = 0;
+	uint16_t num_buffers;
+	struct buf_vector buf_vec[BUF_VECTOR_MAX];
+	uint16_t avail_head;
+
+	struct rte_mbuf *done_pkts[VHOST_MAX_PKT_BURST];
+	uint32_t i, nr_done = 0;
+	bool copy_done;
+
+	if (ioat_vr->dma_enabled && ioat_vr->nr_inflight > 0)
+		process_ioat_completed(dev, ioat_vr);
+
+	avail_head = *((volatile uint16_t *)&vr->avail->idx);
+
+	/**
+	 * the ordering between avail index and
+	 * desc reads needs to be enforced.
+	 */
+	rte_smp_rmb();
+
+	rte_prefetch0(&vr->avail->ring[ioat_vr->last_avail_idx &
+			(vr->size - 1)]);
+
+	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
+		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->hdr_len;
+		uint16_t nr_vec = 0;
+
+		if (unlikely(reserve_avail_buf_split(dev, ioat_vr, pkt_len,
+						     buf_vec, &num_buffers,
+						     avail_head, &nr_vec) <
+			     0)) {
+			VHOST_LOG(INFO,
+				  "(%d) failed to get enough desc from vring\n",
+				  dev->vid);
+			ioat_vr->shadow_used_idx -= num_buffers;
+			break;
+		}
+
+		VHOST_LOG(DEBUG, "(%d) current index %d | end index %d\n",
+			  dev->vid, ioat_vr->last_avail_idx,
+			  ioat_vr->last_avail_idx + num_buffers);
+
+		if (copy_mbuf_to_desc(dev, ioat_vr, pkts[pkt_idx],
+				      buf_vec, nr_vec, num_buffers,
+				      &copy_done) < 0) {
+			ioat_vr->shadow_used_idx -= num_buffers;
+			break;
+		}
+
+		if (copy_done)
+			done_pkts[nr_done++] = pkts[pkt_idx];
+
+		if (ioat_vr->dma_enabled &&
+		    ioat_vr->nr_batching > IOAT_BATCHING_SIZE) {
+			/**
+			 * kick the I/OAT to do copy once the number of
+			 * batching jobs reaches the batching threshold.
+			 */
+			rte_ioat_do_copies(ioat_vr->dev_id);
+			ioat_vr->nr_batching = 0;
+		}
+
+		ioat_vr->last_avail_idx += num_buffers;
+	}
+
+	do_data_copy(ioat_vr);
+
+	if (likely(ioat_vr->shadow_used_idx)) {
+		flush_shadow_used_ring_split(dev, ioat_vr);
+		vhost_ioat_vring_call(dev, ioat_vr);
+	}
+
+	if (ioat_vr->dma_enabled && ioat_vr->nr_batching > 0) {
+		rte_ioat_do_copies(ioat_vr->dev_id);
+		ioat_vr->nr_batching = 0;
+	}
+
+	/* free copy-done packets */
+	for (i = 0; i < nr_done; i++)
+		rte_pktmbuf_free(done_pkts[i]);
+
+	return pkt_idx;
+}
+
+uint16_t
+vhost_ioat_enqueue_burst(struct pmd_internal *dev, struct ioat_vring *ioat_vr,
+			 struct rte_mbuf **pkts, uint32_t count)
+{
+	return vhost_ioat_enqueue_split(dev, ioat_vr, pkts, count);
+}
+
+static inline bool
+virtio_net_with_host_offload(struct pmd_internal *dev)
+{
+	if (dev->features &
+			((1ULL << VIRTIO_NET_F_CSUM) |
+			 (1ULL << VIRTIO_NET_F_HOST_ECN) |
+			 (1ULL << VIRTIO_NET_F_HOST_TSO4) |
+			 (1ULL << VIRTIO_NET_F_HOST_TSO6) |
+			 (1ULL << VIRTIO_NET_F_HOST_UFO)))
+		return true;
+
+	return false;
+}
+
+static void
+parse_ethernet(struct rte_mbuf *m, uint16_t *l4_proto, void **l4_hdr)
+{
+	struct rte_ipv4_hdr *ipv4_hdr;
+	struct rte_ipv6_hdr *ipv6_hdr;
+	void *l3_hdr = NULL;
+	struct rte_ether_hdr *eth_hdr;
+	uint16_t ethertype;
+
+	eth_hdr = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
+
+	m->l2_len = sizeof(struct rte_ether_hdr);
+	ethertype = rte_be_to_cpu_16(eth_hdr->ether_type);
+
+	if (ethertype == RTE_ETHER_TYPE_VLAN) {
+		struct rte_vlan_hdr *vlan_hdr =
+			(struct rte_vlan_hdr *)(eth_hdr + 1);
+
+		m->l2_len += sizeof(struct rte_vlan_hdr);
+		ethertype = rte_be_to_cpu_16(vlan_hdr->eth_proto);
+	}
+
+	l3_hdr = (char *)eth_hdr + m->l2_len;
+
+	switch (ethertype) {
+	case RTE_ETHER_TYPE_IPV4:
+		ipv4_hdr = l3_hdr;
+		*l4_proto = ipv4_hdr->next_proto_id;
+		m->l3_len = (ipv4_hdr->version_ihl & 0x0f) * 4;
+		*l4_hdr = (char *)l3_hdr + m->l3_len;
+		m->ol_flags |= PKT_TX_IPV4;
+		break;
+	case RTE_ETHER_TYPE_IPV6:
+		ipv6_hdr = l3_hdr;
+		*l4_proto = ipv6_hdr->proto;
+		m->l3_len = sizeof(struct rte_ipv6_hdr);
+		*l4_hdr = (char *)l3_hdr + m->l3_len;
+		m->ol_flags |= PKT_TX_IPV6;
+		break;
+	default:
+		m->l3_len = 0;
+		*l4_proto = 0;
+		*l4_hdr = NULL;
+		break;
+	}
+}
+
+static __rte_always_inline void
+vhost_dequeue_offload(struct virtio_net_hdr *hdr, struct rte_mbuf *m)
+{
+	uint16_t l4_proto = 0;
+	void *l4_hdr = NULL;
+	struct rte_tcp_hdr *tcp_hdr = NULL;
+
+	if (hdr->flags == 0 && hdr->gso_type == VIRTIO_NET_HDR_GSO_NONE)
+		return;
+
+	parse_ethernet(m, &l4_proto, &l4_hdr);
+	if (hdr->flags == VIRTIO_NET_HDR_F_NEEDS_CSUM) {
+		if (hdr->csum_start == (m->l2_len + m->l3_len)) {
+			switch (hdr->csum_offset) {
+			case (offsetof(struct rte_tcp_hdr, cksum)):
+				if (l4_proto == IPPROTO_TCP)
+					m->ol_flags |= PKT_TX_TCP_CKSUM;
+				break;
+			case (offsetof(struct rte_udp_hdr, dgram_cksum)):
+				if (l4_proto == IPPROTO_UDP)
+					m->ol_flags |= PKT_TX_UDP_CKSUM;
+				break;
+			case (offsetof(struct rte_sctp_hdr, cksum)):
+				if (l4_proto == IPPROTO_SCTP)
+					m->ol_flags |= PKT_TX_SCTP_CKSUM;
+				break;
+			default:
+				break;
+			}
+		}
+	}
+
+	if (l4_hdr && hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
+		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
+		case VIRTIO_NET_HDR_GSO_TCPV4:
+		case VIRTIO_NET_HDR_GSO_TCPV6:
+			tcp_hdr = l4_hdr;
+			m->ol_flags |= PKT_TX_TCP_SEG;
+			m->tso_segsz = hdr->gso_size;
+			m->l4_len = (tcp_hdr->data_off & 0xf0) >> 2;
+			break;
+		case VIRTIO_NET_HDR_GSO_UDP:
+			m->ol_flags |= PKT_TX_UDP_SEG;
+			m->tso_segsz = hdr->gso_size;
+			m->l4_len = sizeof(struct rte_udp_hdr);
+			break;
+		default:
+			VHOST_LOG(WARNING,
+				  "unsupported gso type %u.\n", hdr->gso_type);
+			break;
+		}
+	}
+}
+
+static __rte_noinline void
+copy_vnet_hdr_from_desc(struct virtio_net_hdr *hdr, struct buf_vector *buf_vec)
+{
+	uint64_t len;
+	uint64_t remain = sizeof(struct virtio_net_hdr);
+	uint64_t src;
+	uint64_t dst = (uint64_t)(uintptr_t)hdr;
+
+	while (remain) {
+		len = RTE_MIN(remain, buf_vec->buf_len);
+		src = buf_vec->buf_addr;
+		rte_memcpy((void *)(uintptr_t)dst, (void *)(uintptr_t)src,
+			   len);
+
+		remain -= len;
+		dst += len;
+		buf_vec++;
+	}
+}
+
+static __rte_always_inline int
+copy_desc_to_mbuf(struct pmd_internal *dev, struct ioat_vring *ioat_vr,
+		  struct buf_vector *buf_vec, uint16_t nr_vec,
+		  struct rte_mbuf *m, struct rte_mempool *mbuf_pool)
+{
+	uint32_t buf_avail, buf_offset;
+	uint64_t buf_addr, buf_iova, buf_len;
+	uint32_t mbuf_avail, mbuf_offset;
+	uint32_t cpy_len;
+	struct rte_mbuf *cur = m, *prev = m;
+	struct virtio_net_hdr tmp_hdr;
+	struct virtio_net_hdr *hdr = NULL;
+	/* a counter to avoid desc dead loop chain */
+	uint16_t vec_idx = 0;
+	struct batch_copy_elem *batch_copy = ioat_vr->batch_copy_elems;
+	int error = 0;
+
+	buf_addr = buf_vec[vec_idx].buf_addr;
+	buf_iova = buf_vec[vec_idx].buf_iova;
+	buf_len = buf_vec[vec_idx].buf_len;
+
+	if (unlikely(buf_len < dev->hdr_len && nr_vec <= 1)) {
+		error = -1;
+		goto out;
+	}
+
+	if (virtio_net_with_host_offload(dev)) {
+		if (unlikely(buf_len < sizeof(struct virtio_net_hdr))) {
+			/**
+			 * no luck, the virtio-net header doesn't fit
+			 * in a contiguous virtual area.
+			 */
+			copy_vnet_hdr_from_desc(&tmp_hdr, buf_vec);
+			hdr = &tmp_hdr;
+		} else {
+			hdr = (struct virtio_net_hdr *)((uintptr_t)buf_addr);
+		}
+	}
+
+	/**
+	 * a virtio driver normally uses at least 2 desc buffers
+	 * for Tx: the first for storing the header, and others
+	 * for storing the data.
+	 */
+	if (unlikely(buf_len < dev->hdr_len)) {
+		buf_offset = dev->hdr_len - buf_len;
+		vec_idx++;
+		buf_addr = buf_vec[vec_idx].buf_addr;
+		buf_iova = buf_vec[vec_idx].buf_iova;
+		buf_len = buf_vec[vec_idx].buf_len;
+		buf_avail  = buf_len - buf_offset;
+	} else if (buf_len == dev->hdr_len) {
+		if (unlikely(++vec_idx >= nr_vec))
+			goto out;
+		buf_addr = buf_vec[vec_idx].buf_addr;
+		buf_iova = buf_vec[vec_idx].buf_iova;
+		buf_len = buf_vec[vec_idx].buf_len;
+
+		buf_offset = 0;
+		buf_avail = buf_len;
+	} else {
+		buf_offset = dev->hdr_len;
+		buf_avail = buf_vec[vec_idx].buf_len - dev->hdr_len;
+	}
+
+	mbuf_offset = 0;
+	mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
+	while (1) {
+		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
+		(void)buf_iova;
+
+		if (cpy_len > MAX_BATCH_LEN || ioat_vr->batch_copy_nb_elems >=
+		    ioat_vr->vr.size || (hdr && cur == m)) {
+			rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *,
+							   mbuf_offset),
+				   (void *)((uintptr_t)(buf_addr + buf_offset)),
+				   cpy_len);
+		} else {
+			batch_copy[ioat_vr->batch_copy_nb_elems].dst =
+				rte_pktmbuf_mtod_offset(cur, void *,
+							mbuf_offset);
+			batch_copy[ioat_vr->batch_copy_nb_elems].src =
+				(void *)((uintptr_t)(buf_addr + buf_offset));
+			batch_copy[ioat_vr->batch_copy_nb_elems].len = cpy_len;
+			ioat_vr->batch_copy_nb_elems++;
+		}
+
+		mbuf_avail  -= cpy_len;
+		mbuf_offset += cpy_len;
+		buf_avail -= cpy_len;
+		buf_offset += cpy_len;
+
+		/* this buf reaches to its end, get the next one */
+		if (buf_avail == 0) {
+			if (++vec_idx >= nr_vec)
+				break;
+
+			buf_addr = buf_vec[vec_idx].buf_addr;
+			buf_iova = buf_vec[vec_idx].buf_iova;
+			buf_len = buf_vec[vec_idx].buf_len;
+
+			buf_offset = 0;
+			buf_avail  = buf_len;
+		}
+
+		/**
+		 * this mbuf reaches to its end, get a new one
+		 * to hold more data.
+		 */
+		if (mbuf_avail == 0) {
+			cur = rte_pktmbuf_alloc(mbuf_pool);
+			if (unlikely(cur == NULL)) {
+				VHOST_LOG(INFO, "Failed to allocate mbuf.\n");
+				error = -1;
+				goto out;
+			}
+
+			prev->next = cur;
+			prev->data_len = mbuf_offset;
+			m->nb_segs += 1;
+			m->pkt_len += mbuf_offset;
+			prev = cur;
+
+			mbuf_offset = 0;
+			mbuf_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
+		}
+	}
+
+	prev->data_len = mbuf_offset;
+	m->pkt_len += mbuf_offset;
+
+	if (hdr)
+		vhost_dequeue_offload(hdr, m);
+
+out:
+
+	return error;
+}
+
+static __rte_always_inline uint16_t
+vhost_ioat_dequeue_split(struct pmd_internal *dev, struct ioat_vring *ioat_vr,
+			 struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+			 uint16_t count)
+{
+	struct rte_vhost_vring *vr = &ioat_vr->vr;
+	uint16_t free_entries, i;
+
+	free_entries = *((volatile uint16_t *)&vr->avail->idx) -
+		ioat_vr->last_avail_idx;
+	if (free_entries == 0)
+		return 0;
+
+	/**
+	 * the ordering between avail index and
+	 * desc reads needs to be enforced.
+	 */
+	rte_smp_rmb();
+
+	rte_prefetch0(&vr->avail->ring[ioat_vr->last_avail_idx &
+		      (vr->size - 1)]);
+
+	count = RTE_MIN(count, VHOST_MAX_PKT_BURST);
+	count = RTE_MIN(count, free_entries);
+	VHOST_LOG(DEBUG, "(%d) about to dequeue %u buffers\n",
+		  dev->vid, count);
+
+	for (i = 0; i < count; i++) {
+		struct buf_vector buf_vec[BUF_VECTOR_MAX];
+		uint16_t head_idx;
+		uint32_t dummy_len;
+		uint16_t nr_vec = 0;
+		int err;
+
+		if (unlikely(fill_vec_buf_split(dev, ioat_vr,
+						ioat_vr->last_avail_idx + i,
+						&nr_vec, buf_vec,
+						&head_idx, &dummy_len) < 0))
+			break;
+
+		update_shadow_used_ring_split(ioat_vr, head_idx, 0);
+
+		pkts[i] = rte_pktmbuf_alloc(mbuf_pool);
+		if (unlikely(pkts[i] == NULL)) {
+			VHOST_LOG(INFO, "Failed to allocate mbuf.\n");
+			break;
+		}
+
+		err = copy_desc_to_mbuf(dev, ioat_vr, buf_vec, nr_vec, pkts[i],
+					mbuf_pool);
+		if (unlikely(err)) {
+			rte_pktmbuf_free(pkts[i]);
+			break;
+		}
+	}
+	ioat_vr->last_avail_idx += i;
+
+	do_data_copy(ioat_vr);
+	if (unlikely(i < count))
+		ioat_vr->shadow_used_idx = i;
+	if (ioat_vr->shadow_used_idx) {
+		flush_shadow_used_ring_split(dev, ioat_vr);
+		vhost_ioat_vring_call(dev, ioat_vr);
+	}
+
+	return i;
+}
+
+uint16_t
+vhost_ioat_dequeue_burst(struct pmd_internal *dev, struct ioat_vring *ioat_vr,
+			 struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+			 uint16_t count)
+{
+	return vhost_ioat_dequeue_split(dev, ioat_vr, mbuf_pool, pkts, count);
+
+	VHOST_LOG(INFO, "Don't support packed ring\n");
+	return 0;
+}
+
+int
+vhost_ioat_setup(struct pmd_internal *dev)
+{
+	struct ioat_vring *ioat_vr;
+	int vid = dev->vid;
+	int ret;
+	uint16_t i, j, size;
+
+	rte_vhost_get_negotiated_features(vid, &dev->features);
+	if (vq_is_packed(dev)) {
+		VHOST_LOG(ERR, "vHost I/OAT doesn't support packed ring\n");
+		return -1;
+	}
+
+	if (dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF))
+		dev->hdr_len = sizeof(struct virtio_net_hdr_mrg_rxbuf);
+	else
+		dev->hdr_len = sizeof(struct virtio_net_hdr);
+
+	dev->nr_vrings = rte_vhost_get_vring_num(vid);
+
+	if (rte_vhost_get_mem_table(vid, &dev->mem) < 0) {
+		VHOST_LOG(ERR, "Failed to get guest memory regions\n");
+		return -1;
+	}
+
+	/* set up gpa and hpa mappings */
+	if (setup_guest_pages(dev, dev->mem) < 0) {
+		VHOST_LOG(ERR, "Failed to get hpa and gpa mappings\n");
+		free(dev->mem);
+		return -1;
+	}
+
+	for (i = 0; i < dev->nr_vrings; i++) {
+		ioat_vr = &dev->ioat_vrings[i];
+
+		ret = rte_vhost_get_vring_base(vid, i, &ioat_vr->last_avail_idx,
+					       &ioat_vr->last_used_idx);
+		if (ret < 0) {
+			VHOST_LOG(ERR, "Failed to get vring index.\n");
+			goto err;
+		}
+
+		ret = rte_vhost_get_vhost_vring(vid, i, &ioat_vr->vr);
+		if (ret < 0) {
+			VHOST_LOG(ERR, "Failed to get vring address.\n");
+			goto err;
+		}
+
+		size = ioat_vr->vr.size;
+		ioat_vr->shadow_used_split =
+			rte_malloc(NULL, size * sizeof(struct vring_used_elem),
+				   RTE_CACHE_LINE_SIZE);
+		if (ioat_vr->shadow_used_split == NULL)
+			goto err;
+
+		ioat_vr->batch_copy_elems =
+			rte_malloc(NULL, size * sizeof(struct batch_copy_elem),
+				   RTE_CACHE_LINE_SIZE);
+		if (ioat_vr->batch_copy_elems == NULL)
+			goto err;
+
+		/* set up used index array for I/OAT copy */
+		ioat_vr->used_idx_hpa =
+			rte_mem_virt2iova(&ioat_vr->vr.used->idx);
+		ioat_vr->max_indices = ioat_vr->vr.size;
+		setup_indices(&ioat_vr->indices, ioat_vr->max_indices);
+
+		ioat_vr->copy_done_used = ioat_vr->last_used_idx;
+		ioat_vr->signalled_used = ioat_vr->last_used_idx;
+		ioat_vr->signalled_used_valid = false;
+
+		ioat_vr->shadow_used_idx = 0;
+		ioat_vr->batch_copy_nb_elems = 0;
+	}
+
+	return 0;
+
+err:
+	for (j = 0; j <= i; j++) {
+		ioat_vr = &dev->ioat_vrings[j];
+		rte_free(ioat_vr->shadow_used_split);
+		rte_free(ioat_vr->batch_copy_elems);
+		destroy_indices(&ioat_vr->indices);
+		ioat_vr->batch_copy_elems = NULL;
+		ioat_vr->shadow_used_split = NULL;
+	}
+
+	free(dev->mem);
+	dev->mem = NULL;
+	free(dev->guest_pages);
+	dev->guest_pages = NULL;
+
+	return -1;
+}
+
+void
+vhost_ioat_remove(struct pmd_internal *dev)
+{
+	struct ioat_vring *ioat_vr;
+	uint16_t i;
+
+	for (i = 0; i < dev->nr_vrings; i++) {
+		ioat_vr = &dev->ioat_vrings[i];
+
+		if (ioat_vr->dma_enabled) {
+			while (ioat_vr->nr_inflight > 0)
+				process_ioat_completed(dev, ioat_vr);
+
+			VHOST_LOG(INFO, "Wait for outstanding DMA jobs "
+				  "of vring %u completion\n", i);
+
+			rte_rawdev_stop(ioat_vr->dev_id);
+
+			ioat_vr->dma_enabled = false;
+			ioat_vr->nr_batching = 0;
+			ioat_vr->dev_id = -1;
+		}
+
+		rte_free(ioat_vr->shadow_used_split);
+		ioat_vr->shadow_used_split = NULL;
+
+		rte_free(ioat_vr->batch_copy_elems);
+		ioat_vr->batch_copy_elems = NULL;
+
+		ioat_vr->signalled_used_valid = false;
+
+		ioat_vr->used_idx_hpa = 0;
+		destroy_indices(&ioat_vr->indices);
+		ioat_vr->max_indices = 0;
+	}
+
+	free(dev->mem);
+	dev->mem = NULL;
+	free(dev->guest_pages);
+	dev->guest_pages = NULL;
+}
diff --git a/mk/rte.app.mk b/mk/rte.app.mk
index ba5c39e..a6e1817 100644
--- a/mk/rte.app.mk
+++ b/mk/rte.app.mk
@@ -215,6 +215,7 @@ _LDLIBS-$(CONFIG_RTE_LIBRTE_VDEV_NETVSC_PMD) += -lrte_pmd_vdev_netvsc
 _LDLIBS-$(CONFIG_RTE_LIBRTE_VIRTIO_PMD)     += -lrte_pmd_virtio
 ifeq ($(CONFIG_RTE_LIBRTE_VHOST),y)
 _LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_VHOST)      += -lrte_pmd_vhost
+_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_VHOST_IOAT)      += -lrte_pmd_vhost_ioat
 ifeq ($(CONFIG_RTE_EAL_VFIO),y)
 _LDLIBS-$(CONFIG_RTE_LIBRTE_IFC_PMD) += -lrte_pmd_ifc
 endif # $(CONFIG_RTE_EAL_VFIO)
-- 
2.7.4


^ permalink raw reply	[flat|nested] 11+ messages in thread

* [dpdk-dev] [RFC v2 0/2] Add a PMD for DMA-accelerated vhost-user
  2019-09-26 14:26 [dpdk-dev] [RFC 0/2] Add a PMD for I/OAT accelerated vhost-user Jiayu Hu
  2019-09-26 14:26 ` [dpdk-dev] [RFC 1/2] vhost: populate guest memory for DMA-accelerated vhost-user Jiayu Hu
  2019-09-26 14:26 ` [dpdk-dev] [RFC 2/2] net/vhost_ioat: add vhost I/OAT driver Jiayu Hu
@ 2019-11-01  8:54 ` Jiayu Hu
  2019-11-01  8:54   ` [dpdk-dev] [RFC v2 1/2] vhost: populate guest memory " Jiayu Hu
  2019-11-01  8:54   ` [dpdk-dev] [RFC v2 2/2] net/vhost_dma: add vHost DMA driver Jiayu Hu
  2 siblings, 2 replies; 11+ messages in thread
From: Jiayu Hu @ 2019-11-01  8:54 UTC (permalink / raw)
  To: dev; +Cc: tiwei.bie, maxime.coquelin, zhihong.wang, bruce.richardson, Jiayu Hu

In vhost-user enqueue and dequeue operations, where data movement is
heavily involved, performing large memory copies usually takes up a
major part of CPU cycles and becomes the hot spot. To offload expensive
memory operations from the CPU, this patch set proposes to leverage DMA
engines, e.g., I/OAT, a DMA engine in the Intel's processor, to accelerate
large copies for vhost-user.

We implement a new PMD for the DMA accelerated vhost-user, called
vhost-dma. This PMD leverages librte_vhost to handle vhost messages,
but implements own vring's enqueue and dequeue operations. It offloads
large memory copies to the DMA in an asynchronous mode; that is, the CPU
just submits copy jobs to the DMA but without waiting for its
completion. Thus, there is no CPU intervention during data transfer;
we can save precious CPU cycles and improve the overall throughput for
vhost-user based applications, like OVS.

The PMD provides basic functionality of packet reception and
transmission. During packet reception and transmission, it offloads
large copies to the DMA and performs small copies by the CPU, due to
startup overheads associated with the DMA.

The PMD is able to support various DMA engines to accelrate data
movements in enqueue and dequeue operations; currently, the supported
DMA engine is I/OAT. The PMD just supports I/OAT acceleration in the
PMD's transmit data path (i.e. vring's enqueue operation); it still
uses the CPU to perform all copies in the PMD's receive data path (i.e.
vring's dequeue operation). In addition, the PMD just supports split ring.

Users can explicitly assign a DMA device to a TX queue by the
parameter 'dmas'. But currently, one DMA device can only be used by
one queue and a queue can use one DMA device at a time. In addition,
the PMD supports multiqueue and both client and server modes. Users can
specify the queue number and client/server mode by 'queues' and 'client'
parameters.

We measure the performance of vhost-dma in testpmd. With 1024 bytes
packets, compared with vhost-user PMD, vhost-dma can improve the throughput
for host testpmd around 20%~30% in the VM2VM and PVP cases; with larger
packets, the throughput improvement will be higher.

Change log
==========
v2:
- provide DMA-accelerated vhost-user PMD to support various DMA engines.

Jiayu Hu (2):
  vhost: populate guest memory for DMA-accelerated vhost-user
  net/vhost_dma: add vHost DMA driver

 config/common_base                                 |    2 +
 config/common_linux                                |    1 +
 drivers/Makefile                                   |    2 +-
 drivers/net/Makefile                               |    1 +
 drivers/net/vhost_dma/Makefile                     |   31 +
 drivers/net/vhost_dma/eth_vhost.c                  | 1495 ++++++++++++++++++++
 drivers/net/vhost_dma/eth_vhost.h                  |  264 ++++
 drivers/net/vhost_dma/internal.h                   |  225 +++
 .../net/vhost_dma/rte_pmd_vhost_dma_version.map    |    4 +
 drivers/net/vhost_dma/virtio_net.c                 | 1234 ++++++++++++++++
 lib/librte_vhost/rte_vhost.h                       |    1 +
 lib/librte_vhost/socket.c                          |   11 +
 lib/librte_vhost/vhost.h                           |    2 +
 lib/librte_vhost/vhost_user.c                      |    3 +-
 mk/rte.app.mk                                      |    1 +
 15 files changed, 3275 insertions(+), 2 deletions(-)
 create mode 100644 drivers/net/vhost_dma/Makefile
 create mode 100644 drivers/net/vhost_dma/eth_vhost.c
 create mode 100644 drivers/net/vhost_dma/eth_vhost.h
 create mode 100644 drivers/net/vhost_dma/internal.h
 create mode 100644 drivers/net/vhost_dma/rte_pmd_vhost_dma_version.map
 create mode 100644 drivers/net/vhost_dma/virtio_net.c

-- 
2.7.4


^ permalink raw reply	[flat|nested] 11+ messages in thread

* [dpdk-dev] [RFC v2 1/2] vhost: populate guest memory for DMA-accelerated vhost-user
  2019-11-01  8:54 ` [dpdk-dev] [RFC v2 0/2] Add a PMD for DMA-accelerated vhost-user Jiayu Hu
@ 2019-11-01  8:54   ` Jiayu Hu
  2019-11-01  8:54   ` [dpdk-dev] [RFC v2 2/2] net/vhost_dma: add vHost DMA driver Jiayu Hu
  1 sibling, 0 replies; 11+ messages in thread
From: Jiayu Hu @ 2019-11-01  8:54 UTC (permalink / raw)
  To: dev; +Cc: tiwei.bie, maxime.coquelin, zhihong.wang, bruce.richardson, Jiayu Hu

DMA engines, like I/OAT, are efficient in moving large data
within memory. Offloading large copies in vhost side to DMA
engines can save precious CPU cycles and improve vhost
performance.

However, using the DMA engine requires to populate guest's
memory. This patch is to enable DMA-accelerated vhost-user
to populate guest's memory.

Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
---
 lib/librte_vhost/rte_vhost.h  |  1 +
 lib/librte_vhost/socket.c     | 11 +++++++++++
 lib/librte_vhost/vhost.h      |  2 ++
 lib/librte_vhost/vhost_user.c |  3 ++-
 4 files changed, 16 insertions(+), 1 deletion(-)

diff --git a/lib/librte_vhost/rte_vhost.h b/lib/librte_vhost/rte_vhost.h
index 7b5dc87..7716939 100644
--- a/lib/librte_vhost/rte_vhost.h
+++ b/lib/librte_vhost/rte_vhost.h
@@ -34,6 +34,7 @@ extern "C" {
 #define RTE_VHOST_USER_EXTBUF_SUPPORT	(1ULL << 5)
 /* support only linear buffers (no chained mbufs) */
 #define RTE_VHOST_USER_LINEARBUF_SUPPORT	(1ULL << 6)
+#define RTE_VHOST_USER_DMA_COPY		(1ULL << 7)
 
 /** Protocol features. */
 #ifndef VHOST_USER_PROTOCOL_F_MQ
diff --git a/lib/librte_vhost/socket.c b/lib/librte_vhost/socket.c
index a34bc7f..9db6f6b 100644
--- a/lib/librte_vhost/socket.c
+++ b/lib/librte_vhost/socket.c
@@ -62,6 +62,8 @@ struct vhost_user_socket {
 	 */
 	int vdpa_dev_id;
 
+	bool dma_enabled;
+
 	struct vhost_device_ops const *notify_ops;
 };
 
@@ -240,6 +242,13 @@ vhost_user_add_connection(int fd, struct vhost_user_socket *vsocket)
 	if (vsocket->linearbuf)
 		vhost_enable_linearbuf(vid);
 
+	if (vsocket->dma_enabled) {
+		struct virtio_net *dev;
+
+		dev = get_device(vid);
+		dev->dma_enabled = true;
+	}
+
 	RTE_LOG(INFO, VHOST_CONFIG, "new device, handle is %d\n", vid);
 
 	if (vsocket->notify_ops->new_connection) {
@@ -889,6 +898,8 @@ rte_vhost_driver_register(const char *path, uint64_t flags)
 		goto out_mutex;
 	}
 
+	vsocket->dma_enabled = flags & RTE_VHOST_USER_DMA_COPY;
+
 	/*
 	 * Set the supported features correctly for the builtin vhost-user
 	 * net driver.
diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
index 9f11b28..b61a790 100644
--- a/lib/librte_vhost/vhost.h
+++ b/lib/librte_vhost/vhost.h
@@ -383,6 +383,8 @@ struct virtio_net {
 	 */
 	int			vdpa_dev_id;
 
+	bool			dma_enabled;
+
 	/* context data for the external message handlers */
 	void			*extern_data;
 	/* pre and post vhost user message handlers for the device */
diff --git a/lib/librte_vhost/vhost_user.c b/lib/librte_vhost/vhost_user.c
index 2a9fa7c..12722b9 100644
--- a/lib/librte_vhost/vhost_user.c
+++ b/lib/librte_vhost/vhost_user.c
@@ -1067,7 +1067,8 @@ vhost_user_set_mem_table(struct virtio_net **pdev, struct VhostUserMsg *msg,
 		}
 		mmap_size = RTE_ALIGN_CEIL(mmap_size, alignment);
 
-		populate = (dev->dequeue_zero_copy) ? MAP_POPULATE : 0;
+		populate = (dev->dequeue_zero_copy || dev->dma_enabled) ?
+			MAP_POPULATE : 0;
 		mmap_addr = mmap(NULL, mmap_size, PROT_READ | PROT_WRITE,
 				 MAP_SHARED | populate, fd, 0);
 
-- 
2.7.4


^ permalink raw reply	[flat|nested] 11+ messages in thread

* [dpdk-dev] [RFC v2 2/2] net/vhost_dma: add vHost DMA driver
  2019-11-01  8:54 ` [dpdk-dev] [RFC v2 0/2] Add a PMD for DMA-accelerated vhost-user Jiayu Hu
  2019-11-01  8:54   ` [dpdk-dev] [RFC v2 1/2] vhost: populate guest memory " Jiayu Hu
@ 2019-11-01  8:54   ` Jiayu Hu
  2019-12-17  8:27     ` Maxime Coquelin
  1 sibling, 1 reply; 11+ messages in thread
From: Jiayu Hu @ 2019-11-01  8:54 UTC (permalink / raw)
  To: dev; +Cc: tiwei.bie, maxime.coquelin, zhihong.wang, bruce.richardson, Jiayu Hu

This patch introduces a new PMD for DMA accelerated vhost-user, which
provides basic functionality of packet reception and transmission. This
PMD leverages librte_vhost to handle vhost messages, but it implements
own vring's enqueue and dequeue operations.

The PMD leverages DMA engines (e.g., I/OAT, a DMA engine in Intel's
processor), to accelerate large data movements in enqueue and dequeue
operations. Large copies are offloaded to the DMA in an asynchronous mode.
That is, the CPU just submits copy jobs to the DMA but without waiting
for its completion; there is no CPU intervention during data transfer.
Thus, we can save precious CPU cycles and improve the overall performance
for vhost-user based applications, like OVS. The PMD still uses the CPU to
performs small copies, due to startup overheads associated with the DMA.

Note that the PMD is able to support various DMA engines to accelerate
data movements in enqueue and dequeue operations; currently the supported
DMA engine is I/OAT. The PMD just supports I/OAT acceleration in the
enqueue operation, and it still uses the CPU to perform all copies in
the dequeue operation. In addition, the PMD only supports the split ring.

The DMA device used by a queue is assigned by users; for the queue
without assigning a DMA device, the PMD will use the CPU to perform
all copies for both enqueue and dequeue operations. Currently, the PMD
just supports I/OAT, and a queue can only use one I/OAT device, and an
I/OAT device can only be used by one queue at a time.

The PMD has 4 parameters.
 - iface: The parameter is used to specify a path to connect to a
 	front end device.
 - queues: The parameter is used to specify the number of the queues
 	front end device has (Default is 1).
 - client: The parameter is used to specify the vhost port working as
 	client mode or server mode (Default is server mode).
 - dmas: This parameter is used to specify the assigned DMA device
 	of a queue.

Here is an example.
$ ./testpmd -c f -n 4 \
--vdev 'dma_vhost0,iface=/tmp/sock0,queues=1,dmas=txq0@00:04.0,client=0'

Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
---
 config/common_base                                 |    2 +
 config/common_linux                                |    1 +
 drivers/Makefile                                   |    2 +-
 drivers/net/Makefile                               |    1 +
 drivers/net/vhost_dma/Makefile                     |   31 +
 drivers/net/vhost_dma/eth_vhost.c                  | 1495 ++++++++++++++++++++
 drivers/net/vhost_dma/eth_vhost.h                  |  264 ++++
 drivers/net/vhost_dma/internal.h                   |  225 +++
 .../net/vhost_dma/rte_pmd_vhost_dma_version.map    |    4 +
 drivers/net/vhost_dma/virtio_net.c                 | 1234 ++++++++++++++++
 mk/rte.app.mk                                      |    1 +
 11 files changed, 3259 insertions(+), 1 deletion(-)
 create mode 100644 drivers/net/vhost_dma/Makefile
 create mode 100644 drivers/net/vhost_dma/eth_vhost.c
 create mode 100644 drivers/net/vhost_dma/eth_vhost.h
 create mode 100644 drivers/net/vhost_dma/internal.h
 create mode 100644 drivers/net/vhost_dma/rte_pmd_vhost_dma_version.map
 create mode 100644 drivers/net/vhost_dma/virtio_net.c

diff --git a/config/common_base b/config/common_base
index b2be3d9..8029519 100644
--- a/config/common_base
+++ b/config/common_base
@@ -1026,6 +1026,8 @@ CONFIG_RTE_LIBRTE_VHOST_DEBUG=n
 #
 CONFIG_RTE_LIBRTE_PMD_VHOST=n
 
+CONFIG_RTE_LIBRTE_PMD_VHOST_DMA=n
+
 #
 # Compile IFC driver
 # To compile, CONFIG_RTE_LIBRTE_VHOST and CONFIG_RTE_EAL_VFIO
diff --git a/config/common_linux b/config/common_linux
index 96e2e1f..1b2b1f2 100644
--- a/config/common_linux
+++ b/config/common_linux
@@ -17,6 +17,7 @@ CONFIG_RTE_LIBRTE_VHOST=y
 CONFIG_RTE_LIBRTE_VHOST_NUMA=y
 CONFIG_RTE_LIBRTE_VHOST_POSTCOPY=n
 CONFIG_RTE_LIBRTE_PMD_VHOST=y
+CONFIG_RTE_LIBRTE_PMD_VHOST_DMA=y
 CONFIG_RTE_LIBRTE_IFC_PMD=y
 CONFIG_RTE_LIBRTE_PMD_AF_PACKET=y
 CONFIG_RTE_LIBRTE_PMD_MEMIF=y
diff --git a/drivers/Makefile b/drivers/Makefile
index 7d5da5d..6bdab76 100644
--- a/drivers/Makefile
+++ b/drivers/Makefile
@@ -9,7 +9,7 @@ DEPDIRS-bus := common
 DIRS-y += mempool
 DEPDIRS-mempool := common bus
 DIRS-y += net
-DEPDIRS-net := common bus mempool
+DEPDIRS-net := common bus mempool raw
 DIRS-$(CONFIG_RTE_LIBRTE_BBDEV) += baseband
 DEPDIRS-baseband := common bus mempool
 DIRS-$(CONFIG_RTE_LIBRTE_CRYPTODEV) += crypto
diff --git a/drivers/net/Makefile b/drivers/net/Makefile
index cee3036..5535ca5 100644
--- a/drivers/net/Makefile
+++ b/drivers/net/Makefile
@@ -71,6 +71,7 @@ endif # $(CONFIG_RTE_LIBRTE_SCHED)
 
 ifeq ($(CONFIG_RTE_LIBRTE_VHOST),y)
 DIRS-$(CONFIG_RTE_LIBRTE_PMD_VHOST) += vhost
+DIRS-$(CONFIG_RTE_LIBRTE_PMD_VHOST_DMA) += vhost_dma
 ifeq ($(CONFIG_RTE_EAL_VFIO),y)
 DIRS-$(CONFIG_RTE_LIBRTE_IFC_PMD) += ifc
 endif
diff --git a/drivers/net/vhost_dma/Makefile b/drivers/net/vhost_dma/Makefile
new file mode 100644
index 0000000..da4dd49
--- /dev/null
+++ b/drivers/net/vhost_dma/Makefile
@@ -0,0 +1,31 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2019 Intel Corporation
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+#
+# library name
+#
+LIB = librte_pmd_vhost_dma.a
+
+LDLIBS += -lpthread
+LDLIBS += -lrte_eal -lrte_mbuf -lrte_mempool -lrte_ring
+LDLIBS += -lrte_ethdev -lrte_net -lrte_kvargs -lrte_vhost
+LDLIBS += -lrte_bus_vdev
+LDLIBS += -lrte_rawdev -lrte_rawdev_ioat
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+CFLAGS += -fno-strict-aliasing
+CFLAGS += -DALLOW_EXPERIMENTAL_API
+
+EXPORT_MAP := rte_pmd_vhost_dma_version.map
+
+LIBABIVER := 1
+
+#
+# all source are stored in SRCS-y
+#
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_VHOST_DMA) += eth_vhost.c virtio_net.c
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/drivers/net/vhost_dma/eth_vhost.c b/drivers/net/vhost_dma/eth_vhost.c
new file mode 100644
index 0000000..72e199e
--- /dev/null
+++ b/drivers/net/vhost_dma/eth_vhost.c
@@ -0,0 +1,1495 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Intel Corporation
+ */
+#include <unistd.h>
+#include <pthread.h>
+#include <stdbool.h>
+
+#include <rte_mbuf.h>
+#include <rte_ethdev_driver.h>
+#include <rte_ethdev_vdev.h>
+#include <rte_malloc.h>
+#include <rte_memcpy.h>
+#include <rte_bus_vdev.h>
+#include <rte_kvargs.h>
+#include <rte_vhost.h>
+#include <rte_spinlock.h>
+#include <rte_log.h>
+#include <rte_string_fns.h>
+#include <rte_rawdev.h>
+#include <rte_ioat_rawdev.h>
+
+#include "eth_vhost.h"
+
+int vhost_dma_logtype;
+
+enum {VIRTIO_RXQ, VIRTIO_TXQ, VIRTIO_QNUM};
+
+#define ETH_VHOST_IFACE_ARG		"iface"
+#define ETH_VHOST_QUEUES_ARG		"queues"
+#define ETH_VHOST_CLIENT_ARG		"client"
+#define ETH_VHOST_DMA_ARG		"dmas"
+
+static const char *valid_arguments[] = {
+	ETH_VHOST_IFACE_ARG,
+	ETH_VHOST_QUEUES_ARG,
+	ETH_VHOST_CLIENT_ARG,
+	ETH_VHOST_DMA_ARG,
+	NULL
+};
+
+static struct rte_ether_addr base_eth_addr = {
+	.addr_bytes = {
+		0x56 /* V */,
+		0x48 /* H */,
+		0x4F /* O */,
+		0x53 /* S */,
+		0x54 /* T */,
+		0x00
+	}
+};
+
+struct internal_list {
+	TAILQ_ENTRY(internal_list) next;
+	struct rte_eth_dev *eth_dev;
+};
+
+TAILQ_HEAD(internal_list_head, internal_list);
+static struct internal_list_head internal_list =
+	TAILQ_HEAD_INITIALIZER(internal_list);
+
+static pthread_mutex_t internal_list_lock = PTHREAD_MUTEX_INITIALIZER;
+
+static struct rte_eth_link pmd_link = {
+		.link_speed = 10000,
+		.link_duplex = ETH_LINK_FULL_DUPLEX,
+		.link_status = ETH_LINK_DOWN
+};
+
+#define VHOST_XSTATS_NAME_SIZE 64
+
+struct vhost_xstats_name_off {
+	char name[VHOST_XSTATS_NAME_SIZE];
+	uint64_t offset;
+};
+
+/* [rx]_is prepended to the name string here */
+static const struct vhost_xstats_name_off vhost_rxport_stat_strings[] = {
+	{"good_packets",
+	 offsetof(struct vhost_queue, stats.pkts)},
+	{"total_bytes",
+	 offsetof(struct vhost_queue, stats.bytes)},
+	{"missed_pkts",
+	 offsetof(struct vhost_queue, stats.missed_pkts)},
+	{"broadcast_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_BROADCAST_PKT])},
+	{"multicast_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_MULTICAST_PKT])},
+	{"unicast_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_UNICAST_PKT])},
+	 {"undersize_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_UNDERSIZE_PKT])},
+	{"size_64_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_64_PKT])},
+	{"size_65_to_127_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_65_TO_127_PKT])},
+	{"size_128_to_255_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_128_TO_255_PKT])},
+	{"size_256_to_511_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_256_TO_511_PKT])},
+	{"size_512_to_1023_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_512_TO_1023_PKT])},
+	{"size_1024_to_1522_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_1024_TO_1522_PKT])},
+	{"size_1523_to_max_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_1523_TO_MAX_PKT])},
+	{"errors_with_bad_CRC",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_ERRORS_PKT])},
+	{"fragmented_errors",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_ERRORS_FRAGMENTED])},
+	{"jabber_errors",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_ERRORS_JABBER])},
+	{"unknown_protos_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_UNKNOWN_PROTOCOL])},
+};
+
+/* [tx]_ is prepended to the name string here */
+static const struct vhost_xstats_name_off vhost_txport_stat_strings[] = {
+	{"good_packets",
+	 offsetof(struct vhost_queue, stats.pkts)},
+	{"total_bytes",
+	 offsetof(struct vhost_queue, stats.bytes)},
+	{"missed_pkts",
+	 offsetof(struct vhost_queue, stats.missed_pkts)},
+	{"broadcast_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_BROADCAST_PKT])},
+	{"multicast_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_MULTICAST_PKT])},
+	{"unicast_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_UNICAST_PKT])},
+	{"undersize_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_UNDERSIZE_PKT])},
+	{"size_64_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_64_PKT])},
+	{"size_65_to_127_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_65_TO_127_PKT])},
+	{"size_128_to_255_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_128_TO_255_PKT])},
+	{"size_256_to_511_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_256_TO_511_PKT])},
+	{"size_512_to_1023_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_512_TO_1023_PKT])},
+	{"size_1024_to_1522_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_1024_TO_1522_PKT])},
+	{"size_1523_to_max_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_1523_TO_MAX_PKT])},
+	{"errors_with_bad_CRC",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_ERRORS_PKT])},
+};
+
+#define VHOST_NB_XSTATS_RXPORT (sizeof(vhost_rxport_stat_strings) / \
+				sizeof(vhost_rxport_stat_strings[0]))
+
+#define VHOST_NB_XSTATS_TXPORT (sizeof(vhost_txport_stat_strings) / \
+				sizeof(vhost_txport_stat_strings[0]))
+
+static int
+vhost_dev_xstats_reset(struct rte_eth_dev *dev)
+{
+	struct vhost_queue *vq = NULL;
+	unsigned int i = 0;
+
+	for (i = 0; i < dev->data->nb_rx_queues; i++) {
+		vq = dev->data->rx_queues[i];
+		if (!vq)
+			continue;
+		memset(&vq->stats, 0, sizeof(vq->stats));
+	}
+	for (i = 0; i < dev->data->nb_tx_queues; i++) {
+		vq = dev->data->tx_queues[i];
+		if (!vq)
+			continue;
+		memset(&vq->stats, 0, sizeof(vq->stats));
+	}
+
+	return 0;
+}
+
+static int
+vhost_dev_xstats_get_names(struct rte_eth_dev *dev __rte_unused,
+			   struct rte_eth_xstat_name *xstats_names,
+			   unsigned int limit __rte_unused)
+{
+	unsigned int t = 0;
+	int count = 0;
+	int nstats = VHOST_NB_XSTATS_RXPORT + VHOST_NB_XSTATS_TXPORT;
+
+	if (!xstats_names)
+		return nstats;
+	for (t = 0; t < VHOST_NB_XSTATS_RXPORT; t++) {
+		snprintf(xstats_names[count].name,
+			 sizeof(xstats_names[count].name),
+			 "rx_%s", vhost_rxport_stat_strings[t].name);
+		count++;
+	}
+	for (t = 0; t < VHOST_NB_XSTATS_TXPORT; t++) {
+		snprintf(xstats_names[count].name,
+			 sizeof(xstats_names[count].name),
+			 "tx_%s", vhost_txport_stat_strings[t].name);
+		count++;
+	}
+	return count;
+}
+
+static int
+vhost_dev_xstats_get(struct rte_eth_dev *dev, struct rte_eth_xstat *xstats,
+		     unsigned int n)
+{
+	unsigned int i;
+	unsigned int t;
+	unsigned int count = 0;
+	struct vhost_queue *vq = NULL;
+	unsigned int nxstats = VHOST_NB_XSTATS_RXPORT + VHOST_NB_XSTATS_TXPORT;
+
+	if (n < nxstats)
+		return nxstats;
+
+	for (i = 0; i < dev->data->nb_rx_queues; i++) {
+		vq = dev->data->rx_queues[i];
+		if (!vq)
+			continue;
+		vq->stats.xstats[VHOST_UNICAST_PKT] = vq->stats.pkts
+				- (vq->stats.xstats[VHOST_BROADCAST_PKT]
+				+ vq->stats.xstats[VHOST_MULTICAST_PKT]);
+	}
+	for (i = 0; i < dev->data->nb_tx_queues; i++) {
+		vq = dev->data->tx_queues[i];
+		if (!vq)
+			continue;
+		vq->stats.xstats[VHOST_UNICAST_PKT] = vq->stats.pkts
+				+ vq->stats.missed_pkts
+				- (vq->stats.xstats[VHOST_BROADCAST_PKT]
+				+ vq->stats.xstats[VHOST_MULTICAST_PKT]);
+	}
+	for (t = 0; t < VHOST_NB_XSTATS_RXPORT; t++) {
+		xstats[count].value = 0;
+		for (i = 0; i < dev->data->nb_rx_queues; i++) {
+			vq = dev->data->rx_queues[i];
+			if (!vq)
+				continue;
+			xstats[count].value +=
+				*(uint64_t *)(((char *)vq)
+				+ vhost_rxport_stat_strings[t].offset);
+		}
+		xstats[count].id = count;
+		count++;
+	}
+	for (t = 0; t < VHOST_NB_XSTATS_TXPORT; t++) {
+		xstats[count].value = 0;
+		for (i = 0; i < dev->data->nb_tx_queues; i++) {
+			vq = dev->data->tx_queues[i];
+			if (!vq)
+				continue;
+			xstats[count].value +=
+				*(uint64_t *)(((char *)vq)
+				+ vhost_txport_stat_strings[t].offset);
+		}
+		xstats[count].id = count;
+		count++;
+	}
+	return count;
+}
+
+static inline void
+vhost_count_multicast_broadcast(struct vhost_queue *vq,
+				struct rte_mbuf *mbuf)
+{
+	struct rte_ether_addr *ea = NULL;
+	struct vhost_stats *pstats = &vq->stats;
+
+	ea = rte_pktmbuf_mtod(mbuf, struct rte_ether_addr *);
+	if (rte_is_multicast_ether_addr(ea)) {
+		if (rte_is_broadcast_ether_addr(ea))
+			pstats->xstats[VHOST_BROADCAST_PKT]++;
+		else
+			pstats->xstats[VHOST_MULTICAST_PKT]++;
+	}
+}
+
+static void
+vhost_update_packet_xstats(struct vhost_queue *vq,
+			   struct rte_mbuf **bufs,
+			   uint16_t count)
+{
+	uint32_t pkt_len = 0;
+	uint64_t i = 0;
+	uint64_t index;
+	struct vhost_stats *pstats = &vq->stats;
+
+	for (i = 0; i < count ; i++) {
+		pkt_len = bufs[i]->pkt_len;
+		if (pkt_len == 64) {
+			pstats->xstats[VHOST_64_PKT]++;
+		} else if (pkt_len > 64 && pkt_len < 1024) {
+			index = (sizeof(pkt_len) * 8)
+				- __builtin_clz(pkt_len) - 5;
+			pstats->xstats[index]++;
+		} else {
+			if (pkt_len < 64)
+				pstats->xstats[VHOST_UNDERSIZE_PKT]++;
+			else if (pkt_len <= 1522)
+				pstats->xstats[VHOST_1024_TO_1522_PKT]++;
+			else if (pkt_len > 1522)
+				pstats->xstats[VHOST_1523_TO_MAX_PKT]++;
+		}
+		vhost_count_multicast_broadcast(vq, bufs[i]);
+	}
+}
+
+static uint16_t
+eth_vhost_rx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
+{
+	struct vhost_queue *queue = q;
+	uint16_t i, nb_rx = 0;
+	uint16_t nb_receive = nb_bufs;
+	struct pmd_internal *dev = queue->internal;
+
+	if (unlikely(rte_atomic32_read(&queue->allow_queuing) == 0))
+		return 0;
+
+	rte_atomic32_set(&queue->while_queuing, 1);
+
+	if (unlikely(rte_atomic32_read(&queue->allow_queuing) == 0))
+		goto out;
+
+	/* get packets from guest's TX queue */
+	while (nb_receive) {
+		uint16_t nb_pkts;
+		uint16_t num = (uint16_t)RTE_MIN(nb_receive,
+						 VHOST_MAX_PKT_BURST);
+
+		nb_pkts = vhost_dma_dequeue_burst(dev, queue->dma_vring,
+						   queue->mb_pool, &bufs[nb_rx],
+						   num);
+
+		nb_rx += nb_pkts;
+		nb_receive -= nb_pkts;
+		if (nb_pkts < num)
+			break;
+	}
+
+	queue->stats.pkts += nb_rx;
+
+	for (i = 0; likely(i < nb_rx); i++) {
+		bufs[i]->port = queue->port;
+		bufs[i]->vlan_tci = 0;
+
+		if (queue->internal->vlan_strip)
+			rte_vlan_strip(bufs[i]);
+
+		queue->stats.bytes += bufs[i]->pkt_len;
+	}
+
+	vhost_update_packet_xstats(queue, bufs, nb_rx);
+
+out:
+	rte_atomic32_set(&queue->while_queuing, 0);
+
+	return nb_rx;
+}
+
+static uint16_t
+eth_vhost_tx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
+{
+	struct vhost_queue *queue = q;
+	struct pmd_internal *dev = queue->internal;
+	uint16_t i, nb_tx = 0;
+	uint16_t nb_send = 0;
+
+	if (unlikely(rte_atomic32_read(&queue->allow_queuing) == 0))
+		return 0;
+
+	rte_atomic32_set(&queue->while_queuing, 1);
+
+	if (unlikely(rte_atomic32_read(&queue->allow_queuing) == 0))
+		goto out;
+
+	for (i = 0; i < nb_bufs; i++) {
+		struct rte_mbuf *m = bufs[i];
+
+		/* do VLAN tag insertion */
+		if (m->ol_flags & PKT_TX_VLAN_PKT) {
+			int error = rte_vlan_insert(&m);
+
+			if (unlikely(error)) {
+				rte_pktmbuf_free(m);
+				continue;
+			}
+		}
+
+		bufs[nb_send] = m;
+		++nb_send;
+	}
+
+	/* send packets to guest's RX queue */
+	while (nb_send) {
+		uint16_t nb_pkts;
+		uint16_t num = (uint16_t)RTE_MIN(nb_send,
+						 VHOST_MAX_PKT_BURST);
+
+		nb_pkts = vhost_dma_enqueue_burst(dev, queue->dma_vring,
+						   &bufs[nb_tx], num);
+
+		nb_tx += nb_pkts;
+		nb_send -= nb_pkts;
+		if (nb_pkts < num)
+			break;
+	}
+
+	queue->stats.pkts += nb_tx;
+	queue->stats.missed_pkts += nb_bufs - nb_tx;
+
+	for (i = 0; likely(i < nb_tx); i++)
+		queue->stats.bytes += bufs[i]->pkt_len;
+
+	vhost_update_packet_xstats(queue, bufs, nb_tx);
+
+	/**
+	 * According to RFC2863 page42 section ifHCOutMulticastPkts and
+	 * ifHCOutBroadcastPkts, the counters "multicast" and "broadcast"
+	 * are increased when packets are not transmitted successfully.
+	 */
+	for (i = nb_tx; i < nb_bufs; i++)
+		vhost_count_multicast_broadcast(queue, bufs[i]);
+out:
+	rte_atomic32_set(&queue->while_queuing, 0);
+
+	return nb_tx;
+}
+
+static int
+eth_dev_configure(struct rte_eth_dev *dev __rte_unused)
+{
+	struct pmd_internal *internal = dev->data->dev_private;
+	const struct rte_eth_rxmode *rxmode = &dev->data->dev_conf.rxmode;
+
+	internal->vlan_strip = !!(rxmode->offloads & DEV_RX_OFFLOAD_VLAN_STRIP);
+
+	return 0;
+}
+
+static inline struct internal_list *
+find_internal_resource(char *ifname)
+{
+	int found = 0;
+	struct internal_list *list;
+	struct pmd_internal *internal;
+
+	if (!ifname)
+		return NULL;
+
+	pthread_mutex_lock(&internal_list_lock);
+
+	TAILQ_FOREACH(list, &internal_list, next) {
+		internal = list->eth_dev->data->dev_private;
+		if (!strcmp(internal->iface_name, ifname)) {
+			found = 1;
+			break;
+		}
+	}
+
+	pthread_mutex_unlock(&internal_list_lock);
+
+	if (!found)
+		return NULL;
+
+	return list;
+}
+
+static int
+eth_rxq_intr_enable(struct rte_eth_dev *dev, uint16_t qid)
+{
+	struct vhost_queue *vq;
+
+	vq = dev->data->rx_queues[qid];
+	if (!vq) {
+		VHOST_LOG(ERR, "rxq%d is not setup yet\n", qid);
+		return -1;
+	}
+
+	VHOST_LOG(INFO, "Enable interrupt for rxq%d\n", qid);
+	vhost_dma_enable_guest_notification(dev->data->dev_private,
+					     vq->dma_vring, 1);
+	rte_wmb();
+
+	return 0;
+}
+
+static int
+eth_rxq_intr_disable(struct rte_eth_dev *dev, uint16_t qid)
+{
+	struct vhost_queue *vq;
+
+	vq = dev->data->rx_queues[qid];
+	if (!vq) {
+		VHOST_LOG(ERR, "rxq%d is not setup yet\n", qid);
+		return -1;
+	}
+
+	VHOST_LOG(INFO, "Disable interrupt for rxq%d\n", qid);
+	vhost_dma_enable_guest_notification(dev->data->dev_private,
+					     vq->dma_vring, 0);
+	rte_wmb();
+
+	return 0;
+}
+
+static void
+eth_vhost_uninstall_intr(struct rte_eth_dev *dev)
+{
+	struct rte_intr_handle *intr_handle = dev->intr_handle;
+
+	if (intr_handle) {
+		if (intr_handle->intr_vec)
+			free(intr_handle->intr_vec);
+		free(intr_handle);
+	}
+
+	dev->intr_handle = NULL;
+}
+
+static int
+eth_vhost_install_intr(struct rte_eth_dev *dev)
+{
+	struct rte_vhost_vring *vv;
+	struct vhost_queue *vq;
+	int count = 0;
+	int nb_rxq = dev->data->nb_rx_queues;
+	int i;
+
+	/* uninstall firstly if we are reconnecting */
+	if (dev->intr_handle)
+		eth_vhost_uninstall_intr(dev);
+
+	dev->intr_handle = malloc(sizeof(*dev->intr_handle));
+	if (!dev->intr_handle) {
+		VHOST_LOG(ERR, "Fail to allocate intr_handle\n");
+		return -ENOMEM;
+	}
+	memset(dev->intr_handle, 0, sizeof(*dev->intr_handle));
+
+	dev->intr_handle->efd_counter_size = sizeof(uint64_t);
+
+	dev->intr_handle->intr_vec =
+		malloc(nb_rxq * sizeof(dev->intr_handle->intr_vec[0]));
+
+	if (!dev->intr_handle->intr_vec) {
+		VHOST_LOG(ERR,
+			  "Failed to allocate memory for interrupt vector\n");
+		free(dev->intr_handle);
+		return -ENOMEM;
+	}
+
+	VHOST_LOG(INFO, "Prepare intr vec\n");
+	for (i = 0; i < nb_rxq; i++) {
+		vq = dev->data->rx_queues[i];
+		if (!vq) {
+			VHOST_LOG(INFO, "rxq-%d not setup yet, skip!\n", i);
+			continue;
+		}
+
+		vv = &vq->dma_vring->vr;
+		if (vv->kickfd < 0) {
+			VHOST_LOG(INFO,
+				  "rxq-%d's kickfd is invalid, skip!\n", i);
+			continue;
+		}
+		dev->intr_handle->intr_vec[i] = RTE_INTR_VEC_RXTX_OFFSET + i;
+		dev->intr_handle->efds[i] = vv->kickfd;
+		count++;
+		VHOST_LOG(INFO, "Installed intr vec for rxq-%d\n", i);
+	}
+
+	dev->intr_handle->nb_efd = count;
+	dev->intr_handle->max_intr = count + 1;
+	dev->intr_handle->type = RTE_INTR_HANDLE_VDEV;
+
+	return 0;
+}
+
+static void
+update_queuing_status(struct rte_eth_dev *dev)
+{
+	struct pmd_internal *internal = dev->data->dev_private;
+	struct vhost_queue *vq;
+	unsigned int i;
+	int allow_queuing = 1;
+
+	if (!dev->data->rx_queues || !dev->data->tx_queues)
+		return;
+
+	if (rte_atomic32_read(&internal->started) == 0 ||
+	    rte_atomic32_read(&internal->dev_attached) == 0)
+		allow_queuing = 0;
+
+	/* wait until rx/tx_pkt_burst stops accessing vhost device */
+	for (i = 0; i < dev->data->nb_rx_queues; i++) {
+		vq = dev->data->rx_queues[i];
+		if (vq == NULL)
+			continue;
+		rte_atomic32_set(&vq->allow_queuing, allow_queuing);
+		while (rte_atomic32_read(&vq->while_queuing))
+			rte_pause();
+	}
+
+	for (i = 0; i < dev->data->nb_tx_queues; i++) {
+		vq = dev->data->tx_queues[i];
+		if (vq == NULL)
+			continue;
+		rte_atomic32_set(&vq->allow_queuing, allow_queuing);
+		while (rte_atomic32_read(&vq->while_queuing))
+			rte_pause();
+	}
+}
+
+static void
+queue_setup(struct rte_eth_dev *eth_dev, struct pmd_internal *internal)
+{
+	struct vhost_queue *vq;
+	int i;
+
+	for (i = 0; i < eth_dev->data->nb_rx_queues; i++) {
+		vq = eth_dev->data->rx_queues[i];
+		if (!vq)
+			continue;
+		vq->vid = internal->vid;
+		vq->internal = internal;
+		vq->port = eth_dev->data->port_id;
+		vq->dma_vring = &internal->dma_vrings[vq->virtqueue_id];
+	}
+	for (i = 0; i < eth_dev->data->nb_tx_queues; i++) {
+		vq = eth_dev->data->tx_queues[i];
+		if (!vq)
+			continue;
+		vq->vid = internal->vid;
+		vq->internal = internal;
+		vq->port = eth_dev->data->port_id;
+		vq->dma_vring = &internal->dma_vrings[vq->virtqueue_id];
+	}
+}
+
+static int
+new_device(int vid)
+{
+	struct rte_eth_dev *eth_dev;
+	struct internal_list *list;
+	struct pmd_internal *internal;
+	struct rte_eth_conf *dev_conf;
+	unsigned i;
+	char ifname[PATH_MAX];
+
+	rte_vhost_get_ifname(vid, ifname, sizeof(ifname));
+	list = find_internal_resource(ifname);
+	if (list == NULL) {
+		VHOST_LOG(INFO, "Invalid device name: %s\n", ifname);
+		return -1;
+	}
+
+	eth_dev = list->eth_dev;
+	internal = eth_dev->data->dev_private;
+	dev_conf = &eth_dev->data->dev_conf;
+
+	internal->vid = vid;
+
+	if (vhost_dma_setup(internal) < 0) {
+		VHOST_LOG(ERR, "Failed to set up vring operations\n");
+		return -1;
+	}
+
+	if (rte_atomic32_read(&internal->started) == 1) {
+		queue_setup(eth_dev, internal);
+
+		if (dev_conf->intr_conf.rxq) {
+			if (eth_vhost_install_intr(eth_dev) < 0) {
+				VHOST_LOG(INFO, "Failed to install "
+					  "interrupt handler.");
+				return -1;
+			}
+		}
+	} else {
+		VHOST_LOG(INFO, "RX/TX queues not exist yet\n");
+	}
+
+	for (i = 0; i < rte_vhost_get_vring_num(vid); i++) {
+		vhost_dma_enable_guest_notification(internal,
+						    &internal->dma_vrings[i],
+						    0);
+	}
+
+	rte_vhost_get_mtu(vid, &eth_dev->data->mtu);
+
+	eth_dev->data->dev_link.link_status = ETH_LINK_UP;
+
+	rte_atomic32_set(&internal->dev_attached, 1);
+	update_queuing_status(eth_dev);
+
+	VHOST_LOG(INFO, "vHost DMA device %d created\n", vid);
+
+	_rte_eth_dev_callback_process(eth_dev, RTE_ETH_EVENT_INTR_LSC, NULL);
+
+	return 0;
+}
+
+static void
+destroy_device(int vid)
+{
+	struct rte_eth_dev *eth_dev;
+	struct pmd_internal *internal;
+	struct vhost_queue *vq;
+	struct internal_list *list;
+	char ifname[PATH_MAX];
+	unsigned i;
+
+	rte_vhost_get_ifname(vid, ifname, sizeof(ifname));
+	list = find_internal_resource(ifname);
+	if (list == NULL) {
+		VHOST_LOG(ERR, "Invalid interface name: %s\n", ifname);
+		return;
+	}
+	eth_dev = list->eth_dev;
+	internal = eth_dev->data->dev_private;
+
+	rte_atomic32_set(&internal->dev_attached, 0);
+	update_queuing_status(eth_dev);
+
+	eth_dev->data->dev_link.link_status = ETH_LINK_DOWN;
+
+	/**
+	 * Before destroy front end's information, we must guarantee
+	 * that RX/TX threads have stopped accessing queues.
+	 */
+	vhost_dma_remove(internal);
+
+	if (eth_dev->data->rx_queues && eth_dev->data->tx_queues) {
+		for (i = 0; i < eth_dev->data->nb_rx_queues; i++) {
+			vq = eth_dev->data->rx_queues[i];
+			if (!vq)
+				continue;
+			vq->vid = -1;
+		}
+		for (i = 0; i < eth_dev->data->nb_tx_queues; i++) {
+			vq = eth_dev->data->tx_queues[i];
+			if (!vq)
+				continue;
+			vq->vid = -1;
+		}
+	}
+
+	VHOST_LOG(INFO, "vHost DMA device %d destroyed\n", vid);
+	eth_vhost_uninstall_intr(eth_dev);
+
+	_rte_eth_dev_callback_process(eth_dev, RTE_ETH_EVENT_INTR_LSC, NULL);
+}
+
+#define IOAT_RING_SIZE 1024
+
+static int
+vring_state_changed(int vid, uint16_t vring, int enable)
+{
+	struct rte_eth_dev *eth_dev;
+	struct internal_list *list;
+	char ifname[PATH_MAX];
+	struct pmd_internal *dev;
+	struct dma_vring *dma_vr;
+	struct rte_ioat_rawdev_config config;
+	struct rte_rawdev_info info = { .dev_private = &config };
+	char name[32];
+
+	rte_vhost_get_ifname(vid, ifname, sizeof(ifname));
+	list = find_internal_resource(ifname);
+	if (list == NULL) {
+		VHOST_LOG(ERR, "Invalid interface name: %s\n", ifname);
+		return -1;
+	}
+
+	VHOST_LOG(INFO, "vring %u is %s\n", vring,
+		  enable ? "enabled" : "disabled");
+
+	eth_dev = list->eth_dev;
+	_rte_eth_dev_callback_process(eth_dev, RTE_ETH_EVENT_QUEUE_STATE, NULL);
+
+	if (!enable)
+		return 0;
+
+	dev = eth_dev->data->dev_private;
+
+	/**
+	 * a vring can only use one DMA device. If it has been
+	 * assigned one, return immediately.
+	 */
+	dma_vr = &dev->dma_vrings[vring];
+	if (dma_vr->dma_enabled)
+		return 0;
+
+	if (!dev->dmas[vring].is_valid)
+		return 0;
+
+	/**
+	 * attach the given DMA device to the queue and
+	 * configure it. Currently, we only support I/OAT.
+	 */
+	if (dev->dmas[vring].type != IOAT) {
+		VHOST_LOG(DEBUG, "Cannot enable DMA for queue %d, since "
+			  "it is not an I/OAT device\n",
+			  dev->dmas[vring].dev_id);
+		return 0;
+	}
+
+	rte_pci_device_name(&dev->dmas[vring].addr, name, sizeof(name));
+	rte_rawdev_info_get(dev->dmas[vring].dev_id, &info);
+	config.ring_size = IOAT_RING_SIZE;
+	if (rte_rawdev_configure(dev->dmas[vring].dev_id, &info) < 0) {
+		VHOST_LOG(ERR, "Config the DMA device %s failed\n", name);
+		return -1;
+	}
+
+	rte_rawdev_start(dev->dmas[vring].dev_id);
+	memcpy(&dma_vr->dma_addr, &dev->dmas[vring].addr,
+	       sizeof(struct rte_pci_addr));
+	dma_vr->dev_id = dev->dmas[vring].dev_id;
+	dma_vr->dma_enabled = true;
+	dma_vr->nr_inflight = 0;
+	dma_vr->nr_batching = 0;
+
+	VHOST_LOG(INFO, "Attach DMA %s to the TX queue %u of port %u\n",
+		  name, vring / VIRTIO_QNUM, eth_dev->data->port_id);
+	return 0;
+}
+
+static struct vhost_device_ops vhost_ops = {
+	.new_device          = new_device,
+	.destroy_device      = destroy_device,
+	.vring_state_changed = vring_state_changed,
+};
+
+static int
+eth_dev_start(struct rte_eth_dev *eth_dev)
+{
+	struct pmd_internal *internal = eth_dev->data->dev_private;
+	struct rte_eth_conf *dev_conf = &eth_dev->data->dev_conf;
+
+	queue_setup(eth_dev, internal);
+
+	if (rte_atomic32_read(&internal->dev_attached) == 1) {
+		if (dev_conf->intr_conf.rxq) {
+			if (eth_vhost_install_intr(eth_dev) < 0) {
+				VHOST_LOG(INFO, "Failed to install "
+					  "interrupt handler.");
+				return -1;
+			}
+		}
+	}
+
+	rte_atomic32_set(&internal->started, 1);
+	update_queuing_status(eth_dev);
+
+	return 0;
+}
+
+static void
+eth_dev_stop(struct rte_eth_dev *dev)
+{
+	struct pmd_internal *internal = dev->data->dev_private;
+
+	rte_atomic32_set(&internal->started, 0);
+	update_queuing_status(dev);
+}
+
+static void
+eth_dev_close(struct rte_eth_dev *dev)
+{
+	struct pmd_internal *internal;
+	struct internal_list *list;
+	unsigned int i;
+
+	internal = dev->data->dev_private;
+	if (!internal)
+		return;
+
+	eth_dev_stop(dev);
+
+	rte_vhost_driver_unregister(internal->iface_name);
+
+	list = find_internal_resource(internal->iface_name);
+	if (!list)
+		return;
+
+	pthread_mutex_lock(&internal_list_lock);
+	TAILQ_REMOVE(&internal_list, list, next);
+	pthread_mutex_unlock(&internal_list_lock);
+	rte_free(list);
+
+	if (dev->data->rx_queues)
+		for (i = 0; i < dev->data->nb_rx_queues; i++)
+			rte_free(dev->data->rx_queues[i]);
+
+	if (dev->data->tx_queues)
+		for (i = 0; i < dev->data->nb_tx_queues; i++)
+			rte_free(dev->data->tx_queues[i]);
+
+	free(internal->dev_name);
+	free(internal->iface_name);
+	rte_free(internal);
+
+	dev->data->dev_private = NULL;
+}
+
+static int
+eth_rx_queue_setup(struct rte_eth_dev *dev, uint16_t rx_queue_id,
+		   uint16_t nb_rx_desc __rte_unused,
+		   unsigned int socket_id,
+		   const struct rte_eth_rxconf *rx_conf __rte_unused,
+		   struct rte_mempool *mb_pool)
+{
+	struct vhost_queue *vq;
+
+	vq = rte_zmalloc_socket(NULL, sizeof(struct vhost_queue),
+				RTE_CACHE_LINE_SIZE, socket_id);
+	if (vq == NULL) {
+		VHOST_LOG(ERR, "Failed to allocate memory for rx queue\n");
+		return -ENOMEM;
+	}
+
+	vq->mb_pool = mb_pool;
+	vq->virtqueue_id = rx_queue_id * VIRTIO_QNUM + VIRTIO_TXQ;
+	dev->data->rx_queues[rx_queue_id] = vq;
+
+	return 0;
+}
+
+static int
+eth_tx_queue_setup(struct rte_eth_dev *dev, uint16_t tx_queue_id,
+		   uint16_t nb_tx_desc __rte_unused,
+		   unsigned int socket_id,
+		   const struct rte_eth_txconf *tx_conf __rte_unused)
+{
+	struct vhost_queue *vq;
+
+	vq = rte_zmalloc_socket(NULL, sizeof(struct vhost_queue),
+				RTE_CACHE_LINE_SIZE, socket_id);
+	if (vq == NULL) {
+		VHOST_LOG(ERR, "Failed to allocate memory for tx queue\n");
+		return -ENOMEM;
+	}
+
+	vq->virtqueue_id = tx_queue_id * VIRTIO_QNUM + VIRTIO_RXQ;
+	dev->data->tx_queues[tx_queue_id] = vq;
+
+	return 0;
+}
+
+static int
+eth_dev_info(struct rte_eth_dev *dev,
+	     struct rte_eth_dev_info *dev_info)
+{
+	struct pmd_internal *internal;
+
+	internal = dev->data->dev_private;
+	if (internal == NULL) {
+		VHOST_LOG(ERR, "Invalid device specified\n");
+		return -ENODEV;
+	}
+
+	dev_info->max_mac_addrs = 1;
+	dev_info->max_rx_pktlen = (uint32_t)-1;
+	dev_info->max_rx_queues = internal->max_queues;
+	dev_info->max_tx_queues = internal->max_queues;
+	dev_info->min_rx_bufsize = 0;
+
+	dev_info->tx_offload_capa = DEV_TX_OFFLOAD_MULTI_SEGS |
+				DEV_TX_OFFLOAD_VLAN_INSERT;
+	dev_info->rx_offload_capa = DEV_RX_OFFLOAD_VLAN_STRIP;
+
+	return 0;
+}
+
+static int
+eth_stats_get(struct rte_eth_dev *dev, struct rte_eth_stats *stats)
+{
+	unsigned i;
+	unsigned long rx_total = 0, tx_total = 0;
+	unsigned long rx_total_bytes = 0, tx_total_bytes = 0;
+	struct vhost_queue *vq;
+
+	for (i = 0; i < RTE_ETHDEV_QUEUE_STAT_CNTRS &&
+	     i < dev->data->nb_rx_queues; i++) {
+		if (dev->data->rx_queues[i] == NULL)
+			continue;
+		vq = dev->data->rx_queues[i];
+		stats->q_ipackets[i] = vq->stats.pkts;
+		rx_total += stats->q_ipackets[i];
+
+		stats->q_ibytes[i] = vq->stats.bytes;
+		rx_total_bytes += stats->q_ibytes[i];
+	}
+
+	for (i = 0; i < RTE_ETHDEV_QUEUE_STAT_CNTRS &&
+	     i < dev->data->nb_tx_queues; i++) {
+		if (dev->data->tx_queues[i] == NULL)
+			continue;
+		vq = dev->data->tx_queues[i];
+		stats->q_opackets[i] = vq->stats.pkts;
+		tx_total += stats->q_opackets[i];
+
+		stats->q_obytes[i] = vq->stats.bytes;
+		tx_total_bytes += stats->q_obytes[i];
+	}
+
+	stats->ipackets = rx_total;
+	stats->opackets = tx_total;
+	stats->ibytes = rx_total_bytes;
+	stats->obytes = tx_total_bytes;
+
+	return 0;
+}
+
+static int
+eth_stats_reset(struct rte_eth_dev *dev)
+{
+	struct vhost_queue *vq;
+	unsigned i;
+
+	for (i = 0; i < dev->data->nb_rx_queues; i++) {
+		if (dev->data->rx_queues[i] == NULL)
+			continue;
+		vq = dev->data->rx_queues[i];
+		vq->stats.pkts = 0;
+		vq->stats.bytes = 0;
+	}
+	for (i = 0; i < dev->data->nb_tx_queues; i++) {
+		if (dev->data->tx_queues[i] == NULL)
+			continue;
+		vq = dev->data->tx_queues[i];
+		vq->stats.pkts = 0;
+		vq->stats.bytes = 0;
+		vq->stats.missed_pkts = 0;
+	}
+
+	return 0;
+}
+
+static void
+eth_queue_release(void *q)
+{
+	rte_free(q);
+}
+
+static int
+eth_tx_done_cleanup(void *txq __rte_unused, uint32_t free_cnt __rte_unused)
+{
+	/**
+	 * vHost does not hang onto mbuf. eth_vhost_tx() copies packet data
+	 * and releases mbuf, so nothing to cleanup.
+	 */
+	return 0;
+}
+
+static int
+eth_link_update(struct rte_eth_dev *dev __rte_unused,
+		int wait_to_complete __rte_unused)
+{
+	return 0;
+}
+
+static uint32_t
+eth_rx_queue_count(struct rte_eth_dev *dev, uint16_t rx_queue_id)
+{
+	struct vhost_queue *vq;
+
+	vq = dev->data->rx_queues[rx_queue_id];
+	if (unlikely(vq == NULL))
+		return 0;
+
+	return rte_vhost_rx_queue_count(vq->vid, vq->virtqueue_id);
+}
+
+static const struct eth_dev_ops ops = {
+	.dev_start = eth_dev_start,
+	.dev_stop = eth_dev_stop,
+	.dev_close = eth_dev_close,
+	.dev_configure = eth_dev_configure,
+	.dev_infos_get = eth_dev_info,
+	.rx_queue_setup = eth_rx_queue_setup,
+	.tx_queue_setup = eth_tx_queue_setup,
+	.rx_queue_release = eth_queue_release,
+	.tx_queue_release = eth_queue_release,
+	.tx_done_cleanup = eth_tx_done_cleanup,
+	.rx_queue_count = eth_rx_queue_count,
+	.link_update = eth_link_update,
+	.stats_get = eth_stats_get,
+	.stats_reset = eth_stats_reset,
+	.xstats_reset = vhost_dev_xstats_reset,
+	.xstats_get = vhost_dev_xstats_get,
+	.xstats_get_names = vhost_dev_xstats_get_names,
+	.rx_queue_intr_enable = eth_rxq_intr_enable,
+	.rx_queue_intr_disable = eth_rxq_intr_disable,
+};
+
+static int
+eth_dev_vhost_create(struct rte_vdev_device *dev, char *iface_name,
+		     int16_t queues, const unsigned int numa_node,
+		     uint64_t flags, struct dma_info *dmas)
+{
+	const char *name = rte_vdev_device_name(dev);
+	struct rte_eth_dev_data *data;
+	struct pmd_internal *internal = NULL;
+	struct rte_eth_dev *eth_dev = NULL;
+	struct rte_ether_addr *eth_addr = NULL;
+	struct internal_list *list = NULL;
+
+	VHOST_LOG(INFO, "Creating vHost-DMA backend on numa socket %u\n",
+		  numa_node);
+
+	list = rte_zmalloc_socket(name, sizeof(*list), 0, numa_node);
+	if (list == NULL)
+		goto error;
+
+	/* reserve an ethdev entry */
+	eth_dev = rte_eth_vdev_allocate(dev, sizeof(*internal));
+	if (eth_dev == NULL)
+		goto error;
+	data = eth_dev->data;
+
+	eth_addr = rte_zmalloc_socket(name, sizeof(*eth_addr), 0, numa_node);
+	if (eth_addr == NULL)
+		goto error;
+	data->mac_addrs = eth_addr;
+	*eth_addr = base_eth_addr;
+	eth_addr->addr_bytes[5] = eth_dev->data->port_id;
+
+	/**
+	 * now put it all together
+	 * - store queue data in internal,
+	 * - point eth_dev_data to internal
+	 * - and point eth_dev structure to new eth_dev_data structure
+	 */
+	internal = eth_dev->data->dev_private;
+	internal->dev_name = strdup(name);
+	if (internal->dev_name == NULL)
+		goto error;
+	internal->iface_name = strdup(iface_name);
+	if (internal->iface_name == NULL)
+		goto error;
+
+	list->eth_dev = eth_dev;
+	pthread_mutex_lock(&internal_list_lock);
+	TAILQ_INSERT_TAIL(&internal_list, list, next);
+	pthread_mutex_unlock(&internal_list_lock);
+
+	data->nb_rx_queues = queues;
+	data->nb_tx_queues = queues;
+	internal->max_queues = queues;
+	internal->vid = -1;
+
+	memcpy(internal->dmas, dmas, sizeof(struct dma_info) * 2 *
+	       RTE_MAX_QUEUES_PER_PORT);
+
+	data->dev_link = pmd_link;
+	data->dev_flags = RTE_ETH_DEV_INTR_LSC | RTE_ETH_DEV_CLOSE_REMOVE;
+
+	eth_dev->dev_ops = &ops;
+
+	/* assign rx and tx ops */
+	eth_dev->rx_pkt_burst = eth_vhost_rx;
+	eth_dev->tx_pkt_burst = eth_vhost_tx;
+
+	if (rte_vhost_driver_register(iface_name, flags))
+		goto error;
+
+	if (rte_vhost_driver_disable_features(iface_name,
+					      VHOST_DMA_UNSUPPORTED_FEATURES) <
+	    0)
+		goto error;
+
+	if (rte_vhost_driver_callback_register(iface_name, &vhost_ops) < 0) {
+		VHOST_LOG(ERR, "Can't register callbacks\n");
+		goto error;
+	}
+
+	if (rte_vhost_driver_start(iface_name) < 0) {
+		VHOST_LOG(ERR, "Failed to start driver for %s\n", iface_name);
+		goto error;
+	}
+
+	rte_eth_dev_probing_finish(eth_dev);
+	return data->port_id;
+
+error:
+	if (internal) {
+		free(internal->iface_name);
+		free(internal->dev_name);
+	}
+	rte_eth_dev_release_port(eth_dev);
+	rte_free(list);
+
+	return -1;
+}
+
+static inline int
+open_iface(const char *key __rte_unused, const char *value, void *extra_args)
+{
+	const char **iface_name = extra_args;
+
+	if (value == NULL)
+		return -1;
+
+	*iface_name = value;
+
+	return 0;
+}
+
+struct dma_info_input {
+	struct dma_info dmas[RTE_MAX_QUEUES_PER_PORT * 2];
+	uint16_t nr;
+};
+
+static inline int
+open_dma(const char *key __rte_unused, const char *value, void *extra_args)
+{
+	struct dma_info_input *dma_info = extra_args;
+	char *input = strndup(value, strlen(value) + 1);
+	char *addrs = input;
+	char *ptrs[2];
+	char *start, *end, *substr;
+	int64_t qid, vring_id;
+	struct rte_ioat_rawdev_config config;
+	struct rte_rawdev_info info = { .dev_private = &config };
+	char name[32];
+	int dev_id;
+	int ret = 0;
+
+	while (isblank(*addrs))
+		addrs++;
+	if (addrs == '\0') {
+		VHOST_LOG(ERR, "No input DMA addresses\n");
+		ret = -1;
+		goto out;
+	}
+
+	/* process single DMA device */
+	if (*addrs != '(') {
+		rte_strsplit(addrs, strlen(addrs), ptrs, 2, '@');
+
+		start = strstr(ptrs[0], "txq");
+		if (start == NULL) {
+			VHOST_LOG(ERR, "We only support DMA for TX "
+				  "queues currently\n");
+			ret = -1;
+			goto out;
+		}
+
+		start += 3;
+		qid = strtol(start, &end, 0);
+		if (end == start) {
+			VHOST_LOG(ERR, "No input queue ID\n");
+			ret = -1;
+			goto out;
+		}
+
+		vring_id = qid * 2 + VIRTIO_RXQ;
+		/* parse PCI address and check if the input DMA is supported */
+		if (rte_pci_addr_parse(ptrs[1],
+		    &dma_info->dmas[vring_id].addr) < 0) {
+			VHOST_LOG(ERR, "Invalid DMA address %s\n", ptrs[1]);
+			return -1;
+		}
+
+		rte_pci_device_name(&dma_info->dmas[vring_id].addr,
+				    name, sizeof(name));
+		dev_id = rte_rawdev_get_dev_id(name);
+		if (dev_id == (uint16_t)(-ENODEV) ||
+		    dev_id == (uint16_t)(-EINVAL)) {
+			VHOST_LOG(ERR, "Cannot find device %s.\n", name);
+			ret = -1;
+			goto out;
+		}
+
+		if (rte_rawdev_info_get(dev_id, &info) < 0 ||
+		    strstr(info.driver_name, "ioat") == NULL) {
+			VHOST_LOG(ERR, "The input device %s is invalid or "
+				  "it is not an I/OAT device\n", name);
+			ret = -1;
+			goto out;
+		}
+
+		dma_info->dmas[vring_id].dev_id = dev_id;
+		dma_info->dmas[vring_id].type = IOAT;
+		dma_info->dmas[vring_id].is_valid = true;
+		dma_info->nr++;
+		goto out;
+	}
+
+	/* process multiple DMA devices within bracket. */
+	addrs++;
+	substr = strtok(addrs, ";");
+	if (!substr) {
+		VHOST_LOG(ERR, "No input DMA addresse\n");
+		ret = -1;
+		goto out;
+	}
+
+	do {
+		rte_strsplit(substr, strlen(substr), ptrs, 2, '@');
+
+		start = strstr(ptrs[0], "txq");
+		if (start == NULL) {
+			VHOST_LOG(ERR, "We only support DMA acceleration for "
+				  "TX queues currently\n");
+			ret = -1;
+			goto out;
+		}
+
+		start += 3;
+		qid = strtol(start, &end, 0);
+		if (end == start) {
+			VHOST_LOG(ERR, "No input queue ID\n");
+			ret = -1;
+			goto out;
+		}
+
+		vring_id = qid * 2 + VIRTIO_RXQ;
+		/* parse PCI address and check if the input DMA is supported */
+		rte_pci_addr_parse(ptrs[1], &dma_info->dmas[vring_id].addr);
+		rte_pci_device_name(&dma_info->dmas[vring_id].addr,
+				    name, sizeof(name));
+		dev_id = rte_rawdev_get_dev_id(name);
+		if (dev_id == (uint16_t)(-ENODEV) ||
+		    dev_id == (uint16_t)(-EINVAL)) {
+			VHOST_LOG(ERR, "Cannot find device %s.\n", name);
+			ret = -1;
+			goto out;
+		}
+
+		if (rte_rawdev_info_get(dev_id, &info) < 0 ||
+		    strstr(info.driver_name, "ioat") == NULL) {
+			VHOST_LOG(ERR, "The input device %s is invalid or "
+				  "it is not an I/OAT device\n", name);
+			ret = -1;
+			goto out;
+		}
+
+		dma_info->dmas[vring_id].dev_id = dev_id;
+		dma_info->dmas[vring_id].type = IOAT;
+		dma_info->dmas[vring_id].is_valid = true;
+		dma_info->nr++;
+
+		substr = strtok(NULL, ";)");
+	} while (substr);
+
+out:
+	free(input);
+	return ret;
+}
+
+static inline int
+open_int(const char *key __rte_unused, const char *value, void *extra_args)
+{
+	uint16_t *n = extra_args;
+
+	if (value == NULL || extra_args == NULL)
+		return -EINVAL;
+
+	*n = (uint16_t)strtoul(value, NULL, 0);
+	if (*n == USHRT_MAX && errno == ERANGE)
+		return -1;
+
+	return 0;
+}
+
+static int
+rte_pmd_vhost_dma_probe(struct rte_vdev_device *dev)
+{
+	struct rte_kvargs *kvlist = NULL;
+	int ret = 0;
+	char *iface_name;
+	uint16_t queues;
+	uint64_t flags = 0;
+	int client_mode = 0;
+	struct rte_eth_dev *eth_dev;
+	const char *name = rte_vdev_device_name(dev);
+	struct dma_info_input dma_info = { 0 };
+
+	VHOST_LOG(INFO, "Initializing vHost DMA for %s\n", name);
+
+	if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
+		eth_dev = rte_eth_dev_attach_secondary(name);
+		if (!eth_dev) {
+			VHOST_LOG(ERR, "Failed to probe %s\n", name);
+			return -1;
+		}
+		/* TODO: request info from primary to set up Rx and Tx */
+		eth_dev->dev_ops = &ops;
+		eth_dev->device = &dev->device;
+		rte_eth_dev_probing_finish(eth_dev);
+		return 0;
+	}
+
+	kvlist = rte_kvargs_parse(rte_vdev_device_args(dev), valid_arguments);
+	if (kvlist == NULL)
+		return -1;
+
+	if (rte_kvargs_count(kvlist, ETH_VHOST_IFACE_ARG) == 1) {
+		ret = rte_kvargs_process(kvlist, ETH_VHOST_IFACE_ARG,
+					 &open_iface, &iface_name);
+		if (ret < 0)
+			goto out_free;
+	} else {
+		ret = -1;
+		goto out_free;
+	}
+
+	if (rte_kvargs_count(kvlist, ETH_VHOST_QUEUES_ARG) == 1) {
+		ret = rte_kvargs_process(kvlist, ETH_VHOST_QUEUES_ARG,
+					 &open_int, &queues);
+		if (ret < 0 || queues > RTE_MAX_QUEUES_PER_PORT)
+			goto out_free;
+
+	} else {
+		queues = 1;
+	}
+
+	if (rte_kvargs_count(kvlist, ETH_VHOST_CLIENT_ARG) == 1) {
+		ret = rte_kvargs_process(kvlist, ETH_VHOST_CLIENT_ARG,
+					 &open_int, &client_mode);
+		if (ret < 0)
+			goto out_free;
+
+		if (client_mode)
+			flags |= RTE_VHOST_USER_CLIENT;
+	}
+
+	if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_ARG) == 1) {
+		ret = rte_kvargs_process(kvlist, ETH_VHOST_DMA_ARG,
+					 &open_dma, &dma_info);
+		if (ret < 0)
+			goto out_free;
+
+		if (dma_info.nr > 0)
+			flags |= RTE_VHOST_USER_DMA_COPY;
+	}
+
+	/* vHost-DMA device is in the same NUMA node as the core. */
+	if (dev->device.numa_node == SOCKET_ID_ANY)
+		dev->device.numa_node = rte_socket_id();
+
+	eth_dev_vhost_create(dev, iface_name, queues, dev->device.numa_node,
+			     flags, dma_info.dmas);
+
+out_free:
+	rte_kvargs_free(kvlist);
+	return ret;
+}
+
+static int
+rte_pmd_vhost_dma_remove(struct rte_vdev_device *dev)
+{
+	const char *name;
+	struct rte_eth_dev *eth_dev = NULL;
+
+	name = rte_vdev_device_name(dev);
+	VHOST_LOG(INFO, "Un-Initializing pmd vhost-dma for %s\n", name);
+
+	/* find an ethdev entry */
+	eth_dev = rte_eth_dev_allocated(name);
+	if (eth_dev == NULL)
+		return 0;
+
+	if (rte_eal_process_type() != RTE_PROC_PRIMARY)
+		return rte_eth_dev_release_port(eth_dev);
+
+	eth_dev_close(eth_dev);
+
+	rte_eth_dev_release_port(eth_dev);
+
+	return 0;
+}
+
+static struct rte_vdev_driver pmd_vhost_dma_drv = {
+	.probe = rte_pmd_vhost_dma_probe,
+	.remove = rte_pmd_vhost_dma_remove,
+};
+
+RTE_PMD_REGISTER_VDEV(net_dma_vhost, pmd_vhost_dma_drv);
+RTE_PMD_REGISTER_ALIAS(net_dma_vhost, dma_vhost);
+RTE_PMD_REGISTER_PARAM_STRING(net_dma_vhost,
+	"iface=<ifc> "
+	"queues=<int> "
+	"client=<0|1> "
+	"dmas=(txq0@addr0;txq1@addr1...)");
+
+RTE_INIT(vhost_dma_init_log)
+{
+	vhost_dma_logtype = rte_log_register("vhost_dma");
+	if (vhost_dma_logtype >= 0)
+		rte_log_set_level(vhost_dma_logtype, RTE_LOG_NOTICE);
+}
diff --git a/drivers/net/vhost_dma/eth_vhost.h b/drivers/net/vhost_dma/eth_vhost.h
new file mode 100644
index 0000000..9c37c4d
--- /dev/null
+++ b/drivers/net/vhost_dma/eth_vhost.h
@@ -0,0 +1,264 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Intel Corporation
+ */
+#ifndef _ETH_VHOST_DMA_H_
+#define _ETH_VHOST_DMA_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdint.h>
+#include <stdbool.h>
+
+#include <rte_pci.h>
+#include <rte_vhost.h>
+#include <rte_log.h>
+
+#ifndef VIRTIO_F_IOMMU_PLATFORM
+#define VIRTIO_F_IOMMU_PLATFORM 33
+#endif
+
+#ifndef VIRTIO_F_RING_PACKED
+#define VIRTIO_F_RING_PACKED 34
+#endif
+
+#define VHOST_DMA_UNSUPPORTED_FEATURES ((1ULL << VHOST_F_LOG_ALL)	| \
+		(1ULL << VIRTIO_F_IOMMU_PLATFORM)	| \
+		(1ULL << VIRTIO_F_RING_PACKED))
+
+#define VHOST_MAX_PKT_BURST 32
+
+/* batching size before a DMA kick */
+#define DMA_BATCHING_SIZE 8
+/**
+ * copy length threshold for the DMA engine. We offload copy jobs whose
+ * lengths are greater than DMA_COPY_LENGTH_THRESHOLD to the DMA; for
+ * small copies, we still use the CPU to perform copies, due to startup
+ * overheads associated with the DMA.
+ */
+#define DMA_COPY_LENGTH_THRESHOLD 1024
+
+extern int vhost_dma_logtype;
+
+#define VHOST_LOG(level, fmt, args...) rte_log(RTE_LOG_ ## level,	\
+		vhost_dma_logtype, "VHOST_DMA: " fmt, ## args)
+
+#define vhost_avail_event(vr) \
+	(*(volatile uint16_t*)&(vr)->used->ring[(vr)->size])
+#define vhost_used_event(vr) \
+	(*(volatile uint16_t*)&(vr)->avail->ring[(vr)->size])
+
+enum vhost_xstats_pkts {
+	VHOST_UNDERSIZE_PKT = 0,
+	VHOST_64_PKT,
+	VHOST_65_TO_127_PKT,
+	VHOST_128_TO_255_PKT,
+	VHOST_256_TO_511_PKT,
+	VHOST_512_TO_1023_PKT,
+	VHOST_1024_TO_1522_PKT,
+	VHOST_1523_TO_MAX_PKT,
+	VHOST_BROADCAST_PKT,
+	VHOST_MULTICAST_PKT,
+	VHOST_UNICAST_PKT,
+	VHOST_ERRORS_PKT,
+	VHOST_ERRORS_FRAGMENTED,
+	VHOST_ERRORS_JABBER,
+	VHOST_UNKNOWN_PROTOCOL,
+	VHOST_XSTATS_MAX,
+};
+
+struct vhost_stats {
+	uint64_t pkts;
+	uint64_t bytes;
+	uint64_t missed_pkts;
+	uint64_t xstats[VHOST_XSTATS_MAX];
+};
+
+struct batch_copy_elem {
+	void *dst;
+	void *src;
+	uint32_t len;
+};
+
+struct guest_page {
+	uint64_t guest_phys_addr;
+	uint64_t host_phys_addr;
+	uint64_t size;
+};
+
+enum dma_type {
+	IOAT = 1
+};
+
+struct dma_vring {
+	struct rte_vhost_vring  vr;
+
+	uint16_t last_avail_idx;
+	uint16_t last_used_idx;
+
+	/* the last used index that front end can consume */
+	uint16_t copy_done_used;
+
+	uint16_t signalled_used;
+	bool signalled_used_valid;
+
+	struct vring_used_elem *shadow_used_split;
+	uint16_t shadow_used_idx;
+
+	struct batch_copy_elem  *batch_copy_elems;
+	uint16_t batch_copy_nb_elems;
+
+	bool dma_enabled;
+	/**
+	 * DMA ID. Currently, we only support I/OAT,
+	 * so it's I/OAT rawdev ID.
+	 */
+	uint16_t dev_id;
+	/* DMA address */
+	struct rte_pci_addr dma_addr;
+	/**
+	 * the number of copy jobs submitted to the DMA but may not
+	 * be completed
+	 */
+	uint64_t nr_inflight;
+	int nr_batching;
+
+	/* host physical address of the index of used ring */
+	phys_addr_t used_idx_hpa;
+
+	struct ring_index *indices;
+	uint16_t max_indices;
+};
+
+struct vhost_queue {
+	int vid;
+	rte_atomic32_t allow_queuing;
+	rte_atomic32_t while_queuing;
+	struct pmd_internal *internal;
+	struct rte_mempool *mb_pool;
+	uint16_t port;
+	uint16_t virtqueue_id;
+	struct vhost_stats stats;
+	struct dma_vring *dma_vring;
+};
+
+struct dma_info {
+	struct rte_pci_addr addr;
+	uint16_t dev_id;
+	enum dma_type type;
+	bool is_valid;
+};
+
+struct pmd_internal {
+	rte_atomic32_t dev_attached;
+	rte_atomic32_t started;
+	char *dev_name;
+	char *iface_name;
+	uint16_t max_queues;
+	int vid;
+	uint8_t vlan_strip;
+
+	struct dma_info dmas[RTE_MAX_QUEUES_PER_PORT * 2];
+
+	/* guest's memory regions */
+	struct rte_vhost_memory *mem;
+	/* address mapping table of guest and host physical addresses */
+	struct guest_page *guest_pages;
+	uint32_t nr_guest_pages;
+	uint32_t max_guest_pages;
+
+	/* guest's vrings */
+	struct dma_vring dma_vrings[RTE_MAX_QUEUES_PER_PORT * 2];
+	size_t hdr_len;
+	/* the number of vrings */
+	uint16_t nr_vrings;
+	/* negotiated features */
+	uint64_t features;
+};
+
+static inline void
+vhost_enable_notify_split(struct pmd_internal *dev, struct dma_vring *dma_vr,
+			  int enable)
+{
+	struct rte_vhost_vring *vr = &dma_vr->vr;
+
+	if (!(dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX))) {
+		if (enable)
+			vr->used->flags &= ~VRING_USED_F_NO_NOTIFY;
+		else
+			vr->used->flags |= VRING_USED_F_NO_NOTIFY;
+	} else {
+		if (enable)
+			vhost_avail_event(vr) = dma_vr->last_avail_idx;
+	}
+}
+
+/* This function is to enable front end to notify backend. */
+static inline void
+vhost_dma_enable_guest_notification(struct pmd_internal *dev,
+				     struct dma_vring *dma_vr, int enable)
+{
+	vhost_enable_notify_split(dev, dma_vr, enable);
+}
+
+/**
+ * This function gets front end's memory and vrings information.
+ * In addition, it sets up necessary data structures for enqueue
+ * and dequeue operations.
+ */
+int vhost_dma_setup(struct pmd_internal *dev);
+
+/**
+ * This function destroys front end's information and frees data
+ * structures for enqueue and dequeue operations.
+ */
+void vhost_dma_remove(struct pmd_internal *dev);
+
+/**
+ * This function sends packet buffers to front end's RX vring.
+ * It will free the mbufs of successfully transmitted packets.
+ *
+ * @param dev
+ *  vhost-dma device
+ * @param dma_vr
+ *  a front end's RX vring
+ * @param pkts
+ *  packets to send
+ * @param count
+ *  the number of packets to send
+ *
+ * @return
+ *  the number of packets successfully sent
+ */
+uint16_t vhost_dma_enqueue_burst(struct pmd_internal *dev,
+				  struct dma_vring *dma_vr,
+				  struct rte_mbuf **pkts, uint32_t count);
+
+/**
+ * This function gets packet buffers from front end's TX virtqueue.
+ *
+ * @param dev
+ *  vhost-dma device
+ * @param dma_vr
+ *  a front-end's TX vring
+ * @param mbuf_pool
+ *  mempool for allocating mbufs for received packets
+ * @param pkts
+ *  pointer array used to keep addresses of received packets
+ * @param count
+ *  the element number in 'pkts'
+ *
+ * @return
+ *  the number of packets successfully received
+ */
+uint16_t vhost_dma_dequeue_burst(struct pmd_internal *dev,
+				  struct dma_vring *dma_vr,
+				  struct rte_mempool *mbuf_pool,
+				  struct rte_mbuf **pkts, uint16_t count);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _ETH_VHOST_DMA_H_ */
diff --git a/drivers/net/vhost_dma/internal.h b/drivers/net/vhost_dma/internal.h
new file mode 100644
index 0000000..08591b3
--- /dev/null
+++ b/drivers/net/vhost_dma/internal.h
@@ -0,0 +1,225 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Intel Corporation
+ */
+#ifndef _INTERNAL_H_
+#define _INTERNAL_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include "eth_vhost.h"
+
+struct buf_vector {
+	uint64_t buf_iova;
+	uint64_t buf_addr;
+	uint32_t buf_len;
+	uint32_t desc_idx;
+};
+
+#define BUF_VECTOR_MAX 256
+
+struct ring_index {
+	/* physical address of 'data' */
+	uintptr_t pa;
+	uintptr_t idx;
+	uint16_t data;
+	bool in_use;
+} __rte_cache_aligned;
+
+static __rte_always_inline int
+setup_indices(struct ring_index **indices, uint16_t num)
+{
+	struct ring_index *array;
+	uint16_t i;
+
+	array = rte_zmalloc(NULL, sizeof(struct ring_index) * num, 0);
+	if (!array) {
+		printf("Init indices failed\n");
+		*indices = NULL;
+		return -1;
+	}
+
+	for (i = 0; i < num; i++) {
+		array[i].pa = rte_mem_virt2iova(&array[i].data);
+		array[i].idx = i;
+	}
+
+	*indices = array;
+	return 0;
+}
+
+static __rte_always_inline void
+destroy_indices(struct ring_index **indices)
+{
+	if (!indices)
+		return;
+	rte_free(*indices);
+	*indices = NULL;
+}
+
+static __rte_always_inline struct ring_index *
+get_empty_indices(struct ring_index *indices, uint16_t num)
+{
+	uint16_t i;
+
+	for (i = 0; i < num; i++) {
+		if (!indices[i].in_use)
+			break;
+	}
+
+	if (unlikely(i == num))
+		return NULL;
+
+	indices[i].in_use = true;
+	return &indices[i];
+}
+
+static __rte_always_inline void
+put_used_indices(struct ring_index *indices, uint16_t idx)
+{
+	indices[idx].in_use = false;
+}
+
+static uint64_t
+get_blk_size(int fd)
+{
+	struct stat stat;
+	int ret;
+
+	ret = fstat(fd, &stat);
+	return ret == -1 ? (uint64_t)-1 : (uint64_t)stat.st_blksize;
+}
+
+static __rte_always_inline int
+add_one_guest_page(struct pmd_internal *dev, uint64_t guest_phys_addr,
+		   uint64_t host_phys_addr, uint64_t size)
+{
+	struct guest_page *page, *last_page;
+	struct guest_page *old_pages;
+
+	if (dev->nr_guest_pages == dev->max_guest_pages) {
+		dev->max_guest_pages *= 2;
+		old_pages = dev->guest_pages;
+		dev->guest_pages = realloc(dev->guest_pages,
+					   dev->max_guest_pages *
+					   sizeof(*page));
+		if (!dev->guest_pages) {
+			VHOST_LOG(ERR, "Cannot realloc guest_pages\n");
+			free(old_pages);
+			return -1;
+		}
+	}
+
+	if (dev->nr_guest_pages > 0) {
+		last_page = &dev->guest_pages[dev->nr_guest_pages - 1];
+		/* merge if the two pages are continuous */
+		if (host_phys_addr == last_page->host_phys_addr +
+		    last_page->size) {
+			last_page->size += size;
+			return 0;
+		}
+	}
+
+	page = &dev->guest_pages[dev->nr_guest_pages++];
+	page->guest_phys_addr = guest_phys_addr;
+	page->host_phys_addr  = host_phys_addr;
+	page->size = size;
+
+	return 0;
+}
+
+static __rte_always_inline int
+add_guest_page(struct pmd_internal *dev, struct rte_vhost_mem_region *reg)
+{
+	uint64_t reg_size = reg->size;
+	uint64_t host_user_addr  = reg->host_user_addr;
+	uint64_t guest_phys_addr = reg->guest_phys_addr;
+	uint64_t host_phys_addr;
+	uint64_t size, page_size;
+
+	page_size = get_blk_size(reg->fd);
+	if (page_size == (uint64_t)-1) {
+		VHOST_LOG(ERR, "Cannot get hugepage size through fstat\n");
+		return -1;
+	}
+
+	host_phys_addr = rte_mem_virt2iova((void *)(uintptr_t)host_user_addr);
+	size = page_size - (guest_phys_addr & (page_size - 1));
+	size = RTE_MIN(size, reg_size);
+
+	if (add_one_guest_page(dev, guest_phys_addr, host_phys_addr, size) < 0)
+		return -1;
+
+	host_user_addr  += size;
+	guest_phys_addr += size;
+	reg_size -= size;
+
+	while (reg_size > 0) {
+		size = RTE_MIN(reg_size, page_size);
+		host_phys_addr = rte_mem_virt2iova((void *)(uintptr_t)
+						   host_user_addr);
+		if (add_one_guest_page(dev, guest_phys_addr, host_phys_addr,
+				       size) < 0)
+			return -1;
+
+		host_user_addr  += size;
+		guest_phys_addr += size;
+		reg_size -= size;
+	}
+
+	return 0;
+}
+
+static __rte_always_inline int
+setup_guest_pages(struct pmd_internal *dev, struct rte_vhost_memory *mem)
+{
+	uint32_t nr_regions = mem->nregions;
+	uint32_t i;
+
+	dev->nr_guest_pages = 0;
+	dev->max_guest_pages = 8;
+
+	dev->guest_pages = malloc(dev->max_guest_pages *
+			sizeof(struct guest_page));
+	if (dev->guest_pages == NULL) {
+		VHOST_LOG(ERR, "(%d) failed to allocate memory "
+			  "for dev->guest_pages\n", dev->vid);
+		return -1;
+	}
+
+	for (i = 0; i < nr_regions; i++) {
+		if (add_guest_page(dev, &mem->regions[i]) < 0)
+			return -1;
+	}
+	return 0;
+}
+
+static __rte_always_inline rte_iova_t
+gpa_to_hpa(struct pmd_internal *dev, uint64_t gpa, uint64_t size)
+{
+	uint32_t i;
+	struct guest_page *page;
+
+	for (i = 0; i < dev->nr_guest_pages; i++) {
+		page = &dev->guest_pages[i];
+
+		if (gpa >= page->guest_phys_addr &&
+		    gpa + size < page->guest_phys_addr + page->size) {
+			return gpa - page->guest_phys_addr +
+			       page->host_phys_addr;
+		}
+	}
+
+	return 0;
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _INTERNAL_H_ */
diff --git a/drivers/net/vhost_dma/rte_pmd_vhost_dma_version.map b/drivers/net/vhost_dma/rte_pmd_vhost_dma_version.map
new file mode 100644
index 0000000..ff357af
--- /dev/null
+++ b/drivers/net/vhost_dma/rte_pmd_vhost_dma_version.map
@@ -0,0 +1,4 @@
+DPDK_20.02 {
+
+	local: *;
+};
diff --git a/drivers/net/vhost_dma/virtio_net.c b/drivers/net/vhost_dma/virtio_net.c
new file mode 100644
index 0000000..7e59450
--- /dev/null
+++ b/drivers/net/vhost_dma/virtio_net.c
@@ -0,0 +1,1234 @@
+#include <stdint.h>
+#include <stdbool.h>
+#include <linux/virtio_net.h>
+
+#include <rte_mbuf.h>
+#include <rte_malloc.h>
+#include <rte_memcpy.h>
+#include <rte_ethdev.h>
+#include <rte_vhost.h>
+#include <rte_rawdev.h>
+#include <rte_ioat_rawdev.h>
+#include <rte_log.h>
+
+#include "internal.h"
+
+#define MAX_BATCH_LEN 256
+
+static __rte_always_inline bool
+vq_is_packed(struct pmd_internal *dev)
+{
+	return dev->features & (1ull << VIRTIO_F_RING_PACKED);
+}
+
+static __rte_always_inline int
+vhost_need_event(uint16_t event_idx, uint16_t new_idx, uint16_t old)
+{
+	return (uint16_t)(new_idx - event_idx - 1) < (uint16_t)(new_idx - old);
+}
+
+static __rte_always_inline void
+vhost_vring_call_split(struct pmd_internal *dev, struct dma_vring *dma_vr)
+{
+	struct rte_vhost_vring *vr = &dma_vr->vr;
+
+	/* flush used->idx update before we read avail->flags. */
+	rte_smp_mb();
+
+	if (dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) {
+		uint16_t old = dma_vr->signalled_used;
+		uint16_t new = dma_vr->copy_done_used;
+		bool signalled_used_valid = dma_vr->signalled_used_valid;
+
+		dma_vr->signalled_used = new;
+		dma_vr->signalled_used_valid = true;
+
+		VHOST_LOG(DEBUG, "%s: used_event_idx=%d, old=%d, new=%d\n",
+			  __func__, vhost_used_event(vr), old, new);
+
+		if ((vhost_need_event(vhost_used_event(vr), new, old) &&
+		     (vr->callfd >= 0)) || unlikely(!signalled_used_valid))
+			eventfd_write(vr->callfd, (eventfd_t)1);
+	} else {
+		if (!(vr->avail->flags & VRING_AVAIL_F_NO_INTERRUPT) &&
+		    (vr->callfd >= 0))
+			eventfd_write(vr->callfd, (eventfd_t)1);
+	}
+}
+
+/* notify front-end of enqueued packets */
+static __rte_always_inline void
+vhost_dma_vring_call(struct pmd_internal *dev, struct dma_vring *dma_vr)
+{
+	vhost_vring_call_split(dev, dma_vr);
+}
+
+static int
+process_dma_completed(struct pmd_internal *dev, struct dma_vring *dma_vr)
+{
+	uintptr_t flags[255], tmps[255];
+	int dma_done, i;
+	uint16_t used_idx;
+
+	dma_done = rte_ioat_completed_copies(dma_vr->dev_id, 255, flags,
+					     tmps);
+	if (unlikely(dma_done <= 0))
+		return dma_done;
+
+	dma_vr->nr_inflight -= dma_done;
+	for (i = 0; i < dma_done; i++) {
+		if ((uint64_t)flags[i] >= dma_vr->max_indices) {
+			/* the DMA finishes a packet copy job. */
+			struct rte_mbuf *pkt = (struct rte_mbuf *)flags[i];
+
+			rte_mbuf_refcnt_update(pkt, -1);
+			if (rte_mbuf_refcnt_read(pkt) == 1)
+				rte_pktmbuf_free(pkt);
+		} else {
+			/**
+			 * the DMA finishes updating index of the
+			 * used ring.
+			 */
+			uint16_t id = flags[i];
+
+			used_idx = dma_vr->indices[id].data;
+			VHOST_LOG(DEBUG, "The DMA finishes updating index %u "
+				  "for the used ring.\n", used_idx);
+
+			dma_vr->copy_done_used = used_idx;
+			vhost_dma_vring_call(dev, dma_vr);
+			put_used_indices(dma_vr->indices, id);
+		}
+	}
+	return dma_done;
+}
+
+static  __rte_always_inline bool
+rxvq_is_mergeable(struct pmd_internal *dev)
+{
+	return dev->features & (1ULL << VIRTIO_NET_F_MRG_RXBUF);
+}
+
+static __rte_always_inline void
+do_flush_shadow_used_ring_split(struct dma_vring *dma_vr, uint16_t to,
+				uint16_t from, uint16_t size)
+{
+	rte_memcpy(&dma_vr->vr.used->ring[to],
+		   &dma_vr->shadow_used_split[from],
+		   size * sizeof(struct vring_used_elem));
+}
+
+static __rte_always_inline void
+flush_shadow_used_ring_split(struct pmd_internal *dev,
+			     struct dma_vring *dma_vr)
+{
+	uint16_t used_idx = dma_vr->last_used_idx & (dma_vr->vr.size - 1);
+
+	if (used_idx + dma_vr->shadow_used_idx <= dma_vr->vr.size) {
+		do_flush_shadow_used_ring_split(dma_vr, used_idx, 0,
+						dma_vr->shadow_used_idx);
+	} else {
+		uint16_t size;
+
+		/* update used ring interval [used_idx, vr->size] */
+		size = dma_vr->vr.size - used_idx;
+		do_flush_shadow_used_ring_split(dma_vr, used_idx, 0, size);
+
+		/* update the left half used ring interval [0, left_size] */
+		do_flush_shadow_used_ring_split(dma_vr, 0, size,
+						dma_vr->shadow_used_idx -
+						size);
+	}
+	dma_vr->last_used_idx += dma_vr->shadow_used_idx;
+
+	rte_smp_wmb();
+
+	if (dma_vr->dma_enabled && dma_vr->nr_inflight > 0) {
+		struct ring_index *index;
+
+		index = get_empty_indices(dma_vr->indices,
+					  dma_vr->max_indices);
+		index->data = dma_vr->last_used_idx;
+		while (unlikely(rte_ioat_enqueue_copy(dma_vr->dev_id,
+						      index->pa,
+						      dma_vr->used_idx_hpa,
+						      sizeof(uint16_t),
+						      index->idx, 0, 0) ==
+				0)) {
+			int ret;
+
+			do {
+				ret = process_dma_completed(dev, dma_vr);
+			} while (ret <= 0);
+		}
+		dma_vr->nr_batching++;
+		dma_vr->nr_inflight++;
+	} else {
+		/**
+		 * we update index of used ring when all previous copy
+		 * jobs are completed.
+		 *
+		 * without enabling DMA copy, the CPU performs all memory
+		 * copy operations. In this case, the CPU is in charge of
+		 * updating the index of used ring.
+		 *
+		 * with enabling DMA copy, if there are outstanding copy
+		 * jobs of the DMA, to avoid the DMA overwriting the
+		 * write of the CPU, the DMA is in charge of updating
+		 * the index of used ring.
+		 */
+		*(volatile uint16_t *)&dma_vr->vr.used->idx +=
+			dma_vr->shadow_used_idx;
+		dma_vr->copy_done_used += dma_vr->shadow_used_idx;
+	}
+
+	dma_vr->shadow_used_idx = 0;
+}
+
+static __rte_always_inline void
+update_shadow_used_ring_split(struct dma_vring *dma_vr,
+			      uint16_t desc_idx, uint32_t len)
+{
+	uint16_t i = dma_vr->shadow_used_idx++;
+
+	dma_vr->shadow_used_split[i].id  = desc_idx;
+	dma_vr->shadow_used_split[i].len = len;
+}
+
+static inline void
+do_data_copy(struct dma_vring *dma_vr)
+{
+	struct batch_copy_elem *elem = dma_vr->batch_copy_elems;
+	uint16_t count = dma_vr->batch_copy_nb_elems;
+	int i;
+
+	for (i = 0; i < count; i++)
+		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
+
+	dma_vr->batch_copy_nb_elems = 0;
+}
+
+#define ASSIGN_UNLESS_EQUAL(var, val) do {	\
+	if ((var) != (val))			\
+		(var) = (val);			\
+} while (0)
+
+static __rte_always_inline void
+virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
+{
+	uint64_t csum_l4 = m_buf->ol_flags & PKT_TX_L4_MASK;
+
+	if (m_buf->ol_flags & PKT_TX_TCP_SEG)
+		csum_l4 |= PKT_TX_TCP_CKSUM;
+
+	if (csum_l4) {
+		net_hdr->flags = VIRTIO_NET_HDR_F_NEEDS_CSUM;
+		net_hdr->csum_start = m_buf->l2_len + m_buf->l3_len;
+
+		switch (csum_l4) {
+		case PKT_TX_TCP_CKSUM:
+			net_hdr->csum_offset = (offsetof(struct rte_tcp_hdr,
+						cksum));
+			break;
+		case PKT_TX_UDP_CKSUM:
+			net_hdr->csum_offset = (offsetof(struct rte_udp_hdr,
+						dgram_cksum));
+			break;
+		case PKT_TX_SCTP_CKSUM:
+			net_hdr->csum_offset = (offsetof(struct rte_sctp_hdr,
+						cksum));
+			break;
+		}
+	} else {
+		ASSIGN_UNLESS_EQUAL(net_hdr->csum_start, 0);
+		ASSIGN_UNLESS_EQUAL(net_hdr->csum_offset, 0);
+		ASSIGN_UNLESS_EQUAL(net_hdr->flags, 0);
+	}
+
+	/* IP cksum verification cannot be bypassed, then calculate here */
+	if (m_buf->ol_flags & PKT_TX_IP_CKSUM) {
+		struct rte_ipv4_hdr *ipv4_hdr;
+
+		ipv4_hdr = rte_pktmbuf_mtod_offset(m_buf, struct rte_ipv4_hdr *,
+						   m_buf->l2_len);
+		ipv4_hdr->hdr_checksum = rte_ipv4_cksum(ipv4_hdr);
+	}
+
+	if (m_buf->ol_flags & PKT_TX_TCP_SEG) {
+		if (m_buf->ol_flags & PKT_TX_IPV4)
+			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV4;
+		else
+			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV6;
+		net_hdr->gso_size = m_buf->tso_segsz;
+		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len
+					+ m_buf->l4_len;
+	} else if (m_buf->ol_flags & PKT_TX_UDP_SEG) {
+		net_hdr->gso_type = VIRTIO_NET_HDR_GSO_UDP;
+		net_hdr->gso_size = m_buf->tso_segsz;
+		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len +
+			m_buf->l4_len;
+	} else {
+		ASSIGN_UNLESS_EQUAL(net_hdr->gso_type, 0);
+		ASSIGN_UNLESS_EQUAL(net_hdr->gso_size, 0);
+		ASSIGN_UNLESS_EQUAL(net_hdr->hdr_len, 0);
+	}
+}
+
+static __rte_always_inline void *
+vhost_alloc_copy_ind_table(struct pmd_internal *dev, uint64_t desc_addr,
+			   uint64_t desc_len)
+{
+	void *idesc;
+	uint64_t src, dst;
+	uint64_t len, remain = desc_len;
+
+	idesc = rte_malloc(NULL, desc_len, 0);
+	if (unlikely(!idesc))
+		return NULL;
+
+	dst = (uint64_t)(uintptr_t)idesc;
+
+	while (remain) {
+		len = remain;
+		src = rte_vhost_va_from_guest_pa(dev->mem, desc_addr, &len);
+		if (unlikely(!src || !len)) {
+			rte_free(idesc);
+			return NULL;
+		}
+
+		rte_memcpy((void *)(uintptr_t)dst, (void *)(uintptr_t)src,
+			   len);
+
+		remain -= len;
+		dst += len;
+		desc_addr += len;
+	}
+
+	return idesc;
+}
+
+static __rte_always_inline void
+free_ind_table(void *idesc)
+{
+	rte_free(idesc);
+}
+
+static __rte_always_inline int
+map_one_desc(struct pmd_internal *dev, struct buf_vector *buf_vec,
+	     uint16_t *vec_idx, uint64_t desc_iova, uint64_t desc_len)
+{
+	uint16_t vec_id = *vec_idx;
+
+	while (desc_len) {
+		uint64_t desc_addr;
+		uint64_t desc_chunck_len = desc_len;
+
+		if (unlikely(vec_id >= BUF_VECTOR_MAX))
+			return -1;
+
+		desc_addr = rte_vhost_va_from_guest_pa(dev->mem, desc_iova,
+						       &desc_chunck_len);
+		if (unlikely(!desc_addr))
+			return -1;
+
+		rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+		buf_vec[vec_id].buf_iova = desc_iova;
+		buf_vec[vec_id].buf_addr = desc_addr;
+		buf_vec[vec_id].buf_len  = desc_chunck_len;
+
+		desc_len -= desc_chunck_len;
+		desc_iova += desc_chunck_len;
+		vec_id++;
+	}
+	*vec_idx = vec_id;
+
+	return 0;
+}
+
+static __rte_always_inline int
+fill_vec_buf_split(struct pmd_internal *dev, struct dma_vring *dma_vr,
+		   uint32_t avail_idx, uint16_t *vec_idx,
+		   struct buf_vector *buf_vec, uint16_t *desc_chain_head,
+		   uint32_t *desc_chain_len)
+{
+	struct rte_vhost_vring *vr = &dma_vr->vr;
+	uint16_t idx = vr->avail->ring[avail_idx & (vr->size - 1)];
+	uint16_t vec_id = *vec_idx;
+	uint32_t len    = 0;
+	uint64_t dlen;
+	uint32_t nr_descs = vr->size;
+	uint32_t cnt    = 0;
+	struct vring_desc *descs = vr->desc;
+	struct vring_desc *idesc = NULL;
+
+	if (unlikely(idx >= vr->size))
+		return -1;
+
+	*desc_chain_head = idx;
+
+	if (vr->desc[idx].flags & VRING_DESC_F_INDIRECT) {
+		dlen = vr->desc[idx].len;
+		nr_descs = dlen / sizeof(struct vring_desc);
+		if (unlikely(nr_descs > vr->size))
+			return -1;
+
+		descs = (struct vring_desc *)(uintptr_t)
+			rte_vhost_va_from_guest_pa(dev->mem,
+						   vr->desc[idx].addr, &dlen);
+		if (unlikely(!descs))
+			return -1;
+
+		if (unlikely(dlen < vr->desc[idx].len)) {
+			/**
+			 * the indirect desc table is not contiguous
+			 * in process VA space, we have to copy it.
+			 */
+			idesc = vhost_alloc_copy_ind_table(dev,
+							   vr->desc[idx].addr,
+							   vr->desc[idx].len);
+			if (unlikely(!idesc))
+				return -1;
+
+			descs = idesc;
+		}
+
+		idx = 0;
+	}
+
+	while (1) {
+		if (unlikely(idx >= nr_descs || cnt++ >= nr_descs)) {
+			free_ind_table(idesc);
+			return -1;
+		}
+
+		len += descs[idx].len;
+
+		if (unlikely(map_one_desc(dev, buf_vec, &vec_id,
+					  descs[idx].addr, descs[idx].len))) {
+			free_ind_table(idesc);
+			return -1;
+		}
+
+		if ((descs[idx].flags & VRING_DESC_F_NEXT) == 0)
+			break;
+
+		idx = descs[idx].next;
+	}
+
+	*desc_chain_len = len;
+	*vec_idx = vec_id;
+
+	if (unlikely(!!idesc))
+		free_ind_table(idesc);
+
+	return 0;
+}
+
+static inline int
+reserve_avail_buf_split(struct pmd_internal *dev, struct dma_vring *dma_vr,
+			uint32_t size, struct buf_vector *buf_vec,
+			uint16_t *num_buffers, uint16_t avail_head,
+			uint16_t *nr_vec)
+{
+	struct rte_vhost_vring *vr = &dma_vr->vr;
+
+	uint16_t cur_idx;
+	uint16_t vec_idx = 0;
+	uint16_t max_tries, tries = 0;
+	uint16_t head_idx = 0;
+	uint32_t len = 0;
+
+	*num_buffers = 0;
+	cur_idx  = dma_vr->last_avail_idx;
+
+	if (rxvq_is_mergeable(dev))
+		max_tries = vr->size - 1;
+	else
+		max_tries = 1;
+
+	while (size > 0) {
+		if (unlikely(cur_idx == avail_head))
+			return -1;
+		/**
+		 * if we tried all available ring items, and still
+		 * can't get enough buf, it means something abnormal
+		 * happened.
+		 */
+		if (unlikely(++tries > max_tries))
+			return -1;
+
+		if (unlikely(fill_vec_buf_split(dev, dma_vr, cur_idx,
+						&vec_idx, buf_vec,
+						&head_idx, &len) < 0))
+			return -1;
+		len = RTE_MIN(len, size);
+		update_shadow_used_ring_split(dma_vr, head_idx, len);
+		size -= len;
+
+		cur_idx++;
+		*num_buffers += 1;
+	}
+
+	*nr_vec = vec_idx;
+
+	return 0;
+}
+
+static __rte_noinline void
+copy_vnet_hdr_to_desc(struct pmd_internal *dev, struct buf_vector *buf_vec,
+		      struct virtio_net_hdr_mrg_rxbuf *hdr)
+{
+	uint64_t len;
+	uint64_t remain = dev->hdr_len;
+	uint64_t src = (uint64_t)(uintptr_t)hdr, dst;
+	uint64_t iova = buf_vec->buf_iova;
+
+	while (remain) {
+		len = RTE_MIN(remain,
+			      buf_vec->buf_len);
+		dst = buf_vec->buf_addr;
+		rte_memcpy((void *)(uintptr_t)dst, (void *)(uintptr_t)src,
+			   len);
+
+		remain -= len;
+		iova += len;
+		src += len;
+		buf_vec++;
+	}
+}
+
+static __rte_always_inline int
+copy_mbuf_to_desc(struct pmd_internal *dev, struct dma_vring *dma_vr,
+		  struct rte_mbuf *m, struct buf_vector *buf_vec,
+		  uint16_t nr_vec, uint16_t num_buffers, bool *copy_done)
+{
+	uint32_t vec_idx = 0;
+	uint32_t mbuf_offset, mbuf_avail;
+	uint32_t buf_offset, buf_avail;
+	uint64_t buf_addr, buf_iova, buf_len;
+	uint32_t cpy_len;
+	uint64_t hdr_addr;
+	struct rte_mbuf *hdr_mbuf;
+	struct batch_copy_elem *batch_copy = dma_vr->batch_copy_elems;
+	struct virtio_net_hdr_mrg_rxbuf tmp_hdr, *hdr = NULL;
+	uint64_t dst, src;
+	int error = 0;
+
+	if (unlikely(m == NULL)) {
+		error = -1;
+		goto out;
+	}
+
+	*copy_done = true;
+
+	buf_addr = buf_vec[vec_idx].buf_addr;
+	buf_iova = buf_vec[vec_idx].buf_iova;
+	buf_len = buf_vec[vec_idx].buf_len;
+
+	if (unlikely(buf_len < dev->hdr_len && nr_vec <= 1)) {
+		error = -1;
+		goto out;
+	}
+
+	hdr_mbuf = m;
+	hdr_addr = buf_addr;
+	if (unlikely(buf_len < dev->hdr_len))
+		hdr = &tmp_hdr;
+	else
+		hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)hdr_addr;
+
+	VHOST_LOG(DEBUG, "(%d) RX: num merge buffers %d\n", dev->vid,
+		  num_buffers);
+
+	if (unlikely(buf_len < dev->hdr_len)) {
+		buf_offset = dev->hdr_len - buf_len;
+		vec_idx++;
+		buf_addr = buf_vec[vec_idx].buf_addr;
+		buf_iova = buf_vec[vec_idx].buf_iova;
+		buf_len = buf_vec[vec_idx].buf_len;
+		buf_avail = buf_len - buf_offset;
+	} else {
+		buf_offset = dev->hdr_len;
+		buf_avail = buf_len - dev->hdr_len;
+	}
+
+	mbuf_avail  = rte_pktmbuf_data_len(m);
+	mbuf_offset = 0;
+	while (mbuf_avail != 0 || m->next != NULL) {
+		bool dma_copy = false;
+
+		/* done with current buf, get the next one */
+		if (buf_avail == 0) {
+			vec_idx++;
+			if (unlikely(vec_idx >= nr_vec)) {
+				error = -1;
+				goto out;
+			}
+
+			buf_addr = buf_vec[vec_idx].buf_addr;
+			buf_iova = buf_vec[vec_idx].buf_iova;
+			buf_len = buf_vec[vec_idx].buf_len;
+
+			buf_offset = 0;
+			buf_avail  = buf_len;
+		}
+
+		/* done with current mbuf, get the next one */
+		if (mbuf_avail == 0) {
+			m = m->next;
+
+			mbuf_offset = 0;
+			mbuf_avail  = rte_pktmbuf_data_len(m);
+		}
+
+		if (hdr_addr) {
+			virtio_enqueue_offload(hdr_mbuf, &hdr->hdr);
+			if (rxvq_is_mergeable(dev))
+				ASSIGN_UNLESS_EQUAL(hdr->num_buffers,
+						    num_buffers);
+
+			if (unlikely(hdr == &tmp_hdr))
+				copy_vnet_hdr_to_desc(dev, buf_vec, hdr);
+			hdr_addr = 0;
+		}
+
+		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
+
+		if (dma_vr->dma_enabled && cpy_len >=
+		    DMA_COPY_LENGTH_THRESHOLD) {
+			dst = gpa_to_hpa(dev, buf_iova + buf_offset, cpy_len);
+			dma_copy = (dst != 0);
+		}
+
+		if (dma_copy) {
+			src = rte_pktmbuf_iova_offset(m, mbuf_offset);
+
+			/**
+			 * if DMA enqueue fails, we wait until there are
+			 * available DMA descriptors.
+			 */
+			while (unlikely(rte_ioat_enqueue_copy(dma_vr->dev_id,
+							      src, dst, cpy_len,
+							      (uintptr_t)
+							      hdr_mbuf, 0, 0) ==
+					0)) {
+				int ret;
+
+				do {
+					ret = process_dma_completed(dev,
+								    dma_vr);
+				} while (ret <= 0);
+			}
+
+			dma_vr->nr_batching++;
+			dma_vr->nr_inflight++;
+			rte_mbuf_refcnt_update(hdr_mbuf, 1);
+			*copy_done = false;
+		} else if (likely(cpy_len > MAX_BATCH_LEN ||
+				  dma_vr->batch_copy_nb_elems >=
+				  dma_vr->vr.size)) {
+			rte_memcpy((void *)((uintptr_t)(buf_addr + buf_offset)),
+				   rte_pktmbuf_mtod_offset(m, void *,
+							   mbuf_offset),
+				   cpy_len);
+		} else {
+			batch_copy[dma_vr->batch_copy_nb_elems].dst =
+				(void *)((uintptr_t)(buf_addr + buf_offset));
+			batch_copy[dma_vr->batch_copy_nb_elems].src =
+				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
+			batch_copy[dma_vr->batch_copy_nb_elems].len = cpy_len;
+			dma_vr->batch_copy_nb_elems++;
+		}
+
+		mbuf_avail  -= cpy_len;
+		mbuf_offset += cpy_len;
+		buf_avail  -= cpy_len;
+		buf_offset += cpy_len;
+	}
+
+out:
+	return error;
+}
+
+static __rte_always_inline uint16_t
+vhost_dma_enqueue_split(struct pmd_internal *dev, struct dma_vring *dma_vr,
+			 struct rte_mbuf **pkts, uint32_t count)
+{
+	struct rte_vhost_vring *vr = &dma_vr->vr;
+
+	uint32_t pkt_idx = 0;
+	uint16_t num_buffers;
+	struct buf_vector buf_vec[BUF_VECTOR_MAX];
+	uint16_t avail_head;
+
+	struct rte_mbuf *done_pkts[VHOST_MAX_PKT_BURST];
+	uint32_t i, nr_done = 0;
+	bool copy_done;
+
+	if (dma_vr->dma_enabled && dma_vr->nr_inflight > 0)
+		process_dma_completed(dev, dma_vr);
+
+	avail_head = *((volatile uint16_t *)&vr->avail->idx);
+
+	/**
+	 * the ordering between avail index and
+	 * desc reads needs to be enforced.
+	 */
+	rte_smp_rmb();
+
+	rte_prefetch0(&vr->avail->ring[dma_vr->last_avail_idx &
+			(vr->size - 1)]);
+
+	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
+		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->hdr_len;
+		uint16_t nr_vec = 0;
+
+		if (unlikely(reserve_avail_buf_split(dev, dma_vr, pkt_len,
+						     buf_vec, &num_buffers,
+						     avail_head, &nr_vec) <
+			     0)) {
+			VHOST_LOG(INFO,
+				  "(%d) failed to get enough desc from vring\n",
+				  dev->vid);
+			dma_vr->shadow_used_idx -= num_buffers;
+			break;
+		}
+
+		VHOST_LOG(DEBUG, "(%d) current index %d | end index %d\n",
+			  dev->vid, dma_vr->last_avail_idx,
+			  dma_vr->last_avail_idx + num_buffers);
+
+		if (copy_mbuf_to_desc(dev, dma_vr, pkts[pkt_idx],
+				      buf_vec, nr_vec, num_buffers,
+				      &copy_done) < 0) {
+			dma_vr->shadow_used_idx -= num_buffers;
+			break;
+		}
+
+		if (copy_done)
+			done_pkts[nr_done++] = pkts[pkt_idx];
+
+		if (dma_vr->dma_enabled &&
+		    unlikely(dma_vr->nr_batching >= DMA_BATCHING_SIZE)) {
+			/**
+			 * kick the DMA to do copy once the number of
+			 * batching jobs reaches the batching threshold.
+			 */
+			rte_ioat_do_copies(dma_vr->dev_id);
+			dma_vr->nr_batching = 0;
+		}
+
+		dma_vr->last_avail_idx += num_buffers;
+	}
+
+	do_data_copy(dma_vr);
+
+	if (dma_vr->shadow_used_idx) {
+		flush_shadow_used_ring_split(dev, dma_vr);
+		vhost_dma_vring_call(dev, dma_vr);
+	}
+
+	if (dma_vr->dma_enabled && dma_vr->nr_batching > 0) {
+		rte_ioat_do_copies(dma_vr->dev_id);
+		dma_vr->nr_batching = 0;
+	}
+
+	/* free copy-done packets */
+	for (i = 0; i < nr_done; i++)
+		rte_pktmbuf_free(done_pkts[i]);
+
+	return pkt_idx;
+}
+
+uint16_t
+vhost_dma_enqueue_burst(struct pmd_internal *dev, struct dma_vring *dma_vr,
+			 struct rte_mbuf **pkts, uint32_t count)
+{
+	return vhost_dma_enqueue_split(dev, dma_vr, pkts, count);
+}
+
+static inline bool
+virtio_net_with_host_offload(struct pmd_internal *dev)
+{
+	if (dev->features &
+			((1ULL << VIRTIO_NET_F_CSUM) |
+			 (1ULL << VIRTIO_NET_F_HOST_ECN) |
+			 (1ULL << VIRTIO_NET_F_HOST_TSO4) |
+			 (1ULL << VIRTIO_NET_F_HOST_TSO6) |
+			 (1ULL << VIRTIO_NET_F_HOST_UFO)))
+		return true;
+
+	return false;
+}
+
+static void
+parse_ethernet(struct rte_mbuf *m, uint16_t *l4_proto, void **l4_hdr)
+{
+	struct rte_ipv4_hdr *ipv4_hdr;
+	struct rte_ipv6_hdr *ipv6_hdr;
+	void *l3_hdr = NULL;
+	struct rte_ether_hdr *eth_hdr;
+	uint16_t ethertype;
+
+	eth_hdr = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
+
+	m->l2_len = sizeof(struct rte_ether_hdr);
+	ethertype = rte_be_to_cpu_16(eth_hdr->ether_type);
+
+	if (ethertype == RTE_ETHER_TYPE_VLAN) {
+		struct rte_vlan_hdr *vlan_hdr =
+			(struct rte_vlan_hdr *)(eth_hdr + 1);
+
+		m->l2_len += sizeof(struct rte_vlan_hdr);
+		ethertype = rte_be_to_cpu_16(vlan_hdr->eth_proto);
+	}
+
+	l3_hdr = (char *)eth_hdr + m->l2_len;
+
+	switch (ethertype) {
+	case RTE_ETHER_TYPE_IPV4:
+		ipv4_hdr = l3_hdr;
+		*l4_proto = ipv4_hdr->next_proto_id;
+		m->l3_len = (ipv4_hdr->version_ihl & 0x0f) * 4;
+		*l4_hdr = (char *)l3_hdr + m->l3_len;
+		m->ol_flags |= PKT_TX_IPV4;
+		break;
+	case RTE_ETHER_TYPE_IPV6:
+		ipv6_hdr = l3_hdr;
+		*l4_proto = ipv6_hdr->proto;
+		m->l3_len = sizeof(struct rte_ipv6_hdr);
+		*l4_hdr = (char *)l3_hdr + m->l3_len;
+		m->ol_flags |= PKT_TX_IPV6;
+		break;
+	default:
+		m->l3_len = 0;
+		*l4_proto = 0;
+		*l4_hdr = NULL;
+		break;
+	}
+}
+
+static __rte_always_inline void
+vhost_dequeue_offload(struct virtio_net_hdr *hdr, struct rte_mbuf *m)
+{
+	uint16_t l4_proto = 0;
+	void *l4_hdr = NULL;
+	struct rte_tcp_hdr *tcp_hdr = NULL;
+
+	if (hdr->flags == 0 && hdr->gso_type == VIRTIO_NET_HDR_GSO_NONE)
+		return;
+
+	parse_ethernet(m, &l4_proto, &l4_hdr);
+	if (hdr->flags == VIRTIO_NET_HDR_F_NEEDS_CSUM) {
+		if (hdr->csum_start == (m->l2_len + m->l3_len)) {
+			switch (hdr->csum_offset) {
+			case (offsetof(struct rte_tcp_hdr, cksum)):
+				if (l4_proto == IPPROTO_TCP)
+					m->ol_flags |= PKT_TX_TCP_CKSUM;
+				break;
+			case (offsetof(struct rte_udp_hdr, dgram_cksum)):
+				if (l4_proto == IPPROTO_UDP)
+					m->ol_flags |= PKT_TX_UDP_CKSUM;
+				break;
+			case (offsetof(struct rte_sctp_hdr, cksum)):
+				if (l4_proto == IPPROTO_SCTP)
+					m->ol_flags |= PKT_TX_SCTP_CKSUM;
+				break;
+			default:
+				break;
+			}
+		}
+	}
+
+	if (l4_hdr && hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
+		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
+		case VIRTIO_NET_HDR_GSO_TCPV4:
+		case VIRTIO_NET_HDR_GSO_TCPV6:
+			tcp_hdr = l4_hdr;
+			m->ol_flags |= PKT_TX_TCP_SEG;
+			m->tso_segsz = hdr->gso_size;
+			m->l4_len = (tcp_hdr->data_off & 0xf0) >> 2;
+			break;
+		case VIRTIO_NET_HDR_GSO_UDP:
+			m->ol_flags |= PKT_TX_UDP_SEG;
+			m->tso_segsz = hdr->gso_size;
+			m->l4_len = sizeof(struct rte_udp_hdr);
+			break;
+		default:
+			VHOST_LOG(WARNING,
+				  "unsupported gso type %u.\n", hdr->gso_type);
+			break;
+		}
+	}
+}
+
+static __rte_noinline void
+copy_vnet_hdr_from_desc(struct virtio_net_hdr *hdr, struct buf_vector *buf_vec)
+{
+	uint64_t len;
+	uint64_t remain = sizeof(struct virtio_net_hdr);
+	uint64_t src;
+	uint64_t dst = (uint64_t)(uintptr_t)hdr;
+
+	while (remain) {
+		len = RTE_MIN(remain, buf_vec->buf_len);
+		src = buf_vec->buf_addr;
+		rte_memcpy((void *)(uintptr_t)dst, (void *)(uintptr_t)src,
+			   len);
+
+		remain -= len;
+		dst += len;
+		buf_vec++;
+	}
+}
+
+static __rte_always_inline int
+copy_desc_to_mbuf(struct pmd_internal *dev, struct dma_vring *dma_vr,
+		  struct buf_vector *buf_vec, uint16_t nr_vec,
+		  struct rte_mbuf *m, struct rte_mempool *mbuf_pool)
+{
+	uint32_t buf_avail, buf_offset;
+	uint64_t buf_addr, buf_iova, buf_len;
+	uint32_t mbuf_avail, mbuf_offset;
+	uint32_t cpy_len;
+	struct rte_mbuf *cur = m, *prev = m;
+	struct virtio_net_hdr tmp_hdr;
+	struct virtio_net_hdr *hdr = NULL;
+	/* a counter to avoid desc dead loop chain */
+	uint16_t vec_idx = 0;
+	struct batch_copy_elem *batch_copy = dma_vr->batch_copy_elems;
+	int error = 0;
+
+	buf_addr = buf_vec[vec_idx].buf_addr;
+	buf_iova = buf_vec[vec_idx].buf_iova;
+	buf_len = buf_vec[vec_idx].buf_len;
+
+	if (unlikely(buf_len < dev->hdr_len && nr_vec <= 1)) {
+		error = -1;
+		goto out;
+	}
+
+	if (virtio_net_with_host_offload(dev)) {
+		if (unlikely(buf_len < sizeof(struct virtio_net_hdr))) {
+			/**
+			 * no luck, the virtio-net header doesn't fit
+			 * in a contiguous virtual area.
+			 */
+			copy_vnet_hdr_from_desc(&tmp_hdr, buf_vec);
+			hdr = &tmp_hdr;
+		} else {
+			hdr = (struct virtio_net_hdr *)((uintptr_t)buf_addr);
+		}
+	}
+
+	/**
+	 * a virtio driver normally uses at least 2 desc buffers
+	 * for Tx: the first for storing the header, and others
+	 * for storing the data.
+	 */
+	if (unlikely(buf_len < dev->hdr_len)) {
+		buf_offset = dev->hdr_len - buf_len;
+		vec_idx++;
+		buf_addr = buf_vec[vec_idx].buf_addr;
+		buf_iova = buf_vec[vec_idx].buf_iova;
+		buf_len = buf_vec[vec_idx].buf_len;
+		buf_avail  = buf_len - buf_offset;
+	} else if (buf_len == dev->hdr_len) {
+		if (unlikely(++vec_idx >= nr_vec))
+			goto out;
+		buf_addr = buf_vec[vec_idx].buf_addr;
+		buf_iova = buf_vec[vec_idx].buf_iova;
+		buf_len = buf_vec[vec_idx].buf_len;
+
+		buf_offset = 0;
+		buf_avail = buf_len;
+	} else {
+		buf_offset = dev->hdr_len;
+		buf_avail = buf_vec[vec_idx].buf_len - dev->hdr_len;
+	}
+
+	mbuf_offset = 0;
+	mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
+	while (1) {
+		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
+		(void)buf_iova;
+
+		if (cpy_len > MAX_BATCH_LEN || dma_vr->batch_copy_nb_elems >=
+		    dma_vr->vr.size || (hdr && cur == m)) {
+			rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *,
+							   mbuf_offset),
+				   (void *)((uintptr_t)(buf_addr + buf_offset)),
+				   cpy_len);
+		} else {
+			batch_copy[dma_vr->batch_copy_nb_elems].dst =
+				rte_pktmbuf_mtod_offset(cur, void *,
+							mbuf_offset);
+			batch_copy[dma_vr->batch_copy_nb_elems].src =
+				(void *)((uintptr_t)(buf_addr + buf_offset));
+			batch_copy[dma_vr->batch_copy_nb_elems].len = cpy_len;
+			dma_vr->batch_copy_nb_elems++;
+		}
+
+		mbuf_avail  -= cpy_len;
+		mbuf_offset += cpy_len;
+		buf_avail -= cpy_len;
+		buf_offset += cpy_len;
+
+		/* this buf reaches to its end, get the next one */
+		if (buf_avail == 0) {
+			if (++vec_idx >= nr_vec)
+				break;
+
+			buf_addr = buf_vec[vec_idx].buf_addr;
+			buf_iova = buf_vec[vec_idx].buf_iova;
+			buf_len = buf_vec[vec_idx].buf_len;
+
+			buf_offset = 0;
+			buf_avail  = buf_len;
+		}
+
+		/**
+		 * this mbuf reaches to its end, get a new one
+		 * to hold more data.
+		 */
+		if (mbuf_avail == 0) {
+			cur = rte_pktmbuf_alloc(mbuf_pool);
+			if (unlikely(cur == NULL)) {
+				VHOST_LOG(INFO, "Failed to allocate mbuf.\n");
+				error = -1;
+				goto out;
+			}
+
+			prev->next = cur;
+			prev->data_len = mbuf_offset;
+			m->nb_segs += 1;
+			m->pkt_len += mbuf_offset;
+			prev = cur;
+
+			mbuf_offset = 0;
+			mbuf_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
+		}
+	}
+
+	prev->data_len = mbuf_offset;
+	m->pkt_len += mbuf_offset;
+
+	if (hdr)
+		vhost_dequeue_offload(hdr, m);
+
+out:
+
+	return error;
+}
+
+static __rte_always_inline uint16_t
+vhost_dma_dequeue_split(struct pmd_internal *dev, struct dma_vring *dma_vr,
+			 struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+			 uint16_t count)
+{
+	struct rte_vhost_vring *vr = &dma_vr->vr;
+	uint16_t free_entries, i;
+
+	free_entries = *((volatile uint16_t *)&vr->avail->idx) -
+		dma_vr->last_avail_idx;
+	if (free_entries == 0)
+		return 0;
+
+	/**
+	 * the ordering between avail index and
+	 * desc reads needs to be enforced.
+	 */
+	rte_smp_rmb();
+
+	rte_prefetch0(&vr->avail->ring[dma_vr->last_avail_idx &
+		      (vr->size - 1)]);
+
+	count = RTE_MIN(count, VHOST_MAX_PKT_BURST);
+	count = RTE_MIN(count, free_entries);
+	VHOST_LOG(DEBUG, "(%d) about to dequeue %u buffers\n",
+		  dev->vid, count);
+
+	for (i = 0; i < count; i++) {
+		struct buf_vector buf_vec[BUF_VECTOR_MAX];
+		uint16_t head_idx;
+		uint32_t dummy_len;
+		uint16_t nr_vec = 0;
+		int err;
+
+		if (unlikely(fill_vec_buf_split(dev, dma_vr,
+						dma_vr->last_avail_idx + i,
+						&nr_vec, buf_vec,
+						&head_idx, &dummy_len) < 0))
+			break;
+
+		update_shadow_used_ring_split(dma_vr, head_idx, 0);
+
+		pkts[i] = rte_pktmbuf_alloc(mbuf_pool);
+		if (unlikely(pkts[i] == NULL)) {
+			VHOST_LOG(INFO, "Failed to allocate mbuf.\n");
+			break;
+		}
+
+		err = copy_desc_to_mbuf(dev, dma_vr, buf_vec, nr_vec, pkts[i],
+					mbuf_pool);
+		if (unlikely(err)) {
+			rte_pktmbuf_free(pkts[i]);
+			break;
+		}
+	}
+	dma_vr->last_avail_idx += i;
+
+	do_data_copy(dma_vr);
+
+	if (unlikely(i < count))
+		dma_vr->shadow_used_idx = i;
+	if (dma_vr->shadow_used_idx) {
+		flush_shadow_used_ring_split(dev, dma_vr);
+		vhost_dma_vring_call(dev, dma_vr);
+	}
+
+	return i;
+}
+
+uint16_t
+vhost_dma_dequeue_burst(struct pmd_internal *dev, struct dma_vring *dma_vr,
+			 struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+			 uint16_t count)
+{
+	return vhost_dma_dequeue_split(dev, dma_vr, mbuf_pool, pkts, count);
+}
+
+int
+vhost_dma_setup(struct pmd_internal *dev)
+{
+	struct dma_vring *dma_vr;
+	int vid = dev->vid;
+	int ret;
+	uint16_t i, j, size;
+
+	rte_vhost_get_negotiated_features(vid, &dev->features);
+	if (vq_is_packed(dev)) {
+		VHOST_LOG(ERR, "vHost DMA doesn't support packed ring\n");
+		return -1;
+	}
+
+	if (dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF))
+		dev->hdr_len = sizeof(struct virtio_net_hdr_mrg_rxbuf);
+	else
+		dev->hdr_len = sizeof(struct virtio_net_hdr);
+
+	dev->nr_vrings = rte_vhost_get_vring_num(vid);
+
+	if (rte_vhost_get_mem_table(vid, &dev->mem) < 0) {
+		VHOST_LOG(ERR, "Failed to get guest memory regions\n");
+		return -1;
+	}
+
+	/* set up gpa and hpa mappings */
+	if (setup_guest_pages(dev, dev->mem) < 0) {
+		VHOST_LOG(ERR, "Failed to get hpa and gpa mappings\n");
+		free(dev->mem);
+		return -1;
+	}
+
+	for (i = 0; i < dev->nr_vrings; i++) {
+		dma_vr = &dev->dma_vrings[i];
+
+		ret = rte_vhost_get_vring_base(vid, i, &dma_vr->last_avail_idx,
+					       &dma_vr->last_used_idx);
+		if (ret < 0) {
+			VHOST_LOG(ERR, "Failed to get vring index.\n");
+			goto err;
+		}
+
+		ret = rte_vhost_get_vhost_vring(vid, i, &dma_vr->vr);
+		if (ret < 0) {
+			VHOST_LOG(ERR, "Failed to get vring address.\n");
+			goto err;
+		}
+
+		size = dma_vr->vr.size;
+		dma_vr->shadow_used_split =
+			rte_malloc(NULL, size * sizeof(struct vring_used_elem),
+				   RTE_CACHE_LINE_SIZE);
+		if (dma_vr->shadow_used_split == NULL)
+			goto err;
+
+		dma_vr->batch_copy_elems =
+			rte_malloc(NULL, size * sizeof(struct batch_copy_elem),
+				   RTE_CACHE_LINE_SIZE);
+		if (dma_vr->batch_copy_elems == NULL)
+			goto err;
+
+		/* set up used index array for DMA copy */
+		dma_vr->used_idx_hpa =
+			rte_mem_virt2iova(&dma_vr->vr.used->idx);
+		dma_vr->max_indices = dma_vr->vr.size;
+		setup_indices(&dma_vr->indices, dma_vr->max_indices);
+
+		dma_vr->copy_done_used = dma_vr->last_used_idx;
+		dma_vr->signalled_used = dma_vr->last_used_idx;
+		dma_vr->signalled_used_valid = false;
+
+		dma_vr->shadow_used_idx = 0;
+		dma_vr->batch_copy_nb_elems = 0;
+	}
+
+	return 0;
+
+err:
+	for (j = 0; j <= i; j++) {
+		dma_vr = &dev->dma_vrings[j];
+		rte_free(dma_vr->shadow_used_split);
+		rte_free(dma_vr->batch_copy_elems);
+		destroy_indices(&dma_vr->indices);
+		dma_vr->batch_copy_elems = NULL;
+		dma_vr->shadow_used_split = NULL;
+	}
+
+	free(dev->mem);
+	dev->mem = NULL;
+	free(dev->guest_pages);
+	dev->guest_pages = NULL;
+
+	return -1;
+}
+
+void
+vhost_dma_remove(struct pmd_internal *dev)
+{
+	struct dma_vring *dma_vr;
+	uint16_t i;
+
+	for (i = 0; i < dev->nr_vrings; i++) {
+		dma_vr = &dev->dma_vrings[i];
+
+		if (dma_vr->dma_enabled) {
+			while (dma_vr->nr_inflight > 0)
+				process_dma_completed(dev, dma_vr);
+
+			VHOST_LOG(INFO, "Wait for outstanding DMA jobs "
+				  "of vring %u completion\n", i);
+
+			rte_rawdev_stop(dma_vr->dev_id);
+			dma_vr->dma_enabled = false;
+			dma_vr->nr_batching = 0;
+			dma_vr->dev_id = -1;
+		}
+
+		rte_free(dma_vr->shadow_used_split);
+		dma_vr->shadow_used_split = NULL;
+		rte_free(dma_vr->batch_copy_elems);
+		dma_vr->batch_copy_elems = NULL;
+		dma_vr->signalled_used_valid = false;
+		dma_vr->used_idx_hpa = 0;
+		destroy_indices(&dma_vr->indices);
+		dma_vr->max_indices = 0;
+	}
+
+	free(dev->mem);
+	dev->mem = NULL;
+	free(dev->guest_pages);
+	dev->guest_pages = NULL;
+}
diff --git a/mk/rte.app.mk b/mk/rte.app.mk
index 1f5c748..1c2e1ce 100644
--- a/mk/rte.app.mk
+++ b/mk/rte.app.mk
@@ -220,6 +220,7 @@ _LDLIBS-$(CONFIG_RTE_LIBRTE_VDEV_NETVSC_PMD) += -lrte_pmd_vdev_netvsc
 _LDLIBS-$(CONFIG_RTE_LIBRTE_VIRTIO_PMD)     += -lrte_pmd_virtio
 ifeq ($(CONFIG_RTE_LIBRTE_VHOST),y)
 _LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_VHOST)      += -lrte_pmd_vhost
+_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_VHOST_DMA)      += -lrte_pmd_vhost_dma
 ifeq ($(CONFIG_RTE_EAL_VFIO),y)
 _LDLIBS-$(CONFIG_RTE_LIBRTE_IFC_PMD) += -lrte_pmd_ifc
 endif # $(CONFIG_RTE_EAL_VFIO)
-- 
2.7.4


^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [dpdk-dev] [RFC 1/2] vhost: populate guest memory for DMA-accelerated vhost-user
  2019-09-26 14:26 ` [dpdk-dev] [RFC 1/2] vhost: populate guest memory for DMA-accelerated vhost-user Jiayu Hu
@ 2019-12-17  7:18   ` Maxime Coquelin
  0 siblings, 0 replies; 11+ messages in thread
From: Maxime Coquelin @ 2019-12-17  7:18 UTC (permalink / raw)
  To: Jiayu Hu, dev; +Cc: tiwei.bie, zhihong.wang



On 9/26/19 4:26 PM, Jiayu Hu wrote:
> DMA engines, like I/OAT, are efficient in moving large data
> within memory. Offloading large copies in vhost side to DMA
> engines can save precious CPU cycles and improve vhost
> performance.
> 
> However, using the DMA engine requires to populate guest's
> memory. This patch is to enable DMA-accelerated vhost-user
> to populate guest's memory.
> 
> Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
> ---
>  lib/librte_vhost/rte_vhost.h  |  1 +
>  lib/librte_vhost/socket.c     | 10 ++++++++++
>  lib/librte_vhost/vhost.h      |  2 ++
>  lib/librte_vhost/vhost_user.c |  3 ++-
>  4 files changed, 15 insertions(+), 1 deletion(-)
> 
> diff --git a/lib/librte_vhost/rte_vhost.h b/lib/librte_vhost/rte_vhost.h
> index 7fb1729..61f8bef 100644
> --- a/lib/librte_vhost/rte_vhost.h
> +++ b/lib/librte_vhost/rte_vhost.h
> @@ -30,6 +30,7 @@ extern "C" {
>  #define RTE_VHOST_USER_DEQUEUE_ZERO_COPY	(1ULL << 2)
>  #define RTE_VHOST_USER_IOMMU_SUPPORT	(1ULL << 3)
>  #define RTE_VHOST_USER_POSTCOPY_SUPPORT		(1ULL << 4)
> +#define RTE_VHOST_USER_DMA_COPY		(1ULL << 5)
>  
>  /** Protocol features. */
>  #ifndef VHOST_USER_PROTOCOL_F_MQ
> diff --git a/lib/librte_vhost/socket.c b/lib/librte_vhost/socket.c
> index 274988c..55498c4 100644
> --- a/lib/librte_vhost/socket.c
> +++ b/lib/librte_vhost/socket.c
> @@ -60,6 +60,8 @@ struct vhost_user_socket {
>  	 */
>  	int vdpa_dev_id;
>  
> +	bool dma_enabled;
> +
>  	struct vhost_device_ops const *notify_ops;
>  };
>  
> @@ -232,6 +234,13 @@ vhost_user_add_connection(int fd, struct vhost_user_socket *vsocket)
>  	if (vsocket->dequeue_zero_copy)
>  		vhost_enable_dequeue_zero_copy(vid);
>  
> +	if (vsocket->dma_enabled) {
> +		struct virtio_net *dev;
> +
> +		dev = get_device(vid);
> +		dev->dma_enabled = true;
> +	}
> +
>  	RTE_LOG(INFO, VHOST_CONFIG, "new device, handle is %d\n", vid);
>  
>  	if (vsocket->notify_ops->new_connection) {
> @@ -870,6 +879,7 @@ rte_vhost_driver_register(const char *path, uint64_t flags)
>  		goto out_free;
>  	}
>  	vsocket->dequeue_zero_copy = flags & RTE_VHOST_USER_DEQUEUE_ZERO_COPY;
> +	vsocket->dma_enabled = flags & RTE_VHOST_USER_DMA_COPY;
>  
>  	/*
>  	 * Set the supported features correctly for the builtin vhost-user

I think you also need to disable VHOST_USER_PROTOCOL_F_PAGEFAULT from
the supported protocol features if DMA is enabled, since it requires
MAP_POPULATE which is not compatible with postcopy.

> diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
> index 884befa..8f564f1 100644
> --- a/lib/librte_vhost/vhost.h
> +++ b/lib/librte_vhost/vhost.h
> @@ -329,6 +329,8 @@ struct virtio_net {
>  	 */
>  	int			vdpa_dev_id;
>  
> +	bool			dma_enabled;
> +
>  	/* context data for the external message handlers */
>  	void			*extern_data;
>  	/* pre and post vhost user message handlers for the device */
> diff --git a/lib/librte_vhost/vhost_user.c b/lib/librte_vhost/vhost_user.c
> index 0b72648..6a4969b 100644
> --- a/lib/librte_vhost/vhost_user.c
> +++ b/lib/librte_vhost/vhost_user.c
> @@ -990,7 +990,8 @@ vhost_user_set_mem_table(struct virtio_net **pdev, struct VhostUserMsg *msg,
>  		}
>  		mmap_size = RTE_ALIGN_CEIL(mmap_size, alignment);
>  
> -		populate = (dev->dequeue_zero_copy) ? MAP_POPULATE : 0;
> +		populate = (dev->dequeue_zero_copy || dev->dma_enabled) ?
> +			MAP_POPULATE : 0;
>  		mmap_addr = mmap(NULL, mmap_size, PROT_READ | PROT_WRITE,
>  				 MAP_SHARED | populate, fd, 0);
>  
> 


^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [dpdk-dev] [RFC v2 2/2] net/vhost_dma: add vHost DMA driver
  2019-11-01  8:54   ` [dpdk-dev] [RFC v2 2/2] net/vhost_dma: add vHost DMA driver Jiayu Hu
@ 2019-12-17  8:27     ` Maxime Coquelin
  2019-12-17 10:20       ` Maxime Coquelin
  2019-12-18  3:11       ` Hu, Jiayu
  0 siblings, 2 replies; 11+ messages in thread
From: Maxime Coquelin @ 2019-12-17  8:27 UTC (permalink / raw)
  To: Jiayu Hu, dev; +Cc: tiwei.bie, zhihong.wang, bruce.richardson

Hi Jiayu,

On 11/1/19 9:54 AM, Jiayu Hu wrote:
> This patch introduces a new PMD for DMA accelerated vhost-user, which
> provides basic functionality of packet reception and transmission. This
> PMD leverages librte_vhost to handle vhost messages, but it implements
> own vring's enqueue and dequeue operations.
> 
> The PMD leverages DMA engines (e.g., I/OAT, a DMA engine in Intel's
> processor), to accelerate large data movements in enqueue and dequeue
> operations. Large copies are offloaded to the DMA in an asynchronous mode.
> That is, the CPU just submits copy jobs to the DMA but without waiting
> for its completion; there is no CPU intervention during data transfer.
> Thus, we can save precious CPU cycles and improve the overall performance
> for vhost-user based applications, like OVS. The PMD still uses the CPU to
> performs small copies, due to startup overheads associated with the DMA.
> 
> Note that the PMD is able to support various DMA engines to accelerate
> data movements in enqueue and dequeue operations; currently the supported
> DMA engine is I/OAT. The PMD just supports I/OAT acceleration in the
> enqueue operation, and it still uses the CPU to perform all copies in
> the dequeue operation. In addition, the PMD only supports the split ring.
> 
> The DMA device used by a queue is assigned by users; for the queue
> without assigning a DMA device, the PMD will use the CPU to perform
> all copies for both enqueue and dequeue operations. Currently, the PMD
> just supports I/OAT, and a queue can only use one I/OAT device, and an
> I/OAT device can only be used by one queue at a time.
> 
> The PMD has 4 parameters.
>  - iface: The parameter is used to specify a path to connect to a
>  	front end device.
>  - queues: The parameter is used to specify the number of the queues
>  	front end device has (Default is 1).
>  - client: The parameter is used to specify the vhost port working as
>  	client mode or server mode (Default is server mode).
>  - dmas: This parameter is used to specify the assigned DMA device
>  	of a queue.
> 
> Here is an example.
> $ ./testpmd -c f -n 4 \
> --vdev 'dma_vhost0,iface=/tmp/sock0,queues=1,dmas=txq0@00:04.0,client=0'

dma_vhost0 is not a good name, you have to mention it is net specific.

Is there a tool to list available DMA engines?

> 
> Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
> ---
>  config/common_base                                 |    2 +
>  config/common_linux                                |    1 +
>  drivers/Makefile                                   |    2 +-
>  drivers/net/Makefile                               |    1 +
>  drivers/net/vhost_dma/Makefile                     |   31 +
>  drivers/net/vhost_dma/eth_vhost.c                  | 1495 ++++++++++++++++++++
>  drivers/net/vhost_dma/eth_vhost.h                  |  264 ++++
>  drivers/net/vhost_dma/internal.h                   |  225 +++
>  .../net/vhost_dma/rte_pmd_vhost_dma_version.map    |    4 +
>  drivers/net/vhost_dma/virtio_net.c                 | 1234 ++++++++++++++++
>  mk/rte.app.mk                                      |    1 +
>  11 files changed, 3259 insertions(+), 1 deletion(-)
>  create mode 100644 drivers/net/vhost_dma/Makefile
>  create mode 100644 drivers/net/vhost_dma/eth_vhost.c
>  create mode 100644 drivers/net/vhost_dma/eth_vhost.h
>  create mode 100644 drivers/net/vhost_dma/internal.h
>  create mode 100644 drivers/net/vhost_dma/rte_pmd_vhost_dma_version.map
>  create mode 100644 drivers/net/vhost_dma/virtio_net.c

You need to add Meson support.


More generally, I have been through the series and I'm not sure having a
dedicated PMD driver for this is a good idea due to all the code
duplication it implies.

I understand it has been done this way to avoid impacting the pure SW
datapath implementation. But I'm sure the series could be reduced to a
few hundred of lines if it was integrated in vhost-user library.
Moreover, your series does not support packed ring, so it means even
more code would need to be duplicated in the end.

What do you think?

Thanks,
Maxime


^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [dpdk-dev] [RFC v2 2/2] net/vhost_dma: add vHost DMA driver
  2019-12-17  8:27     ` Maxime Coquelin
@ 2019-12-17 10:20       ` Maxime Coquelin
  2019-12-18  2:51         ` Hu, Jiayu
  2019-12-18  3:11       ` Hu, Jiayu
  1 sibling, 1 reply; 11+ messages in thread
From: Maxime Coquelin @ 2019-12-17 10:20 UTC (permalink / raw)
  To: Jiayu Hu, dev
  Cc: tiwei.bie, zhihong.wang, bruce.richardson, Honnappa Nagarahalli,
	Hemant Agrawal, jerinj



On 12/17/19 9:27 AM, Maxime Coquelin wrote:
> Hi Jiayu,
> 
> On 11/1/19 9:54 AM, Jiayu Hu wrote:
>> This patch introduces a new PMD for DMA accelerated vhost-user, which
>> provides basic functionality of packet reception and transmission. This
>> PMD leverages librte_vhost to handle vhost messages, but it implements
>> own vring's enqueue and dequeue operations.
>>
>> The PMD leverages DMA engines (e.g., I/OAT, a DMA engine in Intel's
>> processor), to accelerate large data movements in enqueue and dequeue
>> operations. Large copies are offloaded to the DMA in an asynchronous mode.
>> That is, the CPU just submits copy jobs to the DMA but without waiting
>> for its completion; there is no CPU intervention during data transfer.
>> Thus, we can save precious CPU cycles and improve the overall performance
>> for vhost-user based applications, like OVS. The PMD still uses the CPU to
>> performs small copies, due to startup overheads associated with the DMA.
>>
>> Note that the PMD is able to support various DMA engines to accelerate
>> data movements in enqueue and dequeue operations; currently the supported
>> DMA engine is I/OAT. The PMD just supports I/OAT acceleration in the
>> enqueue operation, and it still uses the CPU to perform all copies in
>> the dequeue operation. In addition, the PMD only supports the split ring.
>>
>> The DMA device used by a queue is assigned by users; for the queue
>> without assigning a DMA device, the PMD will use the CPU to perform
>> all copies for both enqueue and dequeue operations. Currently, the PMD
>> just supports I/OAT, and a queue can only use one I/OAT device, and an
>> I/OAT device can only be used by one queue at a time.
>>
>> The PMD has 4 parameters.
>>  - iface: The parameter is used to specify a path to connect to a
>>  	front end device.
>>  - queues: The parameter is used to specify the number of the queues
>>  	front end device has (Default is 1).
>>  - client: The parameter is used to specify the vhost port working as
>>  	client mode or server mode (Default is server mode).
>>  - dmas: This parameter is used to specify the assigned DMA device
>>  	of a queue.
>>
>> Here is an example.
>> $ ./testpmd -c f -n 4 \
>> --vdev 'dma_vhost0,iface=/tmp/sock0,queues=1,dmas=txq0@00:04.0,client=0'
> 
> dma_vhost0 is not a good name, you have to mention it is net specific.
> 
> Is there a tool to list available DMA engines?

Thinking at it again, wouldn't it be possible that the user doesn't
specify a specific DMA device ID, but instead allocate one device
at init time by specifying all the capabilities the DMA device need
to match?

If no DMA device available with matching capabilities, then fallback to
SW mode.

Also, I think we don't want to call directly IOAT API directly here, but
instead introduce a DMA library so that the Vhost DMA stuff isn't vendor
specific.

Adding a few ARM people in cc, to know whether they have plan/interrest
in supporting DMA acceleration for Vhost.

Regards,
Maxime

>>
>> Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
>> ---
>>  config/common_base                                 |    2 +
>>  config/common_linux                                |    1 +
>>  drivers/Makefile                                   |    2 +-
>>  drivers/net/Makefile                               |    1 +
>>  drivers/net/vhost_dma/Makefile                     |   31 +
>>  drivers/net/vhost_dma/eth_vhost.c                  | 1495 ++++++++++++++++++++
>>  drivers/net/vhost_dma/eth_vhost.h                  |  264 ++++
>>  drivers/net/vhost_dma/internal.h                   |  225 +++
>>  .../net/vhost_dma/rte_pmd_vhost_dma_version.map    |    4 +
>>  drivers/net/vhost_dma/virtio_net.c                 | 1234 ++++++++++++++++
>>  mk/rte.app.mk                                      |    1 +
>>  11 files changed, 3259 insertions(+), 1 deletion(-)
>>  create mode 100644 drivers/net/vhost_dma/Makefile
>>  create mode 100644 drivers/net/vhost_dma/eth_vhost.c
>>  create mode 100644 drivers/net/vhost_dma/eth_vhost.h
>>  create mode 100644 drivers/net/vhost_dma/internal.h
>>  create mode 100644 drivers/net/vhost_dma/rte_pmd_vhost_dma_version.map
>>  create mode 100644 drivers/net/vhost_dma/virtio_net.c
> 
> You need to add Meson support.
> 
> 
> More generally, I have been through the series and I'm not sure having a
> dedicated PMD driver for this is a good idea due to all the code
> duplication it implies.
> 
> I understand it has been done this way to avoid impacting the pure SW
> datapath implementation. But I'm sure the series could be reduced to a
> few hundred of lines if it was integrated in vhost-user library.
> Moreover, your series does not support packed ring, so it means even
> more code would need to be duplicated in the end.
> 
> What do you think?
> 
> Thanks,
> Maxime
> 


^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [dpdk-dev] [RFC v2 2/2] net/vhost_dma: add vHost DMA driver
  2019-12-17 10:20       ` Maxime Coquelin
@ 2019-12-18  2:51         ` Hu, Jiayu
  0 siblings, 0 replies; 11+ messages in thread
From: Hu, Jiayu @ 2019-12-18  2:51 UTC (permalink / raw)
  To: Maxime Coquelin, dev
  Cc: Bie, Tiwei, Wang, Zhihong, Richardson, Bruce,
	Honnappa Nagarahalli, Hemant Agrawal, jerinj, Liang, Cunming

Hi Maxime,

Thanks for your suggestions. Replies are inline.

> -----Original Message-----
> From: Maxime Coquelin <maxime.coquelin@redhat.com>
> Sent: Tuesday, December 17, 2019 6:20 PM
> To: Hu, Jiayu <jiayu.hu@intel.com>; dev@dpdk.org
> Cc: Bie, Tiwei <tiwei.bie@intel.com>; Wang, Zhihong
> <zhihong.wang@intel.com>; Richardson, Bruce
> <bruce.richardson@intel.com>; Honnappa Nagarahalli
> <Honnappa.Nagarahalli@arm.com>; Hemant Agrawal
> <hemant.agrawal@nxp.com>; jerinj@marvell.com
> Subject: Re: [dpdk-dev] [RFC v2 2/2] net/vhost_dma: add vHost DMA driver
> 
> 
> >> The PMD has 4 parameters.
> >>  - iface: The parameter is used to specify a path to connect to a
> >>  	front end device.
> >>  - queues: The parameter is used to specify the number of the queues
> >>  	front end device has (Default is 1).
> >>  - client: The parameter is used to specify the vhost port working as
> >>  	client mode or server mode (Default is server mode).
> >>  - dmas: This parameter is used to specify the assigned DMA device
> >>  	of a queue.
> >>
> >> Here is an example.
> >> $ ./testpmd -c f -n 4 \
> >> --vdev
> 'dma_vhost0,iface=/tmp/sock0,queues=1,dmas=txq0@00:04.0,client=0'
> >
> > dma_vhost0 is not a good name, you have to mention it is net specific.
> >
> > Is there a tool to list available DMA engines?
> 
> Thinking at it again, wouldn't it be possible that the user doesn't
> specify a specific DMA device ID, but instead allocate one device
> at init time by specifying all the capabilities the DMA device need
> to match?
> 
> If no DMA device available with matching capabilities, then fallback to
> SW mode.

I think it’s a good way to allocate DMAs by capabilities, like NUMA node. But
the problem is how to express the relationship between queues and DMAs.
As RX/TX is per-queue basis, we need to know if a queue needs DMA acceleration
or not. Current vdev parameters explicitly specify which queue uses which
DMA; if we use DMA capability, will the parameters look too complex? Like
dmas={txq0@(numa0, other capabilities);txq1@(numa0, ...)}

Any suggestions?

> 
> Also, I think we don't want to call directly IOAT API directly here, but
> instead introduce a DMA library so that the Vhost DMA stuff isn't vendor
> specific.

As I know, all DMA engines in DPDK are implemented as rawdev and there is no
unified API for them. If it's necessary, we can provide a DMA library for them
in the future, so that more scenarios can benefit from the DMA engines.

IMO, vhost is one of consumers of the DMA engines, so the DMA library doesn't
have to be inside vhost; in addition, we just know IOAT can accelerate vhost, but
don't know about other DMA engines. Therefore, at this point, I think it's OK to
directly call IOAT API. Furthermore, we can replace IOAT specific API in vhost PMD,
if there is a DMA library in the future; as long as current vdev parameters
for DMA are generic enough, internal API replacement will not be visible to users.

How do you think?

Regards,
Jiayu

> 
> Adding a few ARM people in cc, to know whether they have plan/interrest
> in supporting DMA acceleration for Vhost.
> 
> Regards,
> Maxime
> 

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [dpdk-dev] [RFC v2 2/2] net/vhost_dma: add vHost DMA driver
  2019-12-17  8:27     ` Maxime Coquelin
  2019-12-17 10:20       ` Maxime Coquelin
@ 2019-12-18  3:11       ` Hu, Jiayu
  1 sibling, 0 replies; 11+ messages in thread
From: Hu, Jiayu @ 2019-12-18  3:11 UTC (permalink / raw)
  To: Maxime Coquelin, dev; +Cc: Bie, Tiwei, Wang, Zhihong, Richardson, Bruce

Hi Maxime,

Replies are inline.

> -----Original Message-----
> From: Maxime Coquelin <maxime.coquelin@redhat.com>
> Sent: Tuesday, December 17, 2019 4:27 PM
> To: Hu, Jiayu <jiayu.hu@intel.com>; dev@dpdk.org
> Cc: Bie, Tiwei <tiwei.bie@intel.com>; Wang, Zhihong
> <zhihong.wang@intel.com>; Richardson, Bruce
> <bruce.richardson@intel.com>
> Subject: Re: [RFC v2 2/2] net/vhost_dma: add vHost DMA driver
> 
> Hi Jiayu,
> > Here is an example.
> > $ ./testpmd -c f -n 4 \
> > --vdev
> 'dma_vhost0,iface=/tmp/sock0,queues=1,dmas=txq0@00:04.0,client=0'
> 
> dma_vhost0 is not a good name, you have to mention it is net specific.
> 
> Is there a tool to list available DMA engines?

Yes, you can use dpdk-devbind.py to list available DMA engines.

> 
> >
> > Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
> > ---
> >  config/common_base                                 |    2 +
> >  config/common_linux                                |    1 +
> >  drivers/Makefile                                   |    2 +-
> >  drivers/net/Makefile                               |    1 +
> >  drivers/net/vhost_dma/Makefile                     |   31 +
> >  drivers/net/vhost_dma/eth_vhost.c                  | 1495
> ++++++++++++++++++++
> >  drivers/net/vhost_dma/eth_vhost.h                  |  264 ++++
> >  drivers/net/vhost_dma/internal.h                   |  225 +++
> >  .../net/vhost_dma/rte_pmd_vhost_dma_version.map    |    4 +
> >  drivers/net/vhost_dma/virtio_net.c                 | 1234 ++++++++++++++++
> >  mk/rte.app.mk                                      |    1 +
> >  11 files changed, 3259 insertions(+), 1 deletion(-)
> >  create mode 100644 drivers/net/vhost_dma/Makefile
> >  create mode 100644 drivers/net/vhost_dma/eth_vhost.c
> >  create mode 100644 drivers/net/vhost_dma/eth_vhost.h
> >  create mode 100644 drivers/net/vhost_dma/internal.h
> >  create mode 100644
> drivers/net/vhost_dma/rte_pmd_vhost_dma_version.map
> >  create mode 100644 drivers/net/vhost_dma/virtio_net.c
> 
> You need to add Meson support.

Will add meson build later.

> 
> 
> More generally, I have been through the series and I'm not sure having a
> dedicated PMD driver for this is a good idea due to all the code
> duplication it implies.
> 
> I understand it has been done this way to avoid impacting the pure SW
> datapath implementation. But I'm sure the series could be reduced to a
> few hundred of lines if it was integrated in vhost-user library.
> Moreover, your series does not support packed ring, so it means even
> more code would need to be duplicated in the end.

Yes, providing a new PMD is to avoid impacting vhost library. To avoid
too much duplicated code, we can just provide a separate DMA accelerated
data-path in vhost-user PMD, rather than introducing a new PMD. How
do you think?

Thanks,
Jiayu
> 
> What do you think?
> 
> Thanks,
> Maxime


^ permalink raw reply	[flat|nested] 11+ messages in thread

end of thread, other threads:[~2019-12-18  3:11 UTC | newest]

Thread overview: 11+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-09-26 14:26 [dpdk-dev] [RFC 0/2] Add a PMD for I/OAT accelerated vhost-user Jiayu Hu
2019-09-26 14:26 ` [dpdk-dev] [RFC 1/2] vhost: populate guest memory for DMA-accelerated vhost-user Jiayu Hu
2019-12-17  7:18   ` Maxime Coquelin
2019-09-26 14:26 ` [dpdk-dev] [RFC 2/2] net/vhost_ioat: add vhost I/OAT driver Jiayu Hu
2019-11-01  8:54 ` [dpdk-dev] [RFC v2 0/2] Add a PMD for DMA-accelerated vhost-user Jiayu Hu
2019-11-01  8:54   ` [dpdk-dev] [RFC v2 1/2] vhost: populate guest memory " Jiayu Hu
2019-11-01  8:54   ` [dpdk-dev] [RFC v2 2/2] net/vhost_dma: add vHost DMA driver Jiayu Hu
2019-12-17  8:27     ` Maxime Coquelin
2019-12-17 10:20       ` Maxime Coquelin
2019-12-18  2:51         ` Hu, Jiayu
2019-12-18  3:11       ` Hu, Jiayu

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).