DPDK patches and discussions
 help / color / mirror / Atom feed
* [dpdk-dev] [PATCH] net/mlx5: add queue start and stop feature
@ 2020-07-17 14:37 Viacheslav Ovsiienko
  2020-07-17 15:03 ` Thomas Monjalon
  2020-07-19 15:35 ` [dpdk-dev] [PATCH v2] " Viacheslav Ovsiienko
  0 siblings, 2 replies; 4+ messages in thread
From: Viacheslav Ovsiienko @ 2020-07-17 14:37 UTC (permalink / raw)
  To: dev; +Cc: matan, rasland, thomas

The mlx5 PMD does not support queue_start and queue_stop eth_dev API
routines, queue can't be suspended and resumed during device operation.

There is the use case when this feature is crucial for applications:

- there is the secondary process handling the queue
- secondary process crashed/aborted
- some mbufs were allocated or used by secondary application
- some mbufs were allocated by Rx queues to receive packets
- some mbufs were placed to send queue
- queue goes to undefined state

In this case there is no reliable way to recovery queue handling
by restarted secondary process but reset queue to initial state
freeing all involved resources, including buffers involved in queue
operations, reset the mbuf pools, and then reinitialize queue
to working state:

- reset mbuf pool, allocate all mbuf to initialize pool into
  safe state after the crush and allow safe mbuf free calls
- stop queue, free all potentially involved mbufs
- reset mbuf pool again
- start queue, reallocate mbufs needed

This patch introduces the queue start/stop feature with some
limitations:

- hairpin queues are not supported
- it is application responsibility to synchronize start/stop
  with datapath routines, rx/tx_burst must be suspended during
  the queue_start/queue_stop calls
- it is application responsibility to track queue usage and
  provide coordinated queue_start/queue_stop calls from
  secondary and primary processes.
- Rx queues with vectorized Rx routine and engaged CQE
  compression are not supported by this patch currently

Signed-off-by: Viacheslav Ovsiienko <viacheslavo@mellanox.com>
---
 drivers/common/mlx5/mlx5_common_mp.h |  10 ++
 drivers/net/mlx5/linux/mlx5_os.c     |  12 ++
 drivers/net/mlx5/mlx5.h              |   2 +
 drivers/net/mlx5/mlx5_mp.c           |  74 +++++++++-
 drivers/net/mlx5/mlx5_rxq.c          | 248 +++++++++++++++++++++++++++++++++
 drivers/net/mlx5/mlx5_rxtx.h         |   8 ++
 drivers/net/mlx5/mlx5_txq.c          | 261 +++++++++++++++++++++++++++++++++++
 7 files changed, 614 insertions(+), 1 deletion(-)

