From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id EF983A0577; Mon, 6 Apr 2020 17:27:22 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id A30061BEE4; Mon, 6 Apr 2020 17:27:18 +0200 (CEST) Received: from foss.arm.com (foss.arm.com [217.140.110.172]) by dpdk.org (Postfix) with ESMTP id 02F921BEE1 for ; Mon, 6 Apr 2020 17:27:17 +0200 (CEST) Received: from usa-sjc-imap-foss1.foss.arm.com (unknown [10.121.207.14]) by usa-sjc-mx-foss1.foss.arm.com (Postfix) with ESMTP id 7A16F1FB; Mon, 6 Apr 2020 08:27:16 -0700 (PDT) Received: from net-arm-thunderx2-03.shanghai.arm.com (net-arm-thunderx2-03.shanghai.arm.com [10.169.41.185]) by usa-sjc-imap-foss1.foss.arm.com (Postfix) with ESMTPA id 952653F68F; Mon, 6 Apr 2020 08:27:13 -0700 (PDT) From: Joyce Kong To: maxime.coquelin@redhat.com, stephen@networkplumber.org, tiwei.bie@intel.com, zhihong.wang@intel.com, thomas@monjalon.net, jerinj@marvell.com, yinan.wang@intel.com, honnappa.nagarahalli@arm.com, gavin.hu@arm.com Cc: nd@arm.com, dev@dpdk.org Date: Mon, 6 Apr 2020 23:26:33 +0800 Message-Id: <20200406152634.606-2-joyce.kong@arm.com> X-Mailer: git-send-email 2.17.1 In-Reply-To: <20200406152634.606-1-joyce.kong@arm.com> References: <20200406152634.606-1-joyce.kong@arm.com> In-Reply-To: <20200212092456.29433-1-joyce.kong@arm.com> References: <20200212092456.29433-1-joyce.kong@arm.com> Subject: [dpdk-dev] [PATCH v3 1/2] virtio: one way barrier for split vring used idx X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" In case VIRTIO_F_ORDER_PLATFORM(36) is not negotiated, then the frontend and backend are assumed to be implemented in software, that is they can run on identical CPUs in an SMP configuration. Thus a weak form of memory barriers like rte_smp_r/wmb, other than rte_cio_r/wmb, is sufficient for this case(vq->hw->weak_barriers == 1) and yields better performance. For the above case, this patch helps yielding even better performance by replacing the two-way barriers with C11 one-way barriers for used index in split ring. Signed-off-by: Joyce Kong Reviewed-by: Gavin Hu --- drivers/net/virtio/virtio_ethdev.c | 9 ++-- drivers/net/virtio/virtio_ring.h | 2 +- drivers/net/virtio/virtio_rxtx.c | 46 +++++++++---------- drivers/net/virtio/virtio_rxtx_simple_neon.c | 5 +- drivers/net/virtio/virtio_rxtx_simple_sse.c | 5 +- .../net/virtio/virtio_user/virtio_user_dev.c | 8 ++-- drivers/net/virtio/virtqueue.c | 2 +- drivers/net/virtio/virtqueue.h | 37 ++++++++++++--- lib/librte_vhost/virtio_net.c | 5 +- 9 files changed, 71 insertions(+), 48 deletions(-) diff --git a/drivers/net/virtio/virtio_ethdev.c b/drivers/net/virtio/virtio_ethdev.c index f9d0ea70d..a4a865bfa 100644 --- a/drivers/net/virtio/virtio_ethdev.c +++ b/drivers/net/virtio/virtio_ethdev.c @@ -285,13 +285,12 @@ virtio_send_command_split(struct virtnet_ctl *cvq, virtqueue_notify(vq); - rte_rmb(); - while (VIRTQUEUE_NUSED(vq) == 0) { - rte_rmb(); + /* virtqueue_nused has a load-acquire or rte_cio_rmb inside */ + while (virtqueue_nused(vq) == 0) usleep(100); - } - while (VIRTQUEUE_NUSED(vq)) { + /* virtqueue_nused has a load-acquire or rte_cio_rmb inside */ + while (virtqueue_nused(vq)) { uint32_t idx, desc_idx, used_idx; struct vring_used_elem *uep; diff --git a/drivers/net/virtio/virtio_ring.h b/drivers/net/virtio/virtio_ring.h index 7ba34662e..0f6574f68 100644 --- a/drivers/net/virtio/virtio_ring.h +++ b/drivers/net/virtio/virtio_ring.h @@ -59,7 +59,7 @@ struct vring_used_elem { struct vring_used { uint16_t flags; - volatile uint16_t idx; + uint16_t idx; struct vring_used_elem ring[0]; }; diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c index 752faa0f6..9ba26fd95 100644 --- a/drivers/net/virtio/virtio_rxtx.c +++ b/drivers/net/virtio/virtio_rxtx.c @@ -45,7 +45,7 @@ virtio_dev_rx_queue_done(void *rxq, uint16_t offset) struct virtnet_rx *rxvq = rxq; struct virtqueue *vq = rxvq->vq; - return VIRTQUEUE_NUSED(vq) >= offset; + return virtqueue_nused(vq) >= offset; } void @@ -1243,9 +1243,8 @@ virtio_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts, uint16_t nb_pkts) if (unlikely(hw->started == 0)) return nb_rx; - nb_used = VIRTQUEUE_NUSED(vq); - - virtio_rmb(hw->weak_barriers); + /* virtqueue_nused has a load-acquire or rte_cio_rmb inside */ + nb_used = virtqueue_nused(vq); num = likely(nb_used <= nb_pkts) ? nb_used : nb_pkts; if (unlikely(num > VIRTIO_MBUF_BURST_SZ)) @@ -1458,12 +1457,11 @@ virtio_recv_pkts_inorder(void *rx_queue, if (unlikely(hw->started == 0)) return nb_rx; - nb_used = VIRTQUEUE_NUSED(vq); + /* virtqueue_nused has a load-acquire or rte_cio_rmb inside */ + nb_used = virtqueue_nused(vq); nb_used = RTE_MIN(nb_used, nb_pkts); nb_used = RTE_MIN(nb_used, VIRTIO_MBUF_BURST_SZ); - virtio_rmb(hw->weak_barriers); - PMD_RX_LOG(DEBUG, "used:%d", nb_used); nb_enqueued = 0; @@ -1552,8 +1550,8 @@ virtio_recv_pkts_inorder(void *rx_queue, uint16_t rcv_cnt = RTE_MIN((uint16_t)seg_res, VIRTIO_MBUF_BURST_SZ); - if (likely(VIRTQUEUE_NUSED(vq) >= rcv_cnt)) { - virtio_rmb(hw->weak_barriers); + /* virtqueue_nused has a load-acquire or rte_cio_rmb inside */ + if (likely(virtqueue_nused(vq) >= rcv_cnt)) { num = virtqueue_dequeue_rx_inorder(vq, rcv_pkts, len, rcv_cnt); uint16_t extra_idx = 0; @@ -1644,9 +1642,8 @@ virtio_recv_mergeable_pkts(void *rx_queue, if (unlikely(hw->started == 0)) return nb_rx; - nb_used = VIRTQUEUE_NUSED(vq); - - virtio_rmb(hw->weak_barriers); + /* virtqueue_nused has a load-acquire or rte_cio_rmb inside */ + nb_used = virtqueue_nused(vq); PMD_RX_LOG(DEBUG, "used:%d", nb_used); @@ -1734,8 +1731,8 @@ virtio_recv_mergeable_pkts(void *rx_queue, uint16_t rcv_cnt = RTE_MIN((uint16_t)seg_res, VIRTIO_MBUF_BURST_SZ); - if (likely(VIRTQUEUE_NUSED(vq) >= rcv_cnt)) { - virtio_rmb(hw->weak_barriers); + /* virtqueue_nused has a load-acquire or rte_cio_rmb inside */ + if (likely(virtqueue_nused(vq) >= rcv_cnt)) { num = virtqueue_dequeue_burst_rx(vq, rcv_pkts, len, rcv_cnt); uint16_t extra_idx = 0; @@ -2108,9 +2105,10 @@ virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts) return nb_pkts; PMD_TX_LOG(DEBUG, "%d packets to xmit", nb_pkts); - nb_used = VIRTQUEUE_NUSED(vq); - virtio_rmb(hw->weak_barriers); + /* virtqueue_nused has a load-acquire or rte_cio_rmb inside */ + nb_used = virtqueue_nused(vq); + if (likely(nb_used > vq->vq_nentries - vq->vq_free_thresh)) virtio_xmit_cleanup(vq, nb_used); @@ -2142,8 +2140,11 @@ virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts) /* Positive value indicates it need free vring descriptors */ if (unlikely(need > 0)) { - nb_used = VIRTQUEUE_NUSED(vq); - virtio_rmb(hw->weak_barriers); + /* virtqueue_nused has a load-acquire or + * rte_cio_rmb inside + */ + nb_used = virtqueue_nused(vq); + need = RTE_MIN(need, (int)nb_used); virtio_xmit_cleanup(vq, need); @@ -2180,11 +2181,10 @@ static __rte_always_inline int virtio_xmit_try_cleanup_inorder(struct virtqueue *vq, uint16_t need) { uint16_t nb_used, nb_clean, nb_descs; - struct virtio_hw *hw = vq->hw; nb_descs = vq->vq_free_cnt + need; - nb_used = VIRTQUEUE_NUSED(vq); - virtio_rmb(hw->weak_barriers); + /* virtqueue_nused has a load-acquire or rte_cio_rmb inside */ + nb_used = virtqueue_nused(vq); nb_clean = RTE_MIN(need, (int)nb_used); virtio_xmit_cleanup_inorder(vq, nb_clean); @@ -2213,9 +2213,9 @@ virtio_xmit_pkts_inorder(void *tx_queue, VIRTQUEUE_DUMP(vq); PMD_TX_LOG(DEBUG, "%d packets to xmit", nb_pkts); - nb_used = VIRTQUEUE_NUSED(vq); + /* virtqueue_nused has a load-acquire or rte_cio_rmb inside */ + nb_used = virtqueue_nused(vq); - virtio_rmb(hw->weak_barriers); if (likely(nb_used > vq->vq_nentries - vq->vq_free_thresh)) virtio_xmit_cleanup_inorder(vq, nb_used); diff --git a/drivers/net/virtio/virtio_rxtx_simple_neon.c b/drivers/net/virtio/virtio_rxtx_simple_neon.c index 992e71f01..363e2b330 100644 --- a/drivers/net/virtio/virtio_rxtx_simple_neon.c +++ b/drivers/net/virtio/virtio_rxtx_simple_neon.c @@ -83,9 +83,8 @@ virtio_recv_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts, if (unlikely(nb_pkts < RTE_VIRTIO_DESC_PER_LOOP)) return 0; - nb_used = VIRTQUEUE_NUSED(vq); - - rte_rmb(); + /* virtqueue_nused has a load-acquire or rte_cio_rmb inside */ + nb_used = virtqueue_nused(vq); if (unlikely(nb_used == 0)) return 0; diff --git a/drivers/net/virtio/virtio_rxtx_simple_sse.c b/drivers/net/virtio/virtio_rxtx_simple_sse.c index f9ec4ae69..45a45e6f4 100644 --- a/drivers/net/virtio/virtio_rxtx_simple_sse.c +++ b/drivers/net/virtio/virtio_rxtx_simple_sse.c @@ -85,9 +85,8 @@ virtio_recv_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts, if (unlikely(nb_pkts < RTE_VIRTIO_DESC_PER_LOOP)) return 0; - nb_used = VIRTQUEUE_NUSED(vq); - - rte_compiler_barrier(); + /* virtqueue_nused has a load-acquire or rte_cio_rmb inside */ + nb_used = virtqueue_nused(vq); if (unlikely(nb_used == 0)) return 0; diff --git a/drivers/net/virtio/virtio_user/virtio_user_dev.c b/drivers/net/virtio/virtio_user/virtio_user_dev.c index 1c6b26f8d..7fb135f49 100644 --- a/drivers/net/virtio/virtio_user/virtio_user_dev.c +++ b/drivers/net/virtio/virtio_user/virtio_user_dev.c @@ -730,8 +730,10 @@ virtio_user_handle_cq(struct virtio_user_dev *dev, uint16_t queue_idx) struct vring *vring = &dev->vrings[queue_idx]; /* Consume avail ring, using used ring idx as first one */ - while (vring->used->idx != vring->avail->idx) { - avail_idx = (vring->used->idx) & (vring->num - 1); + while (__atomic_load_n(&vring->used->idx, __ATOMIC_RELAXED) + != vring->avail->idx) { + avail_idx = __atomic_load_n(&vring->used->idx, __ATOMIC_RELAXED) + & (vring->num - 1); desc_idx = vring->avail->ring[avail_idx]; n_descs = virtio_user_handle_ctrl_msg(dev, vring, desc_idx); @@ -741,6 +743,6 @@ virtio_user_handle_cq(struct virtio_user_dev *dev, uint16_t queue_idx) uep->id = desc_idx; uep->len = n_descs; - vring->used->idx++; + __atomic_add_fetch(&vring->used->idx, 1, __ATOMIC_RELAXED); } } diff --git a/drivers/net/virtio/virtqueue.c b/drivers/net/virtio/virtqueue.c index 0b4e3bf3e..b0f61dabc 100644 --- a/drivers/net/virtio/virtqueue.c +++ b/drivers/net/virtio/virtqueue.c @@ -92,7 +92,7 @@ virtqueue_rxvq_flush_split(struct virtqueue *vq) uint16_t used_idx, desc_idx; uint16_t nb_used, i; - nb_used = VIRTQUEUE_NUSED(vq); + nb_used = virtqueue_nused(vq); for (i = 0; i < nb_used; i++) { used_idx = vq->vq_used_cons_idx & (vq->vq_nentries - 1); diff --git a/drivers/net/virtio/virtqueue.h b/drivers/net/virtio/virtqueue.h index 58ad7309a..54dc63c93 100644 --- a/drivers/net/virtio/virtqueue.h +++ b/drivers/net/virtio/virtqueue.h @@ -464,8 +464,32 @@ virtio_get_queue_type(struct virtio_hw *hw, uint16_t vtpci_queue_idx) return VTNET_TQ; } -#define VIRTQUEUE_NUSED(vq) ((uint16_t)((vq)->vq_split.ring.used->idx - \ - (vq)->vq_used_cons_idx)) +static inline uint16_t +virtqueue_nused(const struct virtqueue *vq) +{ + uint16_t idx; + + if (vq->hw->weak_barriers) { + /** + * x86 prefers to using rte_smp_rmb over __atomic_load_n as it + * reports a slightly better perf, which comes from the saved + * branch by the compiler. + * The if and else branches are identical with the smp and cio + * barriers both defined as compiler barriers on x86. + */ +#ifdef RTE_ARCH_X86_64 + idx = vq->vq_split.ring.used->idx; + rte_smp_rmb(); +#else + idx = __atomic_load_n(&(vq)->vq_split.ring.used->idx, + __ATOMIC_ACQUIRE); +#endif + } else { + idx = vq->vq_split.ring.used->idx; + rte_cio_rmb(); + } + return idx - vq->vq_used_cons_idx; +} void vq_ring_free_chain(struct virtqueue *vq, uint16_t desc_idx); void vq_ring_free_chain_packed(struct virtqueue *vq, uint16_t used_idx); @@ -534,7 +558,8 @@ virtqueue_notify(struct virtqueue *vq) #ifdef RTE_LIBRTE_VIRTIO_DEBUG_DUMP #define VIRTQUEUE_DUMP(vq) do { \ uint16_t used_idx, nused; \ - used_idx = (vq)->vq_split.ring.used->idx; \ + used_idx = __atomic_load_n(&(vq)->vq_split.ring.used->idx, \ + __ATOMIC_RELAXED); nused = (uint16_t)(used_idx - (vq)->vq_used_cons_idx); \ if (vtpci_packed_queue((vq)->hw)) { \ PMD_INIT_LOG(DEBUG, \ @@ -549,9 +574,9 @@ virtqueue_notify(struct virtqueue *vq) "VQ: - size=%d; free=%d; used=%d; desc_head_idx=%d;" \ " avail.idx=%d; used_cons_idx=%d; used.idx=%d;" \ " avail.flags=0x%x; used.flags=0x%x", \ - (vq)->vq_nentries, (vq)->vq_free_cnt, nused, \ - (vq)->vq_desc_head_idx, (vq)->vq_split.ring.avail->idx, \ - (vq)->vq_used_cons_idx, (vq)->vq_split.ring.used->idx, \ + (vq)->vq_nentries, (vq)->vq_free_cnt, nused, (vq)->vq_desc_head_idx, \ + (vq)->vq_split.ring.avail->idx, (vq)->vq_used_cons_idx, \ + __atomic_load_n(&(vq)->vq_split.ring.used->idx, __ATOMIC_RELAXED), \ (vq)->vq_split.ring.avail->flags, (vq)->vq_split.ring.used->flags); \ } while (0) #else diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c index 37c47c7dc..7f6e7f2c1 100644 --- a/lib/librte_vhost/virtio_net.c +++ b/lib/librte_vhost/virtio_net.c @@ -77,11 +77,10 @@ flush_shadow_used_ring_split(struct virtio_net *dev, struct vhost_virtqueue *vq) } vq->last_used_idx += vq->shadow_used_idx; - rte_smp_wmb(); - vhost_log_cache_sync(dev, vq); - *(volatile uint16_t *)&vq->used->idx += vq->shadow_used_idx; + __atomic_add_fetch(&vq->used->idx, vq->shadow_used_idx, + __ATOMIC_RELEASE); vq->shadow_used_idx = 0; vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx), sizeof(vq->used->idx)); -- 2.17.1