DPDK patches and discussions
 help / color / mirror / Atom feed
* [dpdk-dev] [PATCH] enic: scattered Rx
@ 2016-06-14 23:55 Nelson Escobar
  2016-06-16 19:19 ` [dpdk-dev] [PATCH v2] " Nelson Escobar
  0 siblings, 1 reply; 3+ messages in thread
From: Nelson Escobar @ 2016-06-14 23:55 UTC (permalink / raw)
  To: dev; +Cc: bruce.richardson, johndale, Nelson Escobar

For performance reasons, this patch uses 2 VIC RQs per RQ presented to
DPDK.

The VIC requires that each descriptor be marked as either a start of
packet (SOP) descriptor or a non-SOP descriptor.  A one RQ solution
requires skipping descriptors when receiving small packets and results
in bad performance when receiving many small packets.

The 2 RQ solution makes use of the VIC feature that allows a receive
on primary queue to 'spill over' into another queue if the receive is
too large to fit in the buffer assigned to the descriptor on the
primary queue.  This means that there is no skipping of descriptors
when receiving small packets and results in much better performance.

Signed-off-by: Nelson Escobar <neescoba@cisco.com>
Reviewed-by: John Daley <johndale@cisco.com>
---
 doc/guides/nics/overview.rst         |   2 +-
 drivers/net/enic/base/rq_enet_desc.h |   2 +-
 drivers/net/enic/base/vnic_rq.c      |   8 +-
 drivers/net/enic/base/vnic_rq.h      |  18 ++-
 drivers/net/enic/enic.h              |  22 ++-
 drivers/net/enic/enic_ethdev.c       |  10 +-
 drivers/net/enic/enic_main.c         | 277 +++++++++++++++++++++++++++--------
 drivers/net/enic/enic_res.c          |   5 +-
 drivers/net/enic/enic_rxtx.c         | 137 +++++++++++------
 9 files changed, 358 insertions(+), 123 deletions(-)

diff --git a/doc/guides/nics/overview.rst b/doc/guides/nics/overview.rst
index 2200171..d0ae847 100644
--- a/doc/guides/nics/overview.rst
+++ b/doc/guides/nics/overview.rst
@@ -94,7 +94,7 @@ Most of these differences are summarized below.
    Queue start/stop             Y   Y Y Y Y Y Y     Y Y     Y Y Y Y Y Y               Y   Y Y
    MTU update                   Y Y Y           Y   Y Y Y Y         Y Y
    Jumbo frame                  Y Y Y Y Y Y Y Y Y   Y Y Y Y Y Y Y Y Y Y       Y Y Y
-   Scattered Rx                 Y Y Y   Y Y Y Y Y Y Y Y Y Y Y Y Y Y Y Y               Y   Y
+   Scattered Rx                 Y Y Y Y Y Y Y Y Y Y Y Y Y Y Y Y Y Y Y Y               Y   Y
    LRO                                              Y Y Y Y
    TSO                          Y   Y   Y Y Y Y Y Y Y Y Y Y Y Y Y Y
    Promiscuous mode       Y Y   Y Y   Y Y Y Y Y Y Y Y Y     Y Y     Y Y         Y Y   Y   Y Y
diff --git a/drivers/net/enic/base/rq_enet_desc.h b/drivers/net/enic/base/rq_enet_desc.h
index 7292d9d..13e24b4 100644
--- a/drivers/net/enic/base/rq_enet_desc.h
+++ b/drivers/net/enic/base/rq_enet_desc.h
@@ -55,7 +55,7 @@ enum rq_enet_type_types {
 #define RQ_ENET_TYPE_BITS		2
 #define RQ_ENET_TYPE_MASK		((1 << RQ_ENET_TYPE_BITS) - 1)
 
-static inline void rq_enet_desc_enc(struct rq_enet_desc *desc,
+static inline void rq_enet_desc_enc(volatile struct rq_enet_desc *desc,
 	u64 address, u8 type, u16 length)
 {
 	desc->address = cpu_to_le64(address);
diff --git a/drivers/net/enic/base/vnic_rq.c b/drivers/net/enic/base/vnic_rq.c
index cb62c5e..0e700a1 100644
--- a/drivers/net/enic/base/vnic_rq.c
+++ b/drivers/net/enic/base/vnic_rq.c
@@ -84,11 +84,12 @@ void vnic_rq_init_start(struct vnic_rq *rq, unsigned int cq_index,
 	iowrite32(cq_index, &rq->ctrl->cq_index);
 	iowrite32(error_interrupt_enable, &rq->ctrl->error_interrupt_enable);
 	iowrite32(error_interrupt_offset, &rq->ctrl->error_interrupt_offset);
-	iowrite32(0, &rq->ctrl->dropped_packet_count);
 	iowrite32(0, &rq->ctrl->error_status);
 	iowrite32(fetch_index, &rq->ctrl->fetch_index);
 	iowrite32(posted_index, &rq->ctrl->posted_index);
-
+	if (rq->is_sop)
+		iowrite32(((rq->is_sop << 10) | rq->data_queue_idx),
+			  &rq->ctrl->data_ring);
 }
 
 void vnic_rq_init(struct vnic_rq *rq, unsigned int cq_index,
@@ -96,6 +97,7 @@ void vnic_rq_init(struct vnic_rq *rq, unsigned int cq_index,
 	unsigned int error_interrupt_offset)
 {
 	u32 fetch_index = 0;
+
 	/* Use current fetch_index as the ring starting point */
 	fetch_index = ioread32(&rq->ctrl->fetch_index);
 
@@ -110,6 +112,8 @@ void vnic_rq_init(struct vnic_rq *rq, unsigned int cq_index,
 		error_interrupt_offset);
 	rq->rxst_idx = 0;
 	rq->tot_pkts = 0;
+	rq->pkt_first_seg = NULL;
+	rq->pkt_last_seg = NULL;
 }
 
 void vnic_rq_error_out(struct vnic_rq *rq, unsigned int error)
diff --git a/drivers/net/enic/base/vnic_rq.h b/drivers/net/enic/base/vnic_rq.h
index e083ccc..fd9e170 100644
--- a/drivers/net/enic/base/vnic_rq.h
+++ b/drivers/net/enic/base/vnic_rq.h
@@ -60,10 +60,18 @@ struct vnic_rq_ctrl {
 	u32 pad7;
 	u32 error_status;		/* 0x48 */
 	u32 pad8;
-	u32 dropped_packet_count;	/* 0x50 */
+	u32 tcp_sn;			/* 0x50 */
 	u32 pad9;
-	u32 dropped_packet_count_rc;	/* 0x58 */
+	u32 unused;			/* 0x58 */
 	u32 pad10;
+	u32 dca_select;			/* 0x60 */
+	u32 pad11;
+	u32 dca_value;			/* 0x68 */
+	u32 pad12;
+	u32 data_ring;			/* 0x70 */
+	u32 pad13;
+	u32 header_split;		/* 0x78 */
+	u32 pad14;
 };
 
 struct vnic_rq {
@@ -82,6 +90,12 @@ struct vnic_rq {
 	struct rte_mempool *mp;
 	uint16_t rxst_idx;
 	uint32_t tot_pkts;
+	uint16_t data_queue_idx;
+	uint8_t is_sop;
+	uint8_t in_use;
+	struct rte_mbuf *pkt_first_seg;
+	struct rte_mbuf *pkt_last_seg;
+	unsigned int max_mbufs_per_pkt;
 };
 
 static inline unsigned int vnic_rq_desc_avail(struct vnic_rq *rq)
diff --git a/drivers/net/enic/enic.h b/drivers/net/enic/enic.h
index 9f94afb..fce75af 100644
--- a/drivers/net/enic/enic.h
+++ b/drivers/net/enic/enic.h
@@ -55,8 +55,11 @@
 #define DRV_COPYRIGHT		"Copyright 2008-2015 Cisco Systems, Inc"
 
 #define ENIC_WQ_MAX		8
-#define ENIC_RQ_MAX		8
-#define ENIC_CQ_MAX		(ENIC_WQ_MAX + ENIC_RQ_MAX)
+/* With Rx scatter support, we use two RQs on VIC per RQ used by app. Both
+ * RQs use the same CQ.
+ */
+#define ENIC_RQ_MAX		16
+#define ENIC_CQ_MAX		(ENIC_WQ_MAX + (ENIC_RQ_MAX / 2))
 #define ENIC_INTR_MAX		(ENIC_CQ_MAX + 2)
 
 #define VLAN_ETH_HLEN           18
@@ -154,6 +157,21 @@ struct enic {
 
 };
 
+static inline unsigned int enic_sop_rq(unsigned int rq)
+{
+	return rq * 2;
+}
+
+static inline unsigned int enic_data_rq(unsigned int rq)
+{
+	return rq * 2 + 1;
+}
+
+static inline unsigned int enic_vnic_rq_count(struct enic *enic)
+{
+	return (enic->rq_count * 2);
+}
+
 static inline unsigned int enic_cq_rq(__rte_unused struct enic *enic, unsigned int rq)
 {
 	return rq;
diff --git a/drivers/net/enic/enic_ethdev.c b/drivers/net/enic/enic_ethdev.c
index 003dec0..83048d8 100644
--- a/drivers/net/enic/enic_ethdev.c
+++ b/drivers/net/enic/enic_ethdev.c
@@ -269,14 +269,18 @@ static int enicpmd_dev_rx_queue_setup(struct rte_eth_dev *eth_dev,
 	struct enic *enic = pmd_priv(eth_dev);
 
 	ENICPMD_FUNC_TRACE();
-	if (queue_idx >= ENIC_RQ_MAX) {
+	/* With Rx scatter support, two RQs are now used on VIC per RQ used
+	 * by the application.
+	 */
+	if (queue_idx * 2 >= ENIC_RQ_MAX) {
 		dev_err(enic,
-			"Max number of RX queues exceeded.  Max is %d\n",
+			"Max number of RX queues exceeded.  Max is %d. This PMD uses 2 RQs on VIC per RQ used by DPDK.\n",
 			ENIC_RQ_MAX);
 		return -EINVAL;
 	}
 
-	eth_dev->data->rx_queues[queue_idx] = (void *)&enic->rq[queue_idx];
+	eth_dev->data->rx_queues[queue_idx] =
+		(void *)&enic->rq[enic_sop_rq(queue_idx)];
 
 	ret = enic_alloc_rq(enic, queue_idx, socket_id, mp, nb_desc);
 	if (ret) {
diff --git a/drivers/net/enic/enic_main.c b/drivers/net/enic/enic_main.c
index 9b6fe36..8b1d7ff 100644
--- a/drivers/net/enic/enic_main.c
+++ b/drivers/net/enic/enic_main.c
@@ -122,7 +122,7 @@ static void enic_log_q_error(struct enic *enic)
 				error_status);
 	}
 
-	for (i = 0; i < enic->rq_count; i++) {
+	for (i = 0; i < enic_vnic_rq_count(enic); i++) {
 		error_status = vnic_rq_error_status(&enic->rq[i]);
 		if (error_status)
 			dev_err(enic, "RQ[%d] error_status %d\n", i,
@@ -235,13 +235,21 @@ void enic_init_vnic_resources(struct enic *enic)
 	unsigned int error_interrupt_offset = 0;
 	unsigned int index = 0;
 	unsigned int cq_idx;
+	struct vnic_rq *data_rq;
 
 	for (index = 0; index < enic->rq_count; index++) {
-		vnic_rq_init(&enic->rq[index],
+		vnic_rq_init(&enic->rq[enic_sop_rq(index)],
 			enic_cq_rq(enic, index),
 			error_interrupt_enable,
 			error_interrupt_offset);
 
+		data_rq = &enic->rq[enic_data_rq(index)];
+		if (data_rq->in_use)
+			vnic_rq_init(data_rq,
+				     enic_cq_rq(enic, index),
+				     error_interrupt_enable,
+				     error_interrupt_offset);
+
 		cq_idx = enic_cq_rq(enic, index);
 		vnic_cq_init(&enic->cq[cq_idx],
 			0 /* flow_control_enable */,
@@ -291,6 +299,9 @@ enic_alloc_rx_queue_mbufs(struct enic *enic, struct vnic_rq *rq)
 	unsigned i;
 	dma_addr_t dma_addr;
 
+	if (!rq->in_use)
+		return 0;
+
 	dev_debug(enic, "queue %u, allocating %u rx queue mbufs\n", rq->index,
 		  rq->ring.desc_count);
 
@@ -304,18 +315,19 @@ enic_alloc_rx_queue_mbufs(struct enic *enic, struct vnic_rq *rq)
 
 		dma_addr = (dma_addr_t)(mb->buf_physaddr
 			   + RTE_PKTMBUF_HEADROOM);
-
-		rq_enet_desc_enc(rqd, dma_addr, RQ_ENET_TYPE_ONLY_SOP,
-				 mb->buf_len - RTE_PKTMBUF_HEADROOM);
+		rq_enet_desc_enc(rqd, dma_addr,
+				(rq->is_sop ? RQ_ENET_TYPE_ONLY_SOP
+				: RQ_ENET_TYPE_NOT_SOP),
+				mb->buf_len - RTE_PKTMBUF_HEADROOM);
 		rq->mbuf_ring[i] = mb;
 	}
 
 	/* make sure all prior writes are complete before doing the PIO write */
 	rte_rmb();
 
-	/* Post all but the last 2 cache lines' worth of descriptors */
-	rq->posted_index = rq->ring.desc_count - (2 * RTE_CACHE_LINE_SIZE
-			/ sizeof(struct rq_enet_desc));
+	/* Post all but the last buffer to VIC. */
+	rq->posted_index = rq->ring.desc_count - 1;
+
 	rq->rx_nb_hold = 0;
 
 	dev_debug(enic, "port=%u, qidx=%u, Write %u posted idx, %u sw held\n",
@@ -419,17 +431,28 @@ int enic_enable(struct enic *enic)
 			"Flow director feature will not work\n");
 
 	for (index = 0; index < enic->rq_count; index++) {
-		err = enic_alloc_rx_queue_mbufs(enic, &enic->rq[index]);
+		err = enic_alloc_rx_queue_mbufs(enic,
+			&enic->rq[enic_sop_rq(index)]);
 		if (err) {
-			dev_err(enic, "Failed to alloc RX queue mbufs\n");
+			dev_err(enic, "Failed to alloc sop RX queue mbufs\n");
+			return err;
+		}
+		err = enic_alloc_rx_queue_mbufs(enic,
+			&enic->rq[enic_data_rq(index)]);
+		if (err) {
+			/* release the allocated mbufs for the sop rq*/
+			enic_rxmbuf_queue_release(enic,
+				&enic->rq[enic_sop_rq(index)]);
+
+			dev_err(enic, "Failed to alloc data RX queue mbufs\n");
 			return err;
 		}
 	}
 
 	for (index = 0; index < enic->wq_count; index++)
-		vnic_wq_enable(&enic->wq[index]);
+		enic_start_wq(enic, index);
 	for (index = 0; index < enic->rq_count; index++)
-		vnic_rq_enable(&enic->rq[index]);
+		enic_start_rq(enic, index);
 
 	vnic_dev_enable_wait(enic->vdev);
 
@@ -449,7 +472,7 @@ int enic_alloc_intr_resources(struct enic *enic)
 
 	dev_info(enic, "vNIC resources used:  "\
 		"wq %d rq %d cq %d intr %d\n",
-		enic->wq_count, enic->rq_count,
+		enic->wq_count, enic_vnic_rq_count(enic),
 		enic->cq_count, enic->intr_count);
 
 	err = vnic_intr_alloc(enic->vdev, &enic->intr, 0);
@@ -461,19 +484,32 @@ int enic_alloc_intr_resources(struct enic *enic)
 
 void enic_free_rq(void *rxq)
 {
-	struct vnic_rq *rq;
+	struct vnic_rq *rq_sop, *rq_data;
 	struct enic *enic;
 
 	if (rxq == NULL)
 		return;
 
-	rq = (struct vnic_rq *)rxq;
-	enic = vnic_dev_priv(rq->vdev);
-	enic_rxmbuf_queue_release(enic, rq);
-	rte_free(rq->mbuf_ring);
-	rq->mbuf_ring = NULL;
-	vnic_rq_free(rq);
-	vnic_cq_free(&enic->cq[rq->index]);
+	rq_sop = (struct vnic_rq *)rxq;
+	enic = vnic_dev_priv(rq_sop->vdev);
+	rq_data = &enic->rq[rq_sop->data_queue_idx];
+
+	enic_rxmbuf_queue_release(enic, rq_sop);
+	if (rq_data->in_use)
+		enic_rxmbuf_queue_release(enic, rq_data);
+
+	rte_free(rq_sop->mbuf_ring);
+	if (rq_data->in_use)
+		rte_free(rq_data->mbuf_ring);
+
+	rq_sop->mbuf_ring = NULL;
+	rq_data->mbuf_ring = NULL;
+
+	vnic_rq_free(rq_sop);
+	if (rq_data->in_use)
+		vnic_rq_free(rq_data);
+
+	vnic_cq_free(&enic->cq[rq_sop->index]);
 }
 
 void enic_start_wq(struct enic *enic, uint16_t queue_idx)
@@ -488,12 +524,32 @@ int enic_stop_wq(struct enic *enic, uint16_t queue_idx)
 
 void enic_start_rq(struct enic *enic, uint16_t queue_idx)
 {
-	vnic_rq_enable(&enic->rq[queue_idx]);
+	struct vnic_rq *rq_sop = &enic->rq[enic_sop_rq(queue_idx)];
+	struct vnic_rq *rq_data = &enic->rq[rq_sop->data_queue_idx];
+
+	if (rq_data->in_use)
+		vnic_rq_enable(rq_data);
+	rte_mb();
+	vnic_rq_enable(rq_sop);
+
 }
 
 int enic_stop_rq(struct enic *enic, uint16_t queue_idx)
 {
-	return vnic_rq_disable(&enic->rq[queue_idx]);
+	int ret1 = 0, ret2 = 0;
+
+	struct vnic_rq *rq_sop = &enic->rq[enic_sop_rq(queue_idx)];
+	struct vnic_rq *rq_data = &enic->rq[rq_sop->data_queue_idx];
+
+	ret2 = vnic_rq_disable(rq_sop);
+	rte_mb();
+	if (rq_data->in_use)
+		ret1 = vnic_rq_disable(rq_data);
+
+	if (ret2)
+		return ret2;
+	else
+		return ret1;
 }
 
 int enic_alloc_rq(struct enic *enic, uint16_t queue_idx,
@@ -501,53 +557,141 @@ int enic_alloc_rq(struct enic *enic, uint16_t queue_idx,
 	uint16_t nb_desc)
 {
 	int rc;
-	struct vnic_rq *rq = &enic->rq[queue_idx];
+	uint16_t sop_queue_idx = enic_sop_rq(queue_idx);
+	uint16_t data_queue_idx = enic_data_rq(queue_idx);
+	struct vnic_rq *rq_sop = &enic->rq[sop_queue_idx];
+	struct vnic_rq *rq_data = &enic->rq[data_queue_idx];
+	unsigned int mbuf_size, mbufs_per_pkt;
+	unsigned int nb_sop_desc, nb_data_desc;
+	uint16_t min_sop, max_sop, min_data, max_data;
+
+	rq_sop->is_sop = 1;
+	rq_sop->data_queue_idx = data_queue_idx;
+	rq_data->is_sop = 0;
+	rq_data->data_queue_idx = 0;
+	rq_sop->socket_id = socket_id;
+	rq_sop->mp = mp;
+	rq_data->socket_id = socket_id;
+	rq_data->mp = mp;
+	rq_sop->in_use = 1;
+
+	mbuf_size = (uint16_t)(rte_pktmbuf_data_room_size(mp) -
+			       RTE_PKTMBUF_HEADROOM);
+
+	if (enic->rte_dev->data->dev_conf.rxmode.enable_scatter) {
+		dev_info(enic, "Scatter rx mode enabled\n");
+		/* ceil(mtu/mbuf_size) */
+		mbufs_per_pkt = (enic->config.mtu +
+				 (mbuf_size - 1)) / mbuf_size;
+	} else {
+		dev_info(enic, "Scatter rx mode disabled\n");
+		mbufs_per_pkt = 1;
+	}
+
+	if (mbufs_per_pkt > 1) {
+		dev_info(enic, "Scatter rx mode in use\n");
+		rq_data->in_use = 1;
+	} else {
+		dev_info(enic, "Scatter rx mode not being used\n");
+		rq_data->in_use = 0;
+	}
 
-	rq->socket_id = socket_id;
-	rq->mp = mp;
+	/* number of descriptors have to be a multiple of 32 */
+	nb_sop_desc = (nb_desc / mbufs_per_pkt) & ~0x1F;
+	nb_data_desc = (nb_desc - nb_sop_desc) & ~0x1F;
+
+	rq_sop->max_mbufs_per_pkt = mbufs_per_pkt;
+	rq_data->max_mbufs_per_pkt = mbufs_per_pkt;
+
+	if (mbufs_per_pkt > 1) {
+		min_sop = 64;
+		max_sop = ((enic->config.rq_desc_count /
+			    (mbufs_per_pkt - 1)) & ~0x1F);
+		min_data = min_sop * (mbufs_per_pkt - 1);
+		max_data = enic->config.rq_desc_count;
+	} else {
+		min_sop = 64;
+		max_sop = enic->config.rq_desc_count;
+		min_data = 0;
+		max_data = 0;
+	}
 
-	if (nb_desc) {
-		if (nb_desc > enic->config.rq_desc_count) {
-			dev_warning(enic,
-				"RQ %d - number of rx desc in cmd line (%d)"\
-				"is greater than that in the UCSM/CIMC adapter"\
-				"policy.  Applying the value in the adapter "\
-				"policy (%d).\n",
-				queue_idx, nb_desc, enic->config.rq_desc_count);
-			nb_desc = enic->config.rq_desc_count;
-		}
-		dev_info(enic, "RX Queues - effective number of descs:%d\n",
-			 nb_desc);
+	if (nb_desc < (min_sop + min_data)) {
+		dev_warning(enic,
+			    "Number of rx descs too low, adjusting to minimum\n");
+		nb_sop_desc = min_sop;
+		nb_data_desc = min_data;
+	} else if (nb_desc > (max_sop + max_data)) {
+		dev_warning(enic,
+			    "Number of rx_descs too high, adjusting to maximum\n");
+		nb_sop_desc = max_sop;
+		nb_data_desc = max_data;
 	}
+	if (mbufs_per_pkt > 1) {
+		dev_info(enic, "For mtu %d and mbuf size %d valid rx descriptor range is %d to %d\n",
+			 enic->config.mtu, mbuf_size, min_sop + min_data,
+			 max_sop + max_data);
+	}
+	dev_info(enic, "Using %d rx descriptors (sop %d, data %d)\n",
+		 nb_sop_desc + nb_data_desc, nb_sop_desc, nb_data_desc);
 
-	/* Allocate queue resources */
-	rc = vnic_rq_alloc(enic->vdev, rq, queue_idx,
-		nb_desc, sizeof(struct rq_enet_desc));
+	/* Allocate sop queue resources */
+	rc = vnic_rq_alloc(enic->vdev, rq_sop, sop_queue_idx,
+		nb_sop_desc, sizeof(struct rq_enet_desc));
 	if (rc) {
-		dev_err(enic, "error in allocation of rq\n");
+		dev_err(enic, "error in allocation of sop rq\n");
 		goto err_exit;
 	}
-
+	nb_sop_desc = rq_sop->ring.desc_count;
+
+	if (rq_data->in_use) {
+		/* Allocate data queue resources */
+		rc = vnic_rq_alloc(enic->vdev, rq_data, data_queue_idx,
+				   nb_data_desc,
+				   sizeof(struct rq_enet_desc));
+		if (rc) {
+			dev_err(enic, "error in allocation of data rq\n");
+			goto err_free_rq_sop;
+		}
+		nb_data_desc = rq_data->ring.desc_count;
+	}
 	rc = vnic_cq_alloc(enic->vdev, &enic->cq[queue_idx], queue_idx,
-		socket_id, nb_desc,
-		sizeof(struct cq_enet_rq_desc));
+			   socket_id, nb_sop_desc + nb_data_desc,
+			   sizeof(struct cq_enet_rq_desc));
 	if (rc) {
 		dev_err(enic, "error in allocation of cq for rq\n");
-		goto err_free_rq_exit;
+		goto err_free_rq_data;
 	}
 
-	/* Allocate the mbuf ring */
-	rq->mbuf_ring = (struct rte_mbuf **)rte_zmalloc_socket("rq->mbuf_ring",
-			sizeof(struct rte_mbuf *) * nb_desc,
-			RTE_CACHE_LINE_SIZE, rq->socket_id);
+	/* Allocate the mbuf rings */
+	rq_sop->mbuf_ring = (struct rte_mbuf **)
+		rte_zmalloc_socket("rq->mbuf_ring",
+				   sizeof(struct rte_mbuf *) * nb_sop_desc,
+				   RTE_CACHE_LINE_SIZE, rq_sop->socket_id);
+	if (rq_sop->mbuf_ring == NULL)
+		goto err_free_cq;
+
+	if (rq_data->in_use) {
+		rq_data->mbuf_ring = (struct rte_mbuf **)
+			rte_zmalloc_socket("rq->mbuf_ring",
+				sizeof(struct rte_mbuf *) * nb_data_desc,
+				RTE_CACHE_LINE_SIZE, rq_sop->socket_id);
+		if (rq_data->mbuf_ring == NULL)
+			goto err_free_sop_mbuf;
+	}
 
-	if (rq->mbuf_ring != NULL)
-		return 0;
+	return 0;
 
+err_free_sop_mbuf:
+	rte_free(rq_sop->mbuf_ring);
+err_free_cq:
 	/* cleanup on error */
 	vnic_cq_free(&enic->cq[queue_idx]);
-err_free_rq_exit:
-	vnic_rq_free(rq);
+err_free_rq_data:
+	if (rq_data->in_use)
+		vnic_rq_free(rq_data);
+err_free_rq_sop:
+	vnic_rq_free(rq_sop);
 err_exit:
 	return -ENOMEM;
 }
@@ -645,10 +789,12 @@ int enic_disable(struct enic *enic)
 		if (err)
 			return err;
 	}
-	for (i = 0; i < enic->rq_count; i++) {
-		err = vnic_rq_disable(&enic->rq[i]);
-		if (err)
-			return err;
+	for (i = 0; i < enic_vnic_rq_count(enic); i++) {
+		if (enic->rq[i].in_use) {
+			err = vnic_rq_disable(&enic->rq[i]);
+			if (err)
+				return err;
+		}
 	}
 
 	vnic_dev_set_reset_flag(enic->vdev, 1);
@@ -657,8 +803,9 @@ int enic_disable(struct enic *enic)
 	for (i = 0; i < enic->wq_count; i++)
 		vnic_wq_clean(&enic->wq[i], enic_free_wq_buf);
 
-	for (i = 0; i < enic->rq_count; i++)
-		vnic_rq_clean(&enic->rq[i], enic_free_rq_buf);
+	for (i = 0; i < enic_vnic_rq_count(enic); i++)
+		if (enic->rq[i].in_use)
+			vnic_rq_clean(&enic->rq[i], enic_free_rq_buf);
 	for (i = 0; i < enic->cq_count; i++)
 		vnic_cq_clean(&enic->cq[i]);
 	vnic_intr_clean(&enic->intr);
@@ -863,9 +1010,13 @@ int enic_set_vnic_res(struct enic *enic)
 	struct rte_eth_dev *eth_dev = enic->rte_dev;
 	int rc = 0;
 
-	if (enic->rq_count < eth_dev->data->nb_rx_queues) {
-		dev_err(dev, "Not enough Receive queues. Requested:%u, Configured:%u\n",
-			eth_dev->data->nb_rx_queues, enic->rq_count);
+	/* With Rx scatter support, two RQs are now used per RQ used by
+	 * the application.
+	 */
+	if (enic->rq_count < (eth_dev->data->nb_rx_queues * 2)) {
+		dev_err(dev, "Not enough Receive queues. Requested:%u which uses %d RQs on VIC, Configured:%u\n",
+			eth_dev->data->nb_rx_queues,
+			eth_dev->data->nb_rx_queues * 2, enic->rq_count);
 		rc = -EINVAL;
 	}
 	if (enic->wq_count < eth_dev->data->nb_tx_queues) {
diff --git a/drivers/net/enic/enic_res.c b/drivers/net/enic/enic_res.c
index ebe379d..42edd84 100644
--- a/drivers/net/enic/enic_res.c
+++ b/drivers/net/enic/enic_res.c
@@ -196,8 +196,9 @@ void enic_free_vnic_resources(struct enic *enic)
 
 	for (i = 0; i < enic->wq_count; i++)
 		vnic_wq_free(&enic->wq[i]);
-	for (i = 0; i < enic->rq_count; i++)
-		vnic_rq_free(&enic->rq[i]);
+	for (i = 0; i < enic_vnic_rq_count(enic); i++)
+		if (enic->rq[i].in_use)
+			vnic_rq_free(&enic->rq[i]);
 	for (i = 0; i < enic->cq_count; i++)
 		vnic_cq_free(&enic->cq[i]);
 	vnic_intr_free(&enic->intr);
diff --git a/drivers/net/enic/enic_rxtx.c b/drivers/net/enic/enic_rxtx.c
index 972eae2..6ca4ac3 100644
--- a/drivers/net/enic/enic_rxtx.c
+++ b/drivers/net/enic/enic_rxtx.c
@@ -228,27 +228,34 @@ uint16_t
 enic_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
 	       uint16_t nb_pkts)
 {
-	struct vnic_rq *rq = rx_queue;
-	struct enic *enic = vnic_dev_priv(rq->vdev);
-	unsigned int rx_id;
+	struct vnic_rq *sop_rq = rx_queue;
+	struct vnic_rq *data_rq;
+	struct vnic_rq *rq;
+	struct enic *enic = vnic_dev_priv(sop_rq->vdev);
+	uint16_t cq_idx;
+	uint16_t rq_idx;
+	uint16_t rq_num;
 	struct rte_mbuf *nmb, *rxmb;
-	uint16_t nb_rx = 0, nb_err = 0;
-	uint16_t nb_hold;
+	uint16_t nb_rx = 0;
 	struct vnic_cq *cq;
 	volatile struct cq_desc *cqd_ptr;
 	uint8_t color;
+	uint16_t seg_length;
+	struct rte_mbuf *first_seg = sop_rq->pkt_first_seg;
+	struct rte_mbuf *last_seg = sop_rq->pkt_last_seg;
 
-	cq = &enic->cq[enic_cq_rq(enic, rq->index)];
-	rx_id = cq->to_clean;		/* index of cqd, rqd, mbuf_table */
-	cqd_ptr = (struct cq_desc *)(cq->ring.descs) + rx_id;
+	cq = &enic->cq[enic_cq_rq(enic, sop_rq->index)];
+	cq_idx = cq->to_clean;		/* index of cqd, rqd, mbuf_table */
+	cqd_ptr = (struct cq_desc *)(cq->ring.descs) + cq_idx;
 
-	nb_hold = rq->rx_nb_hold;	/* mbufs held by software */
+	data_rq = &enic->rq[sop_rq->data_queue_idx];
 
 	while (nb_rx < nb_pkts) {
 		volatile struct rq_enet_desc *rqd_ptr;
 		dma_addr_t dma_addr;
 		struct cq_desc cqd;
 		uint8_t packet_error;
+		uint16_t ciflags;
 
 		/* Check for pkts available */
 		color = (cqd_ptr->type_color >> CQ_DESC_COLOR_SHIFT)
@@ -256,9 +263,13 @@ enic_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
 		if (color == cq->last_color)
 			break;
 
-		/* Get the cq descriptor and rq pointer */
+		/* Get the cq descriptor and extract rq info from it */
 		cqd = *cqd_ptr;
-		rqd_ptr = (struct rq_enet_desc *)(rq->ring.descs) + rx_id;
+		rq_num = cqd.q_number & CQ_DESC_Q_NUM_MASK;
+		rq_idx = cqd.completed_index & CQ_DESC_COMP_NDX_MASK;
+
+		rq = &enic->rq[rq_num];
+		rqd_ptr = ((struct rq_enet_desc *)rq->ring.descs) + rq_idx;
 
 		/* allocate a new mbuf */
 		nmb = rte_mbuf_raw_alloc(rq->mp);
@@ -271,67 +282,99 @@ enic_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
 		packet_error = enic_cq_rx_check_err(&cqd);
 
 		/* Get the mbuf to return and replace with one just allocated */
-		rxmb = rq->mbuf_ring[rx_id];
-		rq->mbuf_ring[rx_id] = nmb;
+		rxmb = rq->mbuf_ring[rq_idx];
+		rq->mbuf_ring[rq_idx] = nmb;
 
 		/* Increment cqd, rqd, mbuf_table index */
-		rx_id++;
-		if (unlikely(rx_id == rq->ring.desc_count)) {
-			rx_id = 0;
+		cq_idx++;
+		if (unlikely(cq_idx == cq->ring.desc_count)) {
+			cq_idx = 0;
 			cq->last_color = cq->last_color ? 0 : 1;
 		}
 
 		/* Prefetch next mbuf & desc while processing current one */
-		cqd_ptr = (struct cq_desc *)(cq->ring.descs) + rx_id;
+		cqd_ptr = (struct cq_desc *)(cq->ring.descs) + cq_idx;
 		rte_enic_prefetch(cqd_ptr);
-		rte_enic_prefetch(rq->mbuf_ring[rx_id]);
-		rte_enic_prefetch((struct rq_enet_desc *)(rq->ring.descs)
-				 + rx_id);
+
+		ciflags = enic_cq_rx_desc_ciflags(
+			(struct cq_enet_rq_desc *)&cqd);
 
 		/* Push descriptor for newly allocated mbuf */
-		dma_addr = (dma_addr_t)(nmb->buf_physaddr
-			   + RTE_PKTMBUF_HEADROOM);
-		rqd_ptr->address = rte_cpu_to_le_64(dma_addr);
-		rqd_ptr->length_type = cpu_to_le16(nmb->buf_len
-				       - RTE_PKTMBUF_HEADROOM);
+		dma_addr = (dma_addr_t)(nmb->buf_physaddr +
+					RTE_PKTMBUF_HEADROOM);
+		rq_enet_desc_enc(rqd_ptr, dma_addr,
+				(rq->is_sop ? RQ_ENET_TYPE_ONLY_SOP
+				: RQ_ENET_TYPE_NOT_SOP),
+				nmb->buf_len - RTE_PKTMBUF_HEADROOM);
 
-		/* Drop incoming bad packet */
-		if (unlikely(packet_error)) {
-			rte_pktmbuf_free(rxmb);
-			nb_err++;
-			continue;
+		/* Fill in the rest of the mbuf */
+		seg_length = enic_cq_rx_desc_n_bytes(&cqd);
+		rxmb->packet_type = enic_cq_rx_flags_to_pkt_type(&cqd);
+		enic_cq_rx_to_pkt_flags(&cqd, rxmb);
+		if (rq->is_sop) {
+			first_seg = rxmb;
+			first_seg->nb_segs = 1;
+			first_seg->pkt_len = seg_length;
+		} else {
+			first_seg->pkt_len = (uint16_t)(first_seg->pkt_len
+							+ seg_length);
+			first_seg->nb_segs++;
+			last_seg->next = rxmb;
 		}
 
-		/* Fill in the rest of the mbuf */
-		rxmb->data_off = RTE_PKTMBUF_HEADROOM;
-		rxmb->nb_segs = 1;
 		rxmb->next = NULL;
 		rxmb->port = enic->port_id;
-		rxmb->pkt_len = enic_cq_rx_desc_n_bytes(&cqd);
-		rxmb->packet_type = enic_cq_rx_flags_to_pkt_type(&cqd);
-		enic_cq_rx_to_pkt_flags(&cqd, rxmb);
-		rxmb->data_len = rxmb->pkt_len;
+		rxmb->data_len = seg_length;
+
+		rq->rx_nb_hold++;
+
+		if (!(enic_cq_rx_desc_eop(ciflags))) {
+			last_seg = rxmb;
+			continue;
+		}
+
+		if (unlikely(packet_error)) {
+			rte_pktmbuf_free(first_seg);
+			rte_atomic64_inc(&enic->soft_stats.rx_packet_errors);
+			continue;
+		}
+
 
 		/* prefetch mbuf data for caller */
-		rte_packet_prefetch(RTE_PTR_ADD(rxmb->buf_addr,
+		rte_packet_prefetch(RTE_PTR_ADD(first_seg->buf_addr,
 				    RTE_PKTMBUF_HEADROOM));
 
 		/* store the mbuf address into the next entry of the array */
-		rx_pkts[nb_rx++] = rxmb;
+		rx_pkts[nb_rx++] = first_seg;
 	}
 
-	nb_hold += nb_rx + nb_err;
-	cq->to_clean = rx_id;
+	sop_rq->pkt_first_seg = first_seg;
+	sop_rq->pkt_last_seg = last_seg;
+
+	cq->to_clean = cq_idx;
+
+	if ((sop_rq->rx_nb_hold + data_rq->rx_nb_hold) >
+	    sop_rq->rx_free_thresh) {
+		if (data_rq->in_use) {
+			data_rq->posted_index =
+				enic_ring_add(data_rq->ring.desc_count,
+					      data_rq->posted_index,
+					      data_rq->rx_nb_hold);
+			data_rq->rx_nb_hold = 0;
+		}
+		sop_rq->posted_index = enic_ring_add(sop_rq->ring.desc_count,
+						     sop_rq->posted_index,
+						     sop_rq->rx_nb_hold);
+		sop_rq->rx_nb_hold = 0;
 
-	if (nb_hold > rq->rx_free_thresh) {
-		rq->posted_index = enic_ring_add(rq->ring.desc_count,
-				rq->posted_index, nb_hold);
-		nb_hold = 0;
 		rte_mb();
-		iowrite32(rq->posted_index, &rq->ctrl->posted_index);
+		if (data_rq->in_use)
+			iowrite32(data_rq->posted_index,
+				  &data_rq->ctrl->posted_index);
+		rte_compiler_barrier();
+		iowrite32(sop_rq->posted_index, &sop_rq->ctrl->posted_index);
 	}
 
-	rq->rx_nb_hold = nb_hold;
 
 	return nb_rx;
 }
-- 
2.7.0

^ permalink raw reply	[flat|nested] 3+ messages in thread

* [dpdk-dev] [PATCH v2] enic: scattered Rx
  2016-06-14 23:55 [dpdk-dev] [PATCH] enic: scattered Rx Nelson Escobar
@ 2016-06-16 19:19 ` Nelson Escobar
  2016-06-24 10:42   ` Bruce Richardson
  0 siblings, 1 reply; 3+ messages in thread
From: Nelson Escobar @ 2016-06-16 19:19 UTC (permalink / raw)
  To: dev; +Cc: bruce.richardson, johndale, Nelson Escobar

For performance reasons, this patch uses 2 VIC RQs per RQ presented to
DPDK.

The VIC requires that each descriptor be marked as either a start of
packet (SOP) descriptor or a non-SOP descriptor.  A one RQ solution
requires skipping descriptors when receiving small packets and results
in bad performance when receiving many small packets.

The 2 RQ solution makes use of the VIC feature that allows a receive
on primary queue to 'spill over' into another queue if the receive is
too large to fit in the buffer assigned to the descriptor on the
primary queue.  This means that there is no skipping of descriptors
when receiving small packets and results in much better performance.

Signed-off-by: Nelson Escobar <neescoba@cisco.com>
Reviewed-by: John Daley <johndale@cisco.com>
---

v2:
 - fixes upstream checkpatch complaint
 - fixes bug where packet type and flags were set on last mbuf
   instead of first mbuf of scattered receive
 - adds ethernet hdr length to mtu when calculating the number of
   mbufs it would take to receive maximum sized packet

 doc/guides/nics/overview.rst         |   2 +-
 drivers/net/enic/base/rq_enet_desc.h |   2 +-
 drivers/net/enic/base/vnic_rq.c      |   8 +-
 drivers/net/enic/base/vnic_rq.h      |  18 ++-
 drivers/net/enic/enic.h              |  22 ++-
 drivers/net/enic/enic_ethdev.c       |  10 +-
 drivers/net/enic/enic_main.c         | 277 +++++++++++++++++++++++++++--------
 drivers/net/enic/enic_res.c          |   5 +-
 drivers/net/enic/enic_rxtx.c         | 140 ++++++++++++------
 9 files changed, 361 insertions(+), 123 deletions(-)

diff --git a/doc/guides/nics/overview.rst b/doc/guides/nics/overview.rst
index 2200171..d0ae847 100644
--- a/doc/guides/nics/overview.rst
+++ b/doc/guides/nics/overview.rst
@@ -94,7 +94,7 @@ Most of these differences are summarized below.
    Queue start/stop             Y   Y Y Y Y Y Y     Y Y     Y Y Y Y Y Y               Y   Y Y
    MTU update                   Y Y Y           Y   Y Y Y Y         Y Y
    Jumbo frame                  Y Y Y Y Y Y Y Y Y   Y Y Y Y Y Y Y Y Y Y       Y Y Y
-   Scattered Rx                 Y Y Y   Y Y Y Y Y Y Y Y Y Y Y Y Y Y Y Y               Y   Y
+   Scattered Rx                 Y Y Y Y Y Y Y Y Y Y Y Y Y Y Y Y Y Y Y Y               Y   Y
    LRO                                              Y Y Y Y
    TSO                          Y   Y   Y Y Y Y Y Y Y Y Y Y Y Y Y Y
    Promiscuous mode       Y Y   Y Y   Y Y Y Y Y Y Y Y Y     Y Y     Y Y         Y Y   Y   Y Y
diff --git a/drivers/net/enic/base/rq_enet_desc.h b/drivers/net/enic/base/rq_enet_desc.h
index 7292d9d..13e24b4 100644
--- a/drivers/net/enic/base/rq_enet_desc.h
+++ b/drivers/net/enic/base/rq_enet_desc.h
@@ -55,7 +55,7 @@ enum rq_enet_type_types {
 #define RQ_ENET_TYPE_BITS		2
 #define RQ_ENET_TYPE_MASK		((1 << RQ_ENET_TYPE_BITS) - 1)
 
-static inline void rq_enet_desc_enc(struct rq_enet_desc *desc,
+static inline void rq_enet_desc_enc(volatile struct rq_enet_desc *desc,
 	u64 address, u8 type, u16 length)
 {
 	desc->address = cpu_to_le64(address);
diff --git a/drivers/net/enic/base/vnic_rq.c b/drivers/net/enic/base/vnic_rq.c
index cb62c5e..0e700a1 100644
--- a/drivers/net/enic/base/vnic_rq.c
+++ b/drivers/net/enic/base/vnic_rq.c
@@ -84,11 +84,12 @@ void vnic_rq_init_start(struct vnic_rq *rq, unsigned int cq_index,
 	iowrite32(cq_index, &rq->ctrl->cq_index);
 	iowrite32(error_interrupt_enable, &rq->ctrl->error_interrupt_enable);
 	iowrite32(error_interrupt_offset, &rq->ctrl->error_interrupt_offset);
-	iowrite32(0, &rq->ctrl->dropped_packet_count);
 	iowrite32(0, &rq->ctrl->error_status);
 	iowrite32(fetch_index, &rq->ctrl->fetch_index);
 	iowrite32(posted_index, &rq->ctrl->posted_index);
-
+	if (rq->is_sop)
+		iowrite32(((rq->is_sop << 10) | rq->data_queue_idx),
+			  &rq->ctrl->data_ring);
 }
 
 void vnic_rq_init(struct vnic_rq *rq, unsigned int cq_index,
@@ -96,6 +97,7 @@ void vnic_rq_init(struct vnic_rq *rq, unsigned int cq_index,
 	unsigned int error_interrupt_offset)
 {
 	u32 fetch_index = 0;
+
 	/* Use current fetch_index as the ring starting point */
 	fetch_index = ioread32(&rq->ctrl->fetch_index);
 
@@ -110,6 +112,8 @@ void vnic_rq_init(struct vnic_rq *rq, unsigned int cq_index,
 		error_interrupt_offset);
 	rq->rxst_idx = 0;
 	rq->tot_pkts = 0;
+	rq->pkt_first_seg = NULL;
+	rq->pkt_last_seg = NULL;
 }
 
 void vnic_rq_error_out(struct vnic_rq *rq, unsigned int error)
diff --git a/drivers/net/enic/base/vnic_rq.h b/drivers/net/enic/base/vnic_rq.h
index e083ccc..fd9e170 100644
--- a/drivers/net/enic/base/vnic_rq.h
+++ b/drivers/net/enic/base/vnic_rq.h
@@ -60,10 +60,18 @@ struct vnic_rq_ctrl {
 	u32 pad7;
 	u32 error_status;		/* 0x48 */
 	u32 pad8;
-	u32 dropped_packet_count;	/* 0x50 */
+	u32 tcp_sn;			/* 0x50 */
 	u32 pad9;
-	u32 dropped_packet_count_rc;	/* 0x58 */
+	u32 unused;			/* 0x58 */
 	u32 pad10;
+	u32 dca_select;			/* 0x60 */
+	u32 pad11;
+	u32 dca_value;			/* 0x68 */
+	u32 pad12;
+	u32 data_ring;			/* 0x70 */
+	u32 pad13;
+	u32 header_split;		/* 0x78 */
+	u32 pad14;
 };
 
 struct vnic_rq {
@@ -82,6 +90,12 @@ struct vnic_rq {
 	struct rte_mempool *mp;
 	uint16_t rxst_idx;
 	uint32_t tot_pkts;
+	uint16_t data_queue_idx;
+	uint8_t is_sop;
+	uint8_t in_use;
+	struct rte_mbuf *pkt_first_seg;
+	struct rte_mbuf *pkt_last_seg;
+	unsigned int max_mbufs_per_pkt;
 };
 
 static inline unsigned int vnic_rq_desc_avail(struct vnic_rq *rq)
diff --git a/drivers/net/enic/enic.h b/drivers/net/enic/enic.h
index 9f94afb..ed5f18d 100644
--- a/drivers/net/enic/enic.h
+++ b/drivers/net/enic/enic.h
@@ -55,8 +55,11 @@
 #define DRV_COPYRIGHT		"Copyright 2008-2015 Cisco Systems, Inc"
 
 #define ENIC_WQ_MAX		8
-#define ENIC_RQ_MAX		8
-#define ENIC_CQ_MAX		(ENIC_WQ_MAX + ENIC_RQ_MAX)
+/* With Rx scatter support, we use two RQs on VIC per RQ used by app. Both
+ * RQs use the same CQ.
+ */
+#define ENIC_RQ_MAX		16
+#define ENIC_CQ_MAX		(ENIC_WQ_MAX + (ENIC_RQ_MAX / 2))
 #define ENIC_INTR_MAX		(ENIC_CQ_MAX + 2)
 
 #define VLAN_ETH_HLEN           18
@@ -154,6 +157,21 @@ struct enic {
 
 };
 
+static inline unsigned int enic_sop_rq(unsigned int rq)
+{
+	return rq * 2;
+}
+
+static inline unsigned int enic_data_rq(unsigned int rq)
+{
+	return rq * 2 + 1;
+}
+
+static inline unsigned int enic_vnic_rq_count(struct enic *enic)
+{
+	return enic->rq_count * 2;
+}
+
 static inline unsigned int enic_cq_rq(__rte_unused struct enic *enic, unsigned int rq)
 {
 	return rq;
diff --git a/drivers/net/enic/enic_ethdev.c b/drivers/net/enic/enic_ethdev.c
index 003dec0..83048d8 100644
--- a/drivers/net/enic/enic_ethdev.c
+++ b/drivers/net/enic/enic_ethdev.c
@@ -269,14 +269,18 @@ static int enicpmd_dev_rx_queue_setup(struct rte_eth_dev *eth_dev,
 	struct enic *enic = pmd_priv(eth_dev);
 
 	ENICPMD_FUNC_TRACE();
-	if (queue_idx >= ENIC_RQ_MAX) {
+	/* With Rx scatter support, two RQs are now used on VIC per RQ used
+	 * by the application.
+	 */
+	if (queue_idx * 2 >= ENIC_RQ_MAX) {
 		dev_err(enic,
-			"Max number of RX queues exceeded.  Max is %d\n",
+			"Max number of RX queues exceeded.  Max is %d. This PMD uses 2 RQs on VIC per RQ used by DPDK.\n",
 			ENIC_RQ_MAX);
 		return -EINVAL;
 	}
 
-	eth_dev->data->rx_queues[queue_idx] = (void *)&enic->rq[queue_idx];
+	eth_dev->data->rx_queues[queue_idx] =
+		(void *)&enic->rq[enic_sop_rq(queue_idx)];
 
 	ret = enic_alloc_rq(enic, queue_idx, socket_id, mp, nb_desc);
 	if (ret) {
diff --git a/drivers/net/enic/enic_main.c b/drivers/net/enic/enic_main.c
index 9b6fe36..15389e5 100644
--- a/drivers/net/enic/enic_main.c
+++ b/drivers/net/enic/enic_main.c
@@ -122,7 +122,7 @@ static void enic_log_q_error(struct enic *enic)
 				error_status);
 	}
 
-	for (i = 0; i < enic->rq_count; i++) {
+	for (i = 0; i < enic_vnic_rq_count(enic); i++) {
 		error_status = vnic_rq_error_status(&enic->rq[i]);
 		if (error_status)
 			dev_err(enic, "RQ[%d] error_status %d\n", i,
@@ -235,13 +235,21 @@ void enic_init_vnic_resources(struct enic *enic)
 	unsigned int error_interrupt_offset = 0;
 	unsigned int index = 0;
 	unsigned int cq_idx;
+	struct vnic_rq *data_rq;
 
 	for (index = 0; index < enic->rq_count; index++) {
-		vnic_rq_init(&enic->rq[index],
+		vnic_rq_init(&enic->rq[enic_sop_rq(index)],
 			enic_cq_rq(enic, index),
 			error_interrupt_enable,
 			error_interrupt_offset);
 
+		data_rq = &enic->rq[enic_data_rq(index)];
+		if (data_rq->in_use)
+			vnic_rq_init(data_rq,
+				     enic_cq_rq(enic, index),
+				     error_interrupt_enable,
+				     error_interrupt_offset);
+
 		cq_idx = enic_cq_rq(enic, index);
 		vnic_cq_init(&enic->cq[cq_idx],
 			0 /* flow_control_enable */,
@@ -291,6 +299,9 @@ enic_alloc_rx_queue_mbufs(struct enic *enic, struct vnic_rq *rq)
 	unsigned i;
 	dma_addr_t dma_addr;
 
+	if (!rq->in_use)
+		return 0;
+
 	dev_debug(enic, "queue %u, allocating %u rx queue mbufs\n", rq->index,
 		  rq->ring.desc_count);
 
@@ -304,18 +315,19 @@ enic_alloc_rx_queue_mbufs(struct enic *enic, struct vnic_rq *rq)
 
 		dma_addr = (dma_addr_t)(mb->buf_physaddr
 			   + RTE_PKTMBUF_HEADROOM);
-
-		rq_enet_desc_enc(rqd, dma_addr, RQ_ENET_TYPE_ONLY_SOP,
-				 mb->buf_len - RTE_PKTMBUF_HEADROOM);
+		rq_enet_desc_enc(rqd, dma_addr,
+				(rq->is_sop ? RQ_ENET_TYPE_ONLY_SOP
+				: RQ_ENET_TYPE_NOT_SOP),
+				mb->buf_len - RTE_PKTMBUF_HEADROOM);
 		rq->mbuf_ring[i] = mb;
 	}
 
 	/* make sure all prior writes are complete before doing the PIO write */
 	rte_rmb();
 
-	/* Post all but the last 2 cache lines' worth of descriptors */
-	rq->posted_index = rq->ring.desc_count - (2 * RTE_CACHE_LINE_SIZE
-			/ sizeof(struct rq_enet_desc));
+	/* Post all but the last buffer to VIC. */
+	rq->posted_index = rq->ring.desc_count - 1;
+
 	rq->rx_nb_hold = 0;
 
 	dev_debug(enic, "port=%u, qidx=%u, Write %u posted idx, %u sw held\n",
@@ -419,17 +431,28 @@ int enic_enable(struct enic *enic)
 			"Flow director feature will not work\n");
 
 	for (index = 0; index < enic->rq_count; index++) {
-		err = enic_alloc_rx_queue_mbufs(enic, &enic->rq[index]);
+		err = enic_alloc_rx_queue_mbufs(enic,
+			&enic->rq[enic_sop_rq(index)]);
 		if (err) {
-			dev_err(enic, "Failed to alloc RX queue mbufs\n");
+			dev_err(enic, "Failed to alloc sop RX queue mbufs\n");
+			return err;
+		}
+		err = enic_alloc_rx_queue_mbufs(enic,
+			&enic->rq[enic_data_rq(index)]);
+		if (err) {
+			/* release the allocated mbufs for the sop rq*/
+			enic_rxmbuf_queue_release(enic,
+				&enic->rq[enic_sop_rq(index)]);
+
+			dev_err(enic, "Failed to alloc data RX queue mbufs\n");
 			return err;
 		}
 	}
 
 	for (index = 0; index < enic->wq_count; index++)
-		vnic_wq_enable(&enic->wq[index]);
+		enic_start_wq(enic, index);
 	for (index = 0; index < enic->rq_count; index++)
-		vnic_rq_enable(&enic->rq[index]);
+		enic_start_rq(enic, index);
 
 	vnic_dev_enable_wait(enic->vdev);
 
@@ -449,7 +472,7 @@ int enic_alloc_intr_resources(struct enic *enic)
 
 	dev_info(enic, "vNIC resources used:  "\
 		"wq %d rq %d cq %d intr %d\n",
-		enic->wq_count, enic->rq_count,
+		enic->wq_count, enic_vnic_rq_count(enic),
 		enic->cq_count, enic->intr_count);
 
 	err = vnic_intr_alloc(enic->vdev, &enic->intr, 0);
@@ -461,19 +484,32 @@ int enic_alloc_intr_resources(struct enic *enic)
 
 void enic_free_rq(void *rxq)
 {
-	struct vnic_rq *rq;
+	struct vnic_rq *rq_sop, *rq_data;
 	struct enic *enic;
 
 	if (rxq == NULL)
 		return;
 
-	rq = (struct vnic_rq *)rxq;
-	enic = vnic_dev_priv(rq->vdev);
-	enic_rxmbuf_queue_release(enic, rq);
-	rte_free(rq->mbuf_ring);
-	rq->mbuf_ring = NULL;
-	vnic_rq_free(rq);
-	vnic_cq_free(&enic->cq[rq->index]);
+	rq_sop = (struct vnic_rq *)rxq;
+	enic = vnic_dev_priv(rq_sop->vdev);
+	rq_data = &enic->rq[rq_sop->data_queue_idx];
+
+	enic_rxmbuf_queue_release(enic, rq_sop);
+	if (rq_data->in_use)
+		enic_rxmbuf_queue_release(enic, rq_data);
+
+	rte_free(rq_sop->mbuf_ring);
+	if (rq_data->in_use)
+		rte_free(rq_data->mbuf_ring);
+
+	rq_sop->mbuf_ring = NULL;
+	rq_data->mbuf_ring = NULL;
+
+	vnic_rq_free(rq_sop);
+	if (rq_data->in_use)
+		vnic_rq_free(rq_data);
+
+	vnic_cq_free(&enic->cq[rq_sop->index]);
 }
 
 void enic_start_wq(struct enic *enic, uint16_t queue_idx)
@@ -488,12 +524,32 @@ int enic_stop_wq(struct enic *enic, uint16_t queue_idx)
 
 void enic_start_rq(struct enic *enic, uint16_t queue_idx)
 {
-	vnic_rq_enable(&enic->rq[queue_idx]);
+	struct vnic_rq *rq_sop = &enic->rq[enic_sop_rq(queue_idx)];
+	struct vnic_rq *rq_data = &enic->rq[rq_sop->data_queue_idx];
+
+	if (rq_data->in_use)
+		vnic_rq_enable(rq_data);
+	rte_mb();
+	vnic_rq_enable(rq_sop);
+
 }
 
 int enic_stop_rq(struct enic *enic, uint16_t queue_idx)
 {
-	return vnic_rq_disable(&enic->rq[queue_idx]);
+	int ret1 = 0, ret2 = 0;
+
+	struct vnic_rq *rq_sop = &enic->rq[enic_sop_rq(queue_idx)];
+	struct vnic_rq *rq_data = &enic->rq[rq_sop->data_queue_idx];
+
+	ret2 = vnic_rq_disable(rq_sop);
+	rte_mb();
+	if (rq_data->in_use)
+		ret1 = vnic_rq_disable(rq_data);
+
+	if (ret2)
+		return ret2;
+	else
+		return ret1;
 }
 
 int enic_alloc_rq(struct enic *enic, uint16_t queue_idx,
@@ -501,53 +557,141 @@ int enic_alloc_rq(struct enic *enic, uint16_t queue_idx,
 	uint16_t nb_desc)
 {
 	int rc;
-	struct vnic_rq *rq = &enic->rq[queue_idx];
+	uint16_t sop_queue_idx = enic_sop_rq(queue_idx);
+	uint16_t data_queue_idx = enic_data_rq(queue_idx);
+	struct vnic_rq *rq_sop = &enic->rq[sop_queue_idx];
+	struct vnic_rq *rq_data = &enic->rq[data_queue_idx];
+	unsigned int mbuf_size, mbufs_per_pkt;
+	unsigned int nb_sop_desc, nb_data_desc;
+	uint16_t min_sop, max_sop, min_data, max_data;
+
+	rq_sop->is_sop = 1;
+	rq_sop->data_queue_idx = data_queue_idx;
+	rq_data->is_sop = 0;
+	rq_data->data_queue_idx = 0;
+	rq_sop->socket_id = socket_id;
+	rq_sop->mp = mp;
+	rq_data->socket_id = socket_id;
+	rq_data->mp = mp;
+	rq_sop->in_use = 1;
+
+	mbuf_size = (uint16_t)(rte_pktmbuf_data_room_size(mp) -
+			       RTE_PKTMBUF_HEADROOM);
+
+	if (enic->rte_dev->data->dev_conf.rxmode.enable_scatter) {
+		dev_info(enic, "Scatter rx mode enabled\n");
+		/* ceil((mtu + ETHER_HDR_LEN + 4)/mbuf_size) */
+		mbufs_per_pkt = ((enic->config.mtu + ETHER_HDR_LEN + 4) +
+				 (mbuf_size - 1)) / mbuf_size;
+	} else {
+		dev_info(enic, "Scatter rx mode disabled\n");
+		mbufs_per_pkt = 1;
+	}
+
+	if (mbufs_per_pkt > 1) {
+		dev_info(enic, "Scatter rx mode in use\n");
+		rq_data->in_use = 1;
+	} else {
+		dev_info(enic, "Scatter rx mode not being used\n");
+		rq_data->in_use = 0;
+	}
 
-	rq->socket_id = socket_id;
-	rq->mp = mp;
+	/* number of descriptors have to be a multiple of 32 */
+	nb_sop_desc = (nb_desc / mbufs_per_pkt) & ~0x1F;
+	nb_data_desc = (nb_desc - nb_sop_desc) & ~0x1F;
+
+	rq_sop->max_mbufs_per_pkt = mbufs_per_pkt;
+	rq_data->max_mbufs_per_pkt = mbufs_per_pkt;
+
+	if (mbufs_per_pkt > 1) {
+		min_sop = 64;
+		max_sop = ((enic->config.rq_desc_count /
+			    (mbufs_per_pkt - 1)) & ~0x1F);
+		min_data = min_sop * (mbufs_per_pkt - 1);
+		max_data = enic->config.rq_desc_count;
+	} else {
+		min_sop = 64;
+		max_sop = enic->config.rq_desc_count;
+		min_data = 0;
+		max_data = 0;
+	}
 
-	if (nb_desc) {
-		if (nb_desc > enic->config.rq_desc_count) {
-			dev_warning(enic,
-				"RQ %d - number of rx desc in cmd line (%d)"\
-				"is greater than that in the UCSM/CIMC adapter"\
-				"policy.  Applying the value in the adapter "\
-				"policy (%d).\n",
-				queue_idx, nb_desc, enic->config.rq_desc_count);
-			nb_desc = enic->config.rq_desc_count;
-		}
-		dev_info(enic, "RX Queues - effective number of descs:%d\n",
-			 nb_desc);
+	if (nb_desc < (min_sop + min_data)) {
+		dev_warning(enic,
+			    "Number of rx descs too low, adjusting to minimum\n");
+		nb_sop_desc = min_sop;
+		nb_data_desc = min_data;
+	} else if (nb_desc > (max_sop + max_data)) {
+		dev_warning(enic,
+			    "Number of rx_descs too high, adjusting to maximum\n");
+		nb_sop_desc = max_sop;
+		nb_data_desc = max_data;
 	}
+	if (mbufs_per_pkt > 1) {
+		dev_info(enic, "For mtu %d and mbuf size %d valid rx descriptor range is %d to %d\n",
+			 enic->config.mtu, mbuf_size, min_sop + min_data,
+			 max_sop + max_data);
+	}
+	dev_info(enic, "Using %d rx descriptors (sop %d, data %d)\n",
+		 nb_sop_desc + nb_data_desc, nb_sop_desc, nb_data_desc);
 
-	/* Allocate queue resources */
-	rc = vnic_rq_alloc(enic->vdev, rq, queue_idx,
-		nb_desc, sizeof(struct rq_enet_desc));
+	/* Allocate sop queue resources */
+	rc = vnic_rq_alloc(enic->vdev, rq_sop, sop_queue_idx,
+		nb_sop_desc, sizeof(struct rq_enet_desc));
 	if (rc) {
-		dev_err(enic, "error in allocation of rq\n");
+		dev_err(enic, "error in allocation of sop rq\n");
 		goto err_exit;
 	}
-
+	nb_sop_desc = rq_sop->ring.desc_count;
+
+	if (rq_data->in_use) {
+		/* Allocate data queue resources */
+		rc = vnic_rq_alloc(enic->vdev, rq_data, data_queue_idx,
+				   nb_data_desc,
+				   sizeof(struct rq_enet_desc));
+		if (rc) {
+			dev_err(enic, "error in allocation of data rq\n");
+			goto err_free_rq_sop;
+		}
+		nb_data_desc = rq_data->ring.desc_count;
+	}
 	rc = vnic_cq_alloc(enic->vdev, &enic->cq[queue_idx], queue_idx,
-		socket_id, nb_desc,
-		sizeof(struct cq_enet_rq_desc));
+			   socket_id, nb_sop_desc + nb_data_desc,
+			   sizeof(struct cq_enet_rq_desc));
 	if (rc) {
 		dev_err(enic, "error in allocation of cq for rq\n");
-		goto err_free_rq_exit;
+		goto err_free_rq_data;
 	}
 
-	/* Allocate the mbuf ring */
-	rq->mbuf_ring = (struct rte_mbuf **)rte_zmalloc_socket("rq->mbuf_ring",
-			sizeof(struct rte_mbuf *) * nb_desc,
-			RTE_CACHE_LINE_SIZE, rq->socket_id);
+	/* Allocate the mbuf rings */
+	rq_sop->mbuf_ring = (struct rte_mbuf **)
+		rte_zmalloc_socket("rq->mbuf_ring",
+				   sizeof(struct rte_mbuf *) * nb_sop_desc,
+				   RTE_CACHE_LINE_SIZE, rq_sop->socket_id);
+	if (rq_sop->mbuf_ring == NULL)
+		goto err_free_cq;
+
+	if (rq_data->in_use) {
+		rq_data->mbuf_ring = (struct rte_mbuf **)
+			rte_zmalloc_socket("rq->mbuf_ring",
+				sizeof(struct rte_mbuf *) * nb_data_desc,
+				RTE_CACHE_LINE_SIZE, rq_sop->socket_id);
+		if (rq_data->mbuf_ring == NULL)
+			goto err_free_sop_mbuf;
+	}
 
-	if (rq->mbuf_ring != NULL)
-		return 0;
+	return 0;
 
+err_free_sop_mbuf:
+	rte_free(rq_sop->mbuf_ring);
+err_free_cq:
 	/* cleanup on error */
 	vnic_cq_free(&enic->cq[queue_idx]);
-err_free_rq_exit:
-	vnic_rq_free(rq);
+err_free_rq_data:
+	if (rq_data->in_use)
+		vnic_rq_free(rq_data);
+err_free_rq_sop:
+	vnic_rq_free(rq_sop);
 err_exit:
 	return -ENOMEM;
 }
@@ -645,10 +789,12 @@ int enic_disable(struct enic *enic)
 		if (err)
 			return err;
 	}
-	for (i = 0; i < enic->rq_count; i++) {
-		err = vnic_rq_disable(&enic->rq[i]);
-		if (err)
-			return err;
+	for (i = 0; i < enic_vnic_rq_count(enic); i++) {
+		if (enic->rq[i].in_use) {
+			err = vnic_rq_disable(&enic->rq[i]);
+			if (err)
+				return err;
+		}
 	}
 
 	vnic_dev_set_reset_flag(enic->vdev, 1);
@@ -657,8 +803,9 @@ int enic_disable(struct enic *enic)
 	for (i = 0; i < enic->wq_count; i++)
 		vnic_wq_clean(&enic->wq[i], enic_free_wq_buf);
 
-	for (i = 0; i < enic->rq_count; i++)
-		vnic_rq_clean(&enic->rq[i], enic_free_rq_buf);
+	for (i = 0; i < enic_vnic_rq_count(enic); i++)
+		if (enic->rq[i].in_use)
+			vnic_rq_clean(&enic->rq[i], enic_free_rq_buf);
 	for (i = 0; i < enic->cq_count; i++)
 		vnic_cq_clean(&enic->cq[i]);
 	vnic_intr_clean(&enic->intr);
@@ -863,9 +1010,13 @@ int enic_set_vnic_res(struct enic *enic)
 	struct rte_eth_dev *eth_dev = enic->rte_dev;
 	int rc = 0;
 
-	if (enic->rq_count < eth_dev->data->nb_rx_queues) {
-		dev_err(dev, "Not enough Receive queues. Requested:%u, Configured:%u\n",
-			eth_dev->data->nb_rx_queues, enic->rq_count);
+	/* With Rx scatter support, two RQs are now used per RQ used by
+	 * the application.
+	 */
+	if (enic->rq_count < (eth_dev->data->nb_rx_queues * 2)) {
+		dev_err(dev, "Not enough Receive queues. Requested:%u which uses %d RQs on VIC, Configured:%u\n",
+			eth_dev->data->nb_rx_queues,
+			eth_dev->data->nb_rx_queues * 2, enic->rq_count);
 		rc = -EINVAL;
 	}
 	if (enic->wq_count < eth_dev->data->nb_tx_queues) {
diff --git a/drivers/net/enic/enic_res.c b/drivers/net/enic/enic_res.c
index ebe379d..42edd84 100644
--- a/drivers/net/enic/enic_res.c
+++ b/drivers/net/enic/enic_res.c
@@ -196,8 +196,9 @@ void enic_free_vnic_resources(struct enic *enic)
 
 	for (i = 0; i < enic->wq_count; i++)
 		vnic_wq_free(&enic->wq[i]);
-	for (i = 0; i < enic->rq_count; i++)
-		vnic_rq_free(&enic->rq[i]);
+	for (i = 0; i < enic_vnic_rq_count(enic); i++)
+		if (enic->rq[i].in_use)
+			vnic_rq_free(&enic->rq[i]);
 	for (i = 0; i < enic->cq_count; i++)
 		vnic_cq_free(&enic->cq[i]);
 	vnic_intr_free(&enic->intr);
diff --git a/drivers/net/enic/enic_rxtx.c b/drivers/net/enic/enic_rxtx.c
index 972eae2..1b0ed74 100644
--- a/drivers/net/enic/enic_rxtx.c
+++ b/drivers/net/enic/enic_rxtx.c
@@ -228,27 +228,34 @@ uint16_t
 enic_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
 	       uint16_t nb_pkts)
 {
-	struct vnic_rq *rq = rx_queue;
-	struct enic *enic = vnic_dev_priv(rq->vdev);
-	unsigned int rx_id;
+	struct vnic_rq *sop_rq = rx_queue;
+	struct vnic_rq *data_rq;
+	struct vnic_rq *rq;
+	struct enic *enic = vnic_dev_priv(sop_rq->vdev);
+	uint16_t cq_idx;
+	uint16_t rq_idx;
+	uint16_t rq_num;
 	struct rte_mbuf *nmb, *rxmb;
-	uint16_t nb_rx = 0, nb_err = 0;
-	uint16_t nb_hold;
+	uint16_t nb_rx = 0;
 	struct vnic_cq *cq;
 	volatile struct cq_desc *cqd_ptr;
 	uint8_t color;
+	uint16_t seg_length;
+	struct rte_mbuf *first_seg = sop_rq->pkt_first_seg;
+	struct rte_mbuf *last_seg = sop_rq->pkt_last_seg;
 
-	cq = &enic->cq[enic_cq_rq(enic, rq->index)];
-	rx_id = cq->to_clean;		/* index of cqd, rqd, mbuf_table */
-	cqd_ptr = (struct cq_desc *)(cq->ring.descs) + rx_id;
+	cq = &enic->cq[enic_cq_rq(enic, sop_rq->index)];
+	cq_idx = cq->to_clean;		/* index of cqd, rqd, mbuf_table */
+	cqd_ptr = (struct cq_desc *)(cq->ring.descs) + cq_idx;
 
-	nb_hold = rq->rx_nb_hold;	/* mbufs held by software */
+	data_rq = &enic->rq[sop_rq->data_queue_idx];
 
 	while (nb_rx < nb_pkts) {
 		volatile struct rq_enet_desc *rqd_ptr;
 		dma_addr_t dma_addr;
 		struct cq_desc cqd;
 		uint8_t packet_error;
+		uint16_t ciflags;
 
 		/* Check for pkts available */
 		color = (cqd_ptr->type_color >> CQ_DESC_COLOR_SHIFT)
@@ -256,9 +263,13 @@ enic_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
 		if (color == cq->last_color)
 			break;
 
-		/* Get the cq descriptor and rq pointer */
+		/* Get the cq descriptor and extract rq info from it */
 		cqd = *cqd_ptr;
-		rqd_ptr = (struct rq_enet_desc *)(rq->ring.descs) + rx_id;
+		rq_num = cqd.q_number & CQ_DESC_Q_NUM_MASK;
+		rq_idx = cqd.completed_index & CQ_DESC_COMP_NDX_MASK;
+
+		rq = &enic->rq[rq_num];
+		rqd_ptr = ((struct rq_enet_desc *)rq->ring.descs) + rq_idx;
 
 		/* allocate a new mbuf */
 		nmb = rte_mbuf_raw_alloc(rq->mp);
@@ -271,67 +282,102 @@ enic_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
 		packet_error = enic_cq_rx_check_err(&cqd);
 
 		/* Get the mbuf to return and replace with one just allocated */
-		rxmb = rq->mbuf_ring[rx_id];
-		rq->mbuf_ring[rx_id] = nmb;
+		rxmb = rq->mbuf_ring[rq_idx];
+		rq->mbuf_ring[rq_idx] = nmb;
 
 		/* Increment cqd, rqd, mbuf_table index */
-		rx_id++;
-		if (unlikely(rx_id == rq->ring.desc_count)) {
-			rx_id = 0;
+		cq_idx++;
+		if (unlikely(cq_idx == cq->ring.desc_count)) {
+			cq_idx = 0;
 			cq->last_color = cq->last_color ? 0 : 1;
 		}
 
 		/* Prefetch next mbuf & desc while processing current one */
-		cqd_ptr = (struct cq_desc *)(cq->ring.descs) + rx_id;
+		cqd_ptr = (struct cq_desc *)(cq->ring.descs) + cq_idx;
 		rte_enic_prefetch(cqd_ptr);
-		rte_enic_prefetch(rq->mbuf_ring[rx_id]);
-		rte_enic_prefetch((struct rq_enet_desc *)(rq->ring.descs)
-				 + rx_id);
+
+		ciflags = enic_cq_rx_desc_ciflags(
+			(struct cq_enet_rq_desc *)&cqd);
 
 		/* Push descriptor for newly allocated mbuf */
-		dma_addr = (dma_addr_t)(nmb->buf_physaddr
-			   + RTE_PKTMBUF_HEADROOM);
-		rqd_ptr->address = rte_cpu_to_le_64(dma_addr);
-		rqd_ptr->length_type = cpu_to_le16(nmb->buf_len
-				       - RTE_PKTMBUF_HEADROOM);
+		dma_addr = (dma_addr_t)(nmb->buf_physaddr +
+					RTE_PKTMBUF_HEADROOM);
+		rq_enet_desc_enc(rqd_ptr, dma_addr,
+				(rq->is_sop ? RQ_ENET_TYPE_ONLY_SOP
+				: RQ_ENET_TYPE_NOT_SOP),
+				nmb->buf_len - RTE_PKTMBUF_HEADROOM);
 
-		/* Drop incoming bad packet */
-		if (unlikely(packet_error)) {
-			rte_pktmbuf_free(rxmb);
-			nb_err++;
-			continue;
+		/* Fill in the rest of the mbuf */
+		seg_length = enic_cq_rx_desc_n_bytes(&cqd);
+
+		if (rq->is_sop) {
+			first_seg = rxmb;
+			first_seg->nb_segs = 1;
+			first_seg->pkt_len = seg_length;
+		} else {
+			first_seg->pkt_len = (uint16_t)(first_seg->pkt_len
+							+ seg_length);
+			first_seg->nb_segs++;
+			last_seg->next = rxmb;
 		}
 
-		/* Fill in the rest of the mbuf */
-		rxmb->data_off = RTE_PKTMBUF_HEADROOM;
-		rxmb->nb_segs = 1;
 		rxmb->next = NULL;
 		rxmb->port = enic->port_id;
-		rxmb->pkt_len = enic_cq_rx_desc_n_bytes(&cqd);
-		rxmb->packet_type = enic_cq_rx_flags_to_pkt_type(&cqd);
-		enic_cq_rx_to_pkt_flags(&cqd, rxmb);
-		rxmb->data_len = rxmb->pkt_len;
+		rxmb->data_len = seg_length;
+
+		rq->rx_nb_hold++;
+
+		if (!(enic_cq_rx_desc_eop(ciflags))) {
+			last_seg = rxmb;
+			continue;
+		}
+
+		/* cq rx flags are only valid if eop bit is set */
+		first_seg->packet_type = enic_cq_rx_flags_to_pkt_type(&cqd);
+		enic_cq_rx_to_pkt_flags(&cqd, first_seg);
+
+		if (unlikely(packet_error)) {
+			rte_pktmbuf_free(first_seg);
+			rte_atomic64_inc(&enic->soft_stats.rx_packet_errors);
+			continue;
+		}
+
 
 		/* prefetch mbuf data for caller */
-		rte_packet_prefetch(RTE_PTR_ADD(rxmb->buf_addr,
+		rte_packet_prefetch(RTE_PTR_ADD(first_seg->buf_addr,
 				    RTE_PKTMBUF_HEADROOM));
 
 		/* store the mbuf address into the next entry of the array */
-		rx_pkts[nb_rx++] = rxmb;
+		rx_pkts[nb_rx++] = first_seg;
 	}
 
-	nb_hold += nb_rx + nb_err;
-	cq->to_clean = rx_id;
+	sop_rq->pkt_first_seg = first_seg;
+	sop_rq->pkt_last_seg = last_seg;
+
+	cq->to_clean = cq_idx;
+
+	if ((sop_rq->rx_nb_hold + data_rq->rx_nb_hold) >
+	    sop_rq->rx_free_thresh) {
+		if (data_rq->in_use) {
+			data_rq->posted_index =
+				enic_ring_add(data_rq->ring.desc_count,
+					      data_rq->posted_index,
+					      data_rq->rx_nb_hold);
+			data_rq->rx_nb_hold = 0;
+		}
+		sop_rq->posted_index = enic_ring_add(sop_rq->ring.desc_count,
+						     sop_rq->posted_index,
+						     sop_rq->rx_nb_hold);
+		sop_rq->rx_nb_hold = 0;
 
-	if (nb_hold > rq->rx_free_thresh) {
-		rq->posted_index = enic_ring_add(rq->ring.desc_count,
-				rq->posted_index, nb_hold);
-		nb_hold = 0;
 		rte_mb();
-		iowrite32(rq->posted_index, &rq->ctrl->posted_index);
+		if (data_rq->in_use)
+			iowrite32(data_rq->posted_index,
+				  &data_rq->ctrl->posted_index);
+		rte_compiler_barrier();
+		iowrite32(sop_rq->posted_index, &sop_rq->ctrl->posted_index);
 	}
 
-	rq->rx_nb_hold = nb_hold;
 
 	return nb_rx;
 }
-- 
2.7.0

^ permalink raw reply	[flat|nested] 3+ messages in thread

* Re: [dpdk-dev] [PATCH v2] enic: scattered Rx
  2016-06-16 19:19 ` [dpdk-dev] [PATCH v2] " Nelson Escobar
