From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id C8F37A0524; Wed, 14 Apr 2021 08:27:09 +0200 (CEST) Received: from [217.70.189.124] (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 167461616E3; Wed, 14 Apr 2021 08:27:05 +0200 (CEST) Received: from mga07.intel.com (mga07.intel.com [134.134.136.100]) by mails.dpdk.org (Postfix) with ESMTP id DE21E1616E8 for ; Wed, 14 Apr 2021 08:27:02 +0200 (CEST) IronPort-SDR: NlUeKLPi9PF+u6PT8aQBI/NEuVP1IC0bq7CmAmbiQ/WZuRYGft4XuLj1GTBrjyPLXJPTHgIL6P oH+WVe0f5RDQ== X-IronPort-AV: E=McAfee;i="6200,9189,9953"; a="258543421" X-IronPort-AV: E=Sophos;i="5.82,221,1613462400"; d="scan'208";a="258543421" Received: from orsmga008.jf.intel.com ([10.7.209.65]) by orsmga105.jf.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 13 Apr 2021 23:27:02 -0700 IronPort-SDR: bzgTPKB9/oie7nAWwtDxnuALBNY/RkGSSX8AcqIw+ZXxTr7X2r7ClazYMzpe3d5cCOypAUgEfE 8pUVfdkivuGw== X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.82,221,1613462400"; d="scan'208";a="424588466" Received: from dpdk_jiangcheng.sh.intel.com ([10.67.119.112]) by orsmga008.jf.intel.com with ESMTP; 13 Apr 2021 23:27:00 -0700 From: Cheng Jiang To: maxime.coquelin@redhat.com, chenbo.xia@intel.com Cc: dev@dpdk.org, jiayu.hu@intel.com, yvonnex.yang@intel.com, yinan.wang@intel.com, yong.liu@intel.com, Cheng Jiang Date: Wed, 14 Apr 2021 06:13:41 +0000 Message-Id: <20210414061343.54919-3-Cheng1.jiang@intel.com> X-Mailer: git-send-email 2.29.2 In-Reply-To: <20210414061343.54919-1-Cheng1.jiang@intel.com> References: <20210317085426.10119-1-Cheng1.jiang@intel.com> <20210414061343.54919-1-Cheng1.jiang@intel.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [dpdk-dev] [PATCH v7 2/4] vhost: add support for packed ring in async vhost X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" For now async vhost data path only supports split ring. This patch enables packed ring in async vhost data path to make async vhost compatible with virtio 1.1 spec. Signed-off-by: Cheng Jiang --- lib/librte_vhost/rte_vhost_async.h | 1 + lib/librte_vhost/vhost.c | 49 ++-- lib/librte_vhost/vhost.h | 15 +- lib/librte_vhost/virtio_net.c | 432 +++++++++++++++++++++++++++-- 4 files changed, 456 insertions(+), 41 deletions(-) diff --git a/lib/librte_vhost/rte_vhost_async.h b/lib/librte_vhost/rte_vhost_async.h index c855ff875..6faa31f5a 100644 --- a/lib/librte_vhost/rte_vhost_async.h +++ b/lib/librte_vhost/rte_vhost_async.h @@ -89,6 +89,7 @@ struct rte_vhost_async_channel_ops { struct async_inflight_info { struct rte_mbuf *mbuf; uint16_t descs; /* num of descs inflight */ + uint16_t nr_buffers; /* num of buffers inflight for packed ring */ }; /** diff --git a/lib/librte_vhost/vhost.c b/lib/librte_vhost/vhost.c index a70fe01d8..f509186c6 100644 --- a/lib/librte_vhost/vhost.c +++ b/lib/librte_vhost/vhost.c @@ -338,19 +338,22 @@ cleanup_device(struct virtio_net *dev, int destroy) } static void -vhost_free_async_mem(struct vhost_virtqueue *vq) +vhost_free_async_mem(struct virtio_net *dev, struct vhost_virtqueue *vq) { - if (vq->async_pkts_info) - rte_free(vq->async_pkts_info); - if (vq->async_descs_split) + rte_free(vq->async_pkts_info); + + if (vq_is_packed(dev)) { + rte_free(vq->async_buffers_packed); + vq->async_buffers_packed = NULL; + } else { rte_free(vq->async_descs_split); - if (vq->it_pool) - rte_free(vq->it_pool); - if (vq->vec_pool) - rte_free(vq->vec_pool); + vq->async_descs_split = NULL; + } + + rte_free(vq->it_pool); + rte_free(vq->vec_pool); vq->async_pkts_info = NULL; - vq->async_descs_split = NULL; vq->it_pool = NULL; vq->vec_pool = NULL; } @@ -360,10 +363,10 @@ free_vq(struct virtio_net *dev, struct vhost_virtqueue *vq) { if (vq_is_packed(dev)) rte_free(vq->shadow_used_packed); - else { + else rte_free(vq->shadow_used_split); - vhost_free_async_mem(vq); - } + + vhost_free_async_mem(dev, vq); rte_free(vq->batch_copy_elems); if (vq->iotlb_pool) rte_mempool_free(vq->iotlb_pool); @@ -1626,10 +1629,9 @@ int rte_vhost_async_channel_register(int vid, uint16_t queue_id, if (unlikely(vq == NULL || !dev->async_copy)) return -1; - /* packed queue is not supported */ - if (unlikely(vq_is_packed(dev) || !f.async_inorder)) { + if (unlikely(!f.async_inorder)) { VHOST_LOG_CONFIG(ERR, - "async copy is not supported on packed queue or non-inorder mode " + "async copy is not supported on non-inorder mode " "(vid %d, qid: %d)\n", vid, queue_id); return -1; } @@ -1667,12 +1669,19 @@ int rte_vhost_async_channel_register(int vid, uint16_t queue_id, vq->vec_pool = rte_malloc_socket(NULL, VHOST_MAX_ASYNC_VEC * sizeof(struct iovec), RTE_CACHE_LINE_SIZE, node); - vq->async_descs_split = rte_malloc_socket(NULL, + if (vq_is_packed(dev)) { + vq->async_buffers_packed = rte_malloc_socket(NULL, + vq->size * sizeof(struct vring_used_elem_packed), + RTE_CACHE_LINE_SIZE, node); + } else { + vq->async_descs_split = rte_malloc_socket(NULL, vq->size * sizeof(struct vring_used_elem), RTE_CACHE_LINE_SIZE, node); - if (!vq->async_descs_split || !vq->async_pkts_info || - !vq->it_pool || !vq->vec_pool) { - vhost_free_async_mem(vq); + } + + if (!vq->async_buffers_packed || !vq->async_descs_split || + !vq->async_pkts_info || !vq->it_pool || !vq->vec_pool) { + vhost_free_async_mem(dev, vq); VHOST_LOG_CONFIG(ERR, "async register failed: cannot allocate memory for vq data " "(vid %d, qid: %d)\n", vid, queue_id); @@ -1728,7 +1737,7 @@ int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id) goto out; } - vhost_free_async_mem(vq); + vhost_free_async_mem(dev, vq); vq->async_ops.transfer_data = NULL; vq->async_ops.check_completed_copies = NULL; diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h index f628714c2..673335217 100644 --- a/lib/librte_vhost/vhost.h +++ b/lib/librte_vhost/vhost.h @@ -201,9 +201,18 @@ struct vhost_virtqueue { uint16_t async_pkts_idx; uint16_t async_pkts_inflight_n; uint16_t async_last_pkts_n; - struct vring_used_elem *async_descs_split; - uint16_t async_desc_idx; - uint16_t last_async_desc_idx; + union { + struct vring_used_elem *async_descs_split; + struct vring_used_elem_packed *async_buffers_packed; + }; + union { + uint16_t async_desc_idx; + uint16_t async_packed_buffer_idx; + }; + union { + uint16_t last_async_desc_idx; + uint16_t last_async_buffer_idx; + }; /* vq async features */ bool async_inorder; diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c index 438bdafd1..54e11e3a5 100644 --- a/lib/librte_vhost/virtio_net.c +++ b/lib/librte_vhost/virtio_net.c @@ -363,14 +363,14 @@ vhost_shadow_dequeue_single_packed_inorder(struct vhost_virtqueue *vq, } static __rte_always_inline void -vhost_shadow_enqueue_single_packed(struct virtio_net *dev, - struct vhost_virtqueue *vq, - uint32_t len[], - uint16_t id[], - uint16_t count[], +vhost_shadow_enqueue_packed(struct vhost_virtqueue *vq, + uint32_t *len, + uint16_t *id, + uint16_t *count, uint16_t num_buffers) { uint16_t i; + for (i = 0; i < num_buffers; i++) { /* enqueue shadow flush action aligned with batch num */ if (!vq->shadow_used_idx) @@ -382,6 +382,17 @@ vhost_shadow_enqueue_single_packed(struct virtio_net *dev, vq->shadow_aligned_idx += count[i]; vq->shadow_used_idx++; } +} + +static __rte_always_inline void +vhost_shadow_enqueue_single_packed(struct virtio_net *dev, + struct vhost_virtqueue *vq, + uint32_t *len, + uint16_t *id, + uint16_t *count, + uint16_t num_buffers) +{ + vhost_shadow_enqueue_packed(vq, len, id, count, num_buffers); if (vq->shadow_aligned_idx >= PACKED_BATCH_SIZE) { do_data_copy_enqueue(dev, vq); @@ -1474,6 +1485,23 @@ store_dma_desc_info_split(struct vring_used_elem *s_ring, struct vring_used_elem } } +static __rte_always_inline void +store_dma_desc_info_packed(struct vring_used_elem_packed *s_ring, + struct vring_used_elem_packed *d_ring, + uint16_t ring_size, uint16_t s_idx, uint16_t d_idx, uint16_t count) +{ + uint16_t elem_size = sizeof(struct vring_used_elem_packed); + + if (d_idx + count <= ring_size) { + rte_memcpy(d_ring + d_idx, s_ring + s_idx, count * elem_size); + } else { + uint16_t size = ring_size - d_idx; + + rte_memcpy(d_ring + d_idx, s_ring + s_idx, size * elem_size); + rte_memcpy(d_ring, s_ring + s_idx + size, (count - size) * elem_size); + } +} + static __rte_noinline uint32_t virtio_dev_rx_async_submit_split(struct virtio_net *dev, struct vhost_virtqueue *vq, uint16_t queue_id, @@ -1641,6 +1669,330 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev, return pkt_idx; } +static __rte_always_inline void +vhost_update_used_packed(struct vhost_virtqueue *vq, + struct vring_used_elem_packed *shadow_ring, + uint16_t count) +{ + int i; + uint16_t used_idx = vq->last_used_idx; + uint16_t head_idx = vq->last_used_idx; + uint16_t head_flags = 0; + + if (count == 0) + return; + + /* Split loop in two to save memory barriers */ + for (i = 0; i < count; i++) { + vq->desc_packed[used_idx].id = shadow_ring[i].id; + vq->desc_packed[used_idx].len = shadow_ring[i].len; + + used_idx += shadow_ring[i].count; + if (used_idx >= vq->size) + used_idx -= vq->size; + } + + /* The ordering for storing desc flags needs to be enforced. */ + rte_atomic_thread_fence(__ATOMIC_RELEASE); + + for (i = 0; i < count; i++) { + uint16_t flags; + + if (vq->shadow_used_packed[i].len) + flags = VRING_DESC_F_WRITE; + else + flags = 0; + + if (vq->used_wrap_counter) { + flags |= VRING_DESC_F_USED; + flags |= VRING_DESC_F_AVAIL; + } else { + flags &= ~VRING_DESC_F_USED; + flags &= ~VRING_DESC_F_AVAIL; + } + + if (i > 0) { + vq->desc_packed[vq->last_used_idx].flags = flags; + + } else { + head_idx = vq->last_used_idx; + head_flags = flags; + } + + vq_inc_last_used_packed(vq, shadow_ring[i].count); + } + + vq->desc_packed[head_idx].flags = head_flags; +} + +static __rte_always_inline int +vhost_enqueue_async_single_packed(struct virtio_net *dev, + struct vhost_virtqueue *vq, + struct rte_mbuf *pkt, + struct buf_vector *buf_vec, + uint16_t *nr_descs, + uint16_t *nr_buffers, + struct vring_packed_desc *async_descs, + struct iovec *src_iovec, struct iovec *dst_iovec, + struct rte_vhost_iov_iter *src_it, + struct rte_vhost_iov_iter *dst_it) +{ + uint16_t nr_vec = 0; + uint16_t avail_idx = vq->last_avail_idx; + uint16_t max_tries, tries = 0; + uint16_t buf_id = 0; + uint32_t len = 0; + uint16_t desc_count = 0; + uint32_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf); + uint32_t buffer_len[vq->size]; + uint16_t buffer_buf_id[vq->size]; + uint16_t buffer_desc_count[vq->size]; + *nr_buffers = 0; + + if (rxvq_is_mergeable(dev)) + max_tries = vq->size - 1; + else + max_tries = 1; + + while (size > 0) { + /* + * if we tried all available ring items, and still + * can't get enough buf, it means something abnormal + * happened. + */ + if (unlikely(++tries > max_tries)) + return -1; + + if (unlikely(fill_vec_buf_packed(dev, vq, avail_idx, &desc_count, buf_vec, &nr_vec, + &buf_id, &len, VHOST_ACCESS_RW) < 0)) + return -1; + + len = RTE_MIN(len, size); + size -= len; + + buffer_len[*nr_buffers] = len; + buffer_buf_id[*nr_buffers] = buf_id; + buffer_desc_count[*nr_buffers] = desc_count; + *nr_buffers += 1; + + *nr_descs += desc_count; + avail_idx += desc_count; + if (avail_idx >= vq->size) + avail_idx -= vq->size; + } + + if (async_mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, *nr_buffers, src_iovec, dst_iovec, + src_it, dst_it) < 0) + return -1; + /* store descriptors for DMA */ + if (avail_idx >= *nr_descs) { + rte_memcpy(async_descs, &vq->desc_packed[vq->last_avail_idx], + *nr_descs * sizeof(struct vring_packed_desc)); + } else { + uint16_t nr_copy = vq->size - vq->last_avail_idx; + rte_memcpy(async_descs, &vq->desc_packed[vq->last_avail_idx], + nr_copy * sizeof(struct vring_packed_desc)); + rte_memcpy(async_descs + nr_copy, vq->desc_packed, + (*nr_descs - nr_copy) * sizeof(struct vring_packed_desc)); + } + + vhost_shadow_enqueue_packed(vq, buffer_len, buffer_buf_id, buffer_desc_count, *nr_buffers); + + return 0; +} + +static __rte_always_inline int16_t +virtio_dev_rx_async_single_packed(struct virtio_net *dev, struct vhost_virtqueue *vq, + struct rte_mbuf *pkt, uint16_t *nr_descs, uint16_t *nr_buffers, + struct vring_packed_desc *async_descs, + struct iovec *src_iovec, struct iovec *dst_iovec, + struct rte_vhost_iov_iter *src_it, struct rte_vhost_iov_iter *dst_it) +{ + struct buf_vector buf_vec[BUF_VECTOR_MAX]; + *nr_descs = 0; + *nr_buffers = 0; + + if (unlikely(vhost_enqueue_async_single_packed(dev, vq, pkt, buf_vec, nr_descs, nr_buffers, + async_descs, src_iovec, dst_iovec, + src_it, dst_it) < 0)) { + VHOST_LOG_DATA(DEBUG, "(%d) failed to get enough desc from vring\n", dev->vid); + return -1; + } + + VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n", + dev->vid, vq->last_avail_idx, vq->last_avail_idx + *nr_descs); + + return 0; +} + +static __rte_always_inline void +dma_error_handler_packed(struct vhost_virtqueue *vq, struct vring_packed_desc *async_descs, + uint16_t async_descs_idx, uint16_t slot_idx, uint32_t nr_err, + uint32_t *pkt_idx, uint32_t *num_async_pkts, uint32_t *num_done_pkts) +{ + uint16_t descs_err = 0; + uint16_t buffers_err = 0; + struct async_inflight_info *pkts_info = vq->async_pkts_info; + + *num_async_pkts -= nr_err; + *pkt_idx -= nr_err; + /* calculate the sum of buffers and descs of DMA-error packets. */ + while (nr_err-- > 0) { + descs_err += pkts_info[slot_idx % vq->size].descs; + buffers_err += pkts_info[slot_idx % vq->size].nr_buffers; + slot_idx--; + } + + vq->async_packed_buffer_idx -= buffers_err; + + if (vq->last_avail_idx >= descs_err) { + vq->last_avail_idx -= descs_err; + + rte_memcpy(&vq->desc_packed[vq->last_avail_idx], + &async_descs[async_descs_idx - descs_err], + descs_err * sizeof(struct vring_packed_desc)); + } else { + uint16_t nr_copy; + + vq->last_avail_idx = vq->last_avail_idx + vq->size - descs_err; + nr_copy = vq->size - vq->last_avail_idx; + rte_memcpy(&vq->desc_packed[vq->last_avail_idx], + &async_descs[async_descs_idx - descs_err], + nr_copy * sizeof(struct vring_packed_desc)); + descs_err -= nr_copy; + rte_memcpy(&vq->desc_packed[0], &async_descs[async_descs_idx - descs_err], + descs_err * sizeof(struct vring_packed_desc)); + vq->avail_wrap_counter ^= 1; + } + + *num_done_pkts = *pkt_idx - *num_async_pkts; +} + +static __rte_noinline uint32_t +virtio_dev_rx_async_submit_packed(struct virtio_net *dev, + struct vhost_virtqueue *vq, uint16_t queue_id, + struct rte_mbuf **pkts, uint32_t count, + struct rte_mbuf **comp_pkts, uint32_t *comp_count) +{ + uint32_t pkt_idx = 0, pkt_burst_idx = 0; + uint16_t async_descs_idx = 0; + uint16_t num_buffers; + uint16_t num_desc; + + struct rte_vhost_iov_iter *it_pool = vq->it_pool; + struct iovec *vec_pool = vq->vec_pool; + struct rte_vhost_async_desc tdes[MAX_PKT_BURST]; + struct iovec *src_iovec = vec_pool; + struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1); + uint16_t slot_idx = 0; + uint16_t segs_await = 0; + uint16_t iovec_idx = 0, it_idx = 0; + struct async_inflight_info *pkts_info = vq->async_pkts_info; + uint32_t n_pkts = 0, pkt_err = 0; + uint32_t num_async_pkts = 0, num_done_pkts = 0; + struct vring_packed_desc async_descs[vq->size]; + + rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]); + + for (pkt_idx = 0; pkt_idx < count; pkt_idx++) { + if (unlikely(virtio_dev_rx_async_single_packed(dev, vq, pkts[pkt_idx], + &num_desc, &num_buffers, + &async_descs[async_descs_idx], + &src_iovec[iovec_idx], &dst_iovec[iovec_idx], + &it_pool[it_idx], &it_pool[it_idx + 1]) < 0)) + break; + + VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n", + dev->vid, vq->last_avail_idx, + vq->last_avail_idx + num_desc); + + slot_idx = (vq->async_pkts_idx + num_async_pkts) % vq->size; + if (it_pool[it_idx].count) { + uint16_t from, to; + + async_descs_idx += num_desc; + async_fill_desc(&tdes[pkt_burst_idx++], + &it_pool[it_idx], &it_pool[it_idx + 1]); + pkts_info[slot_idx].descs = num_desc; + pkts_info[slot_idx].nr_buffers = num_buffers; + pkts_info[slot_idx].mbuf = pkts[pkt_idx]; + num_async_pkts++; + iovec_idx += it_pool[it_idx].nr_segs; + it_idx += 2; + + segs_await += it_pool[it_idx].nr_segs; + + /** + * recover shadow used ring and keep DMA-occupied + * descriptors. + */ + from = vq->shadow_used_idx - num_buffers; + to = vq->async_packed_buffer_idx % vq->size; + store_dma_desc_info_packed(vq->shadow_used_packed, + vq->async_buffers_packed, vq->size, from, to, num_buffers); + + vq->async_packed_buffer_idx += num_buffers; + vq->shadow_used_idx -= num_buffers; + } else { + comp_pkts[num_done_pkts++] = pkts[pkt_idx]; + } + + vq_inc_last_avail_packed(vq, num_desc); + + /* + * conditions to trigger async device transfer: + * - buffered packet number reaches transfer threshold + * - unused async iov number is less than max vhost vector + */ + if (unlikely(pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD || + ((VHOST_MAX_ASYNC_VEC >> 1) - segs_await < BUF_VECTOR_MAX))) { + n_pkts = vq->async_ops.transfer_data(dev->vid, queue_id, + tdes, 0, pkt_burst_idx); + iovec_idx = 0; + it_idx = 0; + segs_await = 0; + vq->async_pkts_inflight_n += n_pkts; + + if (unlikely(n_pkts < pkt_burst_idx)) { + /* + * log error packets number here and do actual + * error processing when applications poll + * completion + */ + pkt_err = pkt_burst_idx - n_pkts; + pkt_burst_idx = 0; + pkt_idx++; + break; + } + + pkt_burst_idx = 0; + } + } + + if (pkt_burst_idx) { + n_pkts = vq->async_ops.transfer_data(dev->vid, queue_id, tdes, 0, pkt_burst_idx); + vq->async_pkts_inflight_n += n_pkts; + + if (unlikely(n_pkts < pkt_burst_idx)) + pkt_err = pkt_burst_idx - n_pkts; + } + + do_data_copy_enqueue(dev, vq); + + if (unlikely(pkt_err)) + dma_error_handler_packed(vq, async_descs, async_descs_idx, slot_idx, pkt_err, + &pkt_idx, &num_async_pkts, &num_done_pkts); + vq->async_pkts_idx += num_async_pkts; + *comp_count = num_done_pkts; + + if (likely(vq->shadow_used_idx)) { + vhost_flush_enqueue_shadow_packed(dev, vq); + vhost_vring_call_packed(dev, vq); + } + + return pkt_idx; +} + static __rte_always_inline void write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t n_descs) { @@ -1671,12 +2023,35 @@ write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t n_descs) } while (nr_left > 0); } +static __rte_always_inline void +write_back_completed_descs_packed(struct vhost_virtqueue *vq, + uint16_t n_buffers) +{ + uint16_t nr_left = n_buffers; + uint16_t from, to; + + do { + from = vq->last_async_buffer_idx % vq->size; + to = (from + nr_left) % vq->size; + if (to > from) { + vhost_update_used_packed(vq, vq->async_buffers_packed + from, to - from); + vq->last_async_buffer_idx += nr_left; + nr_left = 0; + } else { + vhost_update_used_packed(vq, vq->async_buffers_packed + from, + vq->size - from); + vq->last_async_buffer_idx += vq->size - from; + nr_left -= vq->size - from; + } + } while (nr_left > 0); +} + uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id, struct rte_mbuf **pkts, uint16_t count) { struct virtio_net *dev = get_device(vid); struct vhost_virtqueue *vq; - uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0; + uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0, n_buffers = 0; uint16_t start_idx, pkts_idx, vq_size; struct async_inflight_info *pkts_info; uint16_t from, i; @@ -1701,7 +2076,7 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id, rte_spinlock_lock(&vq->access_lock); - pkts_idx = vq->async_pkts_idx & (vq->size - 1); + pkts_idx = vq->async_pkts_idx % vq->size; pkts_info = vq->async_pkts_info; vq_size = vq->size; start_idx = virtio_dev_rx_async_get_info_idx(pkts_idx, @@ -1718,21 +2093,41 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id, goto done; } - for (i = 0; i < n_pkts_put; i++) { - from = (start_idx + i) & (vq_size - 1); - n_descs += pkts_info[from].descs; - pkts[i] = pkts_info[from].mbuf; + if (vq_is_packed(dev)) { + for (i = 0; i < n_pkts_put; i++) { + from = (start_idx + i) & (vq_size - 1); + n_buffers += pkts_info[from].nr_buffers; + pkts[i] = pkts_info[from].mbuf; + } + } else { + for (i = 0; i < n_pkts_put; i++) { + from = (start_idx + i) & (vq_size - 1); + n_descs += pkts_info[from].descs; + pkts[i] = pkts_info[from].mbuf; + } } + vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put; vq->async_pkts_inflight_n -= n_pkts_put; if (likely(vq->enabled && vq->access_ok)) { - write_back_completed_descs_split(vq, n_descs); + if (vq_is_packed(dev)) { + write_back_completed_descs_packed(vq, n_buffers); - __atomic_add_fetch(&vq->used->idx, n_descs, __ATOMIC_RELEASE); - vhost_vring_call_split(dev, vq); - } else - vq->last_async_desc_idx += n_descs; + vhost_vring_call_packed(dev, vq); + } else { + write_back_completed_descs_split(vq, n_descs); + + __atomic_add_fetch(&vq->used->idx, n_descs, + __ATOMIC_RELEASE); + vhost_vring_call_split(dev, vq); + } + } else { + if (vq_is_packed(dev)) + vq->last_async_buffer_idx += n_buffers; + else + vq->last_async_desc_idx += n_descs; + } done: rte_spinlock_unlock(&vq->access_lock); @@ -1773,9 +2168,10 @@ virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id, if (count == 0) goto out; - /* TODO: packed queue not implemented */ if (vq_is_packed(dev)) - nb_tx = 0; + nb_tx = virtio_dev_rx_async_submit_packed(dev, + vq, queue_id, pkts, count, comp_pkts, + comp_count); else nb_tx = virtio_dev_rx_async_submit_split(dev, vq, queue_id, pkts, count, comp_pkts, -- 2.29.2