DPDK patches and discussions
 help / color / mirror / Atom feed
From: Adam Dybkowski <adamx.dybkowski@intel.com>
To: dev@dpdk.org, fiona.trahe@intel.com, akhil.goyal@nxp.com
Cc: Adam Dybkowski <adamx.dybkowski@intel.com>
Subject: [dpdk-dev] [PATCH v2 1/2] compress/qat: im buffer too small - split op
Date: Wed,  8 Apr 2020 14:51:00 +0200	[thread overview]
Message-ID: <20200408125101.25764-2-adamx.dybkowski@intel.com> (raw)
In-Reply-To: <20200408125101.25764-1-adamx.dybkowski@intel.com>

This patch implements a special way of buffer handling when internal
QAT IM buffer is too small for Huffman dynamic compression operation.
Instead of falling back to fixed compression, the operation is now
split into multiple smaller dynamic compression requests (possible to
execute on QAT) and their results are then combined and copied into
the output buffer. This is not possible if any checksum calculation
was requested - in such case the code falls back to fixed compression
as before.

Signed-off-by: Adam Dybkowski <adamx.dybkowski@intel.com>
---
 doc/guides/compressdevs/qat_comp.rst   |   3 -
 doc/guides/cryptodevs/qat.rst          |   7 +-
 doc/guides/rel_notes/release_20_05.rst |  10 +
 drivers/common/qat/qat_qp.c            | 223 +++++++++++-
 drivers/common/qat/qat_qp.h            |   3 +
 drivers/compress/qat/qat_comp.c        | 474 +++++++++++++++++++++++--
 drivers/compress/qat/qat_comp.h        |  29 +-
 drivers/compress/qat/qat_comp_pmd.c    |  27 +-
 8 files changed, 702 insertions(+), 74 deletions(-)

diff --git a/doc/guides/compressdevs/qat_comp.rst b/doc/guides/compressdevs/qat_comp.rst
index 757611a30..475c4a9f9 100644
--- a/doc/guides/compressdevs/qat_comp.rst
+++ b/doc/guides/compressdevs/qat_comp.rst
@@ -42,9 +42,6 @@ Limitations
   from the RX queue must be done from one thread, but enqueues and dequeues may be done
   in different threads.)
 * No BSD support as BSD QAT kernel driver not available.
-* When using Deflate dynamic huffman encoding for compression, the input size (op.src.length)
-  must be < CONFIG_RTE_PMD_QAT_COMP_IM_BUFFER_SIZE from the config file,
-  see :ref:`building_qat_config` for more details.
 * Stateful compression is not supported.
 
 
diff --git a/doc/guides/cryptodevs/qat.rst b/doc/guides/cryptodevs/qat.rst
index c79e686de..4ea7985a7 100644
--- a/doc/guides/cryptodevs/qat.rst
+++ b/doc/guides/cryptodevs/qat.rst
@@ -260,8 +260,11 @@ allocated while for GEN1 devices, 12 buffers are allocated, plus 1472 bytes over
 .. Note::
 
 	If the compressed output of a Deflate operation using Dynamic Huffman
-        Encoding is too big to fit in an intermediate buffer, then the
-	operation will fall back to fixed compression rather than failing the operation.
+	Encoding is too big to fit in an intermediate buffer, then the
+	operation will be split into smaller operations and their results will
+	be merged afterwards.
+	This is not possible if any checksum calculation was requested - in such
+	case the code falls back to fixed compression.
 	To avoid this less performant case, applications should configure
 	the intermediate buffer size to be larger than the expected input data size
 	(compressed output size is usually unknown, so the only option is to make
diff --git a/doc/guides/rel_notes/release_20_05.rst b/doc/guides/rel_notes/release_20_05.rst
index 6b1a7c58c..d56d08e57 100644
--- a/doc/guides/rel_notes/release_20_05.rst
+++ b/doc/guides/rel_notes/release_20_05.rst
@@ -81,6 +81,16 @@ New Features
   by making use of the event device capabilities. The event mode currently supports
   only inline IPsec protocol offload.
 
+* **Added QAT intermediate buffer too small handling in QAT compression PMD.**
+
+  Added a special way of buffer handling when internal QAT intermediate buffer
+  is too small for Huffman dynamic compression operation. Instead of falling
+  back to fixed compression, the operation is now split into multiple smaller
+  dynamic compression requests (possible to execute on QAT) and their results
+  are then combined and copied into the output buffer. This is not possible if
+  any checksum calculation was requested - in such case the code falls back to
+  fixed compression as before.
+
 
 Removed Items
 -------------
diff --git a/drivers/common/qat/qat_qp.c b/drivers/common/qat/qat_qp.c
index eb1da7243..64dfd85c4 100644
--- a/drivers/common/qat/qat_qp.c
+++ b/drivers/common/qat/qat_qp.c
@@ -650,32 +650,212 @@ qat_enqueue_op_burst(void *qp, void **ops, uint16_t nb_ops)
 	return nb_ops_sent;
 }
 