@ 2016-06-24 10:42   ` Bruce Richardson
  0 siblings, 0 replies; 3+ messages in thread
From: Bruce Richardson @ 2016-06-24 10:42 UTC (permalink / raw)
  To: Nelson Escobar; +Cc: dev, johndale

On Thu, Jun 16, 2016 at 12:19:05PM -0700, Nelson Escobar wrote:
> For performance reasons, this patch uses 2 VIC RQs per RQ presented to
> DPDK.
> 
> The VIC requires that each descriptor be marked as either a start of
> packet (SOP) descriptor or a non-SOP descriptor.  A one RQ solution
> requires skipping descriptors when receiving small packets and results
> in bad performance when receiving many small packets.
> 
> The 2 RQ solution makes use of the VIC feature that allows a receive
> on primary queue to 'spill over' into another queue if the receive is
> too large to fit in the buffer assigned to the descriptor on the
> primary queue.  This means that there is no skipping of descriptors
> when receiving small packets and results in much better performance.
> 
> Signed-off-by: Nelson Escobar <neescoba@cisco.com>
> Reviewed-by: John Daley <johndale@cisco.com>
> ---
> 
Applied to dpdk-next-net/rel_16_07

/Bruce

^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2016-06-24 10:43 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2016-06-14 23:55 [dpdk-dev] [PATCH] enic: scattered Rx Nelson Escobar
2016-06-16 19:19 ` [dpdk-dev] [PATCH v2] " Nelson Escobar
2016-06-24 10:42   ` Bruce Richardson

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).