First version of this series did merge out-of-order mergeable and non-mergeable receive paths, but Intel STV team highlighted some performance regression when using multiqueue with two cores enqueueing descs on host, while a single core dequeues the two queues. I didn't manage to close the performance gap, so I decided to give-up this refactoring. But while trying to optimize, I reworked the meargeable function so that it looks like the in-order one. I.e. descriptors are now dequeued in batches, so are descriptors refilled. Doing that, I measure a perfromance gain of 6% when doing rxonly microbenchmark with two cores on host, one in guest. Changes since v2: - Rebase after Jens' packed ring series - Break refill loop if VQ_RING_DESC_CHAIN_END (Tiwei) - Fixup commit message Maxime Coquelin (3): net/virtio: inline refill and offload helpers net/virtio: add non-mergeable support to in-order path net/virtio: improve batching in mergeable path drivers/net/virtio/virtio_ethdev.c | 14 +- drivers/net/virtio/virtio_ethdev.h | 2 +- drivers/net/virtio/virtio_rxtx.c | 257 ++++++++++++++++------------- 3 files changed, 145 insertions(+), 128 deletions(-) -- 2.17.2
Signed-off-by: Maxime Coquelin <maxime.coquelin@redhat.com> Reviewed-by: Jens Freimann <jfreimann@redhat.com> --- drivers/net/virtio/virtio_rxtx.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c index 8564f18a7..95d74d000 100644 --- a/drivers/net/virtio/virtio_rxtx.c +++ b/drivers/net/virtio/virtio_rxtx.c @@ -980,7 +980,7 @@ virtio_dev_tx_queue_setup_finish(struct rte_eth_dev *dev, return 0; } -static void +static inline void virtio_discard_rxbuf(struct virtqueue *vq, struct rte_mbuf *m) { int error; @@ -999,7 +999,7 @@ virtio_discard_rxbuf(struct virtqueue *vq, struct rte_mbuf *m) } } -static void +static inline void virtio_discard_rxbuf_inorder(struct virtqueue *vq, struct rte_mbuf *m) { int error; @@ -1011,7 +1011,7 @@ virtio_discard_rxbuf_inorder(struct virtqueue *vq, struct rte_mbuf *m) } } -static void +static inline void virtio_update_packet_stats(struct virtnet_stats *stats, struct rte_mbuf *mbuf) { uint32_t s = mbuf->pkt_len; @@ -1054,7 +1054,7 @@ virtio_rx_stats_updated(struct virtnet_rx *rxvq, struct rte_mbuf *m) } /* Optionally fill offload information in structure */ -static int +static inline int virtio_rx_offload(struct rte_mbuf *m, struct virtio_net_hdr *hdr) { struct rte_net_hdr_lens hdr_lens; -- 2.17.2
This patch adds support for in-order path when meargeable buffers feature hasn't been negotiated. Signed-off-by: Maxime Coquelin <maxime.coquelin@redhat.com> Reviewed-by: Tiwei Bie <tiwei.bie@intel.com> --- drivers/net/virtio/virtio_ethdev.c | 14 ++++---------- drivers/net/virtio/virtio_ethdev.h | 2 +- drivers/net/virtio/virtio_rxtx.c | 10 +++++++--- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/drivers/net/virtio/virtio_ethdev.c b/drivers/net/virtio/virtio_ethdev.c index d0c65e375..446c338fc 100644 --- a/drivers/net/virtio/virtio_ethdev.c +++ b/drivers/net/virtio/virtio_ethdev.c @@ -1481,10 +1481,9 @@ set_rxtx_funcs(struct rte_eth_dev *eth_dev) eth_dev->rx_pkt_burst = virtio_recv_pkts_vec; } else if (hw->use_inorder_rx) { PMD_INIT_LOG(INFO, - "virtio: using inorder mergeable buffer Rx path on port %u", + "virtio: using inorder Rx path on port %u", eth_dev->data->port_id); - eth_dev->rx_pkt_burst = - &virtio_recv_mergeable_pkts_inorder; + eth_dev->rx_pkt_burst = &virtio_recv_pkts_inorder; } else if (vtpci_with_feature(hw, VIRTIO_NET_F_MRG_RXBUF)) { PMD_INIT_LOG(INFO, "virtio: using mergeable buffer Rx path on port %u", @@ -2049,13 +2048,8 @@ virtio_dev_configure(struct rte_eth_dev *dev) if (vtpci_with_feature(hw, VIRTIO_F_IN_ORDER)) { hw->use_inorder_tx = 1; - if (vtpci_with_feature(hw, VIRTIO_NET_F_MRG_RXBUF) && - !vtpci_packed_queue(hw)) { - hw->use_inorder_rx = 1; - hw->use_simple_rx = 0; - } else { - hw->use_inorder_rx = 0; - } + hw->use_inorder_rx = 1; + hw->use_simple_rx = 0; } if (vtpci_packed_queue(hw)) { diff --git a/drivers/net/virtio/virtio_ethdev.h b/drivers/net/virtio/virtio_ethdev.h index 364ecbb50..f8d8a56ab 100644 --- a/drivers/net/virtio/virtio_ethdev.h +++ b/drivers/net/virtio/virtio_ethdev.h @@ -83,7 +83,7 @@ uint16_t virtio_recv_mergeable_pkts(void *rx_queue, struct rte_mbuf **rx_pkts, uint16_t virtio_recv_mergeable_pkts_packed(void *rx_queue, struct rte_mbuf **rx_pkts, uint16_t nb_pkts); -uint16_t virtio_recv_mergeable_pkts_inorder(void *rx_queue, +uint16_t virtio_recv_pkts_inorder(void *rx_queue, struct rte_mbuf **rx_pkts, uint16_t nb_pkts); uint16_t virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c index 95d74d000..58376ced3 100644 --- a/drivers/net/virtio/virtio_rxtx.c +++ b/drivers/net/virtio/virtio_rxtx.c @@ -1330,7 +1330,7 @@ virtio_recv_pkts_packed(void *rx_queue, struct rte_mbuf **rx_pkts, uint16_t -virtio_recv_mergeable_pkts_inorder(void *rx_queue, +virtio_recv_pkts_inorder(void *rx_queue, struct rte_mbuf **rx_pkts, uint16_t nb_pkts) { @@ -1387,10 +1387,14 @@ virtio_recv_mergeable_pkts_inorder(void *rx_queue, header = (struct virtio_net_hdr_mrg_rxbuf *) ((char *)rxm->buf_addr + RTE_PKTMBUF_HEADROOM - hdr_size); - seg_num = header->num_buffers; - if (seg_num == 0) + if (vtpci_with_feature(hw, VIRTIO_NET_F_MRG_RXBUF)) { + seg_num = header->num_buffers; + if (seg_num == 0) + seg_num = 1; + } else { seg_num = 1; + } rxm->data_off = RTE_PKTMBUF_HEADROOM; rxm->nb_segs = seg_num; -- 2.17.2
This patch improves both descriptors dequeue and refill, by using the same batching strategy as done in in-order path. Signed-off-by: Maxime Coquelin <maxime.coquelin@redhat.com> Tested-by: Jens Freimann <jfreimann@redhat.com> Reviewed-by: Jens Freimann <jfreimann@redhat.com> --- drivers/net/virtio/virtio_rxtx.c | 239 +++++++++++++++++-------------- 1 file changed, 129 insertions(+), 110 deletions(-) diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c index 58376ced3..1cfa2f0d6 100644 --- a/drivers/net/virtio/virtio_rxtx.c +++ b/drivers/net/virtio/virtio_rxtx.c @@ -353,41 +353,44 @@ virtqueue_enqueue_refill_inorder(struct virtqueue *vq, } static inline int -virtqueue_enqueue_recv_refill(struct virtqueue *vq, struct rte_mbuf *cookie) +virtqueue_enqueue_recv_refill(struct virtqueue *vq, struct rte_mbuf **cookie, + uint16_t num) { struct vq_desc_extra *dxp; struct virtio_hw *hw = vq->hw; - struct vring_desc *start_dp; - uint16_t needed = 1; - uint16_t head_idx, idx; + struct vring_desc *start_dp = vq->vq_ring.desc; + uint16_t idx, i; if (unlikely(vq->vq_free_cnt == 0)) return -ENOSPC; - if (unlikely(vq->vq_free_cnt < needed)) + if (unlikely(vq->vq_free_cnt < num)) return -EMSGSIZE; - head_idx = vq->vq_desc_head_idx; - if (unlikely(head_idx >= vq->vq_nentries)) + if (unlikely(vq->vq_desc_head_idx >= vq->vq_nentries)) return -EFAULT; - idx = head_idx; - dxp = &vq->vq_descx[idx]; - dxp->cookie = (void *)cookie; - dxp->ndescs = needed; + for (i = 0; i < num; i++) { + idx = vq->vq_desc_head_idx; + dxp = &vq->vq_descx[idx]; + dxp->cookie = (void *)cookie[i]; + dxp->ndescs = 1; - start_dp = vq->vq_ring.desc; - start_dp[idx].addr = - VIRTIO_MBUF_ADDR(cookie, vq) + - RTE_PKTMBUF_HEADROOM - hw->vtnet_hdr_size; - start_dp[idx].len = - cookie->buf_len - RTE_PKTMBUF_HEADROOM + hw->vtnet_hdr_size; - start_dp[idx].flags = VRING_DESC_F_WRITE; - idx = start_dp[idx].next; - vq->vq_desc_head_idx = idx; - if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END) - vq->vq_desc_tail_idx = idx; - vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - needed); - vq_update_avail_ring(vq, head_idx); + start_dp[idx].addr = + VIRTIO_MBUF_ADDR(cookie[i], vq) + + RTE_PKTMBUF_HEADROOM - hw->vtnet_hdr_size; + start_dp[idx].len = + cookie[i]->buf_len - RTE_PKTMBUF_HEADROOM + + hw->vtnet_hdr_size; + start_dp[idx].flags = VRING_DESC_F_WRITE; + vq->vq_desc_head_idx = start_dp[idx].next; + vq_update_avail_ring(vq, idx); + if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END) { + vq->vq_desc_tail_idx = vq->vq_desc_head_idx; + break; + } + } + + vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - num); return 0; } @@ -892,7 +895,8 @@ virtio_dev_rx_queue_setup_finish(struct rte_eth_dev *dev, uint16_t queue_idx) error = virtqueue_enqueue_recv_refill_packed(vq, &m, 1); else - error = virtqueue_enqueue_recv_refill(vq, m); + error = virtqueue_enqueue_recv_refill(vq, + &m, 1); if (error) { rte_pktmbuf_free(m); break; @@ -991,7 +995,7 @@ virtio_discard_rxbuf(struct virtqueue *vq, struct rte_mbuf *m) if (vtpci_packed_queue(vq->hw)) error = virtqueue_enqueue_recv_refill_packed(vq, &m, 1); else - error = virtqueue_enqueue_recv_refill(vq, m); + error = virtqueue_enqueue_recv_refill(vq, &m, 1); if (unlikely(error)) { RTE_LOG(ERR, PMD, "cannot requeue discarded mbuf"); @@ -1211,7 +1215,7 @@ virtio_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts, uint16_t nb_pkts) dev->data->rx_mbuf_alloc_failed++; break; } - error = virtqueue_enqueue_recv_refill(vq, new_mbuf); + error = virtqueue_enqueue_recv_refill(vq, &new_mbuf, 1); if (unlikely(error)) { rte_pktmbuf_free(new_mbuf); break; @@ -1528,19 +1532,18 @@ virtio_recv_mergeable_pkts(void *rx_queue, struct virtnet_rx *rxvq = rx_queue; struct virtqueue *vq = rxvq->vq; struct virtio_hw *hw = vq->hw; - struct rte_mbuf *rxm, *new_mbuf; - uint16_t nb_used, num, nb_rx; + struct rte_mbuf *rxm; + struct rte_mbuf *prev; + uint16_t nb_used, num, nb_rx = 0; uint32_t len[VIRTIO_MBUF_BURST_SZ]; struct rte_mbuf *rcv_pkts[VIRTIO_MBUF_BURST_SZ]; - struct rte_mbuf *prev; int error; - uint32_t i, nb_enqueued; - uint32_t seg_num; - uint16_t extra_idx; - uint32_t seg_res; - uint32_t hdr_size; + uint32_t nb_enqueued = 0; + uint32_t seg_num = 0; + uint32_t seg_res = 0; + uint32_t hdr_size = hw->vtnet_hdr_size; + int32_t i; - nb_rx = 0; if (unlikely(hw->started == 0)) return nb_rx; @@ -1550,31 +1553,25 @@ virtio_recv_mergeable_pkts(void *rx_queue, PMD_RX_LOG(DEBUG, "used:%d", nb_used); - i = 0; - nb_enqueued = 0; - seg_num = 0; - extra_idx = 0; - seg_res = 0; - hdr_size = hw->vtnet_hdr_size; - - while (i < nb_used) { - struct virtio_net_hdr_mrg_rxbuf *header; + num = likely(nb_used <= nb_pkts) ? nb_used : nb_pkts; + if (unlikely(num > VIRTIO_MBUF_BURST_SZ)) + num = VIRTIO_MBUF_BURST_SZ; + if (likely(num > DESC_PER_CACHELINE)) + num = num - ((vq->vq_used_cons_idx + num) % + DESC_PER_CACHELINE); - if (nb_rx == nb_pkts) - break; - num = virtqueue_dequeue_burst_rx(vq, rcv_pkts, len, 1); - if (num != 1) - continue; + num = virtqueue_dequeue_burst_rx(vq, rcv_pkts, len, num); - i++; + for (i = 0; i < num; i++) { + struct virtio_net_hdr_mrg_rxbuf *header; PMD_RX_LOG(DEBUG, "dequeue:%d", num); - PMD_RX_LOG(DEBUG, "packet len:%d", len[0]); + PMD_RX_LOG(DEBUG, "packet len:%d", len[i]); - rxm = rcv_pkts[0]; + rxm = rcv_pkts[i]; - if (unlikely(len[0] < hdr_size + ETHER_HDR_LEN)) { + if (unlikely(len[i] < hdr_size + ETHER_HDR_LEN)) { PMD_RX_LOG(ERR, "Packet drop"); nb_enqueued++; virtio_discard_rxbuf(vq, rxm); @@ -1582,10 +1579,10 @@ virtio_recv_mergeable_pkts(void *rx_queue, continue; } - header = (struct virtio_net_hdr_mrg_rxbuf *)((char *)rxm->buf_addr + - RTE_PKTMBUF_HEADROOM - hdr_size); + header = (struct virtio_net_hdr_mrg_rxbuf *) + ((char *)rxm->buf_addr + RTE_PKTMBUF_HEADROOM + - hdr_size); seg_num = header->num_buffers; - if (seg_num == 0) seg_num = 1; @@ -1593,10 +1590,11 @@ virtio_recv_mergeable_pkts(void *rx_queue, rxm->nb_segs = seg_num; rxm->ol_flags = 0; rxm->vlan_tci = 0; - rxm->pkt_len = (uint32_t)(len[0] - hdr_size); - rxm->data_len = (uint16_t)(len[0] - hdr_size); + rxm->pkt_len = (uint32_t)(len[i] - hdr_size); + rxm->data_len = (uint16_t)(len[i] - hdr_size); rxm->port = rxvq->port_id; + rx_pkts[nb_rx] = rxm; prev = rxm; @@ -1607,75 +1605,96 @@ virtio_recv_mergeable_pkts(void *rx_queue, continue; } + if (hw->vlan_strip) + rte_vlan_strip(rx_pkts[nb_rx]); + seg_res = seg_num - 1; - while (seg_res != 0) { - /* - * Get extra segments for current uncompleted packet. - */ - uint16_t rcv_cnt = - RTE_MIN(seg_res, RTE_DIM(rcv_pkts)); - if (likely(VIRTQUEUE_NUSED(vq) >= rcv_cnt)) { - uint32_t rx_num = - virtqueue_dequeue_burst_rx(vq, - rcv_pkts, len, rcv_cnt); - i += rx_num; - rcv_cnt = rx_num; - } else { - PMD_RX_LOG(ERR, - "No enough segments for packet."); - nb_enqueued++; - virtio_discard_rxbuf(vq, rxm); - rxvq->stats.errors++; - break; - } + /* Merge remaining segments */ + while (seg_res != 0 && i < (num - 1)) { + i++; + + rxm = rcv_pkts[i]; + rxm->data_off = RTE_PKTMBUF_HEADROOM - hdr_size; + rxm->pkt_len = (uint32_t)(len[i]); + rxm->data_len = (uint16_t)(len[i]); + + rx_pkts[nb_rx]->pkt_len += (uint32_t)(len[i]); + rx_pkts[nb_rx]->data_len += (uint16_t)(len[i]); + + if (prev) + prev->next = rxm; + + prev = rxm; + seg_res -= 1; + } + + if (!seg_res) { + virtio_rx_stats_updated(rxvq, rx_pkts[nb_rx]); + nb_rx++; + } + } + + /* Last packet still need merge segments */ + while (seg_res != 0) { + uint16_t rcv_cnt = RTE_MIN((uint16_t)seg_res, + VIRTIO_MBUF_BURST_SZ); - extra_idx = 0; + prev = rcv_pkts[nb_rx]; + if (likely(VIRTQUEUE_NUSED(vq) >= rcv_cnt)) { + num = virtqueue_dequeue_burst_rx(vq, rcv_pkts, len, + rcv_cnt); + uint16_t extra_idx = 0; + rcv_cnt = num; while (extra_idx < rcv_cnt) { rxm = rcv_pkts[extra_idx]; - - rxm->data_off = RTE_PKTMBUF_HEADROOM - hdr_size; + rxm->data_off = + RTE_PKTMBUF_HEADROOM - hdr_size; rxm->pkt_len = (uint32_t)(len[extra_idx]); rxm->data_len = (uint16_t)(len[extra_idx]); - - if (prev) - prev->next = rxm; - + prev->next = rxm; prev = rxm; - rx_pkts[nb_rx]->pkt_len += rxm->pkt_len; - extra_idx++; + rx_pkts[nb_rx]->pkt_len += len[extra_idx]; + rx_pkts[nb_rx]->data_len += len[extra_idx]; + extra_idx += 1; }; seg_res -= rcv_cnt; - } - - if (hw->vlan_strip) - rte_vlan_strip(rx_pkts[nb_rx]); - - VIRTIO_DUMP_PACKET(rx_pkts[nb_rx], - rx_pkts[nb_rx]->data_len); - virtio_update_packet_stats(&rxvq->stats, rx_pkts[nb_rx]); - nb_rx++; + if (!seg_res) { + virtio_rx_stats_updated(rxvq, rx_pkts[nb_rx]); + nb_rx++; + } + } else { + PMD_RX_LOG(ERR, + "No enough segments for packet."); + virtio_discard_rxbuf(vq, prev); + rxvq->stats.errors++; + break; + } } rxvq->stats.packets += nb_rx; /* Allocate new mbuf for the used descriptor */ - while (likely(!virtqueue_full(vq))) { - new_mbuf = rte_mbuf_raw_alloc(rxvq->mpool); - if (unlikely(new_mbuf == NULL)) { - struct rte_eth_dev *dev - = &rte_eth_devices[rxvq->port_id]; - dev->data->rx_mbuf_alloc_failed++; - break; - } - error = virtqueue_enqueue_recv_refill(vq, new_mbuf); - if (unlikely(error)) { - rte_pktmbuf_free(new_mbuf); - break; + if (likely(!virtqueue_full(vq))) { + /* free_cnt may include mrg descs */ + uint16_t free_cnt = vq->vq_free_cnt; + struct rte_mbuf *new_pkts[free_cnt]; + + if (!rte_pktmbuf_alloc_bulk(rxvq->mpool, new_pkts, free_cnt)) { + error = virtqueue_enqueue_recv_refill(vq, new_pkts, + free_cnt); + if (unlikely(error)) { + for (i = 0; i < free_cnt; i++) + rte_pktmbuf_free(new_pkts[i]); + } + nb_enqueued += free_cnt; + } else { + struct rte_eth_dev *dev = + &rte_eth_devices[rxvq->port_id]; + dev->data->rx_mbuf_alloc_failed += free_cnt; } - nb_enqueued++; } if (likely(nb_enqueued)) { -- 2.17.2
> -----Original Message----- > From: dev <dev-bounces@dpdk.org> On Behalf Of Maxime Coquelin > Sent: Friday, December 21, 2018 1:27 AM > To: dev@dpdk.org; jfreimann@redhat.com; tiwei.bie@intel.com; > zhihong.wang@intel.com > Cc: Maxime Coquelin <maxime.coquelin@redhat.com> > Subject: [dpdk-dev] [PATCH v3 3/3] net/virtio: improve batching in > mergeable path > > This patch improves both descriptors dequeue and refill, > by using the same batching strategy as done in in-order path. > > Signed-off-by: Maxime Coquelin <maxime.coquelin@redhat.com> > Tested-by: Jens Freimann <jfreimann@redhat.com> > Reviewed-by: Jens Freimann <jfreimann@redhat.com> > --- > drivers/net/virtio/virtio_rxtx.c | 239 +++++++++++++++++-------------- > 1 file changed, 129 insertions(+), 110 deletions(-) > > diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c > index 58376ced3..1cfa2f0d6 100644 > --- a/drivers/net/virtio/virtio_rxtx.c > +++ b/drivers/net/virtio/virtio_rxtx.c > @@ -353,41 +353,44 @@ virtqueue_enqueue_refill_inorder(struct > virtqueue *vq, > } > > static inline int > -virtqueue_enqueue_recv_refill(struct virtqueue *vq, struct rte_mbuf > *cookie) > +virtqueue_enqueue_recv_refill(struct virtqueue *vq, struct rte_mbuf > **cookie, > + uint16_t num) > { > struct vq_desc_extra *dxp; > struct virtio_hw *hw = vq->hw; > - struct vring_desc *start_dp; > - uint16_t needed = 1; > - uint16_t head_idx, idx; > + struct vring_desc *start_dp = vq->vq_ring.desc; > + uint16_t idx, i; > > if (unlikely(vq->vq_free_cnt == 0)) > return -ENOSPC; > - if (unlikely(vq->vq_free_cnt < needed)) > + if (unlikely(vq->vq_free_cnt < num)) > return -EMSGSIZE; > > - head_idx = vq->vq_desc_head_idx; > - if (unlikely(head_idx >= vq->vq_nentries)) > + if (unlikely(vq->vq_desc_head_idx >= vq->vq_nentries)) > return -EFAULT; > > - idx = head_idx; > - dxp = &vq->vq_descx[idx]; > - dxp->cookie = (void *)cookie; > - dxp->ndescs = needed; > + for (i = 0; i < num; i++) { > + idx = vq->vq_desc_head_idx; > + dxp = &vq->vq_descx[idx]; > + dxp->cookie = (void *)cookie[i]; > + dxp->ndescs = 1; > > - start_dp = vq->vq_ring.desc; > - start_dp[idx].addr = > - VIRTIO_MBUF_ADDR(cookie, vq) + > - RTE_PKTMBUF_HEADROOM - hw->vtnet_hdr_size; > - start_dp[idx].len = > - cookie->buf_len - RTE_PKTMBUF_HEADROOM + hw- > >vtnet_hdr_size; > - start_dp[idx].flags = VRING_DESC_F_WRITE; > - idx = start_dp[idx].next; > - vq->vq_desc_head_idx = idx; > - if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END) > - vq->vq_desc_tail_idx = idx; > - vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - needed); > - vq_update_avail_ring(vq, head_idx); > + start_dp[idx].addr = > + VIRTIO_MBUF_ADDR(cookie[i], vq) + > + RTE_PKTMBUF_HEADROOM - hw->vtnet_hdr_size; > + start_dp[idx].len = > + cookie[i]->buf_len - RTE_PKTMBUF_HEADROOM + > + hw->vtnet_hdr_size; > + start_dp[idx].flags = VRING_DESC_F_WRITE; > + vq->vq_desc_head_idx = start_dp[idx].next; > + vq_update_avail_ring(vq, idx); > + if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END) > { > + vq->vq_desc_tail_idx = vq->vq_desc_head_idx; > + break; > + } > + } > + > + vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - num); > > return 0; > } > @@ -892,7 +895,8 @@ virtio_dev_rx_queue_setup_finish(struct > rte_eth_dev *dev, uint16_t queue_idx) > error = > virtqueue_enqueue_recv_refill_packed(vq, > &m, 1); > else > - error = virtqueue_enqueue_recv_refill(vq, > m); > + error = virtqueue_enqueue_recv_refill(vq, > + &m, 1); > if (error) { > rte_pktmbuf_free(m); > break; > @@ -991,7 +995,7 @@ virtio_discard_rxbuf(struct virtqueue *vq, struct > rte_mbuf *m) > if (vtpci_packed_queue(vq->hw)) > error = virtqueue_enqueue_recv_refill_packed(vq, &m, 1); > else > - error = virtqueue_enqueue_recv_refill(vq, m); > + error = virtqueue_enqueue_recv_refill(vq, &m, 1); > > if (unlikely(error)) { > RTE_LOG(ERR, PMD, "cannot requeue discarded mbuf"); > @@ -1211,7 +1215,7 @@ virtio_recv_pkts(void *rx_queue, struct rte_mbuf > **rx_pkts, uint16_t nb_pkts) > dev->data->rx_mbuf_alloc_failed++; > break; > } > - error = virtqueue_enqueue_recv_refill(vq, new_mbuf); > + error = virtqueue_enqueue_recv_refill(vq, &new_mbuf, 1); > if (unlikely(error)) { > rte_pktmbuf_free(new_mbuf); > break; > @@ -1528,19 +1532,18 @@ virtio_recv_mergeable_pkts(void *rx_queue, > struct virtnet_rx *rxvq = rx_queue; > struct virtqueue *vq = rxvq->vq; > struct virtio_hw *hw = vq->hw; > - struct rte_mbuf *rxm, *new_mbuf; > - uint16_t nb_used, num, nb_rx; > + struct rte_mbuf *rxm; > + struct rte_mbuf *prev; > + uint16_t nb_used, num, nb_rx = 0; > uint32_t len[VIRTIO_MBUF_BURST_SZ]; > struct rte_mbuf *rcv_pkts[VIRTIO_MBUF_BURST_SZ]; > - struct rte_mbuf *prev; > int error; > - uint32_t i, nb_enqueued; > - uint32_t seg_num; > - uint16_t extra_idx; > - uint32_t seg_res; > - uint32_t hdr_size; > + uint32_t nb_enqueued = 0; > + uint32_t seg_num = 0; > + uint32_t seg_res = 0; > + uint32_t hdr_size = hw->vtnet_hdr_size; > + int32_t i; > > - nb_rx = 0; > if (unlikely(hw->started == 0)) > return nb_rx; > > @@ -1550,31 +1553,25 @@ virtio_recv_mergeable_pkts(void *rx_queue, > > PMD_RX_LOG(DEBUG, "used:%d", nb_used); > > - i = 0; > - nb_enqueued = 0; > - seg_num = 0; > - extra_idx = 0; > - seg_res = 0; > - hdr_size = hw->vtnet_hdr_size; > - > - while (i < nb_used) { > - struct virtio_net_hdr_mrg_rxbuf *header; > + num = likely(nb_used <= nb_pkts) ? nb_used : nb_pkts; > + if (unlikely(num > VIRTIO_MBUF_BURST_SZ)) > + num = VIRTIO_MBUF_BURST_SZ; > + if (likely(num > DESC_PER_CACHELINE)) > + num = num - ((vq->vq_used_cons_idx + num) % > + DESC_PER_CACHELINE); > > - if (nb_rx == nb_pkts) > - break; > > - num = virtqueue_dequeue_burst_rx(vq, rcv_pkts, len, 1); > - if (num != 1) > - continue; > + num = virtqueue_dequeue_burst_rx(vq, rcv_pkts, len, num); > > - i++; > + for (i = 0; i < num; i++) { > + struct virtio_net_hdr_mrg_rxbuf *header; > > PMD_RX_LOG(DEBUG, "dequeue:%d", num); > - PMD_RX_LOG(DEBUG, "packet len:%d", len[0]); > + PMD_RX_LOG(DEBUG, "packet len:%d", len[i]); > > - rxm = rcv_pkts[0]; > + rxm = rcv_pkts[i]; > > - if (unlikely(len[0] < hdr_size + ETHER_HDR_LEN)) { > + if (unlikely(len[i] < hdr_size + ETHER_HDR_LEN)) { > PMD_RX_LOG(ERR, "Packet drop"); > nb_enqueued++; > virtio_discard_rxbuf(vq, rxm); > @@ -1582,10 +1579,10 @@ virtio_recv_mergeable_pkts(void *rx_queue, > continue; > } > > - header = (struct virtio_net_hdr_mrg_rxbuf *)((char *)rxm- > >buf_addr + > - RTE_PKTMBUF_HEADROOM - hdr_size); > + header = (struct virtio_net_hdr_mrg_rxbuf *) > + ((char *)rxm->buf_addr + > RTE_PKTMBUF_HEADROOM > + - hdr_size); > seg_num = header->num_buffers; > - > if (seg_num == 0) > seg_num = 1; > > @@ -1593,10 +1590,11 @@ virtio_recv_mergeable_pkts(void *rx_queue, > rxm->nb_segs = seg_num; > rxm->ol_flags = 0; > rxm->vlan_tci = 0; > - rxm->pkt_len = (uint32_t)(len[0] - hdr_size); > - rxm->data_len = (uint16_t)(len[0] - hdr_size); > + rxm->pkt_len = (uint32_t)(len[i] - hdr_size); > + rxm->data_len = (uint16_t)(len[i] - hdr_size); > > rxm->port = rxvq->port_id; > + > rx_pkts[nb_rx] = rxm; > prev = rxm; > > @@ -1607,75 +1605,96 @@ virtio_recv_mergeable_pkts(void *rx_queue, > continue; > } > > + if (hw->vlan_strip) > + rte_vlan_strip(rx_pkts[nb_rx]); > + > seg_res = seg_num - 1; > > - while (seg_res != 0) { > - /* > - * Get extra segments for current uncompleted > packet. > - */ > - uint16_t rcv_cnt = > - RTE_MIN(seg_res, RTE_DIM(rcv_pkts)); > - if (likely(VIRTQUEUE_NUSED(vq) >= rcv_cnt)) { > - uint32_t rx_num = > - virtqueue_dequeue_burst_rx(vq, > - rcv_pkts, len, rcv_cnt); > - i += rx_num; > - rcv_cnt = rx_num; > - } else { > - PMD_RX_LOG(ERR, > - "No enough segments for > packet."); > - nb_enqueued++; > - virtio_discard_rxbuf(vq, rxm); > - rxvq->stats.errors++; > - break; > - } > + /* Merge remaining segments */ > + while (seg_res != 0 && i < (num - 1)) { > + i++; > + > + rxm = rcv_pkts[i]; > + rxm->data_off = RTE_PKTMBUF_HEADROOM - > hdr_size; > + rxm->pkt_len = (uint32_t)(len[i]); > + rxm->data_len = (uint16_t)(len[i]); > + > + rx_pkts[nb_rx]->pkt_len += (uint32_t)(len[i]); > + rx_pkts[nb_rx]->data_len += (uint16_t)(len[i]); > + > + if (prev) > + prev->next = rxm; > + > + prev = rxm; > + seg_res -= 1; > + } > + > + if (!seg_res) { > + virtio_rx_stats_updated(rxvq, rx_pkts[nb_rx]); > + nb_rx++; > + } > + } > + > + /* Last packet still need merge segments */ > + while (seg_res != 0) { > + uint16_t rcv_cnt = RTE_MIN((uint16_t)seg_res, > + VIRTIO_MBUF_BURST_SZ); > > - extra_idx = 0; > + prev = rcv_pkts[nb_rx]; > + if (likely(VIRTQUEUE_NUSED(vq) >= rcv_cnt)) { > + num = virtqueue_dequeue_burst_rx(vq, rcv_pkts, > len, > + rcv_cnt); > + uint16_t extra_idx = 0; > > + rcv_cnt = num; > while (extra_idx < rcv_cnt) { > rxm = rcv_pkts[extra_idx]; > - > - rxm->data_off = > RTE_PKTMBUF_HEADROOM - hdr_size; > + rxm->data_off = > + RTE_PKTMBUF_HEADROOM - > hdr_size; > rxm->pkt_len = (uint32_t)(len[extra_idx]); > rxm->data_len = (uint16_t)(len[extra_idx]); > - > - if (prev) > - prev->next = rxm; > - > + prev->next = rxm; > prev = rxm; > - rx_pkts[nb_rx]->pkt_len += rxm->pkt_len; > - extra_idx++; > + rx_pkts[nb_rx]->pkt_len += len[extra_idx]; > + rx_pkts[nb_rx]->data_len += len[extra_idx]; > + extra_idx += 1; > }; > seg_res -= rcv_cnt; > - } > - > - if (hw->vlan_strip) > - rte_vlan_strip(rx_pkts[nb_rx]); > - > - VIRTIO_DUMP_PACKET(rx_pkts[nb_rx], > - rx_pkts[nb_rx]->data_len); > > - virtio_update_packet_stats(&rxvq->stats, rx_pkts[nb_rx]); > - nb_rx++; > + if (!seg_res) { > + virtio_rx_stats_updated(rxvq, > rx_pkts[nb_rx]); > + nb_rx++; > + } > + } else { > + PMD_RX_LOG(ERR, > + "No enough segments for packet."); > + virtio_discard_rxbuf(vq, prev); > + rxvq->stats.errors++; > + break; > + } > } > > rxvq->stats.packets += nb_rx; > > /* Allocate new mbuf for the used descriptor */ > - while (likely(!virtqueue_full(vq))) { > - new_mbuf = rte_mbuf_raw_alloc(rxvq->mpool); > - if (unlikely(new_mbuf == NULL)) { > - struct rte_eth_dev *dev > - = &rte_eth_devices[rxvq->port_id]; > - dev->data->rx_mbuf_alloc_failed++; > - break; > - } > - error = virtqueue_enqueue_recv_refill(vq, new_mbuf); > - if (unlikely(error)) { > - rte_pktmbuf_free(new_mbuf); > - break; > + if (likely(!virtqueue_full(vq))) { > + /* free_cnt may include mrg descs */ > + uint16_t free_cnt = vq->vq_free_cnt; > + struct rte_mbuf *new_pkts[free_cnt]; > + > + if (!rte_pktmbuf_alloc_bulk(rxvq->mpool, new_pkts, > free_cnt)) { > + error = virtqueue_enqueue_recv_refill(vq, new_pkts, > + free_cnt); > + if (unlikely(error)) { > + for (i = 0; i < free_cnt; i++) > + rte_pktmbuf_free(new_pkts[i]); Missing error handling here? the execution keeps going on without the mbufs? /Gavin > + } > + nb_enqueued += free_cnt; > + } else { > + struct rte_eth_dev *dev = > + &rte_eth_devices[rxvq->port_id]; > + dev->data->rx_mbuf_alloc_failed += free_cnt; > } > - nb_enqueued++; > } > > if (likely(nb_enqueued)) { > -- > 2.17.2
On 12/21/18 7:27 AM, Gavin Hu (Arm Technology China) wrote: > > >> -----Original Message----- >> From: dev <dev-bounces@dpdk.org> On Behalf Of Maxime Coquelin >> Sent: Friday, December 21, 2018 1:27 AM >> To: dev@dpdk.org; jfreimann@redhat.com; tiwei.bie@intel.com; >> zhihong.wang@intel.com >> Cc: Maxime Coquelin <maxime.coquelin@redhat.com> >> Subject: [dpdk-dev] [PATCH v3 3/3] net/virtio: improve batching in >> mergeable path >> >> This patch improves both descriptors dequeue and refill, >> by using the same batching strategy as done in in-order path. >> >> Signed-off-by: Maxime Coquelin <maxime.coquelin@redhat.com> >> Tested-by: Jens Freimann <jfreimann@redhat.com> >> Reviewed-by: Jens Freimann <jfreimann@redhat.com> >> --- >> drivers/net/virtio/virtio_rxtx.c | 239 +++++++++++++++++-------------- >> 1 file changed, 129 insertions(+), 110 deletions(-) >> >> diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c >> index 58376ced3..1cfa2f0d6 100644 >> --- a/drivers/net/virtio/virtio_rxtx.c >> +++ b/drivers/net/virtio/virtio_rxtx.c >> @@ -353,41 +353,44 @@ virtqueue_enqueue_refill_inorder(struct >> virtqueue *vq, >> } >> >> static inline int >> -virtqueue_enqueue_recv_refill(struct virtqueue *vq, struct rte_mbuf >> *cookie) >> +virtqueue_enqueue_recv_refill(struct virtqueue *vq, struct rte_mbuf >> **cookie, >> + uint16_t num) >> { >> struct vq_desc_extra *dxp; >> struct virtio_hw *hw = vq->hw; >> - struct vring_desc *start_dp; >> - uint16_t needed = 1; >> - uint16_t head_idx, idx; >> + struct vring_desc *start_dp = vq->vq_ring.desc; >> + uint16_t idx, i; >> >> if (unlikely(vq->vq_free_cnt == 0)) >> return -ENOSPC; >> - if (unlikely(vq->vq_free_cnt < needed)) >> + if (unlikely(vq->vq_free_cnt < num)) >> return -EMSGSIZE; >> >> - head_idx = vq->vq_desc_head_idx; >> - if (unlikely(head_idx >= vq->vq_nentries)) >> + if (unlikely(vq->vq_desc_head_idx >= vq->vq_nentries)) >> return -EFAULT; >> >> - idx = head_idx; >> - dxp = &vq->vq_descx[idx]; >> - dxp->cookie = (void *)cookie; >> - dxp->ndescs = needed; >> + for (i = 0; i < num; i++) { >> + idx = vq->vq_desc_head_idx; >> + dxp = &vq->vq_descx[idx]; >> + dxp->cookie = (void *)cookie[i]; >> + dxp->ndescs = 1; >> >> - start_dp = vq->vq_ring.desc; >> - start_dp[idx].addr = >> - VIRTIO_MBUF_ADDR(cookie, vq) + >> - RTE_PKTMBUF_HEADROOM - hw->vtnet_hdr_size; >> - start_dp[idx].len = >> - cookie->buf_len - RTE_PKTMBUF_HEADROOM + hw- >>> vtnet_hdr_size; >> - start_dp[idx].flags = VRING_DESC_F_WRITE; >> - idx = start_dp[idx].next; >> - vq->vq_desc_head_idx = idx; >> - if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END) >> - vq->vq_desc_tail_idx = idx; >> - vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - needed); >> - vq_update_avail_ring(vq, head_idx); >> + start_dp[idx].addr = >> + VIRTIO_MBUF_ADDR(cookie[i], vq) + >> + RTE_PKTMBUF_HEADROOM - hw->vtnet_hdr_size; >> + start_dp[idx].len = >> + cookie[i]->buf_len - RTE_PKTMBUF_HEADROOM + >> + hw->vtnet_hdr_size; >> + start_dp[idx].flags = VRING_DESC_F_WRITE; >> + vq->vq_desc_head_idx = start_dp[idx].next; >> + vq_update_avail_ring(vq, idx); >> + if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END) >> { >> + vq->vq_desc_tail_idx = vq->vq_desc_head_idx; >> + break; >> + } >> + } >> + >> + vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - num); >> >> return 0; >> } >> @@ -892,7 +895,8 @@ virtio_dev_rx_queue_setup_finish(struct >> rte_eth_dev *dev, uint16_t queue_idx) >> error = >> virtqueue_enqueue_recv_refill_packed(vq, >> &m, 1); >> else >> - error = virtqueue_enqueue_recv_refill(vq, >> m); >> + error = virtqueue_enqueue_recv_refill(vq, >> + &m, 1); >> if (error) { >> rte_pktmbuf_free(m); >> break; >> @@ -991,7 +995,7 @@ virtio_discard_rxbuf(struct virtqueue *vq, struct >> rte_mbuf *m) >> if (vtpci_packed_queue(vq->hw)) >> error = virtqueue_enqueue_recv_refill_packed(vq, &m, 1); >> else >> - error = virtqueue_enqueue_recv_refill(vq, m); >> + error = virtqueue_enqueue_recv_refill(vq, &m, 1); >> >> if (unlikely(error)) { >> RTE_LOG(ERR, PMD, "cannot requeue discarded mbuf"); >> @@ -1211,7 +1215,7 @@ virtio_recv_pkts(void *rx_queue, struct rte_mbuf >> **rx_pkts, uint16_t nb_pkts) >> dev->data->rx_mbuf_alloc_failed++; >> break; >> } >> - error = virtqueue_enqueue_recv_refill(vq, new_mbuf); >> + error = virtqueue_enqueue_recv_refill(vq, &new_mbuf, 1); >> if (unlikely(error)) { >> rte_pktmbuf_free(new_mbuf); >> break; >> @@ -1528,19 +1532,18 @@ virtio_recv_mergeable_pkts(void *rx_queue, >> struct virtnet_rx *rxvq = rx_queue; >> struct virtqueue *vq = rxvq->vq; >> struct virtio_hw *hw = vq->hw; >> - struct rte_mbuf *rxm, *new_mbuf; >> - uint16_t nb_used, num, nb_rx; >> + struct rte_mbuf *rxm; >> + struct rte_mbuf *prev; >> + uint16_t nb_used, num, nb_rx = 0; >> uint32_t len[VIRTIO_MBUF_BURST_SZ]; >> struct rte_mbuf *rcv_pkts[VIRTIO_MBUF_BURST_SZ]; >> - struct rte_mbuf *prev; >> int error; >> - uint32_t i, nb_enqueued; >> - uint32_t seg_num; >> - uint16_t extra_idx; >> - uint32_t seg_res; >> - uint32_t hdr_size; >> + uint32_t nb_enqueued = 0; >> + uint32_t seg_num = 0; >> + uint32_t seg_res = 0; >> + uint32_t hdr_size = hw->vtnet_hdr_size; >> + int32_t i; >> >> - nb_rx = 0; >> if (unlikely(hw->started == 0)) >> return nb_rx; >> >> @@ -1550,31 +1553,25 @@ virtio_recv_mergeable_pkts(void *rx_queue, >> >> PMD_RX_LOG(DEBUG, "used:%d", nb_used); >> >> - i = 0; >> - nb_enqueued = 0; >> - seg_num = 0; >> - extra_idx = 0; >> - seg_res = 0; >> - hdr_size = hw->vtnet_hdr_size; >> - >> - while (i < nb_used) { >> - struct virtio_net_hdr_mrg_rxbuf *header; >> + num = likely(nb_used <= nb_pkts) ? nb_used : nb_pkts; >> + if (unlikely(num > VIRTIO_MBUF_BURST_SZ)) >> + num = VIRTIO_MBUF_BURST_SZ; >> + if (likely(num > DESC_PER_CACHELINE)) >> + num = num - ((vq->vq_used_cons_idx + num) % >> + DESC_PER_CACHELINE); >> >> - if (nb_rx == nb_pkts) >> - break; >> >> - num = virtqueue_dequeue_burst_rx(vq, rcv_pkts, len, 1); >> - if (num != 1) >> - continue; >> + num = virtqueue_dequeue_burst_rx(vq, rcv_pkts, len, num); >> >> - i++; >> + for (i = 0; i < num; i++) { >> + struct virtio_net_hdr_mrg_rxbuf *header; >> >> PMD_RX_LOG(DEBUG, "dequeue:%d", num); >> - PMD_RX_LOG(DEBUG, "packet len:%d", len[0]); >> + PMD_RX_LOG(DEBUG, "packet len:%d", len[i]); >> >> - rxm = rcv_pkts[0]; >> + rxm = rcv_pkts[i]; >> >> - if (unlikely(len[0] < hdr_size + ETHER_HDR_LEN)) { >> + if (unlikely(len[i] < hdr_size + ETHER_HDR_LEN)) { >> PMD_RX_LOG(ERR, "Packet drop"); >> nb_enqueued++; >> virtio_discard_rxbuf(vq, rxm); >> @@ -1582,10 +1579,10 @@ virtio_recv_mergeable_pkts(void *rx_queue, >> continue; >> } >> >> - header = (struct virtio_net_hdr_mrg_rxbuf *)((char *)rxm- >>> buf_addr + >> - RTE_PKTMBUF_HEADROOM - hdr_size); >> + header = (struct virtio_net_hdr_mrg_rxbuf *) >> + ((char *)rxm->buf_addr + >> RTE_PKTMBUF_HEADROOM >> + - hdr_size); >> seg_num = header->num_buffers; >> - >> if (seg_num == 0) >> seg_num = 1; >> >> @@ -1593,10 +1590,11 @@ virtio_recv_mergeable_pkts(void *rx_queue, >> rxm->nb_segs = seg_num; >> rxm->ol_flags = 0; >> rxm->vlan_tci = 0; >> - rxm->pkt_len = (uint32_t)(len[0] - hdr_size); >> - rxm->data_len = (uint16_t)(len[0] - hdr_size); >> + rxm->pkt_len = (uint32_t)(len[i] - hdr_size); >> + rxm->data_len = (uint16_t)(len[i] - hdr_size); >> >> rxm->port = rxvq->port_id; >> + >> rx_pkts[nb_rx] = rxm; >> prev = rxm; >> >> @@ -1607,75 +1605,96 @@ virtio_recv_mergeable_pkts(void *rx_queue, >> continue; >> } >> >> + if (hw->vlan_strip) >> + rte_vlan_strip(rx_pkts[nb_rx]); >> + >> seg_res = seg_num - 1; >> >> - while (seg_res != 0) { >> - /* >> - * Get extra segments for current uncompleted >> packet. >> - */ >> - uint16_t rcv_cnt = >> - RTE_MIN(seg_res, RTE_DIM(rcv_pkts)); >> - if (likely(VIRTQUEUE_NUSED(vq) >= rcv_cnt)) { >> - uint32_t rx_num = >> - virtqueue_dequeue_burst_rx(vq, >> - rcv_pkts, len, rcv_cnt); >> - i += rx_num; >> - rcv_cnt = rx_num; >> - } else { >> - PMD_RX_LOG(ERR, >> - "No enough segments for >> packet."); >> - nb_enqueued++; >> - virtio_discard_rxbuf(vq, rxm); >> - rxvq->stats.errors++; >> - break; >> - } >> + /* Merge remaining segments */ >> + while (seg_res != 0 && i < (num - 1)) { >> + i++; >> + >> + rxm = rcv_pkts[i]; >> + rxm->data_off = RTE_PKTMBUF_HEADROOM - >> hdr_size; >> + rxm->pkt_len = (uint32_t)(len[i]); >> + rxm->data_len = (uint16_t)(len[i]); >> + >> + rx_pkts[nb_rx]->pkt_len += (uint32_t)(len[i]); >> + rx_pkts[nb_rx]->data_len += (uint16_t)(len[i]); >> + >> + if (prev) >> + prev->next = rxm; >> + >> + prev = rxm; >> + seg_res -= 1; >> + } >> + >> + if (!seg_res) { >> + virtio_rx_stats_updated(rxvq, rx_pkts[nb_rx]); >> + nb_rx++; >> + } >> + } >> + >> + /* Last packet still need merge segments */ >> + while (seg_res != 0) { >> + uint16_t rcv_cnt = RTE_MIN((uint16_t)seg_res, >> + VIRTIO_MBUF_BURST_SZ); >> >> - extra_idx = 0; >> + prev = rcv_pkts[nb_rx]; >> + if (likely(VIRTQUEUE_NUSED(vq) >= rcv_cnt)) { >> + num = virtqueue_dequeue_burst_rx(vq, rcv_pkts, >> len, >> + rcv_cnt); >> + uint16_t extra_idx = 0; >> >> + rcv_cnt = num; >> while (extra_idx < rcv_cnt) { >> rxm = rcv_pkts[extra_idx]; >> - >> - rxm->data_off = >> RTE_PKTMBUF_HEADROOM - hdr_size; >> + rxm->data_off = >> + RTE_PKTMBUF_HEADROOM - >> hdr_size; >> rxm->pkt_len = (uint32_t)(len[extra_idx]); >> rxm->data_len = (uint16_t)(len[extra_idx]); >> - >> - if (prev) >> - prev->next = rxm; >> - >> + prev->next = rxm; >> prev = rxm; >> - rx_pkts[nb_rx]->pkt_len += rxm->pkt_len; >> - extra_idx++; >> + rx_pkts[nb_rx]->pkt_len += len[extra_idx]; >> + rx_pkts[nb_rx]->data_len += len[extra_idx]; >> + extra_idx += 1; >> }; >> seg_res -= rcv_cnt; >> - } >> - >> - if (hw->vlan_strip) >> - rte_vlan_strip(rx_pkts[nb_rx]); >> - >> - VIRTIO_DUMP_PACKET(rx_pkts[nb_rx], >> - rx_pkts[nb_rx]->data_len); >> >> - virtio_update_packet_stats(&rxvq->stats, rx_pkts[nb_rx]); >> - nb_rx++; >> + if (!seg_res) { >> + virtio_rx_stats_updated(rxvq, >> rx_pkts[nb_rx]); >> + nb_rx++; >> + } >> + } else { >> + PMD_RX_LOG(ERR, >> + "No enough segments for packet."); >> + virtio_discard_rxbuf(vq, prev); >> + rxvq->stats.errors++; >> + break; >> + } >> } >> >> rxvq->stats.packets += nb_rx; >> >> /* Allocate new mbuf for the used descriptor */ >> - while (likely(!virtqueue_full(vq))) { >> - new_mbuf = rte_mbuf_raw_alloc(rxvq->mpool); >> - if (unlikely(new_mbuf == NULL)) { >> - struct rte_eth_dev *dev >> - = &rte_eth_devices[rxvq->port_id]; >> - dev->data->rx_mbuf_alloc_failed++; >> - break; >> - } >> - error = virtqueue_enqueue_recv_refill(vq, new_mbuf); >> - if (unlikely(error)) { >> - rte_pktmbuf_free(new_mbuf); >> - break; >> + if (likely(!virtqueue_full(vq))) { >> + /* free_cnt may include mrg descs */ >> + uint16_t free_cnt = vq->vq_free_cnt; >> + struct rte_mbuf *new_pkts[free_cnt]; >> + >> + if (!rte_pktmbuf_alloc_bulk(rxvq->mpool, new_pkts, >> free_cnt)) { >> + error = virtqueue_enqueue_recv_refill(vq, new_pkts, >> + free_cnt); >> + if (unlikely(error)) { >> + for (i = 0; i < free_cnt; i++) >> + rte_pktmbuf_free(new_pkts[i]); > Missing error handling here? the execution keeps going on without the mbufs? That's fine because if we get an error here, no descriptors were inserted in the avail queue. nb_enqueue is just used to know whether we should notify host descs are available. I can set free_cnt to 0 in the error path though, so that host isn't notified for nothing but that's not a big deal, really. Thanks! Maxime > /Gavin > >> + } >> + nb_enqueued += free_cnt; >> + } else { >> + struct rte_eth_dev *dev = >> + &rte_eth_devices[rxvq->port_id]; >> + dev->data->rx_mbuf_alloc_failed += free_cnt; >> } >> - nb_enqueued++; >> } >> >> if (likely(nb_enqueued)) { >> -- >> 2.17.2 >
> -----Original Message-----
> From: Maxime Coquelin <maxime.coquelin@redhat.com>
> Sent: Friday, December 21, 2018 4:36 PM
> To: Gavin Hu (Arm Technology China) <Gavin.Hu@arm.com>;
> dev@dpdk.org; jfreimann@redhat.com; tiwei.bie@intel.com;
> zhihong.wang@intel.com
> Cc: nd <nd@arm.com>
> Subject: Re: [dpdk-dev] [PATCH v3 3/3] net/virtio: improve batching in
> mergeable path
>
>
>
> On 12/21/18 7:27 AM, Gavin Hu (Arm Technology China) wrote:
> >
> >
> >> -----Original Message-----
> >> From: dev <dev-bounces@dpdk.org> On Behalf Of Maxime Coquelin
> >> Sent: Friday, December 21, 2018 1:27 AM
> >> To: dev@dpdk.org; jfreimann@redhat.com; tiwei.bie@intel.com;
> >> zhihong.wang@intel.com
> >> Cc: Maxime Coquelin <maxime.coquelin@redhat.com>
> >> Subject: [dpdk-dev] [PATCH v3 3/3] net/virtio: improve batching in
> >> mergeable path
> >>
> >> This patch improves both descriptors dequeue and refill,
> >> by using the same batching strategy as done in in-order path.
> >>
> >> Signed-off-by: Maxime Coquelin <maxime.coquelin@redhat.com>
> >> Tested-by: Jens Freimann <jfreimann@redhat.com>
> >> Reviewed-by: Jens Freimann <jfreimann@redhat.com>
> >> ---
> >> drivers/net/virtio/virtio_rxtx.c | 239 +++++++++++++++++--------------
> >> 1 file changed, 129 insertions(+), 110 deletions(-)
> >>
> >> diff --git a/drivers/net/virtio/virtio_rxtx.c
> b/drivers/net/virtio/virtio_rxtx.c
> >> index 58376ced3..1cfa2f0d6 100644
> >> --- a/drivers/net/virtio/virtio_rxtx.c
> >> +++ b/drivers/net/virtio/virtio_rxtx.c
> >> @@ -353,41 +353,44 @@ virtqueue_enqueue_refill_inorder(struct
> >> virtqueue *vq,
> >> }
> >>
> >> static inline int
> >> -virtqueue_enqueue_recv_refill(struct virtqueue *vq, struct rte_mbuf
> >> *cookie)
> >> +virtqueue_enqueue_recv_refill(struct virtqueue *vq, struct rte_mbuf
> >> **cookie,
> >> + uint16_t num)
> >> {
> >> struct vq_desc_extra *dxp;
> >> struct virtio_hw *hw = vq->hw;
> >> - struct vring_desc *start_dp;
> >> - uint16_t needed = 1;
> >> - uint16_t head_idx, idx;
> >> + struct vring_desc *start_dp = vq->vq_ring.desc;
> >> + uint16_t idx, i;
> >>
> >> if (unlikely(vq->vq_free_cnt == 0))
> >> return -ENOSPC;
> >> - if (unlikely(vq->vq_free_cnt < needed))
> >> + if (unlikely(vq->vq_free_cnt < num))
> >> return -EMSGSIZE;
> >>
> >> - head_idx = vq->vq_desc_head_idx;
> >> - if (unlikely(head_idx >= vq->vq_nentries))
> >> + if (unlikely(vq->vq_desc_head_idx >= vq->vq_nentries))
> >> return -EFAULT;
> >>
> >> - idx = head_idx;
> >> - dxp = &vq->vq_descx[idx];
> >> - dxp->cookie = (void *)cookie;
> >> - dxp->ndescs = needed;
> >> + for (i = 0; i < num; i++) {
> >> + idx = vq->vq_desc_head_idx;
> >> + dxp = &vq->vq_descx[idx];
> >> + dxp->cookie = (void *)cookie[i];
> >> + dxp->ndescs = 1;
> >>
> >> - start_dp = vq->vq_ring.desc;
> >> - start_dp[idx].addr =
> >> - VIRTIO_MBUF_ADDR(cookie, vq) +
> >> - RTE_PKTMBUF_HEADROOM - hw->vtnet_hdr_size;
> >> - start_dp[idx].len =
> >> - cookie->buf_len - RTE_PKTMBUF_HEADROOM + hw-
> >>> vtnet_hdr_size;
> >> - start_dp[idx].flags = VRING_DESC_F_WRITE;
> >> - idx = start_dp[idx].next;
> >> - vq->vq_desc_head_idx = idx;
> >> - if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END)
> >> - vq->vq_desc_tail_idx = idx;
> >> - vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - needed);
> >> - vq_update_avail_ring(vq, head_idx);
> >> + start_dp[idx].addr =
> >> + VIRTIO_MBUF_ADDR(cookie[i], vq) +
> >> + RTE_PKTMBUF_HEADROOM - hw->vtnet_hdr_size;
> >> + start_dp[idx].len =
> >> + cookie[i]->buf_len - RTE_PKTMBUF_HEADROOM +
> >> + hw->vtnet_hdr_size;
> >> + start_dp[idx].flags = VRING_DESC_F_WRITE;
> >> + vq->vq_desc_head_idx = start_dp[idx].next;
> >> + vq_update_avail_ring(vq, idx);
> >> + if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END)
> >> {
> >> + vq->vq_desc_tail_idx = vq->vq_desc_head_idx;
> >> + break;
> >> + }
> >> + }
> >> +
> >> + vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - num);
> >>
> >> return 0;
> >> }
> >> @@ -892,7 +895,8 @@ virtio_dev_rx_queue_setup_finish(struct
> >> rte_eth_dev *dev, uint16_t queue_idx)
> >> error =
> >> virtqueue_enqueue_recv_refill_packed(vq,
> >> &m, 1);
> >> else
> >> - error = virtqueue_enqueue_recv_refill(vq,
> >> m);
> >> + error = virtqueue_enqueue_recv_refill(vq,
> >> + &m, 1);
> >> if (error) {
> >> rte_pktmbuf_free(m);
> >> break;
> >> @@ -991,7 +995,7 @@ virtio_discard_rxbuf(struct virtqueue *vq, struct
> >> rte_mbuf *m)
> >> if (vtpci_packed_queue(vq->hw))
> >> error = virtqueue_enqueue_recv_refill_packed(vq, &m, 1);
> >> else
> >> - error = virtqueue_enqueue_recv_refill(vq, m);
> >> + error = virtqueue_enqueue_recv_refill(vq, &m, 1);
> >>
> >> if (unlikely(error)) {
> >> RTE_LOG(ERR, PMD, "cannot requeue discarded mbuf");
> >> @@ -1211,7 +1215,7 @@ virtio_recv_pkts(void *rx_queue, struct
> rte_mbuf
> >> **rx_pkts, uint16_t nb_pkts)
> >> dev->data->rx_mbuf_alloc_failed++;
> >> break;
> >> }
> >> - error = virtqueue_enqueue_recv_refill(vq, new_mbuf);
> >> + error = virtqueue_enqueue_recv_refill(vq, &new_mbuf, 1);
> >> if (unlikely(error)) {
> >> rte_pktmbuf_free(new_mbuf);
> >> break;
> >> @@ -1528,19 +1532,18 @@ virtio_recv_mergeable_pkts(void
> *rx_queue,
> >> struct virtnet_rx *rxvq = rx_queue;
> >> struct virtqueue *vq = rxvq->vq;
> >> struct virtio_hw *hw = vq->hw;
> >> - struct rte_mbuf *rxm, *new_mbuf;
> >> - uint16_t nb_used, num, nb_rx;
> >> + struct rte_mbuf *rxm;
> >> + struct rte_mbuf *prev;
> >> + uint16_t nb_used, num, nb_rx = 0;
> >> uint32_t len[VIRTIO_MBUF_BURST_SZ];
> >> struct rte_mbuf *rcv_pkts[VIRTIO_MBUF_BURST_SZ];
> >> - struct rte_mbuf *prev;
> >> int error;
> >> - uint32_t i, nb_enqueued;
> >> - uint32_t seg_num;
> >> - uint16_t extra_idx;
> >> - uint32_t seg_res;
> >> - uint32_t hdr_size;
> >> + uint32_t nb_enqueued = 0;
> >> + uint32_t seg_num = 0;
> >> + uint32_t seg_res = 0;
> >> + uint32_t hdr_size = hw->vtnet_hdr_size;
> >> + int32_t i;
> >>
> >> - nb_rx = 0;
> >> if (unlikely(hw->started == 0))
> >> return nb_rx;
> >>
> >> @@ -1550,31 +1553,25 @@ virtio_recv_mergeable_pkts(void
> *rx_queue,
> >>
> >> PMD_RX_LOG(DEBUG, "used:%d", nb_used);
> >>
> >> - i = 0;
> >> - nb_enqueued = 0;
> >> - seg_num = 0;
> >> - extra_idx = 0;
> >> - seg_res = 0;
> >> - hdr_size = hw->vtnet_hdr_size;
> >> -
> >> - while (i < nb_used) {
> >> - struct virtio_net_hdr_mrg_rxbuf *header;
> >> + num = likely(nb_used <= nb_pkts) ? nb_used : nb_pkts;
> >> + if (unlikely(num > VIRTIO_MBUF_BURST_SZ))
> >> + num = VIRTIO_MBUF_BURST_SZ;
> >> + if (likely(num > DESC_PER_CACHELINE))
> >> + num = num - ((vq->vq_used_cons_idx + num) %
> >> + DESC_PER_CACHELINE);
> >>
> >> - if (nb_rx == nb_pkts)
> >> - break;
> >>
> >> - num = virtqueue_dequeue_burst_rx(vq, rcv_pkts, len, 1);
> >> - if (num != 1)
> >> - continue;
> >> + num = virtqueue_dequeue_burst_rx(vq, rcv_pkts, len, num);
> >>
> >> - i++;
> >> + for (i = 0; i < num; i++) {
> >> + struct virtio_net_hdr_mrg_rxbuf *header;
> >>
> >> PMD_RX_LOG(DEBUG, "dequeue:%d", num);
> >> - PMD_RX_LOG(DEBUG, "packet len:%d", len[0]);
> >> + PMD_RX_LOG(DEBUG, "packet len:%d", len[i]);
> >>
> >> - rxm = rcv_pkts[0];
> >> + rxm = rcv_pkts[i];
> >>
> >> - if (unlikely(len[0] < hdr_size + ETHER_HDR_LEN)) {
> >> + if (unlikely(len[i] < hdr_size + ETHER_HDR_LEN)) {
> >> PMD_RX_LOG(ERR, "Packet drop");
> >> nb_enqueued++;
> >> virtio_discard_rxbuf(vq, rxm);
> >> @@ -1582,10 +1579,10 @@ virtio_recv_mergeable_pkts(void
> *rx_queue,
> >> continue;
> >> }
> >>
> >> - header = (struct virtio_net_hdr_mrg_rxbuf *)((char *)rxm-
> >>> buf_addr +
> >> - RTE_PKTMBUF_HEADROOM - hdr_size);
> >> + header = (struct virtio_net_hdr_mrg_rxbuf *)
> >> + ((char *)rxm->buf_addr +
> >> RTE_PKTMBUF_HEADROOM
> >> + - hdr_size);
> >> seg_num = header->num_buffers;
> >> -
> >> if (seg_num == 0)
> >> seg_num = 1;
> >>
> >> @@ -1593,10 +1590,11 @@ virtio_recv_mergeable_pkts(void
> *rx_queue,
> >> rxm->nb_segs = seg_num;
> >> rxm->ol_flags = 0;
> >> rxm->vlan_tci = 0;
> >> - rxm->pkt_len = (uint32_t)(len[0] - hdr_size);
> >> - rxm->data_len = (uint16_t)(len[0] - hdr_size);
> >> + rxm->pkt_len = (uint32_t)(len[i] - hdr_size);
> >> + rxm->data_len = (uint16_t)(len[i] - hdr_size);
> >>
> >> rxm->port = rxvq->port_id;
> >> +
> >> rx_pkts[nb_rx] = rxm;
> >> prev = rxm;
> >>
> >> @@ -1607,75 +1605,96 @@ virtio_recv_mergeable_pkts(void
> *rx_queue,
> >> continue;
> >> }
> >>
> >> + if (hw->vlan_strip)
> >> + rte_vlan_strip(rx_pkts[nb_rx]);
> >> +
> >> seg_res = seg_num - 1;
> >>
> >> - while (seg_res != 0) {
> >> - /*
> >> - * Get extra segments for current uncompleted
> >> packet.
> >> - */
> >> - uint16_t rcv_cnt =
> >> - RTE_MIN(seg_res, RTE_DIM(rcv_pkts));
> >> - if (likely(VIRTQUEUE_NUSED(vq) >= rcv_cnt)) {
> >> - uint32_t rx_num =
> >> - virtqueue_dequeue_burst_rx(vq,
> >> - rcv_pkts, len, rcv_cnt);
> >> - i += rx_num;
> >> - rcv_cnt = rx_num;
> >> - } else {
> >> - PMD_RX_LOG(ERR,
> >> - "No enough segments for
> >> packet.");
> >> - nb_enqueued++;
> >> - virtio_discard_rxbuf(vq, rxm);
> >> - rxvq->stats.errors++;
> >> - break;
> >> - }
> >> + /* Merge remaining segments */
> >> + while (seg_res != 0 && i < (num - 1)) {
> >> + i++;
> >> +
> >> + rxm = rcv_pkts[i];
> >> + rxm->data_off = RTE_PKTMBUF_HEADROOM -
> >> hdr_size;
> >> + rxm->pkt_len = (uint32_t)(len[i]);
> >> + rxm->data_len = (uint16_t)(len[i]);
> >> +
> >> + rx_pkts[nb_rx]->pkt_len += (uint32_t)(len[i]);
> >> + rx_pkts[nb_rx]->data_len += (uint16_t)(len[i]);
> >> +
> >> + if (prev)
> >> + prev->next = rxm;
> >> +
> >> + prev = rxm;
> >> + seg_res -= 1;
> >> + }
> >> +
> >> + if (!seg_res) {
> >> + virtio_rx_stats_updated(rxvq, rx_pkts[nb_rx]);
> >> + nb_rx++;
> >> + }
> >> + }
> >> +
> >> + /* Last packet still need merge segments */
> >> + while (seg_res != 0) {
> >> + uint16_t rcv_cnt = RTE_MIN((uint16_t)seg_res,
> >> + VIRTIO_MBUF_BURST_SZ);
> >>
> >> - extra_idx = 0;
> >> + prev = rcv_pkts[nb_rx];
> >> + if (likely(VIRTQUEUE_NUSED(vq) >= rcv_cnt)) {
> >> + num = virtqueue_dequeue_burst_rx(vq, rcv_pkts,
> >> len,
> >> + rcv_cnt);
> >> + uint16_t extra_idx = 0;
> >>
> >> + rcv_cnt = num;
> >> while (extra_idx < rcv_cnt) {
> >> rxm = rcv_pkts[extra_idx];
> >> -
> >> - rxm->data_off =
> >> RTE_PKTMBUF_HEADROOM - hdr_size;
> >> + rxm->data_off =
> >> + RTE_PKTMBUF_HEADROOM -
> >> hdr_size;
> >> rxm->pkt_len = (uint32_t)(len[extra_idx]);
> >> rxm->data_len = (uint16_t)(len[extra_idx]);
> >> -
> >> - if (prev)
> >> - prev->next = rxm;
> >> -
> >> + prev->next = rxm;
> >> prev = rxm;
> >> - rx_pkts[nb_rx]->pkt_len += rxm->pkt_len;
> >> - extra_idx++;
> >> + rx_pkts[nb_rx]->pkt_len += len[extra_idx];
> >> + rx_pkts[nb_rx]->data_len += len[extra_idx];
> >> + extra_idx += 1;
> >> };
> >> seg_res -= rcv_cnt;
> >> - }
> >> -
> >> - if (hw->vlan_strip)
> >> - rte_vlan_strip(rx_pkts[nb_rx]);
> >> -
> >> - VIRTIO_DUMP_PACKET(rx_pkts[nb_rx],
> >> - rx_pkts[nb_rx]->data_len);
> >>
> >> - virtio_update_packet_stats(&rxvq->stats, rx_pkts[nb_rx]);
> >> - nb_rx++;
> >> + if (!seg_res) {
> >> + virtio_rx_stats_updated(rxvq,
> >> rx_pkts[nb_rx]);
> >> + nb_rx++;
> >> + }
> >> + } else {
> >> + PMD_RX_LOG(ERR,
> >> + "No enough segments for packet.");
> >> + virtio_discard_rxbuf(vq, prev);
> >> + rxvq->stats.errors++;
> >> + break;
> >> + }
> >> }
> >>
> >> rxvq->stats.packets += nb_rx;
> >>
> >> /* Allocate new mbuf for the used descriptor */
> >> - while (likely(!virtqueue_full(vq))) {
> >> - new_mbuf = rte_mbuf_raw_alloc(rxvq->mpool);
> >> - if (unlikely(new_mbuf == NULL)) {
> >> - struct rte_eth_dev *dev
> >> - = &rte_eth_devices[rxvq->port_id];
> >> - dev->data->rx_mbuf_alloc_failed++;
> >> - break;
> >> - }
> >> - error = virtqueue_enqueue_recv_refill(vq, new_mbuf);
> >> - if (unlikely(error)) {
> >> - rte_pktmbuf_free(new_mbuf);
> >> - break;
> >> + if (likely(!virtqueue_full(vq))) {
> >> + /* free_cnt may include mrg descs */
> >> + uint16_t free_cnt = vq->vq_free_cnt;
> >> + struct rte_mbuf *new_pkts[free_cnt];
> >> +
> >> + if (!rte_pktmbuf_alloc_bulk(rxvq->mpool, new_pkts,
> >> free_cnt)) {
> >> + error = virtqueue_enqueue_recv_refill(vq, new_pkts,
> >> + free_cnt);
> >> + if (unlikely(error)) {
> >> + for (i = 0; i < free_cnt; i++)
> >> + rte_pktmbuf_free(new_pkts[i]);
> > Missing error handling here? the execution keeps going on without the
> mbufs?
>
> That's fine because if we get an error here, no descriptors were
> inserted in the avail queue.
>
> nb_enqueue is just used to know whether we should notify host descs are
> available. I can set free_cnt to 0 in the error path though, so that
> host isn't notified for nothing but that's not a big deal, really.
>
> Thanks!
> Maxime
>
> > /Gavin
> >
> >> + }
> >> + nb_enqueued += free_cnt;
> >> + } else {
> >> + struct rte_eth_dev *dev =
> >> + &rte_eth_devices[rxvq->port_id];
> >> + dev->data->rx_mbuf_alloc_failed += free_cnt;
> >> }
> >> - nb_enqueued++;
> >> }
> >>
> >> if (likely(nb_enqueued)) {
> >> --
> >> 2.17.2
> >
Thanks for explanation, then
Reviewed-by: Gavin Hu <gavin.hu@arm.com>
On 12/20/18 6:27 PM, Maxime Coquelin wrote:
> First version of this series did merge out-of-order mergeable
> and non-mergeable receive paths, but Intel STV team highlighted
> some performance regression when using multiqueue with two cores
> enqueueing descs on host, while a single core dequeues the
> two queues.
>
> I didn't manage to close the performance gap, so I decided to
> give-up this refactoring. But while trying to optimize, I reworked
> the meargeable function so that it looks like the in-order one.
> I.e. descriptors are now dequeued in batches, so are descriptors
> refilled. Doing that, I measure a perfromance gain of 6% when doing
> rxonly microbenchmark with two cores on host, one in guest.
>
>
> Changes since v2:
> - Rebase after Jens' packed ring series
> - Break refill loop if VQ_RING_DESC_CHAIN_END (Tiwei)
> - Fixup commit message
>
> Maxime Coquelin (3):
> net/virtio: inline refill and offload helpers
> net/virtio: add non-mergeable support to in-order path
> net/virtio: improve batching in mergeable path
>
> drivers/net/virtio/virtio_ethdev.c | 14 +-
> drivers/net/virtio/virtio_ethdev.h | 2 +-
> drivers/net/virtio/virtio_rxtx.c | 257 ++++++++++++++++-------------
> 3 files changed, 145 insertions(+), 128 deletions(-)
>
Applied to dpdk-next-virtio
Maxime