+/* Use this for compression only - but keep consistent with above common
+ * function as much as possible.
+ */
+uint16_t
+qat_enqueue_comp_op_burst(void *qp, void **ops, uint16_t nb_ops)
+{
+	register struct qat_queue *queue;
+	struct qat_qp *tmp_qp = (struct qat_qp *)qp;
+	register uint32_t nb_ops_sent = 0;
+	register int nb_desc_to_build;
+	uint16_t nb_ops_possible = nb_ops;
+	register uint8_t *base_addr;
+	register uint32_t tail;
+
+	int descriptors_built, total_descriptors_built = 0;
+	int nb_remaining_descriptors;
+	int overflow = 0;
+
+	if (unlikely(nb_ops == 0))
+		return 0;
+
+	/* read params used a lot in main loop into registers */
+	queue = &(tmp_qp->tx_q);
+	base_addr = (uint8_t *)queue->base_addr;
+	tail = queue->tail;
+
+	/* Find how many can actually fit on the ring */
+	{
+		/* dequeued can only be written by one thread, but it may not
+		 * be this thread. As it's 4-byte aligned it will be read
+		 * atomically here by any Intel CPU.
+		 * enqueued can wrap before dequeued, but cannot
+		 * lap it as var size of enq/deq (uint32_t) > var size of
+		 * max_inflights (uint16_t). In reality inflights is never
+		 * even as big as max uint16_t, as it's <= ADF_MAX_DESC.
+		 * On wrapping, the calculation still returns the correct
+		 * positive value as all three vars are unsigned.
+		 */
+		uint32_t inflights =
+			tmp_qp->enqueued - tmp_qp->dequeued;
+
+		/* Find how many can actually fit on the ring */
+		overflow = (inflights + nb_ops) - tmp_qp->max_inflights;
+		if (overflow > 0) {
+			nb_ops_possible = nb_ops - overflow;
+			if (nb_ops_possible == 0)
+				return 0;
+		}
+
+		/* QAT has plenty of work queued already, so don't waste cycles
+		 * enqueueing, wait til the application has gathered a bigger
+		 * burst or some completed ops have been dequeued
+		 */
+		if (tmp_qp->min_enq_burst_threshold && inflights >
+				QAT_QP_MIN_INFL_THRESHOLD && nb_ops_possible <
+				tmp_qp->min_enq_burst_threshold) {
+			tmp_qp->stats.threshold_hit_count++;
+			return 0;
+		}
+	}
+
+	/* At this point nb_ops_possible is assuming a 1:1 mapping
+	 * between ops and descriptors.
+	 * Fewer may be sent if some ops have to be split.
+	 * nb_ops_possible is <= burst size.
+	 * Find out how many spaces are actually available on the qp in case
+	 * more are needed.
+	 */
+	nb_remaining_descriptors = nb_ops_possible
+			 + ((overflow >= 0) ? 0 : overflow * (-1));
+	QAT_DP_LOG(DEBUG, "Nb ops requested %d, nb descriptors remaining %d",
+			nb_ops, nb_remaining_descriptors);
+
+	while (nb_ops_sent != nb_ops_possible &&
+				nb_remaining_descriptors > 0) {
+		struct qat_comp_op_cookie *cookie =
+				tmp_qp->op_cookies[tail >> queue->trailz];
+
+		descriptors_built = 0;
+
+		QAT_DP_LOG(DEBUG, "--- data length: %u",
+			   ((struct rte_comp_op *)*ops)->src.length);
+
+		nb_desc_to_build = qat_comp_build_request(*ops,
+				base_addr + tail, cookie, tmp_qp->qat_dev_gen);
+		QAT_DP_LOG(DEBUG, "%d descriptors built, %d remaining, "
+			"%d ops sent, %d descriptors needed",
+			total_descriptors_built, nb_remaining_descriptors,
+			nb_ops_sent, nb_desc_to_build);
+
+		if (unlikely(nb_desc_to_build < 0)) {
+			/* this message cannot be enqueued */
+			tmp_qp->stats.enqueue_err_count++;
+			if (nb_ops_sent == 0)
+				return 0;
+			goto kick_tail;
+		} else if (unlikely(nb_desc_to_build > 1)) {
+			/* this op is too big and must be split - get more
+			 * descriptors and retry
+			 */
+
+			QAT_DP_LOG(DEBUG, "Build %d descriptors for this op",
+					nb_desc_to_build);
+
+			nb_remaining_descriptors -= nb_desc_to_build;
+			if (nb_remaining_descriptors >= 0) {
+				/* There are enough remaining descriptors
+				 * so retry
+				 */
+				int ret2 = qat_comp_build_multiple_requests(
+						*ops, tmp_qp, tail,
+						nb_desc_to_build);
+
+				if (unlikely(ret2 < 1)) {
+					QAT_DP_LOG(DEBUG,
+							"Failed to build (%d) descriptors, status %d",
+							nb_desc_to_build, ret2);
+
+					qat_comp_free_split_op_memzones(cookie,
+							nb_desc_to_build - 1);
+
+					tmp_qp->stats.enqueue_err_count++;
+
+					/* This message cannot be enqueued */
+					if (nb_ops_sent == 0)
+						return 0;
+					goto kick_tail;
+				} else {
+					descriptors_built = ret2;
+					total_descriptors_built +=
+							descriptors_built;
+					nb_remaining_descriptors -=
+							descriptors_built;
+					QAT_DP_LOG(DEBUG,
+							"Multiple descriptors (%d) built ok",
+							descriptors_built);
+				}
+			} else {
+				QAT_DP_LOG(ERR, "For the current op, number of requested descriptors (%d) "
+						"exceeds number of available descriptors (%d)",
+						nb_desc_to_build,
+						nb_remaining_descriptors +
+							nb_desc_to_build);
+
+				qat_comp_free_split_op_memzones(cookie,
+						nb_desc_to_build - 1);
+
+				/* Not enough extra descriptors */
+				if (nb_ops_sent == 0)
+					return 0;
+				goto kick_tail;
+			}
+		} else {
+			descriptors_built = 1;
+			total_descriptors_built++;
+			nb_remaining_descriptors--;
+			QAT_DP_LOG(DEBUG, "Single descriptor built ok");
+		}
+
+		tail = adf_modulo(tail + (queue->msg_size * descriptors_built),
+				  queue->modulo_mask);
+		ops++;
+		nb_ops_sent++;
+	}
+
+kick_tail:
+	queue->tail = tail;
+	tmp_qp->enqueued += total_descriptors_built;
+	tmp_qp->stats.enqueued_count += total_descriptors_built;
+	txq_write_tail(tmp_qp, queue);
+	return nb_ops_sent;
+}
+
 uint16_t
 qat_dequeue_op_burst(void *qp, void **ops, uint16_t nb_ops)
 {
 	struct qat_queue *rx_queue;
 	struct qat_qp *tmp_qp = (struct qat_qp *)qp;
 	uint32_t head;
-	uint32_t resp_counter = 0;
+	uint32_t op_resp_counter = 0, fw_resp_counter = 0;
 	uint8_t *resp_msg;
+	int nb_fw_responses = 0;
 
 	rx_queue = &(tmp_qp->rx_q);
 	head = rx_queue->head;
 	resp_msg = (uint8_t *)rx_queue->base_addr + rx_queue->head;
 
 	while (*(uint32_t *)resp_msg != ADF_RING_EMPTY_SIG &&
-			resp_counter != nb_ops) {
+			op_resp_counter != nb_ops) {
 
-		if (tmp_qp->service_type == QAT_SERVICE_SYMMETRIC)
+		nb_fw_responses = 0;
+		if (tmp_qp->service_type == QAT_SERVICE_SYMMETRIC) {
 			qat_sym_process_response(ops, resp_msg);
-		else if (tmp_qp->service_type == QAT_SERVICE_COMPRESSION)
-			qat_comp_process_response(ops, resp_msg,
+			nb_fw_responses = 1;
+		} else if (tmp_qp->service_type == QAT_SERVICE_COMPRESSION)
+
+			nb_fw_responses = qat_comp_process_response(
+				ops, resp_msg,
 				tmp_qp->op_cookies[head >> rx_queue->trailz],
 				&tmp_qp->stats.dequeue_err_count);
+
 		else if (tmp_qp->service_type == QAT_SERVICE_ASYMMETRIC) {
 #ifdef BUILD_QAT_ASYM
 			qat_asym_process_response(ops, resp_msg,
 				tmp_qp->op_cookies[head >> rx_queue->trailz]);
+			nb_fw_responses = 1;
 #endif
 		}
 
@@ -683,21 +863,38 @@ qat_dequeue_op_burst(void *qp, void **ops, uint16_t nb_ops)
 				  rx_queue->modulo_mask);
 
 		resp_msg = (uint8_t *)rx_queue->base_addr + head;
-		ops++;
-		resp_counter++;
+
+		if (ops != NULL && nb_fw_responses) {
+			/* only move on to next op if one was ready to return
+			 * to API
+			 */
+			ops++;
+			op_resp_counter++;
+		}
+
+		 /* A compression op may be broken up into multiple fw requests.
+		  * Only count fw responses as complete once ALL the responses
+		  * associated with an op have been processed, as the cookie
+		  * data from the first response must be available until
+		  * finished with all firmware responses.
+		  */
+		fw_resp_counter += nb_fw_responses;
 	}
-	if (resp_counter > 0) {
+
+	if (fw_resp_counter > 0) {
 		rx_queue->head = head;
-		tmp_qp->dequeued += resp_counter;
-		tmp_qp->stats.dequeued_count += resp_counter;
-		rx_queue->nb_processed_responses += resp_counter;
+		tmp_qp->dequeued += fw_resp_counter;
+		tmp_qp->stats.dequeued_count += fw_resp_counter;
+		rx_queue->nb_processed_responses += fw_resp_counter;
 
 		if (rx_queue->nb_processed_responses >
-						QAT_CSR_HEAD_WRITE_THRESH)
+				QAT_CSR_HEAD_WRITE_THRESH)
 			rxq_free_desc(tmp_qp, rx_queue);
 	}
+	QAT_DP_LOG(DEBUG, "Dequeue burst return: %u, QAT responses: %u",
+			op_resp_counter, fw_resp_counter);
 
-	return resp_counter;
+	return op_resp_counter;
 }
 
 /* This is almost same as dequeue_op_burst, without the atomic, without stats
diff --git a/drivers/common/qat/qat_qp.h b/drivers/common/qat/qat_qp.h
index 88d3c9942..575d69059 100644
--- a/drivers/common/qat/qat_qp.h
+++ b/drivers/common/qat/qat_qp.h
@@ -89,6 +89,9 @@ extern const struct qat_qp_hw_data qat_gen3_qps[][ADF_MAX_QPS_ON_ANY_SERVICE];
 uint16_t
 qat_enqueue_op_burst(void *qp, void **ops, uint16_t nb_ops);
 
+uint16_t
+qat_enqueue_comp_op_burst(void *qp, void **ops, uint16_t nb_ops);
+
 uint16_t
 qat_dequeue_op_burst(void *qp, void **ops, uint16_t nb_ops);
 
diff --git a/drivers/compress/qat/qat_comp.c b/drivers/compress/qat/qat_comp.c
index 533e34f6b..9e1fd2fe9 100644
--- a/drivers/compress/qat/qat_comp.c
+++ b/drivers/compress/qat/qat_comp.c
@@ -13,11 +13,93 @@
 #include <rte_spinlock.h>
 #include <rte_log.h>
 #include <rte_malloc.h>
+#include <rte_memzone.h>
 
 #include "qat_logs.h"
 #include "qat_comp.h"
 #include "qat_comp_pmd.h"
 
+static void
+qat_comp_fallback_to_fixed(struct icp_qat_fw_comp_req *comp_req)
+{
+	QAT_DP_LOG(DEBUG, "QAT PMD: fallback to fixed compression!");
+
+	comp_req->comn_hdr.service_cmd_id =
+			ICP_QAT_FW_COMP_CMD_STATIC;
+
+	ICP_QAT_FW_COMN_NEXT_ID_SET(
+			&comp_req->comp_cd_ctrl,
+			ICP_QAT_FW_SLICE_DRAM_WR);
+
+	ICP_QAT_FW_COMN_NEXT_ID_SET(
+			&comp_req->u2.xlt_cd_ctrl,
+			ICP_QAT_FW_SLICE_NULL);
+	ICP_QAT_FW_COMN_CURR_ID_SET(
+			&comp_req->u2.xlt_cd_ctrl,
+			ICP_QAT_FW_SLICE_NULL);
+}
+
+void
+qat_comp_free_split_op_memzones(struct qat_comp_op_cookie *cookie,
+				unsigned int nb_children)
+{
+	unsigned int i;
+
+	/* free all memzones allocated for child descriptors */
+	for (i = 0; i < nb_children; i++)
+		rte_memzone_free(cookie->dst_memzones[i]);
+
+	/* and free the pointer table */
+	rte_free(cookie->dst_memzones);
+	cookie->dst_memzones = NULL;
+}
+
+static int
+qat_comp_allocate_split_op_memzones(struct qat_comp_op_cookie *cookie,
+				    unsigned int nb_descriptors_needed)
+{
+	struct qat_queue *txq = &(cookie->qp->tx_q);
+	char dst_memz_name[RTE_MEMZONE_NAMESIZE];
+	unsigned int i;
+
+	/* allocate the array of memzone pointers */
+	cookie->dst_memzones = rte_zmalloc_socket("qat PMD im buf mz pointers",
+			(nb_descriptors_needed - 1) *
+				sizeof(const struct rte_memzone *),
+			RTE_CACHE_LINE_SIZE, cookie->socket_id);
+
+	if (cookie->dst_memzones == NULL) {
+		QAT_DP_LOG(ERR,
+			"QAT PMD: failed to allocate im buf mz pointers");
+		return -ENOMEM;
+	}
+
+	for (i = 0; i < nb_descriptors_needed - 1; i++) {
+		snprintf(dst_memz_name,
+				sizeof(dst_memz_name),
+				"dst_%u_%u_%u_%u_%u",
+				cookie->qp->qat_dev->qat_dev_id,
+				txq->hw_bundle_number, txq->hw_queue_number,
+				cookie->cookie_index, i);
+
+		cookie->dst_memzones[i] = rte_memzone_reserve_aligned(
+				dst_memz_name, RTE_PMD_QAT_COMP_IM_BUFFER_SIZE,
+				cookie->socket_id, RTE_MEMZONE_IOVA_CONTIG,
+				RTE_CACHE_LINE_SIZE);
+
+		if (cookie->dst_memzones[i] == NULL) {
+			QAT_DP_LOG(ERR,
+				"QAT PMD: failed to allocate dst buffer memzone");
+
+			/* let's free all memzones allocated up to now */
+			qat_comp_free_split_op_memzones(cookie, i);
+
+			return -ENOMEM;
+		}
+	}
+
+	return 0;
+}
 
 int
 qat_comp_build_request(void *in_op, uint8_t *out_msg,
@@ -57,7 +139,48 @@ qat_comp_build_request(void *in_op, uint8_t *out_msg,
 	rte_mov128(out_msg, tmpl);
 	comp_req->comn_mid.opaque_data = (uint64_t)(uintptr_t)op;
 
-	if (op->op_type == RTE_COMP_OP_STATEFUL) {
+	if (likely(qat_xform->qat_comp_request_type ==
+			QAT_COMP_REQUEST_DYNAMIC_COMP_STATELESS)) {
+
+		if (unlikely(op->src.length > QAT_FALLBACK_THLD)) {
+			/* the operation must be split into pieces */
+			if (qat_xform->checksum_type !=
+					RTE_COMP_CHECKSUM_NONE) {
+				/* fallback to fixed compression in case any
+				 * checksum calculation was requested
+				 */
+				qat_comp_fallback_to_fixed(comp_req);
+			} else {
+				/* calculate num. of descriptors for split op */
+				unsigned int nb_descriptors_needed =
+					op->src.length / QAT_FALLBACK_THLD + 1;
+				/* allocate memzone for output data */
+				if (qat_comp_allocate_split_op_memzones(
+					       cookie, nb_descriptors_needed)) {
+					/* out of memory, fallback to fixed */
+					qat_comp_fallback_to_fixed(comp_req);
+				} else {
+					QAT_DP_LOG(DEBUG,
+							"Input data is too big, op must be split into %u descriptors",
+							nb_descriptors_needed);
+					return (int) nb_descriptors_needed;
+				}
+			}
+		}
+
+		/* set BFINAL bit according to flush_flag */
+		comp_req->comp_pars.req_par_flags =
+			ICP_QAT_FW_COMP_REQ_PARAM_FLAGS_BUILD(
+				ICP_QAT_FW_COMP_SOP,
+				ICP_QAT_FW_COMP_EOP,
+				op->flush_flag == RTE_COMP_FLUSH_FINAL ?
+					ICP_QAT_FW_COMP_BFINAL
+					: ICP_QAT_FW_COMP_NOT_BFINAL,
+				ICP_QAT_FW_COMP_CNV,
+				ICP_QAT_FW_COMP_CNV_RECOVERY);
+
+	} else if (op->op_type == RTE_COMP_OP_STATEFUL) {
+
 		comp_req->comp_pars.req_par_flags =
 			ICP_QAT_FW_COMP_REQ_PARAM_FLAGS_BUILD(
 				(stream->start_of_packet) ?
@@ -72,30 +195,6 @@ qat_comp_build_request(void *in_op, uint8_t *out_msg,
 				ICP_QAT_FW_COMP_NO_CNV_RECOVERY);
 	}
 
-	if (likely(qat_xform->qat_comp_request_type ==
-		    QAT_COMP_REQUEST_DYNAMIC_COMP_STATELESS)) {
-		if (unlikely(op->src.length > QAT_FALLBACK_THLD)) {
-
-			/* fallback to fixed compression */
-			comp_req->comn_hdr.service_cmd_id =
-					ICP_QAT_FW_COMP_CMD_STATIC;
-
-			ICP_QAT_FW_COMN_NEXT_ID_SET(&comp_req->comp_cd_ctrl,
-					ICP_QAT_FW_SLICE_DRAM_WR);
-
-			ICP_QAT_FW_COMN_NEXT_ID_SET(&comp_req->u2.xlt_cd_ctrl,
-					ICP_QAT_FW_SLICE_NULL);
-			ICP_QAT_FW_COMN_CURR_ID_SET(&comp_req->u2.xlt_cd_ctrl,
-					ICP_QAT_FW_SLICE_NULL);
-
-			QAT_DP_LOG(DEBUG, "QAT PMD: fallback to fixed "
-				   "compression! IM buffer size can be too low "
-				   "for produced data.\n Please use input "
-				   "buffer length lower than %d bytes",
-				   QAT_FALLBACK_THLD);
-		}
-	}
-
 	/* common for sgl and flat buffers */
 	comp_req->comp_pars.comp_len = op->src.length;
 	comp_req->comp_pars.out_buffer_sz = rte_pktmbuf_pkt_len(op->m_dst) -
@@ -233,6 +332,213 @@ qat_comp_build_request(void *in_op, uint8_t *out_msg,
 	return 0;
 }
 
+static inline uint32_t adf_modulo(uint32_t data, uint32_t modulo_mask)
+{
+	return data & modulo_mask;
+}
+
+static inline void
+qat_comp_mbuf_skip(struct rte_mbuf **mbuf, uint32_t *offset, uint32_t len)
+{
+	while (*offset + len >= rte_pktmbuf_data_len(*mbuf)) {
+		len -= (rte_pktmbuf_data_len(*mbuf) - *offset);
+		*mbuf = (*mbuf)->next;
+		*offset = 0;
+	}
+	*offset = len;
+}
+
+int
+qat_comp_build_multiple_requests(void *in_op, struct qat_qp *qp,
+				 uint32_t parent_tail, int nb_descr)
+{
+	struct rte_comp_op op_backup;
+	struct rte_mbuf dst_mbuf;
+	struct rte_comp_op *op = in_op;
+	struct qat_queue *txq = &(qp->tx_q);
+	uint8_t *base_addr = (uint8_t *)txq->base_addr;
+	uint8_t *out_msg = base_addr + parent_tail;
+	uint32_t tail = parent_tail;
+	struct icp_qat_fw_comp_req *comp_req =
+			(struct icp_qat_fw_comp_req *)out_msg;
+	struct qat_comp_op_cookie *parent_cookie =
+			(struct qat_comp_op_cookie *)
+			qp->op_cookies[parent_tail / txq->msg_size];
+	struct qat_comp_op_cookie *child_cookie;
+	uint16_t dst_data_size =
+			RTE_MIN(RTE_PMD_QAT_COMP_IM_BUFFER_SIZE, 65535);
+	uint32_t data_to_enqueue = op->src.length - QAT_FALLBACK_THLD;
+	int num_descriptors_built = 1;
+	int ret;
+
+	QAT_DP_LOG(DEBUG, "op %p, parent_cookie %p", op, parent_cookie);
+
+	/* copy original op to the local variable for restoring later */
+	rte_memcpy(&op_backup, op, sizeof(op_backup));
+
+	parent_cookie->nb_child_responses = 0;
+	parent_cookie->nb_children = 0;
+	parent_cookie->split_op = 1;
+	parent_cookie->dst_data = op->m_dst;
+	parent_cookie->dst_data_offset = op->dst.offset;
+
+	op->src.length = QAT_FALLBACK_THLD;
+	op->flush_flag = RTE_COMP_FLUSH_FULL;
+
+	QAT_DP_LOG(DEBUG, "parent op src len %u dst len %u",
+			op->src.length, op->m_dst->pkt_len);
+
+	ret = qat_comp_build_request(in_op, out_msg, parent_cookie,
+			qp->qat_dev_gen);
+	if (ret != 0) {
+		/* restore op and clear cookie */
+		QAT_DP_LOG(WARNING, "Failed to build parent descriptor");
+		op->src.length = op_backup.src.length;
+		op->flush_flag = op_backup.flush_flag;
+		parent_cookie->split_op = 0;
+		return ret;
+	}
+
+	/* prepare local dst mbuf */
+	rte_memcpy(&dst_mbuf, op->m_dst, sizeof(dst_mbuf));
+	rte_pktmbuf_reset(&dst_mbuf);
+	dst_mbuf.buf_len = dst_data_size;
+	dst_mbuf.data_len = dst_data_size;
+	dst_mbuf.pkt_len = dst_data_size;
+	dst_mbuf.data_off = 0;
+
+	/* update op for the child operations */
+	op->m_dst = &dst_mbuf;
+	op->dst.offset = 0;
+
+	while (data_to_enqueue) {
+		const struct rte_memzone *mz =
+			parent_cookie->dst_memzones[num_descriptors_built - 1];
+		uint32_t src_data_size = RTE_MIN(data_to_enqueue,
+				QAT_FALLBACK_THLD);
+		uint32_t cookie_index;
+
+		/* update params for the next op */
+		op->src.offset += QAT_FALLBACK_THLD;
+		op->src.length = src_data_size;
+		op->flush_flag = (src_data_size == data_to_enqueue) ?
+			op_backup.flush_flag : RTE_COMP_FLUSH_FULL;
+
+		/* update dst mbuf for the next op (use memzone for dst data) */
+		dst_mbuf.buf_addr = mz->addr;
+		dst_mbuf.buf_iova = mz->iova;
+
+		/* move the tail and calculate next cookie index */
+		tail = adf_modulo(tail + txq->msg_size, txq->modulo_mask);
+		cookie_index = tail / txq->msg_size;
+		child_cookie = (struct qat_comp_op_cookie *)
+				qp->op_cookies[cookie_index];
+		comp_req = (struct icp_qat_fw_comp_req *)(base_addr + tail);
+
+		/* update child cookie */
+		child_cookie->split_op = 1; /* must be set for child as well */
+		child_cookie->parent_cookie = parent_cookie; /* same as above */
+		child_cookie->nb_children = 0;
+		child_cookie->dest_buffer = mz->addr;
+
+		QAT_DP_LOG(DEBUG,
+				"cookie_index %u, child_cookie %p, comp_req %p",
+				cookie_index, child_cookie, comp_req);
+		QAT_DP_LOG(DEBUG,
+				"data_to_enqueue %u, num_descriptors_built %d",
+				data_to_enqueue, num_descriptors_built);
+		QAT_DP_LOG(DEBUG, "child op src len %u dst len %u",
+				op->src.length, op->m_dst->pkt_len);
+
+		/* build the request */
+		ret = qat_comp_build_request(op, (uint8_t *)comp_req,
+				child_cookie, qp->qat_dev_gen);
+		if (ret < 0) {
+			QAT_DP_LOG(WARNING, "Failed to build child descriptor");
+			/* restore op and clear cookie */
+			rte_memcpy(op, &op_backup, sizeof(op_backup));
+			parent_cookie->split_op = 0;
+			parent_cookie->nb_children = 0;
+			return ret;
+		}
+
+		data_to_enqueue -= src_data_size;
+		num_descriptors_built++;
+	}
+
+	/* restore backed up original op */
+	rte_memcpy(op, &op_backup, sizeof(op_backup));
+
+	if (nb_descr != num_descriptors_built)
+		QAT_DP_LOG(ERR, "split op. expected %d, built %d",
+				nb_descr, num_descriptors_built);
+
+	parent_cookie->nb_children = num_descriptors_built - 1;
+	return num_descriptors_built;
+}
+
+static inline void
+qat_comp_response_data_copy(struct qat_comp_op_cookie *cookie,
+		       struct rte_comp_op *rx_op)
+{
+	struct qat_comp_op_cookie *pc = cookie->parent_cookie;
+	struct rte_mbuf *sgl_buf = pc->dst_data;
+	void *op_dst_addr = rte_pktmbuf_mtod_offset(sgl_buf, uint8_t *,
+						    pc->dst_data_offset);
+
+	/* number of bytes left in the current segment */
+	uint32_t left_in_current = rte_pktmbuf_data_len(sgl_buf) -
+			pc->dst_data_offset;
+
+	uint32_t prod, sent;
+
+	if (rx_op->produced <= left_in_current) {
+		rte_memcpy(op_dst_addr, cookie->dest_buffer,
+				rx_op->produced);
+		/* calculate dst mbuf and offset for the next child op */
+		if (rx_op->produced == left_in_current) {
+			pc->dst_data = sgl_buf->next;
+			pc->dst_data_offset = 0;
+		} else
+			pc->dst_data_offset += rx_op->produced;
+	} else {
+		rte_memcpy(op_dst_addr, cookie->dest_buffer,
+				left_in_current);
+		sgl_buf = sgl_buf->next;
+		prod = rx_op->produced - left_in_current;
+		sent = left_in_current;
+		while (prod > rte_pktmbuf_data_len(sgl_buf)) {
+			op_dst_addr = rte_pktmbuf_mtod_offset(sgl_buf,
+					uint8_t *, 0);
+
+			rte_memcpy(op_dst_addr,
+					((uint8_t *)cookie->dest_buffer) +
+					sent,
+					rte_pktmbuf_data_len(sgl_buf));
+
+			prod -= rte_pktmbuf_data_len(sgl_buf);
+			sent += rte_pktmbuf_data_len(sgl_buf);
+
+			sgl_buf = sgl_buf->next;
+		}
+
+		op_dst_addr = rte_pktmbuf_mtod_offset(sgl_buf, uint8_t *, 0);
+
+		rte_memcpy(op_dst_addr,
+				((uint8_t *)cookie->dest_buffer) + sent,
+				prod);
+
+		/* calculate dst mbuf and offset for the next child op */
+		if (prod == rte_pktmbuf_data_len(sgl_buf)) {
+			pc->dst_data = sgl_buf->next;
+			pc->dst_data_offset = 0;
+		} else {
+			pc->dst_data = sgl_buf;
+			pc->dst_data_offset = prod;
+		}
+	}
+}
+
 int
 qat_comp_process_response(void **op, uint8_t *resp, void *op_cookie,
 			  uint64_t *dequeue_err_count)
@@ -241,6 +547,14 @@ qat_comp_process_response(void **op, uint8_t *resp, void *op_cookie,
 			(struct icp_qat_fw_comp_resp *)resp;
 	struct qat_comp_op_cookie *cookie =
 			(struct qat_comp_op_cookie *)op_cookie;
+
+	struct icp_qat_fw_resp_comp_pars *comp_resp1 =
+	  (struct icp_qat_fw_resp_comp_pars *)&resp_msg->comp_resp_pars;
+
+	QAT_DP_LOG(DEBUG, "input counter = %u, output counter = %u",
+		   comp_resp1->input_byte_counter,
+		   comp_resp1->output_byte_counter);
+
 	struct rte_comp_op *rx_op = (struct rte_comp_op *)(uintptr_t)
 			(resp_msg->opaque_data);
 	struct qat_comp_stream *stream;
@@ -275,7 +589,10 @@ qat_comp_process_response(void **op, uint8_t *resp, void *op_cookie,
 		rx_op->consumed = 0;
 		rx_op->produced = 0;
 		*op = (void *)rx_op;
-		return 0;
+		/* also in this case number of returned ops */
+		/* must be equal to one, */
+		/* appropriate status (error) must be set as well */
+		return 1;
 	}
 
 	if (likely(qat_xform->qat_comp_request_type
@@ -288,7 +605,7 @@ qat_comp_process_response(void **op, uint8_t *resp, void *op_cookie,
 			*op = (void *)rx_op;
 			QAT_DP_LOG(ERR, "QAT has wrong firmware");
 			++(*dequeue_err_count);
-			return 0;
+			return 1;
 		}
 	}
 
@@ -305,8 +622,9 @@ qat_comp_process_response(void **op, uint8_t *resp, void *op_cookie,
 		int8_t xlat_err_code =
 			(int8_t)resp_msg->comn_resp.comn_error.xlat_err_code;
 
-		/* handle recoverable out-of-buffer condition in stateful */
-		/* decompression scenario */
+		/* handle recoverable out-of-buffer condition in stateful
+		 * decompression scenario
+		 */
 		if (cmp_err_code == ERR_CODE_OVERFLOW_ERROR && !xlat_err_code
 				&& qat_xform->qat_comp_request_type
 					== QAT_COMP_REQUEST_DECOMPRESS
@@ -327,10 +645,12 @@ qat_comp_process_response(void **op, uint8_t *resp, void *op_cookie,
 		     xlat_err_code == ERR_CODE_OVERFLOW_ERROR)){
 
 			struct icp_qat_fw_resp_comp_pars *comp_resp =
-	  (struct icp_qat_fw_resp_comp_pars *)&resp_msg->comp_resp_pars;
+					(struct icp_qat_fw_resp_comp_pars *)
+					&resp_msg->comp_resp_pars;
 
-			/* handle recoverable out-of-buffer condition */
-			/* in stateless compression scenario */
+			/* handle recoverable out-of-buffer condition
+			 * in stateless compression scenario
+			 */
 			if (comp_resp->input_byte_counter) {
 				if ((qat_xform->qat_comp_request_type
 				== QAT_COMP_REQUEST_FIXED_COMP_STATELESS) ||
@@ -375,9 +695,89 @@ qat_comp_process_response(void **op, uint8_t *resp, void *op_cookie,
 				rx_op->output_chksum = comp_resp->curr_chksum;
 		}
 	}
-	*op = (void *)rx_op;
+	QAT_DP_LOG(DEBUG, "About to check for split op :cookies: %p %p, split:%u",
+		cookie, cookie->parent_cookie, cookie->split_op);
+
+	if (cookie->split_op) {
+		*op = NULL;
+		struct qat_comp_op_cookie *pc = cookie->parent_cookie;
+
+		if (cookie->nb_children > 0) {
+			QAT_DP_LOG(DEBUG, "Parent");
+			/* parent - don't return until all children
+			 * responses are collected
+			 */
+			cookie->total_consumed = rx_op->consumed;
+			cookie->total_produced = rx_op->produced;
+			if (err) {
+				cookie->error = rx_op->status;
+				rx_op->status = RTE_COMP_OP_STATUS_SUCCESS;
+			} else {
+				/* calculate dst mbuf and offset for child op */
+				qat_comp_mbuf_skip(&cookie->dst_data,
+						&cookie->dst_data_offset,
+						rx_op->produced);
+			}
+		} else {
+			QAT_DP_LOG(DEBUG, "Child");
+			if (pc->error == RTE_COMP_OP_STATUS_SUCCESS) {
+				if (err)
+					pc->error = rx_op->status;
+				if (rx_op->produced) {
+					/* this covers both SUCCESS and
+					 * OUT_OF_SPACE_RECOVERABLE cases
+					 */
+					qat_comp_response_data_copy(cookie,
+							rx_op);
+					pc->total_consumed += rx_op->consumed;
+					pc->total_produced += rx_op->produced;
+				}
+			}
+			rx_op->status = RTE_COMP_OP_STATUS_SUCCESS;
+
+			pc->nb_child_responses++;
+
+			/* (child) cookie fields have to be reset
+			 * to avoid problems with reusability -
+			 * rx and tx queue starting from index zero
+			 */
+			cookie->nb_children = 0;
+			cookie->split_op = 0;
+			cookie->nb_child_responses = 0;
+			cookie->dest_buffer = NULL;
+
+			if (pc->nb_child_responses == pc->nb_children) {
+				uint8_t child_resp;
+
+				/* parent should be included as well */
+				child_resp = pc->nb_child_responses + 1;
+
+				rx_op->status = pc->error;
+				rx_op->consumed = pc->total_consumed;
+				rx_op->produced = pc->total_produced;
+				*op = (void *)rx_op;
+
+				/* free memzones used for dst data */
+				qat_comp_free_split_op_memzones(pc,
+						pc->nb_children);
+
+				/* (parent) cookie fields have to be reset
+				 * to avoid problems with reusability -
+				 * rx and tx queue starting from index zero
+				 */
+				pc->nb_children = 0;
+				pc->split_op = 0;
+				pc->nb_child_responses = 0;
+				pc->error = RTE_COMP_OP_STATUS_SUCCESS;
+
+				return child_resp;
+			}
+		}
+		return 0;
+	}
 
-	return 0;
+	*op = (void *)rx_op;
+	return 1;
 }
 
 unsigned int
@@ -443,9 +843,9 @@ static int qat_comp_create_templates(struct qat_comp_xform *qat_xform,
 		comp_level = ICP_QAT_HW_COMPRESSION_DEPTH_1;
 		req_par_flags = ICP_QAT_FW_COMP_REQ_PARAM_FLAGS_BUILD(
 				ICP_QAT_FW_COMP_SOP, ICP_QAT_FW_COMP_EOP,
-				ICP_QAT_FW_COMP_BFINAL, ICP_QAT_FW_COMP_NO_CNV,
-				ICP_QAT_FW_COMP_NO_CNV_RECOVERY);
-
+				ICP_QAT_FW_COMP_BFINAL,
+				ICP_QAT_FW_COMP_CNV,
+				ICP_QAT_FW_COMP_CNV_RECOVERY);
 	} else {
 		if (xform->compress.level == RTE_COMP_LEVEL_PMD_DEFAULT)
 			comp_level = ICP_QAT_HW_COMPRESSION_DEPTH_8;
diff --git a/drivers/compress/qat/qat_comp.h b/drivers/compress/qat/qat_comp.h
index 2231451a1..1c07f2233 100644
--- a/drivers/compress/qat/qat_comp.h
+++ b/drivers/compress/qat/qat_comp.h
@@ -11,6 +11,7 @@
 #include <rte_compressdev_pmd.h>
 
 #include "qat_common.h"
+#include "qat_qp.h"
 #include "icp_qat_hw.h"
 #include "icp_qat_fw_comp.h"
 #include "icp_qat_fw_la.h"
@@ -22,7 +23,7 @@
 #define ERR_CODE_QAT_COMP_WRONG_FW -99
 
 /* fallback to fixed compression threshold */
-#define QAT_FALLBACK_THLD ((uint32_t)(RTE_PMD_QAT_COMP_IM_BUFFER_SIZE / 1.1))
+#define QAT_FALLBACK_THLD ((uint32_t)(RTE_PMD_QAT_COMP_IM_BUFFER_SIZE / 1.3))
 
 #define QAT_MIN_OUT_BUF_SIZE 46
 
@@ -63,6 +64,24 @@ struct qat_comp_op_cookie {
 	uint16_t dst_nb_elems;
 	struct qat_sgl *qat_sgl_src_d;
 	struct qat_sgl *qat_sgl_dst_d;
+	struct qat_qp *qp;
+	uint32_t cookie_index;
+
+	/* QAT IM buffer too small handling: */
+	uint8_t split_op;
+	uint8_t nb_children;
+
+	/* used by the parent only */
+	uint8_t nb_child_responses;
+	uint32_t total_consumed;
+	uint32_t total_produced;
+	const struct rte_memzone **dst_memzones;
+	struct rte_mbuf *dst_data;
+	uint32_t dst_data_offset;
+
+	/* used by the child only */
+	struct qat_comp_op_cookie *parent_cookie;
+	void *dest_buffer;
 };
 
 struct qat_comp_xform {
@@ -86,6 +105,14 @@ int
 qat_comp_build_request(void *in_op, uint8_t *out_msg, void *op_cookie,
 		       enum qat_device_gen qat_dev_gen __rte_unused);
 
+int
+qat_comp_build_multiple_requests(void *in_op, struct qat_qp *qp,
+				 uint32_t parent_tail, int nb_descr);
+
+void
+qat_comp_free_split_op_memzones(struct qat_comp_op_cookie *cookie,
+				unsigned int nb_children);
+
 int
 qat_comp_process_response(void **op, uint8_t *resp, void *op_cookie,
 			  uint64_t *dequeue_err_count);
diff --git a/drivers/compress/qat/qat_comp_pmd.c b/drivers/compress/qat/qat_comp_pmd.c
index 9a7ed19d7..fe62de533 100644
--- a/drivers/compress/qat/qat_comp_pmd.c
+++ b/drivers/compress/qat/qat_comp_pmd.c
@@ -146,6 +146,9 @@ qat_comp_qp_setup(struct rte_compressdev *dev, uint16_t qp_id,
 		struct qat_comp_op_cookie *cookie =
 				qp->op_cookies[i];
 
+		cookie->qp = qp;
+		cookie->cookie_index = i;
+
 		cookie->qat_sgl_src_d = rte_zmalloc_socket(NULL,
 					sizeof(struct qat_sgl) +
 					sizeof(struct qat_flat_buf) *
@@ -560,20 +563,6 @@ qat_comp_dev_info_get(struct rte_compressdev *dev,
 	}
 }
 
-static uint16_t
-qat_comp_pmd_enqueue_op_burst(void *qp, struct rte_comp_op **ops,
-		uint16_t nb_ops)
-{
-	return qat_enqueue_op_burst(qp, (void **)ops, nb_ops);
-}
-
-static uint16_t
-qat_comp_pmd_dequeue_op_burst(void *qp, struct rte_comp_op **ops,
-			      uint16_t nb_ops)
-{
-	return qat_dequeue_op_burst(qp, (void **)ops, nb_ops);
-}
-
 static uint16_t
 qat_comp_pmd_enq_deq_dummy_op_burst(void *qp __rte_unused,
 				    struct rte_comp_op **ops __rte_unused,
@@ -603,7 +592,7 @@ static struct rte_compressdev_ops compress_qat_dummy_ops = {
 };
 
 static uint16_t
-qat_comp_pmd_dequeue_frst_op_burst(void *qp, struct rte_comp_op **ops,
+qat_comp_pmd_dequeue_first_op_burst(void *qp, struct rte_comp_op **ops,
 				   uint16_t nb_ops)
 {
 	uint16_t ret = qat_dequeue_op_burst(qp, (void **)ops, nb_ops);
@@ -623,7 +612,8 @@ qat_comp_pmd_dequeue_frst_op_burst(void *qp, struct rte_comp_op **ops,
 
 		} else {
 			tmp_qp->qat_dev->comp_dev->compressdev->dequeue_burst =
-					qat_comp_pmd_dequeue_op_burst;
+					(compressdev_dequeue_pkt_burst_t)
+					qat_dequeue_op_burst;
 		}
 	}
 	return ret;
@@ -698,8 +688,9 @@ qat_comp_dev_create(struct qat_pci_device *qat_pci_dev,
 
 	compressdev->dev_ops = &compress_qat_ops;
 
-	compressdev->enqueue_burst = qat_comp_pmd_enqueue_op_burst;
-	compressdev->dequeue_burst = qat_comp_pmd_dequeue_frst_op_burst;
+	compressdev->enqueue_burst = (compressdev_enqueue_pkt_burst_t)
+			qat_enqueue_comp_op_burst;
+	compressdev->dequeue_burst = qat_comp_pmd_dequeue_first_op_burst;
 
 	compressdev->feature_flags = RTE_COMPDEV_FF_HW_ACCELERATED;
 
-- 
2.17.1


  reply	other threads:[~2020-04-08 12:51 UTC|newest]

Thread overview: 20+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-04-08 12:50 [dpdk-dev] [PATCH v2 0/2] " Adam Dybkowski
2020-04-08 12:51 ` Adam Dybkowski [this message]
2020-04-08 15:43   ` [dpdk-dev] [PATCH v2 1/2] " Trahe, Fiona
2020-04-08 12:51 ` [dpdk-dev] [PATCH v2 2/2] test/compress: im buffer too small - add unit tests Adam Dybkowski
2020-04-08 15:44   ` Trahe, Fiona
2020-04-15 18:35   ` Akhil Goyal
2020-04-16 10:02     ` Trahe, Fiona
2020-04-16 10:25       ` Akhil Goyal
2020-04-16 11:26         ` Trahe, Fiona
2020-04-16 14:31           ` Bruce Richardson
2020-04-16 14:55             ` Trahe, Fiona
2020-04-16 14:37           ` Akhil Goyal
2020-04-16 14:52             ` Trahe, Fiona
2020-04-17 15:39               ` Akhil Goyal
2020-04-17 15:56                 ` Trahe, Fiona
2020-04-17 15:44 ` [dpdk-dev] [PATCH v3 0/2] compress/qat: im buffer too small - split op Adam Dybkowski
2020-04-17 15:44   ` [dpdk-dev] [PATCH v3 1/2] " Adam Dybkowski
2020-04-17 15:44   ` [dpdk-dev] [PATCH v3 2/2] test/compress: im buffer too small - add unit tests Adam Dybkowski
2020-04-17 15:58     ` Trahe, Fiona
2020-04-17 21:50       ` Akhil Goyal

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20200408125101.25764-2-adamx.dybkowski@intel.com \
    --to=adamx.dybkowski@intel.com \
    --cc=akhil.goyal@nxp.com \
    --cc=dev@dpdk.org \
    --cc=fiona.trahe@intel.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).