From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id F4224A0C4F; Tue, 13 Jul 2021 16:30:20 +0200 (CEST) Received: from [217.70.189.124] (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id D3238410F0; Tue, 13 Jul 2021 16:30:20 +0200 (CEST) Received: from us-smtp-delivery-124.mimecast.com (us-smtp-delivery-124.mimecast.com [216.205.24.124]) by mails.dpdk.org (Postfix) with ESMTP id 120FD410DC for ; Tue, 13 Jul 2021 16:30:18 +0200 (CEST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=redhat.com; s=mimecast20190719; t=1626186618; h=from:from:reply-to:subject:subject:date:date:message-id:message-id: to:to:cc:cc:mime-version:mime-version:content-type:content-type: content-transfer-encoding:content-transfer-encoding: in-reply-to:in-reply-to:references:references; bh=hVFtw8rYZjPZwirRnaI1AZxG+URYU7hcRFxqUzOzQvY=; b=gx8rgR08E2BpPJH5nH6+O54o+kGW/lbcZBmzYPekYE9Z10uY7rG4mlGPmY4SiEoFP4lWfV KesRtWPIGWYIkC1UJJRIYqfZbVKud1N6d3t4zJhW6ptkiSlJVGIbXu0QHJjgGebtbVRyig 9AzB+2lCcWF8NOc1Tl20FGJ0ffUS9tc= Received: from mimecast-mx01.redhat.com (mimecast-mx01.redhat.com [209.132.183.4]) (Using TLS) by relay.mimecast.com with ESMTP id us-mta-61-5hOO2T3YOQaPaMGCYI4T0g-1; Tue, 13 Jul 2021 10:30:16 -0400 X-MC-Unique: 5hOO2T3YOQaPaMGCYI4T0g-1 Received: from smtp.corp.redhat.com (int-mx03.intmail.prod.int.phx2.redhat.com [10.5.11.13]) (using TLSv1.2 with cipher AECDH-AES256-SHA (256/256 bits)) (No client certificate requested) by mimecast-mx01.redhat.com (Postfix) with ESMTPS id 7AAB591273; Tue, 13 Jul 2021 14:30:15 +0000 (UTC) Received: from [10.36.110.39] (unknown [10.36.110.39]) by smtp.corp.redhat.com (Postfix) with ESMTPS id D1B00687D5; Tue, 13 Jul 2021 14:30:13 +0000 (UTC) To: Wenwu Ma , dev@dpdk.org Cc: chenbo.xia@intel.com, cheng1.jiang@intel.com, jiayu.hu@intel.com, Yuan Wang References: <20210602083110.5530-1-yuanx.wang@intel.com> <20210705181151.141752-1-wenwux.ma@intel.com> <20210705181151.141752-4-wenwux.ma@intel.com> From: Maxime Coquelin Message-ID: Date: Tue, 13 Jul 2021 16:30:11 +0200 User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:78.0) Gecko/20100101 Thunderbird/78.11.0 MIME-Version: 1.0 In-Reply-To: <20210705181151.141752-4-wenwux.ma@intel.com> X-Scanned-By: MIMEDefang 2.79 on 10.5.11.13 Authentication-Results: relay.mimecast.com; auth=pass smtp.auth=CUSA124A263 smtp.mailfrom=maxime.coquelin@redhat.com X-Mimecast-Spam-Score: 0 X-Mimecast-Originator: redhat.com Content-Type: text/plain; charset=utf-8 Content-Language: en-US Content-Transfer-Encoding: 8bit Subject: Re: [dpdk-dev] [PATCH v5 3/4] vhost: support async dequeue for split ring X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" On 7/5/21 8:11 PM, Wenwu Ma wrote: > From: Yuan Wang > > This patch implements asynchronous dequeue data path for split ring. > A new asynchronous dequeue function is introduced. With this function, > the application can try to receive packets from the guest with > offloading large copies to the DMA engine, thus saving precious CPU > cycles. > > Signed-off-by: Yuan Wang > Signed-off-by: Jiayu Hu > Signed-off-by: Wenwu Ma > --- > doc/guides/prog_guide/vhost_lib.rst | 10 + > lib/vhost/rte_vhost_async.h | 44 +- > lib/vhost/version.map | 3 + > lib/vhost/virtio_net.c | 601 ++++++++++++++++++++++++++++ > 4 files changed, 655 insertions(+), 3 deletions(-) > > diff --git a/doc/guides/prog_guide/vhost_lib.rst b/doc/guides/prog_guide/vhost_lib.rst > index d18fb98910..05c42c9b11 100644 > --- a/doc/guides/prog_guide/vhost_lib.rst > +++ b/doc/guides/prog_guide/vhost_lib.rst > @@ -281,6 +281,16 @@ The following is an overview of some key Vhost API functions: > Poll enqueue completion status from async data path. Completed packets > are returned to applications through ``pkts``. > > +* ``rte_vhost_async_try_dequeue_burst(vid, queue_id, mbuf_pool, pkts, count, nr_inflight)`` > + > + Try to receive packets from the guest with offloading large packets > + to the DMA engine. Successfully dequeued packets are transfer > + completed and returned in ``pkts``. But there may be other packets > + that are sent from the guest but being transferred by the DMA engine, > + called in-flight packets. This function will return in-flight packets > + only after the DMA engine finishes transferring. The amount of > + in-flight packets by now is returned in ``nr_inflight``. > + > Vhost-user Implementations > -------------------------- > > diff --git a/lib/vhost/rte_vhost_async.h b/lib/vhost/rte_vhost_async.h > index 6faa31f5ad..58019408f1 100644 > --- a/lib/vhost/rte_vhost_async.h > +++ b/lib/vhost/rte_vhost_async.h > @@ -84,13 +84,21 @@ struct rte_vhost_async_channel_ops { > }; > > /** > - * inflight async packet information > + * in-flight async packet information > */ > +struct async_nethdr { Please prefix with rte_vhost_ > + struct virtio_net_hdr hdr; > + bool valid; > +}; > + > struct async_inflight_info { > struct rte_mbuf *mbuf; > - uint16_t descs; /* num of descs inflight */ > + union { > + uint16_t descs; /* num of descs in-flight */ > + struct async_nethdr nethdr; > + }; > uint16_t nr_buffers; /* num of buffers inflight for packed ring */ > -}; > +} __rte_cache_aligned; Does it really need to be cache aligned? > > /** > * dma channel feature bit definition > @@ -193,4 +201,34 @@ __rte_experimental > uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id, > struct rte_mbuf **pkts, uint16_t count); > > +/** > + * This function tries to receive packets from the guest with offloading > + * large copies to the DMA engine. Successfully dequeued packets are > + * transfer completed, either by the CPU or the DMA engine, and they are > + * returned in "pkts". There may be other packets that are sent from > + * the guest but being transferred by the DMA engine, called in-flight > + * packets. The amount of in-flight packets by now is returned in > + * "nr_inflight". This function will return in-flight packets only after > + * the DMA engine finishes transferring. I am not sure to understand that comment. Is it still "in-flight" if the DMA transfer is completed? Are we ensuring packets are not reordered with this way of working? > + * > + * @param vid > + * id of vhost device to dequeue data > + * @param queue_id > + * queue id to dequeue data > + * @param pkts > + * blank array to keep successfully dequeued packets > + * @param count > + * size of the packet array > + * @param nr_inflight > + * the amount of in-flight packets by now. If error occurred, its > + * value is set to -1. > + * @return > + * num of successfully dequeued packets > + */ > +__rte_experimental > +uint16_t > +rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id, > + struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count, > + int *nr_inflight); > + > #endif /* _RTE_VHOST_ASYNC_H_ */ > diff --git a/lib/vhost/version.map b/lib/vhost/version.map > index 9103a23cd4..a320f889cd 100644 > --- a/lib/vhost/version.map > +++ b/lib/vhost/version.map > @@ -79,4 +79,7 @@ EXPERIMENTAL { > > # added in 21.05 > rte_vhost_get_negotiated_protocol_features; > + > + # added in 21.08 > + rte_vhost_async_try_dequeue_burst; > }; > diff --git a/lib/vhost/virtio_net.c b/lib/vhost/virtio_net.c > index b93482587c..52237e8600 100644 > --- a/lib/vhost/virtio_net.c > +++ b/lib/vhost/virtio_net.c > @@ -2673,6 +2673,32 @@ virtio_dev_pktmbuf_prep(struct virtio_net *dev, struct rte_mbuf *pkt, > return -1; > } > > +/* > + * Allocate a host supported pktmbuf. > + */ > +static __rte_always_inline struct rte_mbuf * > +virtio_dev_pktmbuf_alloc(struct virtio_net *dev, struct rte_mempool *mp, > + uint32_t data_len) > +{ > + struct rte_mbuf *pkt = rte_pktmbuf_alloc(mp); > + > + if (unlikely(pkt == NULL)) { > + VHOST_LOG_DATA(ERR, > + "Failed to allocate memory for mbuf.\n"); > + return NULL; > + } > + > + if (virtio_dev_pktmbuf_prep(dev, pkt, data_len)) { > + /* Data doesn't fit into the buffer and the host supports > + * only linear buffers > + */ > + rte_pktmbuf_free(pkt); > + return NULL; > + } > + > + return pkt; > +} > + I think you should be able to use rte_pktmbuf_alloc_bulk and virtio_dev_pktmbuf_prep instead of re-introducing the function that was removed by Balazs. It should help perf a bit. > __rte_always_inline > static uint16_t > virtio_dev_tx_split(struct virtio_net *dev, struct vhost_virtqueue *vq, > @@ -3147,3 +3173,578 @@ rte_vhost_dequeue_burst(int vid, uint16_t queue_id, > > return count; > } > + > +static __rte_always_inline int > +async_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq, > + struct buf_vector *buf_vec, uint16_t nr_vec, > + struct rte_mbuf *m, struct rte_mempool *mbuf_pool, > + struct iovec *src_iovec, struct iovec *dst_iovec, > + struct rte_vhost_iov_iter *src_it, > + struct rte_vhost_iov_iter *dst_it, > + struct async_nethdr *nethdr, > + bool legacy_ol_flags) > +{ > + uint64_t buf_addr, buf_iova; > + uint64_t mapped_len; > + uint32_t tlen = 0; > + uint32_t buf_avail, buf_offset, buf_len; > + uint32_t mbuf_avail, mbuf_offset; > + uint32_t cpy_len, cpy_threshold; > + /* A counter to avoid desc dead loop chain */ > + uint16_t vec_idx = 0; > + int tvec_idx = 0; > + struct rte_mbuf *cur = m, *prev = m; > + struct virtio_net_hdr tmp_hdr; > + struct virtio_net_hdr *hdr = NULL; > + struct batch_copy_elem *batch_copy = vq->batch_copy_elems; > + > + buf_addr = buf_vec[vec_idx].buf_addr; > + buf_len = buf_vec[vec_idx].buf_len; > + buf_iova = buf_vec[vec_idx].buf_iova; > + > + if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1)) > + return -1; > + > + cpy_threshold = vq->async_threshold; > + > + if (virtio_net_with_host_offload(dev)) { > + if (unlikely(buf_len < sizeof(struct virtio_net_hdr))) { > + /* > + * No luck, the virtio-net header doesn't fit > + * in a contiguous virtual area. > + */ > + copy_vnet_hdr_from_desc(&tmp_hdr, buf_vec); > + hdr = &tmp_hdr; > + } else { > + hdr = (struct virtio_net_hdr *)((uintptr_t)buf_addr); > + } > + } > + > + /* > + * A virtio driver normally uses at least 2 desc buffers > + * for Tx: the first for storing the header, and others > + * for storing the data. > + */ > + if (unlikely(buf_len < dev->vhost_hlen)) { > + buf_offset = dev->vhost_hlen - buf_len; > + vec_idx++; > + buf_addr = buf_vec[vec_idx].buf_addr; > + buf_len = buf_vec[vec_idx].buf_len; > + buf_avail = buf_len - buf_offset; > + } else if (buf_len == dev->vhost_hlen) { > + if (unlikely(++vec_idx >= nr_vec)) > + return -1; > + buf_addr = buf_vec[vec_idx].buf_addr; > + buf_len = buf_vec[vec_idx].buf_len; > + > + buf_offset = 0; > + buf_avail = buf_len; > + } else { > + buf_offset = dev->vhost_hlen; > + buf_avail = buf_vec[vec_idx].buf_len - dev->vhost_hlen; > + } > + > + PRINT_PACKET(dev, (uintptr_t)(buf_addr + buf_offset), > + (uint32_t)buf_avail, 0); > + > + mbuf_offset = 0; > + mbuf_avail = m->buf_len - RTE_PKTMBUF_HEADROOM; > + while (1) { > + cpy_len = RTE_MIN(buf_avail, mbuf_avail); > + > + while (cpy_len && cpy_len >= cpy_threshold) { > + void *hpa = (void *)(uintptr_t)gpa_to_first_hpa(dev, > + buf_iova + buf_offset, cpy_len, > + &mapped_len); > + > + if (unlikely(!hpa || mapped_len < cpy_threshold)) > + break; > + > + async_fill_vec(src_iovec + tvec_idx, hpa, > + (size_t)mapped_len); > + async_fill_vec(dst_iovec + tvec_idx, > + (void *)(uintptr_t)rte_pktmbuf_iova_offset(cur, > + mbuf_offset), > + (size_t)mapped_len); > + > + tvec_idx++; > + tlen += (uint32_t)mapped_len; > + cpy_len -= (uint32_t)mapped_len; > + mbuf_avail -= (uint32_t)mapped_len; > + mbuf_offset += (uint32_t)mapped_len; > + buf_avail -= (uint32_t)mapped_len; > + buf_offset += (uint32_t)mapped_len; > + } > + > + if (cpy_len) { > + if (vq->batch_copy_nb_elems >= vq->size || > + (hdr && cur == m)) { > + rte_memcpy( > + rte_pktmbuf_mtod_offset(cur, void *, > + mbuf_offset), > + (void *)((uintptr_t)(buf_addr + > + buf_offset)), > + cpy_len); > + } else { > + batch_copy[vq->batch_copy_nb_elems].dst = > + rte_pktmbuf_mtod_offset(cur, void *, > + mbuf_offset); > + batch_copy[vq->batch_copy_nb_elems].src = > + (void *)((uintptr_t)(buf_addr + > + buf_offset)); > + batch_copy[vq->batch_copy_nb_elems].len = > + cpy_len; > + vq->batch_copy_nb_elems++; > + } > + > + mbuf_avail -= cpy_len; > + mbuf_offset += cpy_len; > + buf_avail -= cpy_len; > + buf_offset += cpy_len; > + } > + > + /* This buf reaches to its end, get the next one */ > + if (buf_avail == 0) { > + if (++vec_idx >= nr_vec) > + break; > + > + buf_addr = buf_vec[vec_idx].buf_addr; > + buf_len = buf_vec[vec_idx].buf_len; > + > + buf_offset = 0; > + buf_avail = buf_len; > + > + PRINT_PACKET(dev, (uintptr_t)buf_addr, > + (uint32_t)buf_avail, 0); > + } > + > + /* > + * This mbuf reaches to its end, get a new one > + * to hold more data. > + */ > + if (mbuf_avail == 0) { > + cur = rte_pktmbuf_alloc(mbuf_pool); > + if (unlikely(cur == NULL)) { > + VHOST_LOG_DATA(ERR, "Failed to " > + "allocate memory for mbuf.\n"); > + return -1; > + } > + > + prev->next = cur; > + prev->data_len = mbuf_offset; > + m->nb_segs += 1; > + m->pkt_len += mbuf_offset; > + prev = cur; > + > + mbuf_offset = 0; > + mbuf_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM; > + } > + } > + > + prev->data_len = mbuf_offset; > + m->pkt_len += mbuf_offset; > + > + if (hdr && tlen) { > + nethdr->valid = true; > + nethdr->hdr = *hdr; > + } else if (hdr) > + vhost_dequeue_offload(hdr, m, legacy_ol_flags); > + > + if (tlen) { > + async_fill_iter(src_it, tlen, src_iovec, tvec_idx); > + async_fill_iter(dst_it, tlen, dst_iovec, tvec_idx); > + } else > + src_it->count = 0; > + > + return 0; > +} > + > +static __rte_always_inline uint16_t > +async_poll_dequeue_completed_split(struct virtio_net *dev, > + struct vhost_virtqueue *vq, uint16_t queue_id, > + struct rte_mbuf **pkts, uint16_t count, bool legacy_ol_flags) > +{ > + uint16_t n_pkts_cpl = 0, n_pkts_put = 0; > + uint16_t start_idx, pkt_idx, from; > + struct async_inflight_info *pkts_info; > + > + pkt_idx = vq->async_pkts_idx & (vq->size - 1); > + pkts_info = vq->async_pkts_info; > + start_idx = virtio_dev_rx_async_get_info_idx(pkt_idx, vq->size, > + vq->async_pkts_inflight_n); > + > + if (count > vq->async_last_pkts_n) { > + n_pkts_cpl = vq->async_ops.check_completed_copies(dev->vid, > + queue_id, 0, count - vq->async_last_pkts_n); > + } > + > + n_pkts_cpl += vq->async_last_pkts_n; > + if (unlikely(n_pkts_cpl == 0)) > + return 0; > + > + n_pkts_put = RTE_MIN(count, n_pkts_cpl); > + > + for (pkt_idx = 0; pkt_idx < n_pkts_put; pkt_idx++) { > + from = (start_idx + pkt_idx) & (vq->size - 1); > + pkts[pkt_idx] = pkts_info[from].mbuf; > + > + if (pkts_info[from].nethdr.valid) { > + vhost_dequeue_offload(&pkts_info[from].nethdr.hdr, > + pkts[pkt_idx], legacy_ol_flags); > + } > + } > + vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put; > + > + if (n_pkts_put) { > + /* write back completed descs to used ring */ > + write_back_completed_descs_split(vq, n_pkts_put); > + /* update used ring */ > + __atomic_add_fetch(&vq->used->idx, > + n_pkts_put, __ATOMIC_RELEASE); > + > + vq->async_pkts_inflight_n -= n_pkts_put; > + } > + > + return n_pkts_put; > +} > + > +static __rte_always_inline uint16_t > +virtio_dev_tx_async_split(struct virtio_net *dev, > + struct vhost_virtqueue *vq, uint16_t queue_id, > + struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, > + uint16_t count, bool legacy_ol_flags) > +{ > + static bool allocerr_warned; > + uint16_t pkt_idx; > + uint16_t free_entries; > + uint16_t slot_idx = 0; > + uint16_t segs_await = 0; > + uint16_t nr_done_pkts = 0, nr_async_pkts = 0, nr_async_cmpl_pkts = 0; > + uint16_t nr_async_burst = 0; > + uint16_t pkt_err = 0; > + uint16_t iovec_idx = 0, it_idx = 0; > + > + struct rte_vhost_iov_iter *it_pool = vq->it_pool; > + struct iovec *vec_pool = vq->vec_pool; > + struct iovec *src_iovec = vec_pool; > + struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1); > + struct rte_vhost_async_desc tdes[MAX_PKT_BURST]; > + struct async_inflight_info *pkts_info = vq->async_pkts_info; > + > + struct async_pkt_index { > + uint16_t last_avail_idx; > + } async_pkts_log[MAX_PKT_BURST]; > + > + /** > + * The ordering between avail index and > + * desc reads needs to be enforced. > + */ > + free_entries = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE) - > + vq->last_avail_idx; > + if (free_entries == 0) > + goto out; > + > + rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]); > + > + count = RTE_MIN(count, MAX_PKT_BURST); > + count = RTE_MIN(count, free_entries); > + VHOST_LOG_DATA(DEBUG, "(%d) about to dequeue %u buffers\n", > + dev->vid, count); > + > + for (pkt_idx = 0; pkt_idx < count; pkt_idx++) { > + uint16_t head_idx = 0; > + uint16_t nr_vec = 0; > + uint32_t buf_len; > + int err; > + struct buf_vector buf_vec[BUF_VECTOR_MAX]; > + struct rte_mbuf *pkt; > + > + if (unlikely(fill_vec_buf_split(dev, vq, vq->last_avail_idx, > + &nr_vec, buf_vec, > + &head_idx, &buf_len, > + VHOST_ACCESS_RO) < 0)) > + break; > + > + pkt = virtio_dev_pktmbuf_alloc(dev, mbuf_pool, buf_len); > + if (unlikely(pkt == NULL)) { > + /** > + * mbuf allocation fails for jumbo packets when external > + * buffer allocation is not allowed and linear buffer > + * is required. Drop this packet. > + */ > + if (!allocerr_warned) { > + VHOST_LOG_DATA(ERR, > + "Failed mbuf alloc of size %d from %s on %s.\n", > + buf_len, mbuf_pool->name, dev->ifname); > + allocerr_warned = true; > + } > + break; > + } > + > + slot_idx = (vq->async_pkts_idx + nr_async_pkts) & > + (vq->size - 1); > + err = async_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkt, > + mbuf_pool, &src_iovec[iovec_idx], > + &dst_iovec[iovec_idx], &it_pool[it_idx], > + &it_pool[it_idx + 1], > + &pkts_info[slot_idx].nethdr, legacy_ol_flags); > + if (unlikely(err)) { > + rte_pktmbuf_free(pkt); > + if (!allocerr_warned) { > + VHOST_LOG_DATA(ERR, > + "Failed to copy desc to mbuf on %s.\n", > + dev->ifname); > + allocerr_warned = true; > + } > + break; > + } > + > + if (it_pool[it_idx].count) { > + uint16_t to = vq->async_desc_idx_split & (vq->size - 1); > + > + async_fill_desc(&tdes[nr_async_burst], &it_pool[it_idx], > + &it_pool[it_idx + 1]); > + pkts_info[slot_idx].mbuf = pkt; > + async_pkts_log[nr_async_pkts++].last_avail_idx = > + vq->last_avail_idx; > + nr_async_burst++; > + iovec_idx += it_pool[it_idx].nr_segs; > + it_idx += 2; > + segs_await += it_pool[it_idx].nr_segs; > + > + /* keep used desc */ > + vq->async_descs_split[to].id = head_idx; > + vq->async_descs_split[to].len = 0; > + vq->async_desc_idx_split++; > + } else { > + update_shadow_used_ring_split(vq, head_idx, 0); > + pkts[nr_done_pkts++] = pkt; > + } > + > + vq->last_avail_idx++; > + > + if (unlikely((nr_async_burst >= VHOST_ASYNC_BATCH_THRESHOLD) || > + ((VHOST_MAX_ASYNC_VEC >> 1) - > + segs_await < BUF_VECTOR_MAX))) { > + uint16_t nr_pkts; > + > + nr_pkts = vq->async_ops.transfer_data(dev->vid, > + queue_id, tdes, 0, nr_async_burst); > + src_iovec = vec_pool; > + dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1); > + it_idx = 0; > + segs_await = 0; > + vq->async_pkts_inflight_n += nr_pkts; > + > + if (unlikely(nr_pkts < nr_async_burst)) { > + pkt_err = nr_async_burst - nr_pkts; > + nr_async_burst = 0; > + break; > + } > + nr_async_burst = 0; > + } > + } > + > + if (nr_async_burst) { > + uint32_t nr_pkts; > + > + nr_pkts = vq->async_ops.transfer_data(dev->vid, queue_id, > + tdes, 0, nr_async_burst); > + vq->async_pkts_inflight_n += nr_pkts; > + > + if (unlikely(nr_pkts < nr_async_burst)) > + pkt_err = nr_async_burst - nr_pkts; > + } > + > + do_data_copy_dequeue(vq); > + > + if (unlikely(pkt_err)) { > + uint16_t nr_err_dma = pkt_err; > + uint16_t nr_err_sw; > + > + nr_async_pkts -= nr_err_dma; > + > + /** > + * revert shadow used ring and free pktmbufs for > + * CPU-copied pkts after the first DMA-error pkt. > + */ > + nr_err_sw = vq->last_avail_idx - > + async_pkts_log[nr_async_pkts].last_avail_idx - > + nr_err_dma; > + vq->shadow_used_idx -= nr_err_sw; > + while (nr_err_sw-- > 0) > + rte_pktmbuf_free(pkts[--nr_done_pkts]); > + > + /** > + * recover DMA-copy related structures and free pktmbufs > + * for DMA-error pkts. > + */ > + vq->async_desc_idx_split -= nr_err_dma; > + while (nr_err_dma-- > 0) { > + rte_pktmbuf_free( > + pkts_info[slot_idx & (vq->size - 1)].mbuf); > + slot_idx--; > + } > + > + /* recover available ring */ > + vq->last_avail_idx = > + async_pkts_log[nr_async_pkts].last_avail_idx; > + } > + > + vq->async_pkts_idx += nr_async_pkts; > + > + if (likely(vq->shadow_used_idx)) > + flush_shadow_used_ring_split(dev, vq); > + > +out: > + if (nr_done_pkts < count && vq->async_pkts_inflight_n > 0) { > + nr_async_cmpl_pkts = async_poll_dequeue_completed_split(dev, vq, > + queue_id, &pkts[nr_done_pkts], > + count - nr_done_pkts, > + legacy_ol_flags); > + nr_done_pkts += nr_async_cmpl_pkts; > + } > + if (likely(nr_done_pkts)) > + vhost_vring_call_split(dev, vq); > + > + return nr_done_pkts; > +} > + > +__rte_noinline > +static uint16_t > +virtio_dev_tx_async_split_legacy(struct virtio_net *dev, > + struct vhost_virtqueue *vq, uint16_t queue_id, > + struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, > + uint16_t count) > +{ > + return virtio_dev_tx_async_split(dev, vq, queue_id, mbuf_pool, > + pkts, count, true); I think we don't need to support legacy offload. It may be better to have the Vhost example to support the compliant way, what do you think? > +} > + > +__rte_noinline > +static uint16_t > +virtio_dev_tx_async_split_compliant(struct virtio_net *dev, > + struct vhost_virtqueue *vq, uint16_t queue_id, > + struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, > + uint16_t count) > +{ > + return virtio_dev_tx_async_split(dev, vq, queue_id, mbuf_pool, > + pkts, count, false); > +} > + > +uint16_t > +rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id, > + struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count, > + int *nr_inflight) > +{ > + struct virtio_net *dev; > + struct rte_mbuf *rarp_mbuf = NULL; > + struct vhost_virtqueue *vq; > + int16_t success = 1; > + > + *nr_inflight = -1; > + > + dev = get_device(vid); > + if (!dev) > + return 0; > + > + if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) { > + VHOST_LOG_DATA(ERR, > + "(%d) %s: built-in vhost net backend is disabled.\n", > + dev->vid, __func__); > + return 0; > + } > + > + if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) { > + VHOST_LOG_DATA(ERR, > + "(%d) %s: invalid virtqueue idx %d.\n", > + dev->vid, __func__, queue_id); > + return 0; > + } > + > + vq = dev->virtqueue[queue_id]; > + > + if (unlikely(rte_spinlock_trylock(&vq->access_lock) == 0)) > + return 0; > + > + if (unlikely(vq->enabled == 0)) { > + count = 0; > + goto out_access_unlock; > + } > + > + if (unlikely(!vq->async_registered)) { > + VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for queue id %d.\n", > + dev->vid, __func__, queue_id); > + count = 0; > + goto out_access_unlock; > + } > + > + if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM)) > + vhost_user_iotlb_rd_lock(vq); > + > + if (unlikely(vq->access_ok == 0)) > + if (unlikely(vring_translate(dev, vq) < 0)) { > + count = 0; > + goto out_access_unlock; > + } > + > + /* > + * Construct a RARP broadcast packet, and inject it to the "pkts" > + * array, to looks like that guest actually send such packet. > + * > + * Check user_send_rarp() for more information. > + * > + * broadcast_rarp shares a cacheline in the virtio_net structure > + * with some fields that are accessed during enqueue and > + * __atomic_compare_exchange_n causes a write if performed compare > + * and exchange. This could result in false sharing between enqueue > + * and dequeue. > + * > + * Prevent unnecessary false sharing by reading broadcast_rarp first > + * and only performing compare and exchange if the read indicates it > + * is likely to be set. > + */ > + if (unlikely(__atomic_load_n(&dev->broadcast_rarp, __ATOMIC_ACQUIRE) && > + __atomic_compare_exchange_n(&dev->broadcast_rarp, > + &success, 0, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))) { > + > + rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac); > + if (rarp_mbuf == NULL) { > + VHOST_LOG_DATA(ERR, "Failed to make RARP packet.\n"); > + count = 0; > + goto out; > + } > + count -= 1; > + } > + > + if (unlikely(vq_is_packed(dev))) > + return 0; > + > + if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS) > + count = virtio_dev_tx_async_split_legacy(dev, vq, queue_id, > + mbuf_pool, pkts, count); > + else > + count = virtio_dev_tx_async_split_compliant(dev, vq, queue_id, > + mbuf_pool, pkts, count); > + > +out: > + *nr_inflight = vq->async_pkts_inflight_n; > + > + if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM)) > + vhost_user_iotlb_rd_unlock(vq); > + > +out_access_unlock: > + rte_spinlock_unlock(&vq->access_lock); > + > + if (unlikely(rarp_mbuf != NULL)) { > + /* > + * Inject it to the head of "pkts" array, so that switch's mac > + * learning table will get updated first. > + */ > + memmove(&pkts[1], pkts, count * sizeof(struct rte_mbuf *)); > + pkts[0] = rarp_mbuf; > + count += 1; > + } > + > + return count; > +} >