diff --git a/drivers/common/mlx5/mlx5_common_mp.h b/drivers/common/mlx5/mlx5_common_mp.h
index 05466fd..740aa36 100644
--- a/drivers/common/mlx5/mlx5_common_mp.h
+++ b/drivers/common/mlx5/mlx5_common_mp.h
@@ -26,6 +26,10 @@ enum mlx5_mp_req_type {
 	MLX5_MP_REQ_START_RXTX,
 	MLX5_MP_REQ_STOP_RXTX,
 	MLX5_MP_REQ_QUEUE_STATE_MODIFY,
+	MLX5_MP_REQ_QUEUE_RX_STOP,
+	MLX5_MP_REQ_QUEUE_RX_START,
+	MLX5_MP_REQ_QUEUE_TX_STOP,
+	MLX5_MP_REQ_QUEUE_TX_START,
 };
 
 struct mlx5_mp_arg_queue_state_modify {
@@ -34,6 +38,10 @@ struct mlx5_mp_arg_queue_state_modify {
 	enum ibv_wq_state state; /* WQ requested state. */
 };
 
+struct mlx5_mp_arg_queue_id {
+	uint16_t queue_id; /* DPDK queue ID. */
+};
+
 /* Pameters for IPC. */
 struct mlx5_mp_param {
 	enum mlx5_mp_req_type type;
@@ -44,6 +52,8 @@ struct mlx5_mp_param {
 		uintptr_t addr; /* MLX5_MP_REQ_CREATE_MR */
 		struct mlx5_mp_arg_queue_state_modify state_modify;
 		/* MLX5_MP_REQ_QUEUE_STATE_MODIFY */
+		struct mlx5_mp_arg_queue_id queue_id;
+		/* MLX5_MP_REQ_QUEUE_RX/TX_START/STOP */
 	} args;
 };
 
diff --git a/drivers/net/mlx5/linux/mlx5_os.c b/drivers/net/mlx5/linux/mlx5_os.c
index f228bab..bca1183 100644
--- a/drivers/net/mlx5/linux/mlx5_os.c
+++ b/drivers/net/mlx5/linux/mlx5_os.c
@@ -2378,6 +2378,10 @@
 	.tx_hairpin_queue_setup = mlx5_tx_hairpin_queue_setup,
 	.rx_queue_release = mlx5_rx_queue_release,
 	.tx_queue_release = mlx5_tx_queue_release,
+	.rx_queue_start = mlx5_rx_queue_start,
+	.rx_queue_stop = mlx5_rx_queue_stop,
+	.tx_queue_start = mlx5_tx_queue_start,
+	.tx_queue_stop = mlx5_tx_queue_stop,
 	.flow_ctrl_get = mlx5_dev_get_flow_ctrl,
 	.flow_ctrl_set = mlx5_dev_set_flow_ctrl,
 	.mac_addr_remove = mlx5_mac_addr_remove,
@@ -2419,6 +2423,10 @@
 	.fw_version_get = mlx5_fw_version_get,
 	.dev_infos_get = mlx5_dev_infos_get,
 	.read_clock = mlx5_txpp_read_clock,
+	.rx_queue_start = mlx5_rx_queue_start,
+	.rx_queue_stop = mlx5_rx_queue_stop,
+	.tx_queue_start = mlx5_tx_queue_start,
+	.tx_queue_stop = mlx5_tx_queue_stop,
 	.rx_descriptor_status = mlx5_rx_descriptor_status,
 	.tx_descriptor_status = mlx5_tx_descriptor_status,
 	.rxq_info_get = mlx5_rxq_info_get,
@@ -2458,6 +2466,10 @@
 	.tx_hairpin_queue_setup = mlx5_tx_hairpin_queue_setup,
 	.rx_queue_release = mlx5_rx_queue_release,
 	.tx_queue_release = mlx5_tx_queue_release,
+	.rx_queue_start = mlx5_rx_queue_start,
+	.rx_queue_stop = mlx5_rx_queue_stop,
+	.tx_queue_start = mlx5_tx_queue_start,
+	.tx_queue_stop = mlx5_tx_queue_stop,
 	.flow_ctrl_get = mlx5_dev_get_flow_ctrl,
 	.flow_ctrl_set = mlx5_dev_set_flow_ctrl,
 	.mac_addr_remove = mlx5_mac_addr_remove,
diff --git a/drivers/net/mlx5/mlx5.h b/drivers/net/mlx5/mlx5.h
index 2e61d0c..548c6e5 100644
--- a/drivers/net/mlx5/mlx5.h
+++ b/drivers/net/mlx5/mlx5.h
@@ -989,6 +989,8 @@ int mlx5_flow_get_aged_flows(struct rte_eth_dev *dev, void **contexts,
 int mlx5_mp_secondary_handle(const struct rte_mp_msg *mp_msg, const void *peer);
 void mlx5_mp_req_start_rxtx(struct rte_eth_dev *dev);
 void mlx5_mp_req_stop_rxtx(struct rte_eth_dev *dev);
+int mlx5_mp_req_queue_control(struct rte_eth_dev *dev, uint16_t queue_id,
+			      enum mlx5_mp_req_type req_type);
 
 /* mlx5_socket.c */
 
diff --git a/drivers/net/mlx5/mlx5_mp.c b/drivers/net/mlx5/mlx5_mp.c
index a2b5c40..e7e32e6 100644
--- a/drivers/net/mlx5/mlx5_mp.c
+++ b/drivers/net/mlx5/mlx5_mp.c
@@ -62,6 +62,30 @@
 					(dev, &param->args.state_modify);
 		ret = rte_mp_reply(&mp_res, peer);
 		break;
+	case MLX5_MP_REQ_QUEUE_RX_STOP:
+		mp_init_msg(&priv->mp_id, &mp_res, param->type);
+		res->result = mlx5_rx_queue_stop_primary
+					(dev, param->args.queue_id.queue_id);
+		ret = rte_mp_reply(&mp_res, peer);
+		break;
+	case MLX5_MP_REQ_QUEUE_RX_START:
+		mp_init_msg(&priv->mp_id, &mp_res, param->type);
+		res->result = mlx5_rx_queue_start_primary
+					(dev, param->args.queue_id.queue_id);
+		ret = rte_mp_reply(&mp_res, peer);
+		break;
+	case MLX5_MP_REQ_QUEUE_TX_STOP:
+		mp_init_msg(&priv->mp_id, &mp_res, param->type);
+		res->result = mlx5_tx_queue_stop_primary
+					(dev, param->args.queue_id.queue_id);
+		ret = rte_mp_reply(&mp_res, peer);
+		break;
+	case MLX5_MP_REQ_QUEUE_TX_START:
+		mp_init_msg(&priv->mp_id, &mp_res, param->type);
+		res->result = mlx5_tx_queue_start_primary
+					(dev, param->args.queue_id.queue_id);
+		ret = rte_mp_reply(&mp_res, peer);
+		break;
 	default:
 		rte_errno = EINVAL;
 		DRV_LOG(ERR, "port %u invalid mp request type",
@@ -85,7 +109,7 @@
 int
 mlx5_mp_secondary_handle(const struct rte_mp_msg *mp_msg, const void *peer)
 {
-	struct rte_mp_msg mp_res;
+struct rte_mp_msg mp_res;
 	struct mlx5_mp_param *res = (struct mlx5_mp_param *)mp_res.param;
 	const struct mlx5_mp_param *param =
 		(const struct mlx5_mp_param *)mp_msg->param;
@@ -209,3 +233,51 @@
 {
 	mp_req_on_rxtx(dev, MLX5_MP_REQ_STOP_RXTX);
 }
+
+/**
+ * Request Verbs Rx/Tx queue stop or start to the primary process.
+ *
+ * @param[in] dev
+ *   Pointer to Ethernet structure.
+ * @param queue_id
+ *   Queue ID to control.
+ * @param req_type
+ *   request type
+ *     MLX5_MP_REQ_QUEUE_RX_START - start Rx queue
+ *     MLX5_MP_REQ_QUEUE_TX_START - stop Tx queue
+ *     MLX5_MP_REQ_QUEUE_RX_STOP - stop Rx queue
+ *     MLX5_MP_REQ_QUEUE_TX_STOP - stop Tx queue
+ * @return
+ *   0 on success, a negative errno value otherwise and
+ *     rte_errno is set.
+ */
+int
+mlx5_mp_req_queue_control(struct rte_eth_dev *dev, uint16_t queue_id,
+			  enum mlx5_mp_req_type req_type)
+{
+	struct rte_mp_msg mp_req;
+	struct rte_mp_msg *mp_res;
+	struct rte_mp_reply mp_rep;
+	struct mlx5_mp_param *req = (struct mlx5_mp_param *)mp_req.param;
+	struct mlx5_mp_param *res;
+	struct timespec ts = {.tv_sec = MLX5_MP_REQ_TIMEOUT_SEC, .tv_nsec = 0};
+	struct mlx5_priv *priv;
+	int ret;
+
+	MLX5_ASSERT(rte_eal_process_type() == RTE_PROC_SECONDARY);
+	priv = dev->data->dev_private;
+	mp_init_msg(&priv->mp_id, &mp_req, req_type);
+	req->args.queue_id.queue_id = queue_id;
+	ret = rte_mp_request_sync(&mp_req, &mp_rep, &ts);
+	if (ret) {
+		DRV_LOG(ERR, "port %u request to primary process failed",
+			dev->data->port_id);
+		return -rte_errno;
+	}
+	MLX5_ASSERT(mp_rep.nb_received == 1);
+	mp_res = &mp_rep.msgs[0];
+	res = (struct mlx5_mp_param *)mp_res->param;
+	ret = res->result;
+	free(mp_rep.msgs);
+	return ret;
+}
diff --git a/drivers/net/mlx5/mlx5_rxq.c b/drivers/net/mlx5/mlx5_rxq.c
index 7dd06e8..0101663 100644
--- a/drivers/net/mlx5/mlx5_rxq.c
+++ b/drivers/net/mlx5/mlx5_rxq.c
@@ -440,6 +440,244 @@
 	return (rte_atomic32_read(&rxq_ctrl->refcnt) == 1);
 }
 
+/* Fetches and drops all SW-owned and error CQEs to synchronize CQ. */
+static void
+rxq_sync_cq(struct mlx5_rxq_data *rxq)
+{
+	const uint16_t cqe_n = 1 << rxq->cqe_n;
+	const uint16_t cqe_mask = cqe_n - 1;
+	volatile struct mlx5_cqe *cqe;
+	int ret, i;
+
+	i = cqe_n;
+	do {
+		cqe = &(*rxq->cqes)[rxq->cq_ci & cqe_mask];
+		ret = check_cqe(cqe, cqe_n, rxq->cq_ci);
+		if (ret == MLX5_CQE_STATUS_HW_OWN)
+			break;
+		if (ret == MLX5_CQE_STATUS_ERR) {
+			rxq->cq_ci++;
+			continue;
+		}
+		MLX5_ASSERT(ret == MLX5_CQE_STATUS_SW_OWN);
+		if (MLX5_CQE_FORMAT(cqe->op_own) != MLX5_COMPRESSED) {
+			rxq->cq_ci++;
+			continue;
+		}
+		/* Compute the next non compressed CQE. */
+		rxq->cq_ci += rte_be_to_cpu_32(cqe->byte_cnt);
+
+	} while (--i);
+	/* Move all CQEs to HW ownership, including possible MiniCQEs. */
+	for (i = 0; i < cqe_n; i++) {
+		cqe = &(*rxq->cqes)[i];
+		cqe->op_own = MLX5_CQE_INVALIDATE;
+	}
+	/* Resync CQE and WQE (WQ in RESET state). */
+	rte_cio_wmb();
+	*rxq->cq_db = rte_cpu_to_be_32(rxq->cq_ci);
+	rte_cio_wmb();
+	*rxq->rq_db = rte_cpu_to_be_32(0);
+	rte_cio_wmb();
+}
+
+/**
+ * Rx queue stop. Device queue goes to the RESET state,
+ * all involved mbufs are freed from WQ.
+ *
+ * @param dev
+ *   Pointer to Ethernet device structure.
+ * @param idx
+ *   RX queue index.
+ *
+ * @return
+ *   0 on success, a negative errno value otherwise and rte_errno is set.
+ */
+int
+mlx5_rx_queue_stop_primary(struct rte_eth_dev *dev, uint16_t idx)
+{
+	struct mlx5_priv *priv = dev->data->dev_private;
+	struct mlx5_rxq_data *rxq = (*priv->rxqs)[idx];
+	struct mlx5_rxq_ctrl *rxq_ctrl =
+			container_of(rxq, struct mlx5_rxq_ctrl, rxq);
+	int ret;
+
+	MLX5_ASSERT(rte_eal_process_type() == RTE_PROC_PRIMARY);
+	if (rxq_ctrl->obj->type == MLX5_RXQ_OBJ_TYPE_IBV) {
+		struct ibv_wq_attr mod = {
+			.attr_mask = IBV_WQ_ATTR_STATE,
+			.wq_state = IBV_WQS_RESET,
+		};
+
+		ret = mlx5_glue->modify_wq(rxq_ctrl->obj->wq, &mod);
+	} else { /* rxq_ctrl->obj->type == MLX5_RXQ_OBJ_TYPE_DEVX_RQ. */
+		struct mlx5_devx_modify_rq_attr rq_attr;
+
+		memset(&rq_attr, 0, sizeof(rq_attr));
+		rq_attr.rq_state = MLX5_RQC_STATE_RST;
+		rq_attr.state = MLX5_RQC_STATE_RDY;
+		ret = mlx5_devx_cmd_modify_rq(rxq_ctrl->obj->rq, &rq_attr);
+	}
+	if (ret) {
+		DRV_LOG(ERR, "Cannot change Rx WQ state to RESET:  %s",
+			strerror(errno));
+		rte_errno = errno;
+		return ret;
+	}
+	/* Remove all processes CQEs. */
+	rxq_sync_cq(rxq);
+	/* Free all involved mbufs. */
+	rxq_free_elts(rxq_ctrl);
+	/* Set the actual queue state. */
+	dev->data->rx_queue_state[idx] = RTE_ETH_QUEUE_STATE_STOPPED;
+	return 0;
+}
+
+/**
+ * Rx queue stop. Device queue goes to the RESET state,
+ * all involved mbufs are freed from WQ.
+ *
+ * @param dev
+ *   Pointer to Ethernet device structure.
+ * @param idx
+ *   RX queue index.
+ *
+ * @return
+ *   0 on success, a negative errno value otherwise and rte_errno is set.
+ */
+int
+mlx5_rx_queue_stop(struct rte_eth_dev *dev, uint16_t idx)
+{
+	eth_rx_burst_t pkt_burst = dev->rx_pkt_burst;
+	int ret;
+
+	if (dev->data->rx_queue_state[idx] == RTE_ETH_QUEUE_STATE_HAIRPIN) {
+		DRV_LOG(ERR, "Hairpin queue can't be stopped");
+		rte_errno = EINVAL;
+		return -EINVAL;
+	}
+	if (dev->data->rx_queue_state[idx] == RTE_ETH_QUEUE_STATE_STOPPED)
+		return 0;
+	/*
+	 * Vectorized Rx burst requires the CQ and RQ indices
+	 * synchronized, that might be broken on RQ restart
+	 * and cause Rx malfunction, so queue stopping is
+	 * not supported if vectorized Rx burst is engaged.
+	 * The routine pointer depends on the process
+	 * type, should perform check there.
+	 */
+	if (pkt_burst == mlx5_rx_burst) {
+		DRV_LOG(ERR, "Rx queue stop is not supported "
+			"for vectorized Rx");
+		rte_errno = EINVAL;
+		return -EINVAL;
+	}
+	if (rte_eal_process_type() ==  RTE_PROC_SECONDARY) {
+		ret = mlx5_mp_req_queue_control(dev, idx,
+						MLX5_MP_REQ_QUEUE_RX_STOP);
+	} else {
+		ret = mlx5_rx_queue_stop_primary(dev, idx);
+	}
+	return ret;
+}
+
+/**
+ * Rx queue start. Device queue goes to the ready state,
+ * all required mbufs are allocated and WQ is replenished.
+ *
+ * @param dev
+ *   Pointer to Ethernet device structure.
+ * @param idx
+ *   RX queue index.
+ *
+ * @return
+ *   0 on success, a negative errno value otherwise and rte_errno is set.
+ */
+int
+mlx5_rx_queue_start_primary(struct rte_eth_dev *dev, uint16_t idx)
+{
+	struct mlx5_priv *priv = dev->data->dev_private;
+	struct mlx5_rxq_data *rxq = (*priv->rxqs)[idx];
+	struct mlx5_rxq_ctrl *rxq_ctrl =
+			container_of(rxq, struct mlx5_rxq_ctrl, rxq);
+	int ret;
+
+	MLX5_ASSERT(rte_eal_process_type() ==  RTE_PROC_PRIMARY);
+	/* Allocate needed buffers. */
+	ret = rxq_alloc_elts(rxq_ctrl);
+	if (ret) {
+		DRV_LOG(ERR, "Cannot reallocate buffers for Rx WQ");
+		rte_errno = errno;
+		return ret;
+	}
+	rte_cio_wmb();
+	*rxq->cq_db = rte_cpu_to_be_32(rxq->cq_ci);
+	rte_cio_wmb();
+	/* Reset RQ consumer before moving queue ro READY state. */
+	*rxq->rq_db = rte_cpu_to_be_32(0);
+	rte_cio_wmb();
+	if (rxq_ctrl->obj->type == MLX5_RXQ_OBJ_TYPE_IBV) {
+		struct ibv_wq_attr mod = {
+			.attr_mask = IBV_WQ_ATTR_STATE,
+			.wq_state = IBV_WQS_RDY,
+		};
+
+		ret = mlx5_glue->modify_wq(rxq_ctrl->obj->wq, &mod);
+	} else { /* rxq_ctrl->obj->type == MLX5_RXQ_OBJ_TYPE_DEVX_RQ. */
+		struct mlx5_devx_modify_rq_attr rq_attr;
+
+		memset(&rq_attr, 0, sizeof(rq_attr));
+		rq_attr.rq_state = MLX5_RQC_STATE_RDY;
+		rq_attr.state = MLX5_RQC_STATE_RST;
+		ret = mlx5_devx_cmd_modify_rq(rxq_ctrl->obj->rq, &rq_attr);
+	}
+	if (ret) {
+		DRV_LOG(ERR, "Cannot change Rx WQ state to READY:  %s",
+			strerror(errno));
+		rte_errno = errno;
+		return ret;
+	}
+	/* Reinitialize RQ - set WQEs. */
+	mlx5_rxq_initialize(rxq);
+	rxq->err_state = MLX5_RXQ_ERR_STATE_NO_ERROR;
+	/* Set actual queue state. */
+	dev->data->rx_queue_state[idx] = RTE_ETH_QUEUE_STATE_STARTED;
+	return 0;
+}
+
+/**
+ * Rx queue start. Device queue goes to the ready state,
+ * all required mbufs are allocated and WQ is replenished.
+ *
+ * @param dev
+ *   Pointer to Ethernet device structure.
+ * @param idx
+ *   RX queue index.
+ *
+ * @return
+ *   0 on success, a negative errno value otherwise and rte_errno is set.
+ */
+int
+mlx5_rx_queue_start(struct rte_eth_dev *dev, uint16_t idx)
+{
+	int ret;
+
+	if (dev->data->rx_queue_state[idx] == RTE_ETH_QUEUE_STATE_HAIRPIN) {
+		DRV_LOG(ERR, "Hairpin queue can't be started");
+		rte_errno = EINVAL;
+		return -EINVAL;
+	}
+	if (dev->data->rx_queue_state[idx] == RTE_ETH_QUEUE_STATE_STARTED)
+		return 0;
+	if (rte_eal_process_type() ==  RTE_PROC_SECONDARY) {
+		ret = mlx5_mp_req_queue_control(dev, idx,
+						MLX5_MP_REQ_QUEUE_RX_START);
+	} else {
+		ret = mlx5_rx_queue_start_primary(dev, idx);
+	}
+	return ret;
+}
+
 /**
  * Rx queue presetup checks.
  *
@@ -679,6 +917,9 @@
 static int
 mlx5_rxq_obj_release(struct mlx5_rxq_obj *rxq_obj)
 {
+	struct rte_eth_dev_data *dev_data;
+	uint16_t idx;
+
 	MLX5_ASSERT(rxq_obj);
 	if (rte_atomic32_dec_and_test(&rxq_obj->refcnt)) {
 		switch (rxq_obj->type) {
@@ -705,6 +946,11 @@
 		if (rxq_obj->channel)
 			claim_zero(mlx5_glue->destroy_comp_channel
 				   (rxq_obj->channel));
+		idx = rxq_obj->rxq_ctrl->rxq.idx;
+		dev_data = rxq_obj->rxq_ctrl->priv->dev_data;
+		if (rxq_obj->type != MLX5_RXQ_OBJ_TYPE_DEVX_HAIRPIN)
+			dev_data->rx_queue_state[idx] =
+				RTE_ETH_QUEUE_STATE_STOPPED;
 		LIST_REMOVE(rxq_obj, next);
 		rte_free(rxq_obj);
 		return 0;
@@ -1320,6 +1566,7 @@
 	rte_atomic32_inc(&tmpl->refcnt);
 	LIST_INSERT_HEAD(&priv->rxqsobj, tmpl, next);
 	priv->verbs_alloc_ctx.type = MLX5_VERBS_ALLOC_TYPE_NONE;
+	dev->data->rx_queue_state[idx] = RTE_ETH_QUEUE_STATE_HAIRPIN;
 	return tmpl;
 }
 
@@ -1487,6 +1734,7 @@ struct mlx5_rxq_obj *
 	rte_atomic32_inc(&tmpl->refcnt);
 	LIST_INSERT_HEAD(&priv->rxqsobj, tmpl, next);
 	priv->verbs_alloc_ctx.type = MLX5_VERBS_ALLOC_TYPE_NONE;
+	dev->data->rx_queue_state[idx] = RTE_ETH_QUEUE_STATE_STARTED;
 	return tmpl;
 error:
 	if (tmpl) {
diff --git a/drivers/net/mlx5/mlx5_rxtx.h b/drivers/net/mlx5/mlx5_rxtx.h
index 5116a15..95ff771 100644
--- a/drivers/net/mlx5/mlx5_rxtx.h
+++ b/drivers/net/mlx5/mlx5_rxtx.h
@@ -402,6 +402,10 @@ struct mlx5_txq_ctrl {
 int mlx5_mprq_enabled(struct rte_eth_dev *dev);
 int mlx5_mprq_free_mp(struct rte_eth_dev *dev);
 int mlx5_mprq_alloc_mp(struct rte_eth_dev *dev);
+int mlx5_rx_queue_start(struct rte_eth_dev *dev, uint16_t queue_id);
+int mlx5_rx_queue_stop(struct rte_eth_dev *dev, uint16_t queue_id);
+int mlx5_rx_queue_start_primary(struct rte_eth_dev *dev, uint16_t queue_id);
+int mlx5_rx_queue_stop_primary(struct rte_eth_dev *dev, uint16_t queue_id);
 int mlx5_rx_queue_setup(struct rte_eth_dev *dev, uint16_t idx, uint16_t desc,
 			unsigned int socket, const struct rte_eth_rxconf *conf,
 			struct rte_mempool *mp);
@@ -449,6 +453,10 @@ uint32_t mlx5_hrxq_get(struct rte_eth_dev *dev,
 
 /* mlx5_txq.c */
 
+int mlx5_tx_queue_start(struct rte_eth_dev *dev, uint16_t queue_id);
+int mlx5_tx_queue_stop(struct rte_eth_dev *dev, uint16_t queue_id);
+int mlx5_tx_queue_start_primary(struct rte_eth_dev *dev, uint16_t queue_id);
+int mlx5_tx_queue_stop_primary(struct rte_eth_dev *dev, uint16_t queue_id);
 int mlx5_tx_queue_setup(struct rte_eth_dev *dev, uint16_t idx, uint16_t desc,
 			unsigned int socket, const struct rte_eth_txconf *conf);
 int mlx5_tx_hairpin_queue_setup
diff --git a/drivers/net/mlx5/mlx5_txq.c b/drivers/net/mlx5/mlx5_txq.c
index 4ab6ac1..84cae21 100644
--- a/drivers/net/mlx5/mlx5_txq.c
+++ b/drivers/net/mlx5/mlx5_txq.c
@@ -139,6 +139,264 @@
 	return offloads;
 }
 
+/* Fetches and drops all SW-owned and error CQEs to synchronize CQ. */
+static void
+txq_sync_cq(struct mlx5_txq_data *txq)
+{
+	volatile struct mlx5_cqe *cqe;
+	int ret, i;
+
+	i = txq->cqe_s;
+	do {
+		cqe = &txq->cqes[txq->cq_ci & txq->cqe_m];
+		ret = check_cqe(cqe, txq->cqe_s, txq->cq_ci);
+		if (unlikely(ret != MLX5_CQE_STATUS_SW_OWN)) {
+			if (likely(ret != MLX5_CQE_STATUS_ERR)) {
+				/* No new CQEs in completion queue. */
+				MLX5_ASSERT(ret == MLX5_CQE_STATUS_HW_OWN);
+				break;
+			}
+		}
+		++txq->cq_ci;
+	} while (--i);
+	/* Move all CQEs to HW ownership. */
+	for (i = 0; i < txq->cqe_s; i++) {
+		cqe = &txq->cqes[i];
+		cqe->op_own = MLX5_CQE_INVALIDATE;
+	}
+	/* Resync CQE and WQE (WQ in reset state). */
+	rte_cio_wmb();
+	*txq->cq_db = rte_cpu_to_be_32(txq->cq_ci);
+	rte_cio_wmb();
+}
+
+/**
+ * Tx queue stop. Device queue goes to the idle state,
+ * all involved mbufs are freed from elts/WQ.
+ *
+ * @param dev
+ *   Pointer to Ethernet device structure.
+ * @param idx
+ *   Tx queue index.
+ *
+ * @return
+ *   0 on success, a negative errno value otherwise and rte_errno is set.
+ */
+int
+mlx5_tx_queue_stop_primary(struct rte_eth_dev *dev, uint16_t idx)
+{
+	struct mlx5_priv *priv = dev->data->dev_private;
+	struct mlx5_txq_data *txq = (*priv->txqs)[idx];
+	struct mlx5_txq_ctrl *txq_ctrl =
+			container_of(txq, struct mlx5_txq_ctrl, txq);
+	int ret;
+
+	MLX5_ASSERT(rte_eal_process_type() == RTE_PROC_PRIMARY);
+	/* Move QP to RESET state. */
+	if (txq_ctrl->obj->type == MLX5_TXQ_OBJ_TYPE_DEVX_SQ) {
+		struct mlx5_devx_modify_sq_attr msq_attr = { 0 };
+
+		/* Change queue state to reset with DevX. */
+		msq_attr.sq_state = MLX5_SQC_STATE_RDY;
+		msq_attr.state = MLX5_SQC_STATE_RST;
+		ret = mlx5_devx_cmd_modify_sq(txq_ctrl->obj->sq_devx,
+					      &msq_attr);
+		if (ret) {
+			DRV_LOG(ERR, "Cannot change the "
+				"Tx QP state to RESET %s",
+				strerror(errno));
+			rte_errno = errno;
+			return ret;
+		}
+	} else {
+		struct ibv_qp_attr mod = {
+			.qp_state = IBV_QPS_RESET,
+			.port_num = (uint8_t)priv->dev_port,
+		};
+		struct ibv_qp *qp = txq_ctrl->obj->qp;
+
+		/* Change queue state to reset with Verbs. */
+		ret = mlx5_glue->modify_qp(qp, &mod, IBV_QP_STATE);
+		if (ret) {
+			DRV_LOG(ERR, "Cannot change the Tx QP state to RESET "
+				"%s", strerror(errno));
+			rte_errno = errno;
+			return ret;
+		}
+	}
+	/* Handle all send completions. */
+	txq_sync_cq(txq);
+	/* Free elts stored in the SQ. */
+	txq_free_elts(txq_ctrl);
+	/* Prevent writing new pkts to SQ by setting no free WQE.*/
+	txq->wqe_ci = txq->wqe_s;
+	txq->wqe_pi = 0;
+	txq->elts_comp = 0;
+	/* Set the actual queue state. */
+	dev->data->tx_queue_state[idx] = RTE_ETH_QUEUE_STATE_STOPPED;
+	return 0;
+}
+
+/**
+ * Tx queue stop. Device queue goes to the idle state,
+ * all involved mbufs are freed from elts/WQ.
+ *
+ * @param dev
+ *   Pointer to Ethernet device structure.
+ * @param idx
+ *   Tx queue index.
+ *
+ * @return
+ *   0 on success, a negative errno value otherwise and rte_errno is set.
+ */
+int
+mlx5_tx_queue_stop(struct rte_eth_dev *dev, uint16_t idx)
+{
+	int ret;
+
+	if (dev->data->tx_queue_state[idx] == RTE_ETH_QUEUE_STATE_HAIRPIN) {
+		DRV_LOG(ERR, "Hairpin queue can't be stopped");
+		rte_errno = EINVAL;
+		return -EINVAL;
+	}
+	if (dev->data->tx_queue_state[idx] == RTE_ETH_QUEUE_STATE_STOPPED)
+		return 0;
+	if (rte_eal_process_type() ==  RTE_PROC_SECONDARY) {
+		ret = mlx5_mp_req_queue_control(dev, idx,
+						MLX5_MP_REQ_QUEUE_TX_STOP);
+	} else {
+		ret = mlx5_tx_queue_stop_primary(dev, idx);
+	}
+	return ret;
+}
+
+/**
+ * Rx queue start. Device queue goes to the ready state,
+ * all required mbufs are allocated and WQ is replenished.
+ *
+ * @param dev
+ *   Pointer to Ethernet device structure.
+ * @param idx
+ *   RX queue index.
+ *
+ * @return
+ *   0 on success, a negative errno value otherwise and rte_errno is set.
+ */
+int
+mlx5_tx_queue_start_primary(struct rte_eth_dev *dev, uint16_t idx)
+{
+	struct mlx5_priv *priv = dev->data->dev_private;
+	struct mlx5_txq_data *txq = (*priv->txqs)[idx];
+	struct mlx5_txq_ctrl *txq_ctrl =
+			container_of(txq, struct mlx5_txq_ctrl, txq);
+	int ret;
+
+	MLX5_ASSERT(rte_eal_process_type() ==  RTE_PROC_PRIMARY);
+	if (txq_ctrl->obj->type == MLX5_TXQ_OBJ_TYPE_DEVX_SQ) {
+		struct mlx5_devx_modify_sq_attr msq_attr = { 0 };
+		struct mlx5_txq_obj *obj = txq_ctrl->obj;
+
+		msq_attr.sq_state = MLX5_SQC_STATE_RDY;
+		msq_attr.state = MLX5_SQC_STATE_RST;
+		ret = mlx5_devx_cmd_modify_sq(obj->sq_devx, &msq_attr);
+		if (ret) {
+			rte_errno = errno;
+			DRV_LOG(ERR,
+				"Cannot change the Tx QP state to RESET "
+				"%s", strerror(errno));
+			return ret;
+		}
+		msq_attr.sq_state = MLX5_SQC_STATE_RST;
+		msq_attr.state = MLX5_SQC_STATE_RDY;
+		ret = mlx5_devx_cmd_modify_sq(obj->sq_devx, &msq_attr);
+		if (ret) {
+			rte_errno = errno;
+			DRV_LOG(ERR,
+				"Cannot change the Tx QP state to READY "
+				"%s", strerror(errno));
+			return ret;
+		}
+	} else {
+		struct ibv_qp_attr mod = {
+			.qp_state = IBV_QPS_RESET,
+			.port_num = (uint8_t)priv->dev_port,
+		};
+		struct ibv_qp *qp = txq_ctrl->obj->qp;
+
+		ret = mlx5_glue->modify_qp(qp, &mod, IBV_QP_STATE);
+		if (ret) {
+			DRV_LOG(ERR, "Cannot change the Tx QP state to RESET "
+				"%s", strerror(errno));
+			rte_errno = errno;
+			return ret;
+		}
+		mod.qp_state = IBV_QPS_INIT;
+		ret = mlx5_glue->modify_qp(qp, &mod,
+					   (IBV_QP_STATE | IBV_QP_PORT));
+		if (ret) {
+			DRV_LOG(ERR, "Cannot change Tx QP state to INIT %s",
+				strerror(errno));
+			rte_errno = errno;
+			return ret;
+		}
+		mod.qp_state = IBV_QPS_RTR;
+		ret = mlx5_glue->modify_qp(qp, &mod, IBV_QP_STATE);
+		if (ret) {
+			DRV_LOG(ERR, "Cannot change Tx QP state to RTR %s",
+				strerror(errno));
+			rte_errno = errno;
+			return ret;
+		}
+		mod.qp_state = IBV_QPS_RTS;
+		ret = mlx5_glue->modify_qp(qp, &mod, IBV_QP_STATE);
+		if (ret) {
+			DRV_LOG(ERR, "Cannot change Tx QP state to RTS %s",
+				strerror(errno));
+			rte_errno = errno;
+			return ret;
+		}
+	}
+	txq_ctrl->txq.wqe_ci = 0;
+	txq_ctrl->txq.wqe_pi = 0;
+	txq_ctrl->txq.elts_comp = 0;
+	/* Set the actual queue state. */
+	dev->data->tx_queue_state[idx] = RTE_ETH_QUEUE_STATE_STARTED;
+	return 0;
+}
+
+/**
+ * Rx queue start. Device queue goes to the ready state,
+ * all required mbufs are allocated and WQ is replenished.
+ *
+ * @param dev
+ *   Pointer to Ethernet device structure.
+ * @param idx
+ *   RX queue index.
+ *
+ * @return
+ *   0 on success, a negative errno value otherwise and rte_errno is set.
+ */
+int
+mlx5_tx_queue_start(struct rte_eth_dev *dev, uint16_t idx)
+{
+	int ret;
+
+	if (dev->data->tx_queue_state[idx] == RTE_ETH_QUEUE_STATE_HAIRPIN) {
+		DRV_LOG(ERR, "Hairpin queue can't be started");
+		rte_errno = EINVAL;
+		return -EINVAL;
+	}
+	if (dev->data->tx_queue_state[idx] == RTE_ETH_QUEUE_STATE_STARTED)
+		return 0;
+	if (rte_eal_process_type() ==  RTE_PROC_SECONDARY) {
+		ret = mlx5_mp_req_queue_control(dev, idx,
+						MLX5_MP_REQ_QUEUE_TX_START);
+	} else {
+		ret = mlx5_tx_queue_start_primary(dev, idx);
+	}
+	return ret;
+}
+
 /**
  * Tx queue presetup checks.
  *
@@ -228,6 +486,7 @@
 	DRV_LOG(DEBUG, "port %u adding Tx queue %u to list",
 		dev->data->port_id, idx);
 	(*priv->txqs)[idx] = &txq_ctrl->txq;
+	dev->data->tx_queue_state[idx] = RTE_ETH_QUEUE_STATE_STARTED;
 	return 0;
 }
 
@@ -278,6 +537,7 @@
 	DRV_LOG(DEBUG, "port %u adding Tx queue %u to list",
 		dev->data->port_id, idx);
 	(*priv->txqs)[idx] = &txq_ctrl->txq;
+	dev->data->tx_queue_state[idx] = RTE_ETH_QUEUE_STATE_HAIRPIN;
 	return 0;
 }
 
@@ -1736,6 +1996,7 @@ struct mlx5_txq_ctrl *
 		LIST_REMOVE(txq, next);
 		rte_free(txq);
 		(*priv->txqs)[idx] = NULL;
+		dev->data->tx_queue_state[idx] = RTE_ETH_QUEUE_STATE_STOPPED;
 		return 0;
 	}
 	return 1;
-- 
1.8.3.1


^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [dpdk-dev] [PATCH] net/mlx5: add queue start and stop feature
  2020-07-17 14:37 [dpdk-dev] [PATCH] net/mlx5: add queue start and stop feature Viacheslav Ovsiienko
@ 2020-07-17 15:03 ` Thomas Monjalon
  2020-07-19 15:35 ` [dpdk-dev] [PATCH v2] " Viacheslav Ovsiienko
  1 sibling, 0 replies; 4+ messages in thread
From: Thomas Monjalon @ 2020-07-17 15:03 UTC (permalink / raw)
  To: Viacheslav Ovsiienko; +Cc: dev, matan, rasland

17/07/2020 16:37, Viacheslav Ovsiienko:
> The mlx5 PMD does not support queue_start and queue_stop eth_dev API
> routines, queue can't be suspended and resumed during device operation.

Please use past tense to describe the status before the patch,
and present for the following explanations.

> 
> There is the use case when this feature is crucial for applications:






^ permalink raw reply	[flat|nested] 4+ messages in thread

* [dpdk-dev] [PATCH v2] net/mlx5: add queue start and stop feature
  2020-07-17 14:37 [dpdk-dev] [PATCH] net/mlx5: add queue start and stop feature Viacheslav Ovsiienko
  2020-07-17 15:03 ` Thomas Monjalon
@ 2020-07-19 15:35 ` Viacheslav Ovsiienko
  2020-07-20  7:04   ` Raslan Darawsheh
  1 sibling, 1 reply; 4+ messages in thread
From: Viacheslav Ovsiienko @ 2020-07-19 15:35 UTC (permalink / raw)
  To: dev; +Cc: matan, rasland, thomas

The mlx5 PMD did not support queue_start and queue_stop eth_dev API
routines, queue could not be suspended and resumed during device
operation.

There is the use case when this feature is crucial for applications:

- there is the secondary process handling the queue
- secondary process crashed/aborted
- some mbufs were allocated or used by secondary application
- some mbufs were allocated by Rx queues to receive packets
- some mbufs were placed to send queue
- queue goes to undefined state

In this case there was no reliable way to recovery queue handling
by restarted secondary process but reset queue to initial state
freeing all involved resources, including buffers involved in queue
operations, reset the mbuf pools, and then reinitialize queue
to working state:

- reset mbuf pool, allocate all mbuf to initialize pool into
  safe state after the crush and allow safe mbuf free calls
- stop queue, free all potentially involved mbufs
- reset mbuf pool again
- start queue, reallocate mbufs needed

This patch introduces the queue start/stop feature with some
limitations:

- hairpin queues are not supported
- it is application responsibility to synchronize start/stop
  with datapath routines, rx/tx_burst must be suspended during
  the queue_start/queue_stop calls
- it is application responsibility to track queue usage and
  provide coordinated queue_start/queue_stop calls from
  secondary and primary processes.
- Rx queues with vectorized Rx routine and engaged CQE
  compression are not supported by this patch currently

Signed-off-by: Viacheslav Ovsiienko <viacheslavo@mellanox.com>
---

v2: - rebase
    - address comments for commit messages


 drivers/common/mlx5/mlx5_common_mp.h |  10 ++
 drivers/net/mlx5/linux/mlx5_mp_os.c  |  74 +++++++++-
 drivers/net/mlx5/linux/mlx5_os.c     |  12 ++
 drivers/net/mlx5/mlx5.h              |   3 +
 drivers/net/mlx5/mlx5_rxq.c          | 240 ++++++++++++++++++++++++++++++++
 drivers/net/mlx5/mlx5_rxtx.h         |   8 ++
 drivers/net/mlx5/mlx5_txq.c          | 261 +++++++++++++++++++++++++++++++++++
 7 files changed, 607 insertions(+), 1 deletion(-)

diff --git a/drivers/common/mlx5/mlx5_common_mp.h b/drivers/common/mlx5/mlx5_common_mp.h
index 64260c0..6829141 100644
--- a/drivers/common/mlx5/mlx5_common_mp.h
+++ b/drivers/common/mlx5/mlx5_common_mp.h
@@ -17,6 +17,10 @@ enum mlx5_mp_req_type {
 	MLX5_MP_REQ_START_RXTX,
 	MLX5_MP_REQ_STOP_RXTX,
 	MLX5_MP_REQ_QUEUE_STATE_MODIFY,
+	MLX5_MP_REQ_QUEUE_RX_STOP,
+	MLX5_MP_REQ_QUEUE_RX_START,
+	MLX5_MP_REQ_QUEUE_TX_STOP,
+	MLX5_MP_REQ_QUEUE_TX_START,
 };
 
 struct mlx5_mp_arg_queue_state_modify {
@@ -25,6 +29,10 @@ struct mlx5_mp_arg_queue_state_modify {
 	enum ibv_wq_state state; /* WQ requested state. */
 };
 
+struct mlx5_mp_arg_queue_id {
+	uint16_t queue_id; /* DPDK queue ID. */
+};
+
 /* Pameters for IPC. */
 struct mlx5_mp_param {
 	enum mlx5_mp_req_type type;
@@ -35,6 +43,8 @@ struct mlx5_mp_param {
 		uintptr_t addr; /* MLX5_MP_REQ_CREATE_MR */
 		struct mlx5_mp_arg_queue_state_modify state_modify;
 		/* MLX5_MP_REQ_QUEUE_STATE_MODIFY */
+		struct mlx5_mp_arg_queue_id queue_id;
+		/* MLX5_MP_REQ_QUEUE_RX/TX_START/STOP */
 	} args;
 };
 
diff --git a/drivers/net/mlx5/linux/mlx5_mp_os.c b/drivers/net/mlx5/linux/mlx5_mp_os.c
index dd9a2c2..08ade75 100644
--- a/drivers/net/mlx5/linux/mlx5_mp_os.c
+++ b/drivers/net/mlx5/linux/mlx5_mp_os.c
@@ -63,6 +63,30 @@
 					(dev, &param->args.state_modify);
 		ret = rte_mp_reply(&mp_res, peer);
 		break;
+	case MLX5_MP_REQ_QUEUE_RX_STOP:
+		mp_init_msg(&priv->mp_id, &mp_res, param->type);
+		res->result = mlx5_rx_queue_stop_primary
+					(dev, param->args.queue_id.queue_id);
+		ret = rte_mp_reply(&mp_res, peer);
+		break;
+	case MLX5_MP_REQ_QUEUE_RX_START:
+		mp_init_msg(&priv->mp_id, &mp_res, param->type);
+		res->result = mlx5_rx_queue_start_primary
+					(dev, param->args.queue_id.queue_id);
+		ret = rte_mp_reply(&mp_res, peer);
+		break;
+	case MLX5_MP_REQ_QUEUE_TX_STOP:
+		mp_init_msg(&priv->mp_id, &mp_res, param->type);
+		res->result = mlx5_tx_queue_stop_primary
+					(dev, param->args.queue_id.queue_id);
+		ret = rte_mp_reply(&mp_res, peer);
+		break;
+	case MLX5_MP_REQ_QUEUE_TX_START:
+		mp_init_msg(&priv->mp_id, &mp_res, param->type);
+		res->result = mlx5_tx_queue_start_primary
+					(dev, param->args.queue_id.queue_id);
+		ret = rte_mp_reply(&mp_res, peer);
+		break;
 	default:
 		rte_errno = EINVAL;
 		DRV_LOG(ERR, "port %u invalid mp request type",
@@ -86,7 +110,7 @@
 int
 mlx5_mp_os_secondary_handle(const struct rte_mp_msg *mp_msg, const void *peer)
 {
-	struct rte_mp_msg mp_res;
+struct rte_mp_msg mp_res;
 	struct mlx5_mp_param *res = (struct mlx5_mp_param *)mp_res.param;
 	const struct mlx5_mp_param *param =
 		(const struct mlx5_mp_param *)mp_msg->param;
@@ -210,3 +234,51 @@
 {
 	mp_req_on_rxtx(dev, MLX5_MP_REQ_STOP_RXTX);
 }
+
+/**
+ * Request Verbs Rx/Tx queue stop or start to the primary process.
+ *
+ * @param[in] dev
+ *   Pointer to Ethernet structure.
+ * @param queue_id
+ *   Queue ID to control.
+ * @param req_type
+ *   request type
+ *     MLX5_MP_REQ_QUEUE_RX_START - start Rx queue
+ *     MLX5_MP_REQ_QUEUE_TX_START - stop Tx queue
+ *     MLX5_MP_REQ_QUEUE_RX_STOP - stop Rx queue
+ *     MLX5_MP_REQ_QUEUE_TX_STOP - stop Tx queue
+ * @return
+ *   0 on success, a negative errno value otherwise and
+ *     rte_errno is set.
+ */
+int
+mlx5_mp_os_req_queue_control(struct rte_eth_dev *dev, uint16_t queue_id,
+			  enum mlx5_mp_req_type req_type)
+{
+	struct rte_mp_msg mp_req;
+	struct rte_mp_msg *mp_res;
+	struct rte_mp_reply mp_rep;
+	struct mlx5_mp_param *req = (struct mlx5_mp_param *)mp_req.param;
+	struct mlx5_mp_param *res;
+	struct timespec ts = {.tv_sec = MLX5_MP_REQ_TIMEOUT_SEC, .tv_nsec = 0};
+	struct mlx5_priv *priv;
+	int ret;
+
+	MLX5_ASSERT(rte_eal_process_type() == RTE_PROC_SECONDARY);
+	priv = dev->data->dev_private;
+	mp_init_msg(&priv->mp_id, &mp_req, req_type);
+	req->args.queue_id.queue_id = queue_id;
+	ret = rte_mp_request_sync(&mp_req, &mp_rep, &ts);
+	if (ret) {
+		DRV_LOG(ERR, "port %u request to primary process failed",
+			dev->data->port_id);
+		return -rte_errno;
+	}
+	MLX5_ASSERT(mp_rep.nb_received == 1);
+	mp_res = &mp_rep.msgs[0];
+	res = (struct mlx5_mp_param *)mp_res->param;
+	ret = res->result;
+	free(mp_rep.msgs);
+	return ret;
+}
diff --git a/drivers/net/mlx5/linux/mlx5_os.c b/drivers/net/mlx5/linux/mlx5_os.c
index d945911..b6d6926 100644
--- a/drivers/net/mlx5/linux/mlx5_os.c
+++ b/drivers/net/mlx5/linux/mlx5_os.c
@@ -2337,6 +2337,10 @@
 	.tx_hairpin_queue_setup = mlx5_tx_hairpin_queue_setup,
 	.rx_queue_release = mlx5_rx_queue_release,
 	.tx_queue_release = mlx5_tx_queue_release,
+	.rx_queue_start = mlx5_rx_queue_start,
+	.rx_queue_stop = mlx5_rx_queue_stop,
+	.tx_queue_start = mlx5_tx_queue_start,
+	.tx_queue_stop = mlx5_tx_queue_stop,
 	.flow_ctrl_get = mlx5_dev_get_flow_ctrl,
 	.flow_ctrl_set = mlx5_dev_set_flow_ctrl,
 	.mac_addr_remove = mlx5_mac_addr_remove,
@@ -2378,6 +2382,10 @@
 	.fw_version_get = mlx5_fw_version_get,
 	.dev_infos_get = mlx5_dev_infos_get,
 	.read_clock = mlx5_txpp_read_clock,
+	.rx_queue_start = mlx5_rx_queue_start,
+	.rx_queue_stop = mlx5_rx_queue_stop,
+	.tx_queue_start = mlx5_tx_queue_start,
+	.tx_queue_stop = mlx5_tx_queue_stop,
 	.rx_descriptor_status = mlx5_rx_descriptor_status,
 	.tx_descriptor_status = mlx5_tx_descriptor_status,
 	.rxq_info_get = mlx5_rxq_info_get,
@@ -2417,6 +2425,10 @@
 	.tx_hairpin_queue_setup = mlx5_tx_hairpin_queue_setup,
 	.rx_queue_release = mlx5_rx_queue_release,
 	.tx_queue_release = mlx5_tx_queue_release,
+	.rx_queue_start = mlx5_rx_queue_start,
+	.rx_queue_stop = mlx5_rx_queue_stop,
+	.tx_queue_start = mlx5_tx_queue_start,
+	.tx_queue_stop = mlx5_tx_queue_stop,
 	.flow_ctrl_get = mlx5_dev_get_flow_ctrl,
 	.flow_ctrl_set = mlx5_dev_set_flow_ctrl,
 	.mac_addr_remove = mlx5_mac_addr_remove,
diff --git a/drivers/net/mlx5/mlx5.h b/drivers/net/mlx5/mlx5.h
index 4d24cc7..5d7d609 100644
--- a/drivers/net/mlx5/mlx5.h
+++ b/drivers/net/mlx5/mlx5.h
@@ -969,12 +969,15 @@ int mlx5_flow_get_aged_flows(struct rte_eth_dev *dev, void **contexts,
 			uint32_t nb_contexts, struct rte_flow_error *error);
 
 /* mlx5_mp_os.c */
+
 int mlx5_mp_os_primary_handle(const struct rte_mp_msg *mp_msg,
 			      const void *peer);
 int mlx5_mp_os_secondary_handle(const struct rte_mp_msg *mp_msg,
 				const void *peer);
 void mlx5_mp_os_req_start_rxtx(struct rte_eth_dev *dev);
 void mlx5_mp_os_req_stop_rxtx(struct rte_eth_dev *dev);
+int mlx5_mp_os_req_queue_control(struct rte_eth_dev *dev, uint16_t queue_id,
+				 enum mlx5_mp_req_type req_type);
 
 /* mlx5_socket.c */
 
diff --git a/drivers/net/mlx5/mlx5_rxq.c b/drivers/net/mlx5/mlx5_rxq.c
index bccdc5e..e6dc5ac 100644
--- a/drivers/net/mlx5/mlx5_rxq.c
+++ b/drivers/net/mlx5/mlx5_rxq.c
@@ -431,6 +431,244 @@
 	return (rte_atomic32_read(&rxq_ctrl->refcnt) == 1);
 }
 
+/* Fetches and drops all SW-owned and error CQEs to synchronize CQ. */
+static void
+rxq_sync_cq(struct mlx5_rxq_data *rxq)
+{
+	const uint16_t cqe_n = 1 << rxq->cqe_n;
+	const uint16_t cqe_mask = cqe_n - 1;
+	volatile struct mlx5_cqe *cqe;
+	int ret, i;
+
+	i = cqe_n;
+	do {
+		cqe = &(*rxq->cqes)[rxq->cq_ci & cqe_mask];
+		ret = check_cqe(cqe, cqe_n, rxq->cq_ci);
+		if (ret == MLX5_CQE_STATUS_HW_OWN)
+			break;
+		if (ret == MLX5_CQE_STATUS_ERR) {
+			rxq->cq_ci++;
+			continue;
+		}
+		MLX5_ASSERT(ret == MLX5_CQE_STATUS_SW_OWN);
+		if (MLX5_CQE_FORMAT(cqe->op_own) != MLX5_COMPRESSED) {
+			rxq->cq_ci++;
+			continue;
+		}
+		/* Compute the next non compressed CQE. */
+		rxq->cq_ci += rte_be_to_cpu_32(cqe->byte_cnt);
+
+	} while (--i);
+	/* Move all CQEs to HW ownership, including possible MiniCQEs. */
+	for (i = 0; i < cqe_n; i++) {
+		cqe = &(*rxq->cqes)[i];
+		cqe->op_own = MLX5_CQE_INVALIDATE;
+	}
+	/* Resync CQE and WQE (WQ in RESET state). */
+	rte_cio_wmb();
+	*rxq->cq_db = rte_cpu_to_be_32(rxq->cq_ci);
+	rte_cio_wmb();
+	*rxq->rq_db = rte_cpu_to_be_32(0);
+	rte_cio_wmb();
+}
+
+/**
+ * Rx queue stop. Device queue goes to the RESET state,
+ * all involved mbufs are freed from WQ.
+ *
+ * @param dev
+ *   Pointer to Ethernet device structure.
+ * @param idx
+ *   RX queue index.
+ *
+ * @return
+ *   0 on success, a negative errno value otherwise and rte_errno is set.
+ */
+int
+mlx5_rx_queue_stop_primary(struct rte_eth_dev *dev, uint16_t idx)
+{
+	struct mlx5_priv *priv = dev->data->dev_private;
+	struct mlx5_rxq_data *rxq = (*priv->rxqs)[idx];
+	struct mlx5_rxq_ctrl *rxq_ctrl =
+			container_of(rxq, struct mlx5_rxq_ctrl, rxq);
+	int ret;
+
+	MLX5_ASSERT(rte_eal_process_type() == RTE_PROC_PRIMARY);
+	if (rxq_ctrl->obj->type == MLX5_RXQ_OBJ_TYPE_IBV) {
+		struct ibv_wq_attr mod = {
+			.attr_mask = IBV_WQ_ATTR_STATE,
+			.wq_state = IBV_WQS_RESET,
+		};
+
+		ret = mlx5_glue->modify_wq(rxq_ctrl->obj->wq, &mod);
+	} else { /* rxq_ctrl->obj->type == MLX5_RXQ_OBJ_TYPE_DEVX_RQ. */
+		struct mlx5_devx_modify_rq_attr rq_attr;
+
+		memset(&rq_attr, 0, sizeof(rq_attr));
+		rq_attr.rq_state = MLX5_RQC_STATE_RST;
+		rq_attr.state = MLX5_RQC_STATE_RDY;
+		ret = mlx5_devx_cmd_modify_rq(rxq_ctrl->obj->rq, &rq_attr);
+	}
+	if (ret) {
+		DRV_LOG(ERR, "Cannot change Rx WQ state to RESET:  %s",
+			strerror(errno));
+		rte_errno = errno;
+		return ret;
+	}
+	/* Remove all processes CQEs. */
+	rxq_sync_cq(rxq);
+	/* Free all involved mbufs. */
+	rxq_free_elts(rxq_ctrl);
+	/* Set the actual queue state. */
+	dev->data->rx_queue_state[idx] = RTE_ETH_QUEUE_STATE_STOPPED;
+	return 0;
+}
+
+/**
+ * Rx queue stop. Device queue goes to the RESET state,
+ * all involved mbufs are freed from WQ.
+ *
+ * @param dev
+ *   Pointer to Ethernet device structure.
+ * @param idx
+ *   RX queue index.
+ *
+ * @return
+ *   0 on success, a negative errno value otherwise and rte_errno is set.
+ */
+int
+mlx5_rx_queue_stop(struct rte_eth_dev *dev, uint16_t idx)
+{
+	eth_rx_burst_t pkt_burst = dev->rx_pkt_burst;
+	int ret;
+
+	if (dev->data->rx_queue_state[idx] == RTE_ETH_QUEUE_STATE_HAIRPIN) {
+		DRV_LOG(ERR, "Hairpin queue can't be stopped");
+		rte_errno = EINVAL;
+		return -EINVAL;
+	}
+	if (dev->data->rx_queue_state[idx] == RTE_ETH_QUEUE_STATE_STOPPED)
+		return 0;
+	/*
+	 * Vectorized Rx burst requires the CQ and RQ indices
+	 * synchronized, that might be broken on RQ restart
+	 * and cause Rx malfunction, so queue stopping is
+	 * not supported if vectorized Rx burst is engaged.
+	 * The routine pointer depends on the process
+	 * type, should perform check there.
+	 */
+	if (pkt_burst == mlx5_rx_burst) {
+		DRV_LOG(ERR, "Rx queue stop is not supported "
+			"for vectorized Rx");
+		rte_errno = EINVAL;
+		return -EINVAL;
+	}
+	if (rte_eal_process_type() ==  RTE_PROC_SECONDARY) {
+		ret = mlx5_mp_os_req_queue_control(dev, idx,
+						   MLX5_MP_REQ_QUEUE_RX_STOP);
+	} else {
+		ret = mlx5_rx_queue_stop_primary(dev, idx);
+	}
+	return ret;
+}
+
+/**
+ * Rx queue start. Device queue goes to the ready state,
+ * all required mbufs are allocated and WQ is replenished.
+ *
+ * @param dev
+ *   Pointer to Ethernet device structure.
+ * @param idx
+ *   RX queue index.
+ *
+ * @return
+ *   0 on success, a negative errno value otherwise and rte_errno is set.
+ */
+int
+mlx5_rx_queue_start_primary(struct rte_eth_dev *dev, uint16_t idx)
+{
+	struct mlx5_priv *priv = dev->data->dev_private;
+	struct mlx5_rxq_data *rxq = (*priv->rxqs)[idx];
+	struct mlx5_rxq_ctrl *rxq_ctrl =
+			container_of(rxq, struct mlx5_rxq_ctrl, rxq);
+	int ret;
+
+	MLX5_ASSERT(rte_eal_process_type() ==  RTE_PROC_PRIMARY);
+	/* Allocate needed buffers. */
+	ret = rxq_alloc_elts(rxq_ctrl);
+	if (ret) {
+		DRV_LOG(ERR, "Cannot reallocate buffers for Rx WQ");
+		rte_errno = errno;
+		return ret;
+	}
+	rte_cio_wmb();
+	*rxq->cq_db = rte_cpu_to_be_32(rxq->cq_ci);
+	rte_cio_wmb();
+	/* Reset RQ consumer before moving queue ro READY state. */
+	*rxq->rq_db = rte_cpu_to_be_32(0);
+	rte_cio_wmb();
+	if (rxq_ctrl->obj->type == MLX5_RXQ_OBJ_TYPE_IBV) {
+		struct ibv_wq_attr mod = {
+			.attr_mask = IBV_WQ_ATTR_STATE,
+			.wq_state = IBV_WQS_RDY,
+		};
+
+		ret = mlx5_glue->modify_wq(rxq_ctrl->obj->wq, &mod);
+	} else { /* rxq_ctrl->obj->type == MLX5_RXQ_OBJ_TYPE_DEVX_RQ. */
+		struct mlx5_devx_modify_rq_attr rq_attr;
+
+		memset(&rq_attr, 0, sizeof(rq_attr));
+		rq_attr.rq_state = MLX5_RQC_STATE_RDY;
+		rq_attr.state = MLX5_RQC_STATE_RST;
+		ret = mlx5_devx_cmd_modify_rq(rxq_ctrl->obj->rq, &rq_attr);
+	}
+	if (ret) {
+		DRV_LOG(ERR, "Cannot change Rx WQ state to READY:  %s",
+			strerror(errno));
+		rte_errno = errno;
+		return ret;
+	}
+	/* Reinitialize RQ - set WQEs. */
+	mlx5_rxq_initialize(rxq);
+	rxq->err_state = MLX5_RXQ_ERR_STATE_NO_ERROR;
+	/* Set actual queue state. */
+	dev->data->rx_queue_state[idx] = RTE_ETH_QUEUE_STATE_STARTED;
+	return 0;
+}
+
+/**
+ * Rx queue start. Device queue goes to the ready state,
+ * all required mbufs are allocated and WQ is replenished.
+ *
+ * @param dev
+ *   Pointer to Ethernet device structure.
+ * @param idx
+ *   RX queue index.
+ *
+ * @return
+ *   0 on success, a negative errno value otherwise and rte_errno is set.
+ */
+int
+mlx5_rx_queue_start(struct rte_eth_dev *dev, uint16_t idx)
+{
+	int ret;
+
+	if (dev->data->rx_queue_state[idx] == RTE_ETH_QUEUE_STATE_HAIRPIN) {
+		DRV_LOG(ERR, "Hairpin queue can't be started");
+		rte_errno = EINVAL;
+		return -EINVAL;
+	}
+	if (dev->data->rx_queue_state[idx] == RTE_ETH_QUEUE_STATE_STARTED)
+		return 0;
+	if (rte_eal_process_type() ==  RTE_PROC_SECONDARY) {
+		ret = mlx5_mp_os_req_queue_control(dev, idx,
+						   MLX5_MP_REQ_QUEUE_RX_START);
+	} else {
+		ret = mlx5_rx_queue_start_primary(dev, idx);
+	}
+	return ret;
+}
+
 /**
  * Rx queue presetup checks.
  *
@@ -1479,6 +1717,7 @@
 	rte_atomic32_inc(&tmpl->refcnt);
 	LIST_INSERT_HEAD(&priv->rxqsobj, tmpl, next);
 	priv->verbs_alloc_ctx.type = MLX5_VERBS_ALLOC_TYPE_NONE;
+	dev->data->rx_queue_state[idx] = RTE_ETH_QUEUE_STATE_HAIRPIN;
 	return tmpl;
 }
 
@@ -1690,6 +1929,7 @@ struct mlx5_rxq_obj *
 	rte_atomic32_inc(&tmpl->refcnt);
 	LIST_INSERT_HEAD(&priv->rxqsobj, tmpl, next);
 	priv->verbs_alloc_ctx.type = MLX5_VERBS_ALLOC_TYPE_NONE;
+	dev->data->rx_queue_state[idx] = RTE_ETH_QUEUE_STATE_STARTED;
 	return tmpl;
 error:
 	if (tmpl) {
diff --git a/drivers/net/mlx5/mlx5_rxtx.h b/drivers/net/mlx5/mlx5_rxtx.h
index 02c00f6..c02a007 100644
--- a/drivers/net/mlx5/mlx5_rxtx.h
+++ b/drivers/net/mlx5/mlx5_rxtx.h
@@ -404,6 +404,10 @@ struct mlx5_txq_ctrl {
 int mlx5_mprq_enabled(struct rte_eth_dev *dev);
 int mlx5_mprq_free_mp(struct rte_eth_dev *dev);
 int mlx5_mprq_alloc_mp(struct rte_eth_dev *dev);
+int mlx5_rx_queue_start(struct rte_eth_dev *dev, uint16_t queue_id);
+int mlx5_rx_queue_stop(struct rte_eth_dev *dev, uint16_t queue_id);
+int mlx5_rx_queue_start_primary(struct rte_eth_dev *dev, uint16_t queue_id);
+int mlx5_rx_queue_stop_primary(struct rte_eth_dev *dev, uint16_t queue_id);
 int mlx5_rx_queue_setup(struct rte_eth_dev *dev, uint16_t idx, uint16_t desc,
 			unsigned int socket, const struct rte_eth_rxconf *conf,
 			struct rte_mempool *mp);
@@ -451,6 +455,10 @@ uint32_t mlx5_hrxq_get(struct rte_eth_dev *dev,
 
 /* mlx5_txq.c */
 
+int mlx5_tx_queue_start(struct rte_eth_dev *dev, uint16_t queue_id);
+int mlx5_tx_queue_stop(struct rte_eth_dev *dev, uint16_t queue_id);
+int mlx5_tx_queue_start_primary(struct rte_eth_dev *dev, uint16_t queue_id);
+int mlx5_tx_queue_stop_primary(struct rte_eth_dev *dev, uint16_t queue_id);
 int mlx5_tx_queue_setup(struct rte_eth_dev *dev, uint16_t idx, uint16_t desc,
 			unsigned int socket, const struct rte_eth_txconf *conf);
 int mlx5_tx_hairpin_queue_setup
diff --git a/drivers/net/mlx5/mlx5_txq.c b/drivers/net/mlx5/mlx5_txq.c
index 7da5c70..21fe16b 100644
--- a/drivers/net/mlx5/mlx5_txq.c
+++ b/drivers/net/mlx5/mlx5_txq.c
@@ -129,6 +129,264 @@
 	return offloads;
 }
 
+/* Fetches and drops all SW-owned and error CQEs to synchronize CQ. */
+static void
+txq_sync_cq(struct mlx5_txq_data *txq)
+{
+	volatile struct mlx5_cqe *cqe;
+	int ret, i;
+
+	i = txq->cqe_s;
+	do {
+		cqe = &txq->cqes[txq->cq_ci & txq->cqe_m];
+		ret = check_cqe(cqe, txq->cqe_s, txq->cq_ci);
+		if (unlikely(ret != MLX5_CQE_STATUS_SW_OWN)) {
+			if (likely(ret != MLX5_CQE_STATUS_ERR)) {
+				/* No new CQEs in completion queue. */
+				MLX5_ASSERT(ret == MLX5_CQE_STATUS_HW_OWN);
+				break;
+			}
+		}
+		++txq->cq_ci;
+	} while (--i);
+	/* Move all CQEs to HW ownership. */
+	for (i = 0; i < txq->cqe_s; i++) {
+		cqe = &txq->cqes[i];
+		cqe->op_own = MLX5_CQE_INVALIDATE;
+	}
+	/* Resync CQE and WQE (WQ in reset state). */
+	rte_cio_wmb();
+	*txq->cq_db = rte_cpu_to_be_32(txq->cq_ci);
+	rte_cio_wmb();
+}
+
+/**
+ * Tx queue stop. Device queue goes to the idle state,
+ * all involved mbufs are freed from elts/WQ.
+ *
+ * @param dev
+ *   Pointer to Ethernet device structure.
+ * @param idx
+ *   Tx queue index.
+ *
+ * @return
+ *   0 on success, a negative errno value otherwise and rte_errno is set.
+ */
+int
+mlx5_tx_queue_stop_primary(struct rte_eth_dev *dev, uint16_t idx)
+{
+	struct mlx5_priv *priv = dev->data->dev_private;
+	struct mlx5_txq_data *txq = (*priv->txqs)[idx];
+	struct mlx5_txq_ctrl *txq_ctrl =
+			container_of(txq, struct mlx5_txq_ctrl, txq);
+	int ret;
+
+	MLX5_ASSERT(rte_eal_process_type() == RTE_PROC_PRIMARY);
+	/* Move QP to RESET state. */
+	if (txq_ctrl->obj->type == MLX5_TXQ_OBJ_TYPE_DEVX_SQ) {
+		struct mlx5_devx_modify_sq_attr msq_attr = { 0 };
+
+		/* Change queue state to reset with DevX. */
+		msq_attr.sq_state = MLX5_SQC_STATE_RDY;
+		msq_attr.state = MLX5_SQC_STATE_RST;
+		ret = mlx5_devx_cmd_modify_sq(txq_ctrl->obj->sq_devx,
+					      &msq_attr);
+		if (ret) {
+			DRV_LOG(ERR, "Cannot change the "
+				"Tx QP state to RESET %s",
+				strerror(errno));
+			rte_errno = errno;
+			return ret;
+		}
+	} else {
+		struct ibv_qp_attr mod = {
+			.qp_state = IBV_QPS_RESET,
+			.port_num = (uint8_t)priv->dev_port,
+		};
+		struct ibv_qp *qp = txq_ctrl->obj->qp;
+
+		/* Change queue state to reset with Verbs. */
+		ret = mlx5_glue->modify_qp(qp, &mod, IBV_QP_STATE);
+		if (ret) {
+			DRV_LOG(ERR, "Cannot change the Tx QP state to RESET "
+				"%s", strerror(errno));
+			rte_errno = errno;
+			return ret;
+		}
+	}
+	/* Handle all send completions. */
+	txq_sync_cq(txq);
+	/* Free elts stored in the SQ. */
+	txq_free_elts(txq_ctrl);
+	/* Prevent writing new pkts to SQ by setting no free WQE.*/
+	txq->wqe_ci = txq->wqe_s;
+	txq->wqe_pi = 0;
+	txq->elts_comp = 0;
+	/* Set the actual queue state. */
+	dev->data->tx_queue_state[idx] = RTE_ETH_QUEUE_STATE_STOPPED;
+	return 0;
+}
+
+/**
+ * Tx queue stop. Device queue goes to the idle state,
+ * all involved mbufs are freed from elts/WQ.
+ *
+ * @param dev
+ *   Pointer to Ethernet device structure.
+ * @param idx
+ *   Tx queue index.
+ *
+ * @return
+ *   0 on success, a negative errno value otherwise and rte_errno is set.
+ */
+int
+mlx5_tx_queue_stop(struct rte_eth_dev *dev, uint16_t idx)
+{
+	int ret;
+
+	if (dev->data->tx_queue_state[idx] == RTE_ETH_QUEUE_STATE_HAIRPIN) {
+		DRV_LOG(ERR, "Hairpin queue can't be stopped");
+		rte_errno = EINVAL;
+		return -EINVAL;
+	}
+	if (dev->data->tx_queue_state[idx] == RTE_ETH_QUEUE_STATE_STOPPED)
+		return 0;
+	if (rte_eal_process_type() ==  RTE_PROC_SECONDARY) {
+		ret = mlx5_mp_os_req_queue_control(dev, idx,
+						   MLX5_MP_REQ_QUEUE_TX_STOP);
+	} else {
+		ret = mlx5_tx_queue_stop_primary(dev, idx);
+	}
+	return ret;
+}
+
+/**
+ * Rx queue start. Device queue goes to the ready state,
+ * all required mbufs are allocated and WQ is replenished.
+ *
+ * @param dev
+ *   Pointer to Ethernet device structure.
+ * @param idx
+ *   RX queue index.
+ *
+ * @return
+ *   0 on success, a negative errno value otherwise and rte_errno is set.
+ */
+int
+mlx5_tx_queue_start_primary(struct rte_eth_dev *dev, uint16_t idx)
+{
+	struct mlx5_priv *priv = dev->data->dev_private;
+	struct mlx5_txq_data *txq = (*priv->txqs)[idx];
+	struct mlx5_txq_ctrl *txq_ctrl =
+			container_of(txq, struct mlx5_txq_ctrl, txq);
+	int ret;
+
+	MLX5_ASSERT(rte_eal_process_type() ==  RTE_PROC_PRIMARY);
+	if (txq_ctrl->obj->type == MLX5_TXQ_OBJ_TYPE_DEVX_SQ) {
+		struct mlx5_devx_modify_sq_attr msq_attr = { 0 };
+		struct mlx5_txq_obj *obj = txq_ctrl->obj;
+
+		msq_attr.sq_state = MLX5_SQC_STATE_RDY;
+		msq_attr.state = MLX5_SQC_STATE_RST;
+		ret = mlx5_devx_cmd_modify_sq(obj->sq_devx, &msq_attr);
+		if (ret) {
+			rte_errno = errno;
+			DRV_LOG(ERR,
+				"Cannot change the Tx QP state to RESET "
+				"%s", strerror(errno));
+			return ret;
+		}
+		msq_attr.sq_state = MLX5_SQC_STATE_RST;
+		msq_attr.state = MLX5_SQC_STATE_RDY;
+		ret = mlx5_devx_cmd_modify_sq(obj->sq_devx, &msq_attr);
+		if (ret) {
+			rte_errno = errno;
+			DRV_LOG(ERR,
+				"Cannot change the Tx QP state to READY "
+				"%s", strerror(errno));
+			return ret;
+		}
+	} else {
+		struct ibv_qp_attr mod = {
+			.qp_state = IBV_QPS_RESET,
+			.port_num = (uint8_t)priv->dev_port,
+		};
+		struct ibv_qp *qp = txq_ctrl->obj->qp;
+
+		ret = mlx5_glue->modify_qp(qp, &mod, IBV_QP_STATE);
+		if (ret) {
+			DRV_LOG(ERR, "Cannot change the Tx QP state to RESET "
+				"%s", strerror(errno));
+			rte_errno = errno;
+			return ret;
+		}
+		mod.qp_state = IBV_QPS_INIT;
+		ret = mlx5_glue->modify_qp(qp, &mod,
+					   (IBV_QP_STATE | IBV_QP_PORT));
+		if (ret) {
+			DRV_LOG(ERR, "Cannot change Tx QP state to INIT %s",
+				strerror(errno));
+			rte_errno = errno;
+			return ret;
+		}
+		mod.qp_state = IBV_QPS_RTR;
+		ret = mlx5_glue->modify_qp(qp, &mod, IBV_QP_STATE);
+		if (ret) {
+			DRV_LOG(ERR, "Cannot change Tx QP state to RTR %s",
+				strerror(errno));
+			rte_errno = errno;
+			return ret;
+		}
+		mod.qp_state = IBV_QPS_RTS;
+		ret = mlx5_glue->modify_qp(qp, &mod, IBV_QP_STATE);
+		if (ret) {
+			DRV_LOG(ERR, "Cannot change Tx QP state to RTS %s",
+				strerror(errno));
+			rte_errno = errno;
+			return ret;
+		}
+	}
+	txq_ctrl->txq.wqe_ci = 0;
+	txq_ctrl->txq.wqe_pi = 0;
+	txq_ctrl->txq.elts_comp = 0;
+	/* Set the actual queue state. */
+	dev->data->tx_queue_state[idx] = RTE_ETH_QUEUE_STATE_STARTED;
+	return 0;
+}
+
+/**
+ * Rx queue start. Device queue goes to the ready state,
+ * all required mbufs are allocated and WQ is replenished.
+ *
+ * @param dev
+ *   Pointer to Ethernet device structure.
+ * @param idx
+ *   RX queue index.
+ *
+ * @return
+ *   0 on success, a negative errno value otherwise and rte_errno is set.
+ */
+int
+mlx5_tx_queue_start(struct rte_eth_dev *dev, uint16_t idx)
+{
+	int ret;
+
+	if (dev->data->tx_queue_state[idx] == RTE_ETH_QUEUE_STATE_HAIRPIN) {
+		DRV_LOG(ERR, "Hairpin queue can't be started");
+		rte_errno = EINVAL;
+		return -EINVAL;
+	}
+	if (dev->data->tx_queue_state[idx] == RTE_ETH_QUEUE_STATE_STARTED)
+		return 0;
+	if (rte_eal_process_type() ==  RTE_PROC_SECONDARY) {
+		ret = mlx5_mp_os_req_queue_control(dev, idx,
+						   MLX5_MP_REQ_QUEUE_TX_START);
+	} else {
+		ret = mlx5_tx_queue_start_primary(dev, idx);
+	}
+	return ret;
+}
+
 /**
  * Tx queue presetup checks.
  *
@@ -218,6 +476,7 @@
 	DRV_LOG(DEBUG, "port %u adding Tx queue %u to list",
 		dev->data->port_id, idx);
 	(*priv->txqs)[idx] = &txq_ctrl->txq;
+	dev->data->tx_queue_state[idx] = RTE_ETH_QUEUE_STATE_STARTED;
 	return 0;
 }
 
@@ -268,6 +527,7 @@
 	DRV_LOG(DEBUG, "port %u adding Tx queue %u to list",
 		dev->data->port_id, idx);
 	(*priv->txqs)[idx] = &txq_ctrl->txq;
+	dev->data->tx_queue_state[idx] = RTE_ETH_QUEUE_STATE_HAIRPIN;
 	return 0;
 }
 
@@ -1747,6 +2007,7 @@ struct mlx5_txq_ctrl *
 		LIST_REMOVE(txq, next);
 		mlx5_free(txq);
 		(*priv->txqs)[idx] = NULL;
+		dev->data->tx_queue_state[idx] = RTE_ETH_QUEUE_STATE_STOPPED;
 		return 0;
 	}
 	return 1;
-- 
1.8.3.1


^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [dpdk-dev] [PATCH v2] net/mlx5: add queue start and stop feature
  2020-07-19 15:35 ` [dpdk-dev] [PATCH v2] " Viacheslav Ovsiienko
@ 2020-07-20  7:04   ` Raslan Darawsheh
  0 siblings, 0 replies; 4+ messages in thread
From: Raslan Darawsheh @ 2020-07-20  7:04 UTC (permalink / raw)
  To: Slava Ovsiienko, dev; +Cc: Matan Azrad, Thomas Monjalon

Hi,

> -----Original Message-----
> From: Viacheslav Ovsiienko <viacheslavo@mellanox.com>
> Sent: Sunday, July 19, 2020 6:36 PM
> To: dev@dpdk.org
> Cc: Matan Azrad <matan@mellanox.com>; Raslan Darawsheh
> <rasland@mellanox.com>; Thomas Monjalon <thomas@monjalon.net>
> Subject: [PATCH v2] net/mlx5: add queue start and stop feature
> 
> The mlx5 PMD did not support queue_start and queue_stop eth_dev API
> routines, queue could not be suspended and resumed during device
> operation.
> 
> There is the use case when this feature is crucial for applications:
> 
> - there is the secondary process handling the queue
> - secondary process crashed/aborted
> - some mbufs were allocated or used by secondary application
> - some mbufs were allocated by Rx queues to receive packets
> - some mbufs were placed to send queue
> - queue goes to undefined state
> 
> In this case there was no reliable way to recovery queue handling
> by restarted secondary process but reset queue to initial state
> freeing all involved resources, including buffers involved in queue
> operations, reset the mbuf pools, and then reinitialize queue
> to working state:
> 
> - reset mbuf pool, allocate all mbuf to initialize pool into
>   safe state after the crush and allow safe mbuf free calls
> - stop queue, free all potentially involved mbufs
> - reset mbuf pool again
> - start queue, reallocate mbufs needed
> 
> This patch introduces the queue start/stop feature with some
> limitations:
> 
> - hairpin queues are not supported
> - it is application responsibility to synchronize start/stop
>   with datapath routines, rx/tx_burst must be suspended during
>   the queue_start/queue_stop calls
> - it is application responsibility to track queue usage and
>   provide coordinated queue_start/queue_stop calls from
>   secondary and primary processes.
> - Rx queues with vectorized Rx routine and engaged CQE
>   compression are not supported by this patch currently
> 
> Signed-off-by: Viacheslav Ovsiienko <viacheslavo@mellanox.com>
> ---
> 
> v2: - rebase
>     - address comments for commit messages
> 
> 

Patch applied to next-net-mlx,

Kindest regards,
Raslan Darawsheh

^ permalink raw reply	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2020-07-20  7:05 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-07-17 14:37 [dpdk-dev] [PATCH] net/mlx5: add queue start and stop feature Viacheslav Ovsiienko
2020-07-17 15:03 ` Thomas Monjalon
2020-07-19 15:35 ` [dpdk-dev] [PATCH v2] " Viacheslav Ovsiienko
2020-07-20  7:04   ` Raslan Darawsheh

DPDK patches and discussions

This inbox may be cloned and mirrored by anyone:

	git clone --mirror https://inbox.dpdk.org/dev/0 dev/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 dev dev/ https://inbox.dpdk.org/dev \
		dev@dpdk.org
	public-inbox-index dev

Example config snippet for mirrors.
Newsgroup available over NNTP:
	nntp://inbox.dpdk.org/inbox.dpdk.dev


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git