From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga11.intel.com (mga11.intel.com [192.55.52.93]) by dpdk.org (Postfix) with ESMTP id 741761B476 for ; Wed, 20 Jun 2018 09:41:20 +0200 (CEST) X-Amp-Result: UNKNOWN X-Amp-Original-Verdict: FILE UNKNOWN X-Amp-File-Uploaded: False Received: from fmsmga004.fm.intel.com ([10.253.24.48]) by fmsmga102.fm.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 20 Jun 2018 00:41:16 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.51,246,1526367600"; d="scan'208";a="64746794" Received: from debian.sh.intel.com (HELO debian) ([10.67.104.228]) by fmsmga004.fm.intel.com with ESMTP; 20 Jun 2018 00:41:15 -0700 Date: Wed, 20 Jun 2018 15:41:19 +0800 From: Tiwei Bie To: Marvin Liu Cc: maxime.coquelin@redhat.com, zhihong.wang@intel.com, dev@dpdk.org Message-ID: <20180620074119.GA4686@debian> References: <20180608090724.20855-1-yong.liu@intel.com> <20180608090724.20855-6-yong.liu@intel.com> MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Disposition: inline In-Reply-To: <20180608090724.20855-6-yong.liu@intel.com> User-Agent: Mutt/1.9.5 (2018-04-13) Subject: Re: [dpdk-dev] [PATCH 5/7] net/virtio: support IN_ORDER Rx and Tx X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 20 Jun 2018 07:41:21 -0000 On Fri, Jun 08, 2018 at 05:07:22PM +0800, Marvin Liu wrote: > IN_ORDER Rx function can support merge-able feature. Descriptors > allocation and free will be done in bulk. > > Virtio dequeue logic: > dequeue_burst_rx(burst mbufs) > for (each mbuf b) { > if (b need merge) { > merge remained mbufs > add merged mbuf to return mbufs list > } else { > add mbuf to return mbufs list > } > } > if (last mbuf c need merge) { > dequeue_burst_rx(required mbufs) > merge last mbuf c > } > refill_avail_ring_bulk() > update_avail_ring() > return mbufs list > > IN_ORDER Tx function can support indirect and offloading features. > > Virtio enqueue logic: > xmit_cleanup(used descs) > for (each xmit mbuf b) { > if (b can inorder xmit) { > add mbuf b to inorder burst list > } else { > xmit inorder burst list > xmit mbuf b with normal xmit > } > } > if (inorder burst list not empty) { > xmit inorder burst list > } > update_avail_ring() > > Signed-off-by: Marvin Liu > > diff --git a/drivers/net/virtio/virtio_ethdev.h b/drivers/net/virtio/virtio_ethdev.h > index bb40064ea..25697c872 100644 > --- a/drivers/net/virtio/virtio_ethdev.h > +++ b/drivers/net/virtio/virtio_ethdev.h > @@ -83,9 +83,15 @@ uint16_t virtio_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts, > uint16_t virtio_recv_mergeable_pkts(void *rx_queue, struct rte_mbuf **rx_pkts, > uint16_t nb_pkts); > > +uint16_t virtio_recv_inorder_pkts(void *rx_queue, struct rte_mbuf **rx_pkts, > + uint16_t nb_pkts); > + > uint16_t virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, > uint16_t nb_pkts); > > +uint16_t virtio_xmit_inorder_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, > + uint16_t nb_pkts); > + > uint16_t virtio_recv_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts, > uint16_t nb_pkts); > > diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c > index 0bca29855..d0473d6b4 100644 > --- a/drivers/net/virtio/virtio_rxtx.c > +++ b/drivers/net/virtio/virtio_rxtx.c > @@ -122,6 +122,45 @@ virtqueue_dequeue_burst_rx(struct virtqueue *vq, struct rte_mbuf **rx_pkts, > return i; > } > > +static uint16_t > +virtqueue_dequeue_inorder_rx(struct virtqueue *vq, virtqueue_dequeue_burst_rx_inorder() will be better. > + struct rte_mbuf **rx_pkts, > + uint32_t *len, > + uint16_t num) > +{ > + struct vring_used_elem *uep; > + struct rte_mbuf *cookie; > + uint16_t used_idx; > + uint16_t i; > + > + if (unlikely(num == 0)) > + return 0; > + > + used_idx = vq->vq_used_cons_idx & (vq->vq_nentries - 1); Above assignment is misleading. > + for (i = 0; i < num; i++) { > + used_idx = vq->vq_used_cons_idx & (vq->vq_nentries - 1); > + /* Desc idx same as used idx */ > + uep = &vq->vq_ring.used->ring[used_idx]; > + len[i] = uep->len; > + cookie = (struct rte_mbuf *)vq->vq_descx[used_idx].cookie; > + > + if (unlikely(cookie == NULL)) { > + PMD_DRV_LOG(ERR, "vring descriptor with no mbuf cookie at %u", > + vq->vq_used_cons_idx); > + break; > + } > + > + rte_prefetch0(cookie); > + rte_packet_prefetch(rte_pktmbuf_mtod(cookie, void *)); > + rx_pkts[i] = cookie; > + vq->vq_used_cons_idx++; > + vq->vq_descx[used_idx].cookie = NULL; > + } > + > + vq_ring_free_inorder(vq, used_idx, i); > + return i; > +} > + > #ifndef DEFAULT_TX_FREE_THRESH > #define DEFAULT_TX_FREE_THRESH 32 > #endif > @@ -150,6 +189,70 @@ virtio_xmit_cleanup(struct virtqueue *vq, uint16_t num) > } > } > > +/* Cleanup from completed inorder transmits. */ > +static void > +virtio_xmit_cleanup_inorder(struct virtqueue *vq, uint16_t num) > +{ > + uint16_t i, used_idx; > + > + used_idx = vq->vq_used_cons_idx & (vq->vq_nentries - 1); Above assignment is misleading. > + for (i = 0; i < num; i++) { > + struct vq_desc_extra *dxp; > + > + used_idx = vq->vq_used_cons_idx & (vq->vq_nentries - 1); > + dxp = &vq->vq_descx[used_idx]; > + vq->vq_used_cons_idx++; > + > + if (dxp->cookie != NULL) { > + rte_pktmbuf_free(dxp->cookie); > + dxp->cookie = NULL; > + } > + } > + vq_ring_free_inorder(vq, used_idx, num); If num is zero, we can't free used_idx. > +} > + > +static inline int > +virtqueue_enqueue_inorder_refill(struct virtqueue *vq, virtqueue_enqueue_recv_refill_inorder() will be better. > + struct rte_mbuf **cookies, > + uint16_t num) > +{ > + struct vq_desc_extra *dxp; > + struct virtio_hw *hw = vq->hw; > + struct vring_desc *start_dp; > + uint16_t head_idx, idx, i = 0; > + > + if (unlikely(vq->vq_free_cnt == 0)) > + return -ENOSPC; > + if (unlikely(vq->vq_free_cnt < num)) > + return -EMSGSIZE; > + > + head_idx = vq->vq_desc_head_idx & (vq->vq_nentries - 1); > + start_dp = vq->vq_ring.desc; > + > + while (i < num) { > + idx = head_idx & (vq->vq_nentries - 1); > + dxp = &vq->vq_descx[idx]; > + dxp->cookie = (void *)(struct rte_mbuf *)cookies[i]; Casting isn't necessary. > + dxp->ndescs = 1; > + > + start_dp[idx].addr = > + VIRTIO_MBUF_ADDR(cookies[i], vq) + > + RTE_PKTMBUF_HEADROOM - hw->vtnet_hdr_size; > + start_dp[idx].len = > + cookies[i]->buf_len - > + RTE_PKTMBUF_HEADROOM + > + hw->vtnet_hdr_size; > + start_dp[idx].flags = VRING_DESC_F_WRITE; > + > + head_idx++; > + i++; > + } > + > + vq->vq_avail_idx += num; > + vq->vq_desc_head_idx += num; > + vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - num); > + return 0; > +} > > static inline int > virtqueue_enqueue_recv_refill(struct virtqueue *vq, struct rte_mbuf *cookie) > @@ -246,9 +349,113 @@ tx_offload_enabled(struct virtio_hw *hw) > (var) = (val); \ > } while (0) > [...] > + > +static void > +virtio_discard_inorder_rxbuf(struct virtqueue *vq, struct rte_mbuf *m) virtio_discard_rxbuf_inorder() will be better. > +{ > + int error; > + > + error = virtqueue_enqueue_inorder_refill(vq, &m, 1); > if (unlikely(error)) { > RTE_LOG(ERR, PMD, "cannot requeue discarded mbuf"); > rte_pktmbuf_free(m); > @@ -813,6 +1002,191 @@ virtio_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts, uint16_t nb_pkts) > return nb_rx; > } > > +uint16_t > +virtio_recv_inorder_pkts(void *rx_queue, virtio_recv_mergeable_pkts_inorder() will be better. > + struct rte_mbuf **rx_pkts, > + uint16_t nb_pkts) > +{ > + struct virtnet_rx *rxvq = rx_queue; > + struct virtqueue *vq = rxvq->vq; > + struct virtio_hw *hw = vq->hw; > + struct rte_mbuf *rxm; > + struct rte_mbuf *prev; > + uint16_t nb_used, num, nb_rx; > + uint32_t len[VIRTIO_MBUF_BURST_SZ]; > + struct rte_mbuf *rcv_pkts[VIRTIO_MBUF_BURST_SZ]; > + int error; > + uint32_t i, nb_enqueued; > + uint32_t seg_num; > + uint32_t seg_res; > + uint32_t hdr_size; > + int offload; > + > + nb_rx = 0; > + if (unlikely(hw->started == 0)) > + return nb_rx; > + > + nb_used = VIRTQUEUE_NUSED(vq); > + nb_used = RTE_MIN(nb_used, nb_pkts); > + nb_used = RTE_MIN(nb_used, VIRTIO_MBUF_BURST_SZ); > + > + virtio_rmb(); > + > + PMD_RX_LOG(DEBUG, "used:%d", nb_used); > + > + i = 0; > + nb_enqueued = 0; > + seg_num = 1; > + seg_res = 0; > + hdr_size = hw->vtnet_hdr_size; > + offload = rx_offload_enabled(hw); > + > + num = virtqueue_dequeue_inorder_rx(vq, rcv_pkts, len, nb_used); > + > + for (; i < num; i++) { Why not initialize i in for ()? > + struct virtio_net_hdr_mrg_rxbuf *header; > + > + PMD_RX_LOG(DEBUG, "dequeue:%d", num); > + PMD_RX_LOG(DEBUG, "packet len:%d", len[i]); > + > + rxm = rcv_pkts[i]; > + > + if (unlikely(len[i] < hdr_size + ETHER_HDR_LEN)) { > + PMD_RX_LOG(ERR, "Packet drop"); > + nb_enqueued++; > + virtio_discard_inorder_rxbuf(vq, rxm); > + rxvq->stats.errors++; > + continue; > + } > + > + header = (struct virtio_net_hdr_mrg_rxbuf *)((char *)rxm->buf_addr + > + RTE_PKTMBUF_HEADROOM - hdr_size); > + seg_num = header->num_buffers; > + > + if (seg_num == 0) > + seg_num = 1; > + > + rxm->data_off = RTE_PKTMBUF_HEADROOM; > + rxm->nb_segs = seg_num; > + rxm->ol_flags = 0; > + rxm->vlan_tci = 0; > + rxm->pkt_len = (uint32_t)(len[i] - hdr_size); > + rxm->data_len = (uint16_t)(len[i] - hdr_size); > + > + rxm->port = rxvq->port_id; > + > + rx_pkts[nb_rx] = rxm; > + prev = rxm; > + > + if (offload && virtio_rx_offload(rxm, &header->hdr) < 0) { > + virtio_discard_inorder_rxbuf(vq, rxm); > + rxvq->stats.errors++; > + continue; > + } > + > + seg_res = seg_num - 1; > + > + /* Merge remaining segments */ > + while (seg_res != 0 && i < num) { > + rxm = rcv_pkts[i]; > + rxm->data_off = RTE_PKTMBUF_HEADROOM - hdr_size; > + rxm->pkt_len = (uint32_t)(len[i]); > + rxm->data_len = (uint16_t)(len[i]); > + > + rx_pkts[nb_rx]->pkt_len += (uint32_t)(len[i]); > + rx_pkts[nb_rx]->data_len += (uint16_t)(len[i]); > + > + i++; > + > + if (prev) > + prev->next = rxm; > + > + prev = rxm; > + seg_res -= 1; > + } > + > + if (hw->vlan_strip) > + rte_vlan_strip(rx_pkts[nb_rx]); > + > + VIRTIO_DUMP_PACKET(rx_pkts[nb_rx], > + rx_pkts[nb_rx]->data_len); > + > + rxvq->stats.bytes += rx_pkts[nb_rx]->pkt_len; > + virtio_update_packet_stats(&rxvq->stats, rx_pkts[nb_rx]); When seg_res != 0, we cannot run above code. > + nb_rx++; > + } > + > + /* Last packet still need merge segments */ > + while (seg_res != 0) { > + uint16_t rcv_cnt = RTE_MIN((uint16_t)seg_res, > + VIRTIO_MBUF_BURST_SZ); > + > + prev = rcv_pkts[nb_rx - 1]; > + if (likely(VIRTQUEUE_NUSED(vq) >= rcv_cnt)) { > + num = virtqueue_dequeue_inorder_rx(vq, rcv_pkts, len, > + rcv_cnt); > + uint16_t extra_idx = 0; > + > + rcv_cnt = num; > + while (extra_idx < rcv_cnt) { > + rxm = rcv_pkts[extra_idx]; > + rxm->data_off = RTE_PKTMBUF_HEADROOM - hdr_size; > + rxm->pkt_len = (uint32_t)(len[extra_idx]); > + rxm->data_len = (uint16_t)(len[extra_idx]); > + prev->next = rxm; > + prev = rxm; > + rx_pkts[nb_rx - 1]->pkt_len += len[extra_idx]; > + rx_pkts[nb_rx - 1]->data_len += len[extra_idx]; > + extra_idx += 1; > + }; > + seg_res -= rcv_cnt; > + } else { > + PMD_RX_LOG(ERR, > + "No enough segments for packet."); > + virtio_discard_inorder_rxbuf(vq, prev); > + rxvq->stats.errors++; > + nb_rx--; > + break; > + } > + } > + > + rxvq->stats.packets += nb_rx; > + > + /* Allocate new mbuf for the used descriptor */ > + error = ENOSPC; Above assignment is meaningless. > + > + if (likely(!virtqueue_full(vq))) { > + /* free_cnt may include mrg descs */ > + uint16_t free_cnt = vq->vq_free_cnt; > + struct rte_mbuf *new_pkts[free_cnt]; > + > + if (!rte_pktmbuf_alloc_bulk(rxvq->mpool, new_pkts, free_cnt)) { > + error = virtqueue_enqueue_inorder_refill(vq, new_pkts, > + free_cnt); > + if (unlikely(error)) { > + for (i = 0; i < free_cnt; i++) > + rte_pktmbuf_free(new_pkts[i]); > + } > + nb_enqueued += free_cnt; > + } else { > + struct rte_eth_dev *dev > + = &rte_eth_devices[rxvq->port_id]; > + dev->data->rx_mbuf_alloc_failed += free_cnt; > + } > + } > + > + if (likely(nb_enqueued)) { > + vq_update_avail_idx(vq); > + > + if (unlikely(virtqueue_kick_prepare(vq))) { > + virtqueue_notify(vq); > + PMD_RX_LOG(DEBUG, "Notified"); > + } > + } > + > + return nb_rx; > +} > + > uint16_t > virtio_recv_mergeable_pkts(void *rx_queue, > struct rte_mbuf **rx_pkts, > @@ -1060,7 +1434,8 @@ virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts) > } > > /* Enqueue Packet buffers */ > - virtqueue_enqueue_xmit(txvq, txm, slots, use_indirect, can_push); > + virtqueue_enqueue_xmit(txvq, txm, slots, use_indirect, > + can_push, 0); > > txvq->stats.bytes += txm->pkt_len; > virtio_update_packet_stats(&txvq->stats, txm); > @@ -1079,3 +1454,121 @@ virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts) > > return nb_tx; > } > + > +uint16_t > +virtio_xmit_inorder_pkts(void *tx_queue, virtio_xmit_pkts_inorder() will be better. > + struct rte_mbuf **tx_pkts, > + uint16_t nb_pkts) > +{ > + struct virtnet_tx *txvq = tx_queue; > + struct virtqueue *vq = txvq->vq; > + struct virtio_hw *hw = vq->hw; > + uint16_t hdr_size = hw->vtnet_hdr_size; > + uint16_t nb_used, nb_avail, nb_tx = 0, inorder_tx = 0; > + struct rte_mbuf *inorder_pkts[nb_pkts]; > + int error; > + > + if (unlikely(hw->started == 0 && tx_pkts != hw->inject_pkts)) > + return nb_tx; > + > + if (unlikely(nb_pkts < 1)) > + return nb_pkts; > + > + VIRTQUEUE_DUMP(vq); > + PMD_TX_LOG(DEBUG, "%d packets to xmit", nb_pkts); > + nb_used = VIRTQUEUE_NUSED(vq); > + > + virtio_rmb(); > + if (likely(nb_used > vq->vq_nentries - vq->vq_free_thresh)) > + virtio_xmit_cleanup_inorder(vq, nb_used); > + > + nb_avail = RTE_MIN(vq->vq_free_cnt, nb_pkts); > + > + for (nb_tx = 0; nb_tx < nb_avail; nb_tx++) { > + struct rte_mbuf *txm = tx_pkts[nb_tx]; > + int slots, need; > + int can_push = 0, use_indirect = 0, flush_inorder = 1; > + > + /* Do VLAN tag insertion */ > + if (unlikely(txm->ol_flags & PKT_TX_VLAN_PKT)) { > + error = rte_vlan_insert(&txm); > + if (unlikely(error)) { > + rte_pktmbuf_free(txm); > + continue; > + } > + } > + > + txvq->stats.bytes += txm->pkt_len; > + virtio_update_packet_stats(&txvq->stats, txm); > + > + /* optimize ring usage */ > + if ((vtpci_with_feature(hw, VIRTIO_F_ANY_LAYOUT) || > + vtpci_with_feature(hw, VIRTIO_F_VERSION_1)) && > + rte_mbuf_refcnt_read(txm) == 1 && > + RTE_MBUF_DIRECT(txm) && > + txm->nb_segs == 1 && > + rte_pktmbuf_headroom(txm) >= hdr_size && > + rte_is_aligned(rte_pktmbuf_mtod(txm, char *), > + __alignof__(struct virtio_net_hdr_mrg_rxbuf))) { > + can_push = 1; There is no need to have 'can_push' any more. > + > + inorder_pkts[inorder_tx] = txm; > + inorder_tx++; inorder_tx isn't a good name. > + flush_inorder = 0; I think there is no need to introduce 'flush_inorder'. And it will make the code much more readable by writing the code like: for (nb_tx = 0; nb_tx < nb_avail; nb_tx++) { struct rte_mbuf *txm = tx_pkts[nb_tx]; ...... if ((vtpci_with_feature(hw, VIRTIO_F_ANY_LAYOUT) || vtpci_with_feature(hw, VIRTIO_F_VERSION_1)) && rte_mbuf_refcnt_read(txm) == 1 && RTE_MBUF_DIRECT(txm) && txm->nb_segs == 1 && rte_pktmbuf_headroom(txm) >= hdr_size && rte_is_aligned(rte_pktmbuf_mtod(txm, char *), __alignof__(struct virtio_net_hdr_mrg_rxbuf))) { inorder_pkts[nb_inorder_pkts++] = txm; continue; } /* Flush inorder packets if any. */ if (nb_inorder_pkts) { virtqueue_enqueue_xmit_inorder(txvq, inorder_pkts, nb_inorder_pkts); nb_inorder_pkts = 0; } ...... virtqueue_enqueue_xmit(txvq, txm, slots, 0, 0, 1); } /* Flush inorder packets if any. */ if (nb_inorder_pkts) { virtqueue_enqueue_xmit_inorder(txvq, inorder_pkts, nb_inorder_pkts); nb_inorder_pkts = 0; } > + } else if (vtpci_with_feature(hw, VIRTIO_RING_F_INDIRECT_DESC) && > + txm->nb_segs < VIRTIO_MAX_TX_INDIRECT) { > + use_indirect = 1; > + } There is no need to try to use INDIRECT. > + > + if (flush_inorder) { > + /* Flush inorder packets */ > + virtqueue_enqueue_xmit_inorder(txvq, inorder_pkts, > + inorder_tx); > + inorder_tx = 0; > + > + /* How many main ring entries are needed to this Tx? > + * any_layout => number of segments > + * indirect => 1 > + * default => number of segments + 1 > + */ > + slots = use_indirect ? 1 : (txm->nb_segs + !can_push); > + need = slots - vq->vq_free_cnt; > + > + if (unlikely(need > 0)) { > + nb_used = VIRTQUEUE_NUSED(vq); > + virtio_rmb(); > + need = RTE_MIN(need, (int)nb_used); > + > + virtio_xmit_cleanup_inorder(vq, need); > + need = slots - vq->vq_free_cnt; > + if (unlikely(need > 0)) { > + PMD_TX_LOG(ERR, > + "No free tx descriptors to transmit"); > + break; > + } > + } > + /* Enqueue Packet buffers */ > + virtqueue_enqueue_xmit(txvq, txm, slots, use_indirect, > + can_push, 1); > + } > + } > + > + /* Transmit all inorder packets */ > + if (inorder_tx) > + virtqueue_enqueue_xmit_inorder(txvq, inorder_pkts, inorder_tx); > + > + txvq->stats.packets += nb_tx; > + > + if (likely(nb_tx)) { > + vq_update_avail_idx(vq); > + > + if (unlikely(virtqueue_kick_prepare(vq))) { > + virtqueue_notify(vq); > + PMD_TX_LOG(DEBUG, "Notified backend after xmit"); > + } > + } > + > + VIRTQUEUE_DUMP(vq); > + > + return nb_tx; > +} > -- > 2.17.0 >