DPDK patches and discussions
 help / color / mirror / Atom feed
* [PATCH v1 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer
@ 2021-12-21 11:31 Ganapati Kundapura
  2021-12-21 11:31 ` [PATCH v1 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
  2022-01-04 12:31 ` [PATCH v2 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
  0 siblings, 2 replies; 17+ messages in thread
From: Ganapati Kundapura @ 2021-12-21 11:31 UTC (permalink / raw)
  To: jay.jayatheerthan, jerinjacobk, dev; +Cc: abhinandan.gujjar

Move crypto ops to circular buffer to retain crypto
ops when cryptodev/eventdev are temporarily full

Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>

diff --git a/lib/eventdev/rte_event_crypto_adapter.c b/lib/eventdev/rte_event_crypto_adapter.c
index d840803..4469a89 100644
--- a/lib/eventdev/rte_event_crypto_adapter.c
+++ b/lib/eventdev/rte_event_crypto_adapter.c
@@ -25,11 +25,27 @@
 #define CRYPTO_ADAPTER_MEM_NAME_LEN 32
 #define CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
 
+#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
+#define CRYPTO_ADAPTER_BUFFER_SZ 1024
+
 /* Flush an instance's enqueue buffers every CRYPTO_ENQ_FLUSH_THRESHOLD
  * iterations of eca_crypto_adapter_enq_run()
  */
 #define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
 
+struct crypto_ops_circular_buffer {
+	/* index of head element in circular buffer */
+	uint16_t head;
+	/* index of tail element in circular buffer */
+	uint16_t tail;
+	/* number elements in buffer */
+	uint16_t count;
+	/* size of circular buffer */
+	uint16_t size;
+	/* Pointer to hold rte_crypto_ops for batching */
+	struct rte_crypto_op **op_buffer;
+} __rte_cache_aligned;
+
 struct event_crypto_adapter {
 	/* Event device identifier */
 	uint8_t eventdev_id;
@@ -47,6 +63,8 @@ struct event_crypto_adapter {
 	struct crypto_device_info *cdevs;
 	/* Loop counter to flush crypto ops */
 	uint16_t transmit_loop_count;
+	/* Circular buffer for batching crypto ops to eventdev */
+	struct crypto_ops_circular_buffer ebuf;
 	/* Per instance stats structure */
 	struct rte_event_crypto_adapter_stats crypto_stats;
 	/* Configuration callback for rte_service configuration */
@@ -93,8 +111,8 @@ struct crypto_device_info {
 struct crypto_queue_pair_info {
 	/* Set to indicate queue pair is enabled */
 	bool qp_enabled;
-	/* Pointer to hold rte_crypto_ops for batching */
-	struct rte_crypto_op **op_buffer;
+	/* Circular buffer for batching crypto ops to cdev */
+	struct crypto_ops_circular_buffer cbuf;
 	/* No of crypto ops accumulated */
 	uint8_t len;
 } __rte_cache_aligned;
@@ -141,6 +159,77 @@ eca_init(void)
 	return 0;
 }
 
+static inline bool
+eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer *bufp)
+{
+	return bufp->count >= BATCH_SIZE;
+}
+
+static inline void
+eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp)
+{
+	rte_free(bufp->op_buffer);
+}
+
+static inline int
+eca_circular_buffer_init(const char *name,
+			 struct crypto_ops_circular_buffer *bufp,
+			 uint16_t sz)
+{
+	bufp->op_buffer = rte_zmalloc(name,
+				      sizeof(struct rte_crypto_op *) * sz,
+				      0);
+	if (bufp->op_buffer == NULL)
+		return -ENOMEM;
+
+	bufp->size = sz;
+	return 0;
+}
+
+static inline int
+eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
+			struct rte_crypto_op *op)
+{
+	uint16_t *tailp = &bufp->tail;
+
+	bufp->op_buffer[*tailp] = op;
+	*tailp = (*tailp + 1) % bufp->size;
+	bufp->count++;
+
+	return 0;
+}
+
+static inline int
+eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer *bufp,
+				  uint8_t cdev_id, uint16_t qp_id,
+				  uint16_t *nb_ops_flushed)
+{
+	uint16_t n = 0;
+	uint16_t *headp = &bufp->head;
+	uint16_t *tailp = &bufp->tail;
+	struct rte_crypto_op **ops = bufp->op_buffer;
+
+	if (*tailp > *headp)
+		n = *tailp - *headp;
+	else if (*tailp < *headp)
+		n = bufp->size - *headp;
+	else {
+		*nb_ops_flushed = 0;
+		return 0;  /* buffer empty */
+	}
+
+	*nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
+						      &ops[*headp], n);
+	bufp->count -= *nb_ops_flushed;
+	if (!bufp->count) {
+		*headp = 0;
+		*tailp = 0;
+	} else
+		*headp = (*headp + *nb_ops_flushed) % bufp->size;
+
+	return *nb_ops_flushed == n ? 0 : -1;
+}
+
 static inline struct event_crypto_adapter *
 eca_id_to_adapter(uint8_t id)
 {
@@ -237,10 +326,19 @@ rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
 		return -ENOMEM;
 	}
 
+	if (eca_circular_buffer_init("eca_edev_circular_buffer",
+				     &adapter->ebuf,
+				     CRYPTO_ADAPTER_BUFFER_SZ)) {
+		RTE_EDEV_LOG_ERR("Failed to get mem for edev buffer");
+		rte_free(adapter);
+		return -ENOMEM;
+	}
+
 	ret = rte_event_dev_info_get(dev_id, &dev_info);
 	if (ret < 0) {
 		RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d: %s!",
 				 dev_id, dev_info.driver_name);
+		eca_circular_buffer_free(&adapter->ebuf);
 		rte_free(adapter);
 		return ret;
 	}
@@ -259,6 +357,7 @@ rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
 					socket_id);
 	if (adapter->cdevs == NULL) {
 		RTE_EDEV_LOG_ERR("Failed to get mem for crypto devices\n");
+		eca_circular_buffer_free(&adapter->ebuf);
 		rte_free(adapter);
 		return -ENOMEM;
 	}
@@ -337,11 +436,10 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 	struct crypto_queue_pair_info *qp_info = NULL;
 	struct rte_crypto_op *crypto_op;
 	unsigned int i, n;
-	uint16_t qp_id, len, ret;
+	uint16_t qp_id, len, nb_enqueued = 0;
 	uint8_t cdev_id;
 
 	len = 0;
-	ret = 0;
 	n = 0;
 	stats->event_deq_count += cnt;
 
@@ -367,7 +465,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 				continue;
 			}
 			len = qp_info->len;
-			qp_info->op_buffer[len] = crypto_op;
+			eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
 			len++;
 		} else if (crypto_op->sess_type == RTE_CRYPTO_OP_SESSIONLESS &&
 				crypto_op->private_data_offset) {
@@ -383,7 +481,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 				continue;
 			}
 			len = qp_info->len;
-			qp_info->op_buffer[len] = crypto_op;
+			eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
 			len++;
 		} else {
 			rte_pktmbuf_free(crypto_op->sym->m_src);
@@ -391,18 +489,16 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 			continue;
 		}
 
-		if (len == BATCH_SIZE) {
-			struct rte_crypto_op **op_buffer = qp_info->op_buffer;
-			ret = rte_cryptodev_enqueue_burst(cdev_id,
+		if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
+			eca_circular_buffer_flush_to_cdev(&qp_info->cbuf,
+							  cdev_id,
 							  qp_id,
-							  op_buffer,
-							  BATCH_SIZE);
+							  &nb_enqueued);
+			stats->crypto_enq_count += nb_enqueued;
 
-			stats->crypto_enq_count += ret;
-
-			while (ret < len) {
+			while (nb_enqueued < len) {
 				struct rte_crypto_op *op;
-				op = op_buffer[ret++];
+				op = qp_info->cbuf.op_buffer[nb_enqueued++];
 				stats->crypto_enq_fail++;
 				rte_pktmbuf_free(op->sym->m_src);
 				rte_crypto_op_free(op);
@@ -413,7 +509,8 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 
 		if (qp_info)
 			qp_info->len = len;
-		n += ret;
+
+		n += nb_enqueued;
 	}
 
 	return n;
@@ -425,14 +522,12 @@ eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
 	struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
 	struct crypto_device_info *curr_dev;
 	struct crypto_queue_pair_info *curr_queue;
-	struct rte_crypto_op **op_buffer;
 	struct rte_cryptodev *dev;
 	uint8_t cdev_id;
 	uint16_t qp;
-	uint16_t ret;
+	uint16_t nb_enqueued = 0, nb = 0;
 	uint16_t num_cdev = rte_cryptodev_count();
 
-	ret = 0;
 	for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
 		curr_dev = &adapter->cdevs[cdev_id];
 		dev = curr_dev->dev;
@@ -444,25 +539,26 @@ eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
 			if (!curr_queue->qp_enabled)
 				continue;
 
-			op_buffer = curr_queue->op_buffer;
-			ret = rte_cryptodev_enqueue_burst(cdev_id,
+			eca_circular_buffer_flush_to_cdev(&curr_queue->cbuf,
+							  cdev_id,
 							  qp,
-							  op_buffer,
-							  curr_queue->len);
-			stats->crypto_enq_count += ret;
+							  &nb_enqueued);
+
+			stats->crypto_enq_count += nb_enqueued;
 
-			while (ret < curr_queue->len) {
+			while (nb_enqueued < curr_queue->len) {
 				struct rte_crypto_op *op;
-				op = op_buffer[ret++];
+				op = curr_queue->cbuf.op_buffer[nb_enqueued++];
 				stats->crypto_enq_fail++;
 				rte_pktmbuf_free(op->sym->m_src);
 				rte_crypto_op_free(op);
 			}
 			curr_queue->len = 0;
+			nb += nb_enqueued;
 		}
 	}
 
-	return ret;
+	return nb;
 }
 
 static int
@@ -499,9 +595,9 @@ eca_crypto_adapter_enq_run(struct event_crypto_adapter *adapter,
 	return nb_enqueued;
 }
 
-static inline void
+static inline uint16_t
 eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
-		      struct rte_crypto_op **ops, uint16_t num)
+		  struct rte_crypto_op **ops, uint16_t num)
 {
 	struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
 	union rte_event_crypto_metadata *m_data = NULL;
@@ -518,6 +614,8 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
 	num = RTE_MIN(num, BATCH_SIZE);
 	for (i = 0; i < num; i++) {
 		struct rte_event *ev = &events[nb_ev++];
+
+		m_data = NULL;
 		if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
 			m_data = rte_cryptodev_sym_session_get_user_data(
 					ops[i]->sym->session);
@@ -548,6 +646,7 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
 						  event_port_id,
 						  &events[nb_enqueued],
 						  nb_ev - nb_enqueued);
+
 	} while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
 		 nb_enqueued < nb_ev);
 
@@ -561,8 +660,52 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
 	stats->event_enq_fail_count += nb_ev - nb_enqueued;
 	stats->event_enq_count += nb_enqueued;
 	stats->event_enq_retry_count += retry - 1;
+
+	return nb_enqueued;
 }
 
+static int
+eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter *adapter,
+				   struct crypto_ops_circular_buffer *bufp)
+{
+	uint16_t n = 0, nb_ops_flushed;
+	uint16_t *headp = &bufp->head;
+	uint16_t *tailp = &bufp->tail;
+	struct rte_crypto_op **ops = bufp->op_buffer;
+
+	if (*tailp > *headp)
+		n = *tailp - *headp;
+	else if (*tailp < *headp)
+		n = bufp->size - *headp;
+	else
+		return 0;  /* buffer empty */
+
+	nb_ops_flushed =  eca_ops_enqueue_burst(adapter, ops, n);
+	bufp->count -= nb_ops_flushed;
+	if (!bufp->count) {
+		*headp = 0;
+		*tailp = 0;
+	} else
+		*headp = (*headp + nb_ops_flushed) % bufp->size;
+
+	/* Stop further enqueue to eventdev, if nb_ops_flushed < n */
+	if (nb_ops_flushed < n)
+		return 0;
+
+	return 1;
+}
+
+
+static void
+eca_ops_buffer_flush(struct event_crypto_adapter *adapter)
+{
+	if (adapter->ebuf.count == 0)
+		return;
+
+	while (eca_circular_buffer_flush_to_evdev(adapter,
+						  &adapter->ebuf))
+		;
+}
 static inline unsigned int
 eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 			   unsigned int max_deq)
@@ -571,7 +714,7 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 	struct crypto_device_info *curr_dev;
 	struct crypto_queue_pair_info *curr_queue;
 	struct rte_crypto_op *ops[BATCH_SIZE];
-	uint16_t n, nb_deq;
+	uint16_t n, nb_deq, nb_enqueued, i;
 	struct rte_cryptodev *dev;
 	uint8_t cdev_id;
 	uint16_t qp, dev_qps;
@@ -579,16 +722,20 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 	uint16_t num_cdev = rte_cryptodev_count();
 
 	nb_deq = 0;
+	eca_ops_buffer_flush(adapter);
+
 	do {
-		uint16_t queues = 0;
 		done = true;
 
 		for (cdev_id = adapter->next_cdev_id;
 			cdev_id < num_cdev; cdev_id++) {
+			uint16_t queues = 0;
+
 			curr_dev = &adapter->cdevs[cdev_id];
 			dev = curr_dev->dev;
 			if (dev == NULL)
 				continue;
+
 			dev_qps = dev->data->nb_queue_pairs;
 
 			for (qp = curr_dev->next_queue_pair_id;
@@ -605,8 +752,24 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 					continue;
 
 				done = false;
+				nb_enqueued = 0;
+
 				stats->crypto_deq_count += n;
-				eca_ops_enqueue_burst(adapter, ops, n);
+
+				if (unlikely(!adapter->ebuf.count))
+					nb_enqueued = eca_ops_enqueue_burst(
+							adapter, ops, n);
+
+				if (nb_enqueued == n)
+					goto check;
+
+				/* Failed to enqueue events case */
+				for (i = nb_enqueued; i < n; i++)
+					eca_circular_buffer_add(
+						&adapter->ebuf,
+						ops[nb_enqueued]);
+
+check:
 				nb_deq += n;
 
 				if (nb_deq > max_deq) {
@@ -751,11 +914,10 @@ eca_add_queue_pair(struct event_crypto_adapter *adapter, uint8_t cdev_id,
 			return -ENOMEM;
 
 		qpairs = dev_info->qpairs;
-		qpairs->op_buffer = rte_zmalloc_socket(adapter->mem_name,
-					BATCH_SIZE *
-					sizeof(struct rte_crypto_op *),
-					0, adapter->socket_id);
-		if (!qpairs->op_buffer) {
+
+		if (eca_circular_buffer_init("eca_cdev_circular_buffer",
+					     &qpairs->cbuf,
+					     CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
 			rte_free(qpairs);
 			return -ENOMEM;
 		}
-- 
2.6.4


^ permalink raw reply	[flat|nested] 17+ messages in thread

* [PATCH v1 2/2] eventdev: update crypto caps get to return SW cap
  2021-12-21 11:31 [PATCH v1 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
@ 2021-12-21 11:31 ` Ganapati Kundapura
  2022-01-04 12:31 ` [PATCH v2 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
  1 sibling, 0 replies; 17+ messages in thread
From: Ganapati Kundapura @ 2021-12-21 11:31 UTC (permalink / raw)
  To: jay.jayatheerthan, jerinjacobk, dev; +Cc: abhinandan.gujjar

update rte_event_crypto_adapter_caps_get() to return
SW_CAP if PMD callback is not registered.

Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>

diff --git a/lib/eventdev/rte_eventdev.c b/lib/eventdev/rte_eventdev.c
index 79b9ea3..6988bf1 100644
--- a/lib/eventdev/rte_eventdev.c
+++ b/lib/eventdev/rte_eventdev.c
@@ -176,11 +176,15 @@ rte_event_crypto_adapter_caps_get(uint8_t dev_id, uint8_t cdev_id,
 
 	if (caps == NULL)
 		return -EINVAL;
-	*caps = 0;
+
+	if (dev->dev_ops->crypto_adapter_caps_get == NULL)
+		*caps = RTE_EVENT_CRYPTO_ADAPTER_SW_CAP;
+	else
+		*caps = 0;
 
 	return dev->dev_ops->crypto_adapter_caps_get ?
 		(*dev->dev_ops->crypto_adapter_caps_get)
-		(dev, cdev, caps) : -ENOTSUP;
+		(dev, cdev, caps) : 0;
 }
 
 int
-- 
2.6.4


^ permalink raw reply	[flat|nested] 17+ messages in thread

* [PATCH v2 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer
  2021-12-21 11:31 [PATCH v1 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
  2021-12-21 11:31 ` [PATCH v1 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
@ 2022-01-04 12:31 ` Ganapati Kundapura
  2022-01-04 12:31   ` [PATCH v2 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
  2022-01-11 10:36   ` [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
  1 sibling, 2 replies; 17+ messages in thread
From: Ganapati Kundapura @ 2022-01-04 12:31 UTC (permalink / raw)
  To: jerinjacobk, jay.jayatheerthan, dev; +Cc: abhinandan.gujjar

Move crypto ops to circular buffer to retain crypto
ops when cryptodev/eventdev are temporarily full

---
v2:
* reset cryptp adapter next cdev id before dequeueing from the
  next cdev
---

Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>

diff --git a/lib/eventdev/rte_event_crypto_adapter.c b/lib/eventdev/rte_event_crypto_adapter.c
index d840803..981db6e 100644
--- a/lib/eventdev/rte_event_crypto_adapter.c
+++ b/lib/eventdev/rte_event_crypto_adapter.c
@@ -25,11 +25,27 @@
 #define CRYPTO_ADAPTER_MEM_NAME_LEN 32
 #define CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
 
+#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
+#define CRYPTO_ADAPTER_BUFFER_SZ 1024
+
 /* Flush an instance's enqueue buffers every CRYPTO_ENQ_FLUSH_THRESHOLD
  * iterations of eca_crypto_adapter_enq_run()
  */
 #define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
 
+struct crypto_ops_circular_buffer {
+	/* index of head element in circular buffer */
+	uint16_t head;
+	/* index of tail element in circular buffer */
+	uint16_t tail;
+	/* number elements in buffer */
+	uint16_t count;
+	/* size of circular buffer */
+	uint16_t size;
+	/* Pointer to hold rte_crypto_ops for batching */
+	struct rte_crypto_op **op_buffer;
+} __rte_cache_aligned;
+
 struct event_crypto_adapter {
 	/* Event device identifier */
 	uint8_t eventdev_id;
@@ -47,6 +63,8 @@ struct event_crypto_adapter {
 	struct crypto_device_info *cdevs;
 	/* Loop counter to flush crypto ops */
 	uint16_t transmit_loop_count;
+	/* Circular buffer for batching crypto ops to eventdev */
+	struct crypto_ops_circular_buffer ebuf;
 	/* Per instance stats structure */
 	struct rte_event_crypto_adapter_stats crypto_stats;
 	/* Configuration callback for rte_service configuration */
@@ -93,8 +111,8 @@ struct crypto_device_info {
 struct crypto_queue_pair_info {
 	/* Set to indicate queue pair is enabled */
 	bool qp_enabled;
-	/* Pointer to hold rte_crypto_ops for batching */
-	struct rte_crypto_op **op_buffer;
+	/* Circular buffer for batching crypto ops to cdev */
+	struct crypto_ops_circular_buffer cbuf;
 	/* No of crypto ops accumulated */
 	uint8_t len;
 } __rte_cache_aligned;
@@ -141,6 +159,77 @@ eca_init(void)
 	return 0;
 }
 
+static inline bool
+eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer *bufp)
+{
+	return bufp->count >= BATCH_SIZE;
+}
+
+static inline void
+eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp)
+{
+	rte_free(bufp->op_buffer);
+}
+
+static inline int
+eca_circular_buffer_init(const char *name,
+			 struct crypto_ops_circular_buffer *bufp,
+			 uint16_t sz)
+{
+	bufp->op_buffer = rte_zmalloc(name,
+				      sizeof(struct rte_crypto_op *) * sz,
+				      0);
+	if (bufp->op_buffer == NULL)
+		return -ENOMEM;
+
+	bufp->size = sz;
+	return 0;
+}
+
+static inline int
+eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
+			struct rte_crypto_op *op)
+{
+	uint16_t *tailp = &bufp->tail;
+
+	bufp->op_buffer[*tailp] = op;
+	*tailp = (*tailp + 1) % bufp->size;
+	bufp->count++;
+
+	return 0;
+}
+
+static inline int
+eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer *bufp,
+				  uint8_t cdev_id, uint16_t qp_id,
+				  uint16_t *nb_ops_flushed)
+{
+	uint16_t n = 0;
+	uint16_t *headp = &bufp->head;
+	uint16_t *tailp = &bufp->tail;
+	struct rte_crypto_op **ops = bufp->op_buffer;
+
+	if (*tailp > *headp)
+		n = *tailp - *headp;
+	else if (*tailp < *headp)
+		n = bufp->size - *headp;
+	else {
+		*nb_ops_flushed = 0;
+		return 0;  /* buffer empty */
+	}
+
+	*nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
+						      &ops[*headp], n);
+	bufp->count -= *nb_ops_flushed;
+	if (!bufp->count) {
+		*headp = 0;
+		*tailp = 0;
+	} else
+		*headp = (*headp + *nb_ops_flushed) % bufp->size;
+
+	return *nb_ops_flushed == n ? 0 : -1;
+}
+
 static inline struct event_crypto_adapter *
 eca_id_to_adapter(uint8_t id)
 {
@@ -237,10 +326,19 @@ rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
 		return -ENOMEM;
 	}
 
+	if (eca_circular_buffer_init("eca_edev_circular_buffer",
+				     &adapter->ebuf,
+				     CRYPTO_ADAPTER_BUFFER_SZ)) {
+		RTE_EDEV_LOG_ERR("Failed to get mem for edev buffer");
+		rte_free(adapter);
+		return -ENOMEM;
+	}
+
 	ret = rte_event_dev_info_get(dev_id, &dev_info);
 	if (ret < 0) {
 		RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d: %s!",
 				 dev_id, dev_info.driver_name);
+		eca_circular_buffer_free(&adapter->ebuf);
 		rte_free(adapter);
 		return ret;
 	}
@@ -259,6 +357,7 @@ rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
 					socket_id);
 	if (adapter->cdevs == NULL) {
 		RTE_EDEV_LOG_ERR("Failed to get mem for crypto devices\n");
+		eca_circular_buffer_free(&adapter->ebuf);
 		rte_free(adapter);
 		return -ENOMEM;
 	}
@@ -337,11 +436,10 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 	struct crypto_queue_pair_info *qp_info = NULL;
 	struct rte_crypto_op *crypto_op;
 	unsigned int i, n;
-	uint16_t qp_id, len, ret;
+	uint16_t qp_id, len, nb_enqueued = 0;
 	uint8_t cdev_id;
 
 	len = 0;
-	ret = 0;
 	n = 0;
 	stats->event_deq_count += cnt;
 
@@ -367,7 +465,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 				continue;
 			}
 			len = qp_info->len;
-			qp_info->op_buffer[len] = crypto_op;
+			eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
 			len++;
 		} else if (crypto_op->sess_type == RTE_CRYPTO_OP_SESSIONLESS &&
 				crypto_op->private_data_offset) {
@@ -383,7 +481,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 				continue;
 			}
 			len = qp_info->len;
-			qp_info->op_buffer[len] = crypto_op;
+			eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
 			len++;
 		} else {
 			rte_pktmbuf_free(crypto_op->sym->m_src);
@@ -391,18 +489,16 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 			continue;
 		}
 
-		if (len == BATCH_SIZE) {
-			struct rte_crypto_op **op_buffer = qp_info->op_buffer;
-			ret = rte_cryptodev_enqueue_burst(cdev_id,
+		if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
+			eca_circular_buffer_flush_to_cdev(&qp_info->cbuf,
+							  cdev_id,
 							  qp_id,
-							  op_buffer,
-							  BATCH_SIZE);
+							  &nb_enqueued);
+			stats->crypto_enq_count += nb_enqueued;
 
-			stats->crypto_enq_count += ret;
-
-			while (ret < len) {
+			while (nb_enqueued < len) {
 				struct rte_crypto_op *op;
-				op = op_buffer[ret++];
+				op = qp_info->cbuf.op_buffer[nb_enqueued++];
 				stats->crypto_enq_fail++;
 				rte_pktmbuf_free(op->sym->m_src);
 				rte_crypto_op_free(op);
@@ -413,7 +509,8 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 
 		if (qp_info)
 			qp_info->len = len;
-		n += ret;
+
+		n += nb_enqueued;
 	}
 
 	return n;
@@ -425,14 +522,12 @@ eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
 	struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
 	struct crypto_device_info *curr_dev;
 	struct crypto_queue_pair_info *curr_queue;
-	struct rte_crypto_op **op_buffer;
 	struct rte_cryptodev *dev;
 	uint8_t cdev_id;
 	uint16_t qp;
-	uint16_t ret;
+	uint16_t nb_enqueued = 0, nb = 0;
 	uint16_t num_cdev = rte_cryptodev_count();
 
-	ret = 0;
 	for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
 		curr_dev = &adapter->cdevs[cdev_id];
 		dev = curr_dev->dev;
@@ -444,25 +539,26 @@ eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
 			if (!curr_queue->qp_enabled)
 				continue;
 
-			op_buffer = curr_queue->op_buffer;
-			ret = rte_cryptodev_enqueue_burst(cdev_id,
+			eca_circular_buffer_flush_to_cdev(&curr_queue->cbuf,
+							  cdev_id,
 							  qp,
-							  op_buffer,
-							  curr_queue->len);
-			stats->crypto_enq_count += ret;
+							  &nb_enqueued);
+
+			stats->crypto_enq_count += nb_enqueued;
 
-			while (ret < curr_queue->len) {
+			while (nb_enqueued < curr_queue->len) {
 				struct rte_crypto_op *op;
-				op = op_buffer[ret++];
+				op = curr_queue->cbuf.op_buffer[nb_enqueued++];
 				stats->crypto_enq_fail++;
 				rte_pktmbuf_free(op->sym->m_src);
 				rte_crypto_op_free(op);
 			}
 			curr_queue->len = 0;
+			nb += nb_enqueued;
 		}
 	}
 
-	return ret;
+	return nb;
 }
 
 static int
@@ -499,9 +595,9 @@ eca_crypto_adapter_enq_run(struct event_crypto_adapter *adapter,
 	return nb_enqueued;
 }
 
-static inline void
+static inline uint16_t
 eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
-		      struct rte_crypto_op **ops, uint16_t num)
+		  struct rte_crypto_op **ops, uint16_t num)
 {
 	struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
 	union rte_event_crypto_metadata *m_data = NULL;
@@ -518,6 +614,8 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
 	num = RTE_MIN(num, BATCH_SIZE);
 	for (i = 0; i < num; i++) {
 		struct rte_event *ev = &events[nb_ev++];
+
+		m_data = NULL;
 		if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
 			m_data = rte_cryptodev_sym_session_get_user_data(
 					ops[i]->sym->session);
@@ -548,6 +646,7 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
 						  event_port_id,
 						  &events[nb_enqueued],
 						  nb_ev - nb_enqueued);
+
 	} while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
 		 nb_enqueued < nb_ev);
 
@@ -561,8 +660,52 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
 	stats->event_enq_fail_count += nb_ev - nb_enqueued;
 	stats->event_enq_count += nb_enqueued;
 	stats->event_enq_retry_count += retry - 1;
+
+	return nb_enqueued;
 }
 
+static int
+eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter *adapter,
+				   struct crypto_ops_circular_buffer *bufp)
+{
+	uint16_t n = 0, nb_ops_flushed;
+	uint16_t *headp = &bufp->head;
+	uint16_t *tailp = &bufp->tail;
+	struct rte_crypto_op **ops = bufp->op_buffer;
+
+	if (*tailp > *headp)
+		n = *tailp - *headp;
+	else if (*tailp < *headp)
+		n = bufp->size - *headp;
+	else
+		return 0;  /* buffer empty */
+
+	nb_ops_flushed =  eca_ops_enqueue_burst(adapter, ops, n);
+	bufp->count -= nb_ops_flushed;
+	if (!bufp->count) {
+		*headp = 0;
+		*tailp = 0;
+	} else
+		*headp = (*headp + nb_ops_flushed) % bufp->size;
+
+	/* Stop further enqueue to eventdev, if nb_ops_flushed < n */
+	if (nb_ops_flushed < n)
+		return 0;
+
+	return 1;
+}
+
+
+static void
+eca_ops_buffer_flush(struct event_crypto_adapter *adapter)
+{
+	if (adapter->ebuf.count == 0)
+		return;
+
+	while (eca_circular_buffer_flush_to_evdev(adapter,
+						  &adapter->ebuf))
+		;
+}
 static inline unsigned int
 eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 			   unsigned int max_deq)
@@ -571,7 +714,7 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 	struct crypto_device_info *curr_dev;
 	struct crypto_queue_pair_info *curr_queue;
 	struct rte_crypto_op *ops[BATCH_SIZE];
-	uint16_t n, nb_deq;
+	uint16_t n, nb_deq, nb_enqueued, i;
 	struct rte_cryptodev *dev;
 	uint8_t cdev_id;
 	uint16_t qp, dev_qps;
@@ -579,16 +722,20 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 	uint16_t num_cdev = rte_cryptodev_count();
 
 	nb_deq = 0;
+	eca_ops_buffer_flush(adapter);
+
 	do {
-		uint16_t queues = 0;
 		done = true;
 
 		for (cdev_id = adapter->next_cdev_id;
 			cdev_id < num_cdev; cdev_id++) {
+			uint16_t queues = 0;
+
 			curr_dev = &adapter->cdevs[cdev_id];
 			dev = curr_dev->dev;
 			if (dev == NULL)
 				continue;
+
 			dev_qps = dev->data->nb_queue_pairs;
 
 			for (qp = curr_dev->next_queue_pair_id;
@@ -596,7 +743,8 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 				queues++) {
 
 				curr_queue = &curr_dev->qpairs[qp];
-				if (!curr_queue->qp_enabled)
+				if (curr_queue == NULL ||
+				    !curr_queue->qp_enabled)
 					continue;
 
 				n = rte_cryptodev_dequeue_burst(cdev_id, qp,
@@ -605,8 +753,24 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 					continue;
 
 				done = false;
+				nb_enqueued = 0;
+
 				stats->crypto_deq_count += n;
-				eca_ops_enqueue_burst(adapter, ops, n);
+
+				if (unlikely(!adapter->ebuf.count))
+					nb_enqueued = eca_ops_enqueue_burst(
+							adapter, ops, n);
+
+				if (nb_enqueued == n)
+					goto check;
+
+				/* Failed to enqueue events case */
+				for (i = nb_enqueued; i < n; i++)
+					eca_circular_buffer_add(
+						&adapter->ebuf,
+						ops[nb_enqueued]);
+
+check:
 				nb_deq += n;
 
 				if (nb_deq > max_deq) {
@@ -622,6 +786,7 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 				}
 			}
 		}
+		adapter->next_cdev_id = 0;
 	} while (done == false);
 	return nb_deq;
 }
@@ -751,11 +916,10 @@ eca_add_queue_pair(struct event_crypto_adapter *adapter, uint8_t cdev_id,
 			return -ENOMEM;
 
 		qpairs = dev_info->qpairs;
-		qpairs->op_buffer = rte_zmalloc_socket(adapter->mem_name,
-					BATCH_SIZE *
-					sizeof(struct rte_crypto_op *),
-					0, adapter->socket_id);
-		if (!qpairs->op_buffer) {
+
+		if (eca_circular_buffer_init("eca_cdev_circular_buffer",
+					     &qpairs->cbuf,
+					     CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
 			rte_free(qpairs);
 			return -ENOMEM;
 		}
-- 
2.6.4


^ permalink raw reply	[flat|nested] 17+ messages in thread

* [PATCH v2 2/2] eventdev: update crypto caps get to return SW cap
  2022-01-04 12:31 ` [PATCH v2 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
@ 2022-01-04 12:31   ` Ganapati Kundapura
  2022-01-11 10:36   ` [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
  1 sibling, 0 replies; 17+ messages in thread
From: Ganapati Kundapura @ 2022-01-04 12:31 UTC (permalink / raw)
  To: jerinjacobk, jay.jayatheerthan, dev; +Cc: abhinandan.gujjar

update rte_event_crypto_adapter_caps_get() to return
SW_CAP if PMD callback is not registered.

Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>

diff --git a/lib/eventdev/rte_eventdev.c b/lib/eventdev/rte_eventdev.c
index 79b9ea3..6988bf1 100644
--- a/lib/eventdev/rte_eventdev.c
+++ b/lib/eventdev/rte_eventdev.c
@@ -176,11 +176,15 @@ rte_event_crypto_adapter_caps_get(uint8_t dev_id, uint8_t cdev_id,
 
 	if (caps == NULL)
 		return -EINVAL;
-	*caps = 0;
+
+	if (dev->dev_ops->crypto_adapter_caps_get == NULL)
+		*caps = RTE_EVENT_CRYPTO_ADAPTER_SW_CAP;
+	else
+		*caps = 0;
 
 	return dev->dev_ops->crypto_adapter_caps_get ?
 		(*dev->dev_ops->crypto_adapter_caps_get)
-		(dev, cdev, caps) : -ENOTSUP;
+		(dev, cdev, caps) : 0;
 }
 
 int
-- 
2.6.4


^ permalink raw reply	[flat|nested] 17+ messages in thread

* [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer
  2022-01-04 12:31 ` [PATCH v2 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
  2022-01-04 12:31   ` [PATCH v2 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
@ 2022-01-11 10:36   ` Ganapati Kundapura
  2022-01-11 10:36     ` [PATCH v3 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
                       ` (2 more replies)
  1 sibling, 3 replies; 17+ messages in thread
From: Ganapati Kundapura @ 2022-01-11 10:36 UTC (permalink / raw)
  To: jerinjacobk, jay.jayatheerthan, dev; +Cc: abhinandan.gujjar

Move crypto ops to circular buffer to retain crypto
ops when cryptodev/eventdev are temporarily full

---
v3:
* update eca_ops_buffer_flush() to flush out all the crypto
  ops out of circular buffer.
* remove freeing of failed crypto ops from eca_ops_enqueue_burst()
  and add to cirular buffer for later processing.

v2:
* reset cryptp adapter next cdev id before dequeueing from the
  next cdev
---

Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>

diff --git a/lib/eventdev/rte_event_crypto_adapter.c b/lib/eventdev/rte_event_crypto_adapter.c
index d840803..9086368 100644
--- a/lib/eventdev/rte_event_crypto_adapter.c
+++ b/lib/eventdev/rte_event_crypto_adapter.c
@@ -25,11 +25,27 @@
 #define CRYPTO_ADAPTER_MEM_NAME_LEN 32
 #define CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
 
+#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
+#define CRYPTO_ADAPTER_BUFFER_SZ 1024
+
 /* Flush an instance's enqueue buffers every CRYPTO_ENQ_FLUSH_THRESHOLD
  * iterations of eca_crypto_adapter_enq_run()
  */
 #define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
 
+struct crypto_ops_circular_buffer {
+	/* index of head element in circular buffer */
+	uint16_t head;
+	/* index of tail element in circular buffer */
+	uint16_t tail;
+	/* number elements in buffer */
+	uint16_t count;
+	/* size of circular buffer */
+	uint16_t size;
+	/* Pointer to hold rte_crypto_ops for batching */
+	struct rte_crypto_op **op_buffer;
+} __rte_cache_aligned;
+
 struct event_crypto_adapter {
 	/* Event device identifier */
 	uint8_t eventdev_id;
@@ -47,6 +63,8 @@ struct event_crypto_adapter {
 	struct crypto_device_info *cdevs;
 	/* Loop counter to flush crypto ops */
 	uint16_t transmit_loop_count;
+	/* Circular buffer for batching crypto ops to eventdev */
+	struct crypto_ops_circular_buffer ebuf;
 	/* Per instance stats structure */
 	struct rte_event_crypto_adapter_stats crypto_stats;
 	/* Configuration callback for rte_service configuration */
@@ -93,8 +111,8 @@ struct crypto_device_info {
 struct crypto_queue_pair_info {
 	/* Set to indicate queue pair is enabled */
 	bool qp_enabled;
-	/* Pointer to hold rte_crypto_ops for batching */
-	struct rte_crypto_op **op_buffer;
+	/* Circular buffer for batching crypto ops to cdev */
+	struct crypto_ops_circular_buffer cbuf;
 	/* No of crypto ops accumulated */
 	uint8_t len;
 } __rte_cache_aligned;
@@ -141,6 +159,77 @@ eca_init(void)
 	return 0;
 }
 
+static inline bool
+eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer *bufp)
+{
+	return bufp->count >= BATCH_SIZE;
+}
+
+static inline void
+eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp)
+{
+	rte_free(bufp->op_buffer);
+}
+
+static inline int
+eca_circular_buffer_init(const char *name,
+			 struct crypto_ops_circular_buffer *bufp,
+			 uint16_t sz)
+{
+	bufp->op_buffer = rte_zmalloc(name,
+				      sizeof(struct rte_crypto_op *) * sz,
+				      0);
+	if (bufp->op_buffer == NULL)
+		return -ENOMEM;
+
+	bufp->size = sz;
+	return 0;
+}
+
+static inline int
+eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
+			struct rte_crypto_op *op)
+{
+	uint16_t *tailp = &bufp->tail;
+
+	bufp->op_buffer[*tailp] = op;
+	*tailp = (*tailp + 1) % bufp->size;
+	bufp->count++;
+
+	return 0;
+}
+
+static inline int
+eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer *bufp,
+				  uint8_t cdev_id, uint16_t qp_id,
+				  uint16_t *nb_ops_flushed)
+{
+	uint16_t n = 0;
+	uint16_t *headp = &bufp->head;
+	uint16_t *tailp = &bufp->tail;
+	struct rte_crypto_op **ops = bufp->op_buffer;
+
+	if (*tailp > *headp)
+		n = *tailp - *headp;
+	else if (*tailp < *headp)
+		n = bufp->size - *headp;
+	else {
+		*nb_ops_flushed = 0;
+		return 0;  /* buffer empty */
+	}
+
+	*nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
+						      &ops[*headp], n);
+	bufp->count -= *nb_ops_flushed;
+	if (!bufp->count) {
+		*headp = 0;
+		*tailp = 0;
+	} else
+		*headp = (*headp + *nb_ops_flushed) % bufp->size;
+
+	return *nb_ops_flushed == n ? 0 : -1;
+}
+
 static inline struct event_crypto_adapter *
 eca_id_to_adapter(uint8_t id)
 {
@@ -237,10 +326,19 @@ rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
 		return -ENOMEM;
 	}
 
+	if (eca_circular_buffer_init("eca_edev_circular_buffer",
+				     &adapter->ebuf,
+				     CRYPTO_ADAPTER_BUFFER_SZ)) {
+		RTE_EDEV_LOG_ERR("Failed to get mem for edev buffer");
+		rte_free(adapter);
+		return -ENOMEM;
+	}
+
 	ret = rte_event_dev_info_get(dev_id, &dev_info);
 	if (ret < 0) {
 		RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d: %s!",
 				 dev_id, dev_info.driver_name);
+		eca_circular_buffer_free(&adapter->ebuf);
 		rte_free(adapter);
 		return ret;
 	}
@@ -259,6 +357,7 @@ rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
 					socket_id);
 	if (adapter->cdevs == NULL) {
 		RTE_EDEV_LOG_ERR("Failed to get mem for crypto devices\n");
+		eca_circular_buffer_free(&adapter->ebuf);
 		rte_free(adapter);
 		return -ENOMEM;
 	}
@@ -337,11 +436,10 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 	struct crypto_queue_pair_info *qp_info = NULL;
 	struct rte_crypto_op *crypto_op;
 	unsigned int i, n;
-	uint16_t qp_id, len, ret;
+	uint16_t qp_id, len, nb_enqueued = 0;
 	uint8_t cdev_id;
 
 	len = 0;
-	ret = 0;
 	n = 0;
 	stats->event_deq_count += cnt;
 
@@ -367,7 +465,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 				continue;
 			}
 			len = qp_info->len;
-			qp_info->op_buffer[len] = crypto_op;
+			eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
 			len++;
 		} else if (crypto_op->sess_type == RTE_CRYPTO_OP_SESSIONLESS &&
 				crypto_op->private_data_offset) {
@@ -383,7 +481,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 				continue;
 			}
 			len = qp_info->len;
-			qp_info->op_buffer[len] = crypto_op;
+			eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
 			len++;
 		} else {
 			rte_pktmbuf_free(crypto_op->sym->m_src);
@@ -391,18 +489,17 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 			continue;
 		}
 
-		if (len == BATCH_SIZE) {
-			struct rte_crypto_op **op_buffer = qp_info->op_buffer;
-			ret = rte_cryptodev_enqueue_burst(cdev_id,
+		if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
+			eca_circular_buffer_flush_to_cdev(&qp_info->cbuf,
+							  cdev_id,
 							  qp_id,
-							  op_buffer,
-							  BATCH_SIZE);
+							  &nb_enqueued);
+			stats->crypto_enq_count += nb_enqueued;
+			n += nb_enqueued;
 
-			stats->crypto_enq_count += ret;
-
-			while (ret < len) {
+			while (nb_enqueued < len) {
 				struct rte_crypto_op *op;
-				op = op_buffer[ret++];
+				op = qp_info->cbuf.op_buffer[nb_enqueued++];
 				stats->crypto_enq_fail++;
 				rte_pktmbuf_free(op->sym->m_src);
 				rte_crypto_op_free(op);
@@ -413,7 +510,6 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 
 		if (qp_info)
 			qp_info->len = len;
-		n += ret;
 	}
 
 	return n;
@@ -425,14 +521,12 @@ eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
 	struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
 	struct crypto_device_info *curr_dev;
 	struct crypto_queue_pair_info *curr_queue;
-	struct rte_crypto_op **op_buffer;
 	struct rte_cryptodev *dev;
 	uint8_t cdev_id;
 	uint16_t qp;
-	uint16_t ret;
+	uint16_t nb_enqueued = 0, nb = 0;
 	uint16_t num_cdev = rte_cryptodev_count();
 
-	ret = 0;
 	for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
 		curr_dev = &adapter->cdevs[cdev_id];
 		dev = curr_dev->dev;
@@ -444,16 +538,17 @@ eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
 			if (!curr_queue->qp_enabled)
 				continue;
 
-			op_buffer = curr_queue->op_buffer;
-			ret = rte_cryptodev_enqueue_burst(cdev_id,
+			eca_circular_buffer_flush_to_cdev(&curr_queue->cbuf,
+							  cdev_id,
 							  qp,
-							  op_buffer,
-							  curr_queue->len);
-			stats->crypto_enq_count += ret;
+							  &nb_enqueued);
 
-			while (ret < curr_queue->len) {
+			stats->crypto_enq_count += nb_enqueued;
+			nb += nb_enqueued;
+
+			while (nb_enqueued < curr_queue->len) {
 				struct rte_crypto_op *op;
-				op = op_buffer[ret++];
+				op = curr_queue->cbuf.op_buffer[nb_enqueued++];
 				stats->crypto_enq_fail++;
 				rte_pktmbuf_free(op->sym->m_src);
 				rte_crypto_op_free(op);
@@ -462,7 +557,7 @@ eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
 		}
 	}
 
-	return ret;
+	return nb;
 }
 
 static int
@@ -499,9 +594,9 @@ eca_crypto_adapter_enq_run(struct event_crypto_adapter *adapter,
 	return nb_enqueued;
 }
 
-static inline void
+static inline uint16_t
 eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
-		      struct rte_crypto_op **ops, uint16_t num)
+		  struct rte_crypto_op **ops, uint16_t num)
 {
 	struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
 	union rte_event_crypto_metadata *m_data = NULL;
@@ -518,6 +613,8 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
 	num = RTE_MIN(num, BATCH_SIZE);
 	for (i = 0; i < num; i++) {
 		struct rte_event *ev = &events[nb_ev++];
+
+		m_data = NULL;
 		if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
 			m_data = rte_cryptodev_sym_session_get_user_data(
 					ops[i]->sym->session);
@@ -548,21 +645,58 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
 						  event_port_id,
 						  &events[nb_enqueued],
 						  nb_ev - nb_enqueued);
+
 	} while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
 		 nb_enqueued < nb_ev);
 
-	/* Free mbufs and rte_crypto_ops for failed events */
-	for (i = nb_enqueued; i < nb_ev; i++) {
-		struct rte_crypto_op *op = events[i].event_ptr;
-		rte_pktmbuf_free(op->sym->m_src);
-		rte_crypto_op_free(op);
-	}
-
 	stats->event_enq_fail_count += nb_ev - nb_enqueued;
 	stats->event_enq_count += nb_enqueued;
 	stats->event_enq_retry_count += retry - 1;
+
+	return nb_enqueued;
 }
 
+static int
+eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter *adapter,
+				   struct crypto_ops_circular_buffer *bufp)
+{
+	uint16_t n = 0, nb_ops_flushed;
+	uint16_t *headp = &bufp->head;
+	uint16_t *tailp = &bufp->tail;
+	struct rte_crypto_op **ops = bufp->op_buffer;
+
+	if (*tailp > *headp)
+		n = *tailp - *headp;
+	else if (*tailp < *headp)
+		n = bufp->size - *headp;
+	else
+		return 0;  /* buffer empty */
+
+	nb_ops_flushed =  eca_ops_enqueue_burst(adapter, ops, n);
+	bufp->count -= nb_ops_flushed;
+	if (!bufp->count) {
+		*headp = 0;
+		*tailp = 0;
+
+		return 0;  /* buffer empty */
+	}
+
+	*headp = (*headp + nb_ops_flushed) % bufp->size;
+
+	return 1;
+}
+
+
+static void
+eca_ops_buffer_flush(struct event_crypto_adapter *adapter)
+{
+	if (adapter->ebuf.count == 0)
+		return;
+
+	while (eca_circular_buffer_flush_to_evdev(adapter,
+						  &adapter->ebuf))
+		;
+}
 static inline unsigned int
 eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 			   unsigned int max_deq)
@@ -571,7 +705,7 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 	struct crypto_device_info *curr_dev;
 	struct crypto_queue_pair_info *curr_queue;
 	struct rte_crypto_op *ops[BATCH_SIZE];
-	uint16_t n, nb_deq;
+	uint16_t n, nb_deq, nb_enqueued, i;
 	struct rte_cryptodev *dev;
 	uint8_t cdev_id;
 	uint16_t qp, dev_qps;
@@ -579,16 +713,20 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 	uint16_t num_cdev = rte_cryptodev_count();
 
 	nb_deq = 0;
+	eca_ops_buffer_flush(adapter);
+
 	do {
-		uint16_t queues = 0;
 		done = true;
 
 		for (cdev_id = adapter->next_cdev_id;
 			cdev_id < num_cdev; cdev_id++) {
+			uint16_t queues = 0;
+
 			curr_dev = &adapter->cdevs[cdev_id];
 			dev = curr_dev->dev;
 			if (dev == NULL)
 				continue;
+
 			dev_qps = dev->data->nb_queue_pairs;
 
 			for (qp = curr_dev->next_queue_pair_id;
@@ -596,7 +734,8 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 				queues++) {
 
 				curr_queue = &curr_dev->qpairs[qp];
-				if (!curr_queue->qp_enabled)
+				if (curr_queue == NULL ||
+				    !curr_queue->qp_enabled)
 					continue;
 
 				n = rte_cryptodev_dequeue_burst(cdev_id, qp,
@@ -605,11 +744,27 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 					continue;
 
 				done = false;
+				nb_enqueued = 0;
+
 				stats->crypto_deq_count += n;
-				eca_ops_enqueue_burst(adapter, ops, n);
+
+				if (unlikely(!adapter->ebuf.count))
+					nb_enqueued = eca_ops_enqueue_burst(
+							adapter, ops, n);
+
+				if (nb_enqueued == n)
+					goto check;
+
+				/* Failed to enqueue events case */
+				for (i = nb_enqueued; i < n; i++)
+					eca_circular_buffer_add(
+						&adapter->ebuf,
+						ops[nb_enqueued]);
+
+check:
 				nb_deq += n;
 
-				if (nb_deq > max_deq) {
+				if (nb_deq >= max_deq) {
 					if ((qp + 1) == dev_qps) {
 						adapter->next_cdev_id =
 							(cdev_id + 1)
@@ -622,6 +777,7 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 				}
 			}
 		}
+		adapter->next_cdev_id = 0;
 	} while (done == false);
 	return nb_deq;
 }
@@ -751,11 +907,10 @@ eca_add_queue_pair(struct event_crypto_adapter *adapter, uint8_t cdev_id,
 			return -ENOMEM;
 
 		qpairs = dev_info->qpairs;
-		qpairs->op_buffer = rte_zmalloc_socket(adapter->mem_name,
-					BATCH_SIZE *
-					sizeof(struct rte_crypto_op *),
-					0, adapter->socket_id);
-		if (!qpairs->op_buffer) {
+
+		if (eca_circular_buffer_init("eca_cdev_circular_buffer",
+					     &qpairs->cbuf,
+					     CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
 			rte_free(qpairs);
 			return -ENOMEM;
 		}
-- 
2.6.4


^ permalink raw reply	[flat|nested] 17+ messages in thread

* [PATCH v3 2/2] eventdev: update crypto caps get to return SW cap
  2022-01-11 10:36   ` [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
@ 2022-01-11 10:36     ` Ganapati Kundapura
  2022-01-13 11:05     ` [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Gujjar, Abhinandan S
  2022-02-07 10:50     ` [PATCH v4 " Ganapati Kundapura
  2 siblings, 0 replies; 17+ messages in thread
From: Ganapati Kundapura @ 2022-01-11 10:36 UTC (permalink / raw)
  To: jerinjacobk, jay.jayatheerthan, dev; +Cc: abhinandan.gujjar

update rte_event_crypto_adapter_caps_get() to return
SW_CAP if PMD callback is not registered.

Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>

diff --git a/lib/eventdev/rte_eventdev.c b/lib/eventdev/rte_eventdev.c
index 79b9ea3..6988bf1 100644
--- a/lib/eventdev/rte_eventdev.c
+++ b/lib/eventdev/rte_eventdev.c
@@ -176,11 +176,15 @@ rte_event_crypto_adapter_caps_get(uint8_t dev_id, uint8_t cdev_id,
 
 	if (caps == NULL)
 		return -EINVAL;
-	*caps = 0;
+
+	if (dev->dev_ops->crypto_adapter_caps_get == NULL)
+		*caps = RTE_EVENT_CRYPTO_ADAPTER_SW_CAP;
+	else
+		*caps = 0;
 
 	return dev->dev_ops->crypto_adapter_caps_get ?
 		(*dev->dev_ops->crypto_adapter_caps_get)
-		(dev, cdev, caps) : -ENOTSUP;
+		(dev, cdev, caps) : 0;
 }
 
 int
-- 
2.6.4


^ permalink raw reply	[flat|nested] 17+ messages in thread

* RE: [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer
  2022-01-11 10:36   ` [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
  2022-01-11 10:36     ` [PATCH v3 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
@ 2022-01-13 11:05     ` Gujjar, Abhinandan S
  2022-02-07 10:32       ` Kundapura, Ganapati
  2022-02-07 10:50     ` [PATCH v4 " Ganapati Kundapura
  2 siblings, 1 reply; 17+ messages in thread
From: Gujjar, Abhinandan S @ 2022-01-13 11:05 UTC (permalink / raw)
  To: Kundapura, Ganapati, jerinjacobk, Jayatheerthan, Jay, dev

Hi Ganapati,


> -----Original Message-----
> From: Kundapura, Ganapati <ganapati.kundapura@intel.com>
> Sent: Tuesday, January 11, 2022 4:07 PM
> To: jerinjacobk@gmail.com; Jayatheerthan, Jay <jay.jayatheerthan@intel.com>;
> dev@dpdk.org
> Cc: Gujjar, Abhinandan S <abhinandan.gujjar@intel.com>
> Subject: [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular
> buffer
> 
> Move crypto ops to circular buffer to retain crypto ops when
> cryptodev/eventdev are temporarily full
> 
> ---
> v3:
> * update eca_ops_buffer_flush() to flush out all the crypto
>   ops out of circular buffer.
> * remove freeing of failed crypto ops from eca_ops_enqueue_burst()
>   and add to cirular buffer for later processing.
> 
> v2:
> * reset cryptp adapter next cdev id before dequeueing from the
Cryptp -> crypto
>   next cdev
> ---
> 
> Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>
I don't see sign off after applying the patch. Could you take a look?
commit 18d9c3b1728f325dba5fe5dbb51df2366b70527a
Author: Ganapati Kundapura <ganapati.kundapura@intel.com>
Date:   Tue Jan 11 04:36:30 2022 -0600

    eventdev/crypto_adapter: move crypto ops to circular buffer
    
    Move crypto ops to circular buffer to retain crypto
    ops when cryptodev/eventdev are temporarily full


> 
> diff --git a/lib/eventdev/rte_event_crypto_adapter.c
> b/lib/eventdev/rte_event_crypto_adapter.c
> index d840803..9086368 100644
> --- a/lib/eventdev/rte_event_crypto_adapter.c
> +++ b/lib/eventdev/rte_event_crypto_adapter.c
> @@ -25,11 +25,27 @@
>  #define CRYPTO_ADAPTER_MEM_NAME_LEN 32
>  #define CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
> 
> +#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
> #define
> +CRYPTO_ADAPTER_BUFFER_SZ 1024
> +
>  /* Flush an instance's enqueue buffers every CRYPTO_ENQ_FLUSH_THRESHOLD
>   * iterations of eca_crypto_adapter_enq_run()
>   */
>  #define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
> 
> +struct crypto_ops_circular_buffer {
> +	/* index of head element in circular buffer */
> +	uint16_t head;
> +	/* index of tail element in circular buffer */
> +	uint16_t tail;
> +	/* number elements in buffer */
number elements -> number of elements 
> +	uint16_t count;
> +	/* size of circular buffer */
> +	uint16_t size;
> +	/* Pointer to hold rte_crypto_ops for batching */
> +	struct rte_crypto_op **op_buffer;
> +} __rte_cache_aligned;
> +
>  struct event_crypto_adapter {
>  	/* Event device identifier */
>  	uint8_t eventdev_id;
> @@ -47,6 +63,8 @@ struct event_crypto_adapter {
>  	struct crypto_device_info *cdevs;
>  	/* Loop counter to flush crypto ops */
>  	uint16_t transmit_loop_count;
> +	/* Circular buffer for batching crypto ops to eventdev */
> +	struct crypto_ops_circular_buffer ebuf;
>  	/* Per instance stats structure */
>  	struct rte_event_crypto_adapter_stats crypto_stats;
>  	/* Configuration callback for rte_service configuration */ @@ -93,8
> +111,8 @@ struct crypto_device_info {  struct crypto_queue_pair_info {
>  	/* Set to indicate queue pair is enabled */
>  	bool qp_enabled;
> -	/* Pointer to hold rte_crypto_ops for batching */
> -	struct rte_crypto_op **op_buffer;
> +	/* Circular buffer for batching crypto ops to cdev */
> +	struct crypto_ops_circular_buffer cbuf;
>  	/* No of crypto ops accumulated */
>  	uint8_t len;
>  } __rte_cache_aligned;
> @@ -141,6 +159,77 @@ eca_init(void)
>  	return 0;
>  }
> 
> +static inline bool
> +eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer
> +*bufp) {
> +	return bufp->count >= BATCH_SIZE;
> +}
> +
> +static inline void
> +eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp) {
> +	rte_free(bufp->op_buffer);
> +}
> +
> +static inline int
> +eca_circular_buffer_init(const char *name,
> +			 struct crypto_ops_circular_buffer *bufp,
> +			 uint16_t sz)
> +{
> +	bufp->op_buffer = rte_zmalloc(name,
> +				      sizeof(struct rte_crypto_op *) * sz,
> +				      0);
> +	if (bufp->op_buffer == NULL)
> +		return -ENOMEM;
> +
> +	bufp->size = sz;
> +	return 0;
> +}
> +
> +static inline int
> +eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
> +			struct rte_crypto_op *op)
> +{
> +	uint16_t *tailp = &bufp->tail;
> +
> +	bufp->op_buffer[*tailp] = op;
> +	*tailp = (*tailp + 1) % bufp->size;
Provide a comment saying you are taking care of buffer rollover condition
> +	bufp->count++;
> +
> +	return 0;
> +}
> +
> +static inline int
> +eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer *bufp,
> +				  uint8_t cdev_id, uint16_t qp_id,
> +				  uint16_t *nb_ops_flushed)
This function is returning a value but caller does not check!
> +{
> +	uint16_t n = 0;
> +	uint16_t *headp = &bufp->head;
> +	uint16_t *tailp = &bufp->tail;
> +	struct rte_crypto_op **ops = bufp->op_buffer;
> +
> +	if (*tailp > *headp)
> +		n = *tailp - *headp;
> +	else if (*tailp < *headp)
> +		n = bufp->size - *headp;
> +	else {
> +		*nb_ops_flushed = 0;
> +		return 0;  /* buffer empty */
> +	}
> +
> +	*nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
> +						      &ops[*headp], n);
> +	bufp->count -= *nb_ops_flushed;
> +	if (!bufp->count) {
> +		*headp = 0;
> +		*tailp = 0;
> +	} else
> +		*headp = (*headp + *nb_ops_flushed) % bufp->size;
> +
> +	return *nb_ops_flushed == n ? 0 : -1;
> +}
> +
>  static inline struct event_crypto_adapter *  eca_id_to_adapter(uint8_t id)  {
> @@ -237,10 +326,19 @@ rte_event_crypto_adapter_create_ext(uint8_t id,
> uint8_t dev_id,
>  		return -ENOMEM;
>  	}
> 
> +	if (eca_circular_buffer_init("eca_edev_circular_buffer",
> +				     &adapter->ebuf,
> +				     CRYPTO_ADAPTER_BUFFER_SZ)) {
> +		RTE_EDEV_LOG_ERR("Failed to get mem for edev buffer");
Mem -> memory, edev -> eventdev. 
> +		rte_free(adapter);
> +		return -ENOMEM;
> +	}
> +
>  	ret = rte_event_dev_info_get(dev_id, &dev_info);
>  	if (ret < 0) {
>  		RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d: %s!",
>  				 dev_id, dev_info.driver_name);
> +		eca_circular_buffer_free(&adapter->ebuf);
>  		rte_free(adapter);
>  		return ret;
>  	}
> @@ -259,6 +357,7 @@ rte_event_crypto_adapter_create_ext(uint8_t id,
> uint8_t dev_id,
>  					socket_id);
>  	if (adapter->cdevs == NULL) {
>  		RTE_EDEV_LOG_ERR("Failed to get mem for crypto devices\n");
> +		eca_circular_buffer_free(&adapter->ebuf);
>  		rte_free(adapter);
>  		return -ENOMEM;
>  	}
> @@ -337,11 +436,10 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
>  	struct crypto_queue_pair_info *qp_info = NULL;
>  	struct rte_crypto_op *crypto_op;
>  	unsigned int i, n;
> -	uint16_t qp_id, len, ret;
> +	uint16_t qp_id, len, nb_enqueued = 0;
>  	uint8_t cdev_id;
> 
>  	len = 0;
> -	ret = 0;
>  	n = 0;
>  	stats->event_deq_count += cnt;
> 
> @@ -367,7 +465,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
>  				continue;
>  			}
>  			len = qp_info->len;
> -			qp_info->op_buffer[len] = crypto_op;
> +			eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
>  			len++;
>  		} else if (crypto_op->sess_type ==
> RTE_CRYPTO_OP_SESSIONLESS &&
>  				crypto_op->private_data_offset) {
> @@ -383,7 +481,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
>  				continue;
>  			}
>  			len = qp_info->len;
> -			qp_info->op_buffer[len] = crypto_op;
> +			eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
>  			len++;
>  		} else {
>  			rte_pktmbuf_free(crypto_op->sym->m_src);
> @@ -391,18 +489,17 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
>  			continue;
>  		}
> 
> -		if (len == BATCH_SIZE) {
> -			struct rte_crypto_op **op_buffer = qp_info-
> >op_buffer;
> -			ret = rte_cryptodev_enqueue_burst(cdev_id,
> +		if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
> +			eca_circular_buffer_flush_to_cdev(&qp_info->cbuf,
> +							  cdev_id,
>  							  qp_id,
> -							  op_buffer,
> -							  BATCH_SIZE);
> +							  &nb_enqueued);
> +			stats->crypto_enq_count += nb_enqueued;
> +			n += nb_enqueued;
> 
> -			stats->crypto_enq_count += ret;
> -
> -			while (ret < len) {
> +			while (nb_enqueued < len) {
>  				struct rte_crypto_op *op;
> -				op = op_buffer[ret++];
> +				op = qp_info-
> >cbuf.op_buffer[nb_enqueued++];
>  				stats->crypto_enq_fail++;
>  				rte_pktmbuf_free(op->sym->m_src);
>  				rte_crypto_op_free(op);
Not sure, why are you free the ops which are not enqueued to cryptodev.
Isn't it the goal of the patch is to retain those non-enqueued crypto_ops temporarily and retry later?

> @@ -413,7 +510,6 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
> 
>  		if (qp_info)
>  			qp_info->len = len;
> -		n += ret;
>  	}
> 
>  	return n;
> @@ -425,14 +521,12 @@ eca_crypto_enq_flush(struct event_crypto_adapter
> *adapter)
>  	struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
>  	struct crypto_device_info *curr_dev;
>  	struct crypto_queue_pair_info *curr_queue;
> -	struct rte_crypto_op **op_buffer;
>  	struct rte_cryptodev *dev;
>  	uint8_t cdev_id;
>  	uint16_t qp;
> -	uint16_t ret;
> +	uint16_t nb_enqueued = 0, nb = 0;
>  	uint16_t num_cdev = rte_cryptodev_count();
> 
> -	ret = 0;
>  	for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
>  		curr_dev = &adapter->cdevs[cdev_id];
>  		dev = curr_dev->dev;
> @@ -444,16 +538,17 @@ eca_crypto_enq_flush(struct event_crypto_adapter
> *adapter)
>  			if (!curr_queue->qp_enabled)
>  				continue;
> 
> -			op_buffer = curr_queue->op_buffer;
> -			ret = rte_cryptodev_enqueue_burst(cdev_id,
> +			eca_circular_buffer_flush_to_cdev(&curr_queue->cbuf,
> +							  cdev_id,
>  							  qp,
> -							  op_buffer,
> -							  curr_queue->len);
> -			stats->crypto_enq_count += ret;
> +							  &nb_enqueued);
> 
> -			while (ret < curr_queue->len) {
> +			stats->crypto_enq_count += nb_enqueued;
> +			nb += nb_enqueued;
> +
> +			while (nb_enqueued < curr_queue->len) {
>  				struct rte_crypto_op *op;
> -				op = op_buffer[ret++];
> +				op = curr_queue-
> >cbuf.op_buffer[nb_enqueued++];
>  				stats->crypto_enq_fail++;
>  				rte_pktmbuf_free(op->sym->m_src);
>  				rte_crypto_op_free(op);
> @@ -462,7 +557,7 @@ eca_crypto_enq_flush(struct event_crypto_adapter
> *adapter)
>  		}
>  	}
> 
> -	return ret;
> +	return nb;
>  }
> 
>  static int
> @@ -499,9 +594,9 @@ eca_crypto_adapter_enq_run(struct
> event_crypto_adapter *adapter,
>  	return nb_enqueued;
>  }
> 
> -static inline void
> +static inline uint16_t
>  eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
> -		      struct rte_crypto_op **ops, uint16_t num)
> +		  struct rte_crypto_op **ops, uint16_t num)
>  {
>  	struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
>  	union rte_event_crypto_metadata *m_data = NULL; @@ -518,6 +613,8
> @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
>  	num = RTE_MIN(num, BATCH_SIZE);
>  	for (i = 0; i < num; i++) {
>  		struct rte_event *ev = &events[nb_ev++];
> +
> +		m_data = NULL;
>  		if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
>  			m_data = rte_cryptodev_sym_session_get_user_data(
>  					ops[i]->sym->session);
> @@ -548,21 +645,58 @@ eca_ops_enqueue_burst(struct event_crypto_adapter
> *adapter,
>  						  event_port_id,
>  						  &events[nb_enqueued],
>  						  nb_ev - nb_enqueued);
> +
>  	} while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
>  		 nb_enqueued < nb_ev);
> 
> -	/* Free mbufs and rte_crypto_ops for failed events */
> -	for (i = nb_enqueued; i < nb_ev; i++) {
> -		struct rte_crypto_op *op = events[i].event_ptr;
> -		rte_pktmbuf_free(op->sym->m_src);
> -		rte_crypto_op_free(op);
> -	}
> -
>  	stats->event_enq_fail_count += nb_ev - nb_enqueued;
>  	stats->event_enq_count += nb_enqueued;
>  	stats->event_enq_retry_count += retry - 1;
> +
> +	return nb_enqueued;
>  }
> 
> +static int
> +eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter *adapter,
> +				   struct crypto_ops_circular_buffer *bufp) {
> +	uint16_t n = 0, nb_ops_flushed;
> +	uint16_t *headp = &bufp->head;
> +	uint16_t *tailp = &bufp->tail;
> +	struct rte_crypto_op **ops = bufp->op_buffer;
> +
> +	if (*tailp > *headp)
> +		n = *tailp - *headp;
> +	else if (*tailp < *headp)
> +		n = bufp->size - *headp;
> +	else
> +		return 0;  /* buffer empty */
> +
> +	nb_ops_flushed =  eca_ops_enqueue_burst(adapter, ops, n);
> +	bufp->count -= nb_ops_flushed;
> +	if (!bufp->count) {
> +		*headp = 0;
> +		*tailp = 0;
> +
Extra line not required
> +		return 0;  /* buffer empty */
> +	}
> +
> +	*headp = (*headp + nb_ops_flushed) % bufp->size;
> +
Extra line not required
> +	return 1;
> +}
> +
> +
> +static void
> +eca_ops_buffer_flush(struct event_crypto_adapter *adapter) {
> +	if (adapter->ebuf.count == 0)
> +		return;
> +
> +	while (eca_circular_buffer_flush_to_evdev(adapter,
> +						  &adapter->ebuf))
> +		;
> +}
>  static inline unsigned int
>  eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
>  			   unsigned int max_deq)
> @@ -571,7 +705,7 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
>  	struct crypto_device_info *curr_dev;
>  	struct crypto_queue_pair_info *curr_queue;
>  	struct rte_crypto_op *ops[BATCH_SIZE];
> -	uint16_t n, nb_deq;
> +	uint16_t n, nb_deq, nb_enqueued, i;
>  	struct rte_cryptodev *dev;
>  	uint8_t cdev_id;
>  	uint16_t qp, dev_qps;
> @@ -579,16 +713,20 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
>  	uint16_t num_cdev = rte_cryptodev_count();
> 
>  	nb_deq = 0;
> +	eca_ops_buffer_flush(adapter);
> +
>  	do {
> -		uint16_t queues = 0;
>  		done = true;
> 
>  		for (cdev_id = adapter->next_cdev_id;
>  			cdev_id < num_cdev; cdev_id++) {
> +			uint16_t queues = 0;
> +
>  			curr_dev = &adapter->cdevs[cdev_id];
>  			dev = curr_dev->dev;
>  			if (dev == NULL)
>  				continue;
> +
>  			dev_qps = dev->data->nb_queue_pairs;
> 
>  			for (qp = curr_dev->next_queue_pair_id; @@ -596,7
> +734,8 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter
> *adapter,
>  				queues++) {
> 
>  				curr_queue = &curr_dev->qpairs[qp];
> -				if (!curr_queue->qp_enabled)
> +				if (curr_queue == NULL ||
> +				    !curr_queue->qp_enabled)
>  					continue;
> 
>  				n = rte_cryptodev_dequeue_burst(cdev_id, qp,
> @@ -605,11 +744,27 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
>  					continue;
> 
>  				done = false;
> +				nb_enqueued = 0;
> +
>  				stats->crypto_deq_count += n;
> -				eca_ops_enqueue_burst(adapter, ops, n);
> +
> +				if (unlikely(!adapter->ebuf.count))
> +					nb_enqueued =
> eca_ops_enqueue_burst(
> +							adapter, ops, n);
> +
> +				if (nb_enqueued == n)
> +					goto check;
> +
> +				/* Failed to enqueue events case */
> +				for (i = nb_enqueued; i < n; i++)
> +					eca_circular_buffer_add(
> +						&adapter->ebuf,
> +						ops[nb_enqueued]);
> +
> +check:
>  				nb_deq += n;
> 
> -				if (nb_deq > max_deq) {
> +				if (nb_deq >= max_deq) {
>  					if ((qp + 1) == dev_qps) {
>  						adapter->next_cdev_id =
>  							(cdev_id + 1)
> @@ -622,6 +777,7 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
>  				}
>  			}
>  		}
> +		adapter->next_cdev_id = 0;
>  	} while (done == false);
>  	return nb_deq;
>  }
> @@ -751,11 +907,10 @@ eca_add_queue_pair(struct event_crypto_adapter
> *adapter, uint8_t cdev_id,
>  			return -ENOMEM;
> 
>  		qpairs = dev_info->qpairs;
> -		qpairs->op_buffer = rte_zmalloc_socket(adapter->mem_name,
> -					BATCH_SIZE *
> -					sizeof(struct rte_crypto_op *),
> -					0, adapter->socket_id);
> -		if (!qpairs->op_buffer) {
> +
> +		if (eca_circular_buffer_init("eca_cdev_circular_buffer",
> +					     &qpairs->cbuf,
> +
> CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
>  			rte_free(qpairs);
>  			return -ENOMEM;
>  		}
> --
> 2.6.4


^ permalink raw reply	[flat|nested] 17+ messages in thread

* RE: [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer
  2022-01-13 11:05     ` [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Gujjar, Abhinandan S
@ 2022-02-07 10:32       ` Kundapura, Ganapati
  0 siblings, 0 replies; 17+ messages in thread
From: Kundapura, Ganapati @ 2022-02-07 10:32 UTC (permalink / raw)
  To: Gujjar, Abhinandan S, jerinjacobk, Jayatheerthan, Jay, dev

Hi Abhi,

> -----Original Message-----
> From: Gujjar, Abhinandan S <abhinandan.gujjar@intel.com>
> Sent: 13 January 2022 16:36
> To: Kundapura, Ganapati <ganapati.kundapura@intel.com>;
> jerinjacobk@gmail.com; Jayatheerthan, Jay <jay.jayatheerthan@intel.com>;
> dev@dpdk.org
> Subject: RE: [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to
> circular buffer
> 
> Hi Ganapati,
> 
> 
> > -----Original Message-----
> > From: Kundapura, Ganapati <ganapati.kundapura@intel.com>
> > Sent: Tuesday, January 11, 2022 4:07 PM
> > To: jerinjacobk@gmail.com; Jayatheerthan, Jay
> > <jay.jayatheerthan@intel.com>; dev@dpdk.org
> > Cc: Gujjar, Abhinandan S <abhinandan.gujjar@intel.com>
> > Subject: [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to
> > circular buffer
> >
> > Move crypto ops to circular buffer to retain crypto ops when
> > cryptodev/eventdev are temporarily full
> >
> > ---
> > v3:
> > * update eca_ops_buffer_flush() to flush out all the crypto
> >   ops out of circular buffer.
> > * remove freeing of failed crypto ops from eca_ops_enqueue_burst()
> >   and add to cirular buffer for later processing.
> >
> > v2:
> > * reset cryptp adapter next cdev id before dequeueing from the
> Cryptp -> crypto
Updated in v4
> >   next cdev
> > ---
> >
> > Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>
> I don't see sign off after applying the patch. Could you take a look?
> commit 18d9c3b1728f325dba5fe5dbb51df2366b70527a
Moved signed-off before version history

> Author: Ganapati Kundapura <ganapati.kundapura@intel.com>
> Date:   Tue Jan 11 04:36:30 2022 -0600
> 
>     eventdev/crypto_adapter: move crypto ops to circular buffer
> 
>     Move crypto ops to circular buffer to retain crypto
>     ops when cryptodev/eventdev are temporarily full
> 
> 
> >
> > diff --git a/lib/eventdev/rte_event_crypto_adapter.c
> > b/lib/eventdev/rte_event_crypto_adapter.c
> > index d840803..9086368 100644
> > --- a/lib/eventdev/rte_event_crypto_adapter.c
> > +++ b/lib/eventdev/rte_event_crypto_adapter.c
> > @@ -25,11 +25,27 @@
> >  #define CRYPTO_ADAPTER_MEM_NAME_LEN 32  #define
> > CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
> >
> > +#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
> > #define
> > +CRYPTO_ADAPTER_BUFFER_SZ 1024
> > +
> >  /* Flush an instance's enqueue buffers every
> CRYPTO_ENQ_FLUSH_THRESHOLD
> >   * iterations of eca_crypto_adapter_enq_run()
> >   */
> >  #define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
> >
> > +struct crypto_ops_circular_buffer {
> > +	/* index of head element in circular buffer */
> > +	uint16_t head;
> > +	/* index of tail element in circular buffer */
> > +	uint16_t tail;
> > +	/* number elements in buffer */
> number elements -> number of elements
updated
> > +	uint16_t count;
> > +	/* size of circular buffer */
> > +	uint16_t size;
> > +	/* Pointer to hold rte_crypto_ops for batching */
> > +	struct rte_crypto_op **op_buffer;
> > +} __rte_cache_aligned;
> > +
> >  struct event_crypto_adapter {
> >  	/* Event device identifier */
> >  	uint8_t eventdev_id;
> > @@ -47,6 +63,8 @@ struct event_crypto_adapter {
> >  	struct crypto_device_info *cdevs;
> >  	/* Loop counter to flush crypto ops */
> >  	uint16_t transmit_loop_count;
> > +	/* Circular buffer for batching crypto ops to eventdev */
> > +	struct crypto_ops_circular_buffer ebuf;
> >  	/* Per instance stats structure */
> >  	struct rte_event_crypto_adapter_stats crypto_stats;
> >  	/* Configuration callback for rte_service configuration */ @@ -93,8
> > +111,8 @@ struct crypto_device_info {  struct crypto_queue_pair_info {
> >  	/* Set to indicate queue pair is enabled */
> >  	bool qp_enabled;
> > -	/* Pointer to hold rte_crypto_ops for batching */
> > -	struct rte_crypto_op **op_buffer;
> > +	/* Circular buffer for batching crypto ops to cdev */
> > +	struct crypto_ops_circular_buffer cbuf;
> >  	/* No of crypto ops accumulated */
> >  	uint8_t len;
> >  } __rte_cache_aligned;
> > @@ -141,6 +159,77 @@ eca_init(void)
> >  	return 0;
> >  }
> >
> > +static inline bool
> > +eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer
> > +*bufp) {
> > +	return bufp->count >= BATCH_SIZE;
> > +}
> > +
> > +static inline void
> > +eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp) {
> > +	rte_free(bufp->op_buffer);
> > +}
> > +
> > +static inline int
> > +eca_circular_buffer_init(const char *name,
> > +			 struct crypto_ops_circular_buffer *bufp,
> > +			 uint16_t sz)
> > +{
> > +	bufp->op_buffer = rte_zmalloc(name,
> > +				      sizeof(struct rte_crypto_op *) * sz,
> > +				      0);
> > +	if (bufp->op_buffer == NULL)
> > +		return -ENOMEM;
> > +
> > +	bufp->size = sz;
> > +	return 0;
> > +}
> > +
> > +static inline int
> > +eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
> > +			struct rte_crypto_op *op)
> > +{
> > +	uint16_t *tailp = &bufp->tail;
> > +
> > +	bufp->op_buffer[*tailp] = op;
> > +	*tailp = (*tailp + 1) % bufp->size;
> Provide a comment saying you are taking care of buffer rollover condition
> > +	bufp->count++;
> > +
> > +	return 0;
> > +}
> > +
> > +static inline int
> > +eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer
> *bufp,
> > +				  uint8_t cdev_id, uint16_t qp_id,
> > +				  uint16_t *nb_ops_flushed)
> This function is returning a value but caller does not check!
> > +{
> > +	uint16_t n = 0;
> > +	uint16_t *headp = &bufp->head;
> > +	uint16_t *tailp = &bufp->tail;
> > +	struct rte_crypto_op **ops = bufp->op_buffer;
> > +
> > +	if (*tailp > *headp)
> > +		n = *tailp - *headp;
> > +	else if (*tailp < *headp)
> > +		n = bufp->size - *headp;
> > +	else {
> > +		*nb_ops_flushed = 0;
> > +		return 0;  /* buffer empty */
> > +	}
> > +
> > +	*nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
> > +						      &ops[*headp], n);
> > +	bufp->count -= *nb_ops_flushed;
> > +	if (!bufp->count) {
> > +		*headp = 0;
> > +		*tailp = 0;
> > +	} else
> > +		*headp = (*headp + *nb_ops_flushed) % bufp->size;
> > +
> > +	return *nb_ops_flushed == n ? 0 : -1; }
> > +
> >  static inline struct event_crypto_adapter *
> > eca_id_to_adapter(uint8_t id)  { @@ -237,10 +326,19 @@
> > rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
> >  		return -ENOMEM;
> >  	}
> >
> > +	if (eca_circular_buffer_init("eca_edev_circular_buffer",
> > +				     &adapter->ebuf,
> > +				     CRYPTO_ADAPTER_BUFFER_SZ)) {
> > +		RTE_EDEV_LOG_ERR("Failed to get mem for edev buffer");
> Mem -> memory, edev -> eventdev.
Updated in v4

> > +		rte_free(adapter);
> > +		return -ENOMEM;
> > +	}
> > +
> >  	ret = rte_event_dev_info_get(dev_id, &dev_info);
> >  	if (ret < 0) {
> >  		RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d:
> %s!",
> >  				 dev_id, dev_info.driver_name);
> > +		eca_circular_buffer_free(&adapter->ebuf);
> >  		rte_free(adapter);
> >  		return ret;
> >  	}
> > @@ -259,6 +357,7 @@ rte_event_crypto_adapter_create_ext(uint8_t id,
> > uint8_t dev_id,
> >  					socket_id);
> >  	if (adapter->cdevs == NULL) {
> >  		RTE_EDEV_LOG_ERR("Failed to get mem for crypto
> devices\n");
> > +		eca_circular_buffer_free(&adapter->ebuf);
> >  		rte_free(adapter);
> >  		return -ENOMEM;
> >  	}
> > @@ -337,11 +436,10 @@ eca_enq_to_cryptodev(struct
> event_crypto_adapter
> > *adapter, struct rte_event *ev,
> >  	struct crypto_queue_pair_info *qp_info = NULL;
> >  	struct rte_crypto_op *crypto_op;
> >  	unsigned int i, n;
> > -	uint16_t qp_id, len, ret;
> > +	uint16_t qp_id, len, nb_enqueued = 0;
> >  	uint8_t cdev_id;
> >
> >  	len = 0;
> > -	ret = 0;
> >  	n = 0;
> >  	stats->event_deq_count += cnt;
> >
> > @@ -367,7 +465,7 @@ eca_enq_to_cryptodev(struct
> event_crypto_adapter
> > *adapter, struct rte_event *ev,
> >  				continue;
> >  			}
> >  			len = qp_info->len;
> > -			qp_info->op_buffer[len] = crypto_op;
> > +			eca_circular_buffer_add(&qp_info->cbuf,
> crypto_op);
> >  			len++;
> >  		} else if (crypto_op->sess_type ==
> > RTE_CRYPTO_OP_SESSIONLESS &&
> >  				crypto_op->private_data_offset) { @@ -
> 383,7 +481,7 @@
> > eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct
> > rte_event *ev,
> >  				continue;
> >  			}
> >  			len = qp_info->len;
> > -			qp_info->op_buffer[len] = crypto_op;
> > +			eca_circular_buffer_add(&qp_info->cbuf,
> crypto_op);
> >  			len++;
> >  		} else {
> >  			rte_pktmbuf_free(crypto_op->sym->m_src);
> > @@ -391,18 +489,17 @@ eca_enq_to_cryptodev(struct
> event_crypto_adapter
> > *adapter, struct rte_event *ev,
> >  			continue;
> >  		}
> >
> > -		if (len == BATCH_SIZE) {
> > -			struct rte_crypto_op **op_buffer = qp_info-
> > >op_buffer;
> > -			ret = rte_cryptodev_enqueue_burst(cdev_id,
> > +		if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
> > +			eca_circular_buffer_flush_to_cdev(&qp_info->cbuf,
> > +							  cdev_id,
> >  							  qp_id,
> > -							  op_buffer,
> > -							  BATCH_SIZE);
> > +							  &nb_enqueued);
> > +			stats->crypto_enq_count += nb_enqueued;
> > +			n += nb_enqueued;
> >
> > -			stats->crypto_enq_count += ret;
> > -
> > -			while (ret < len) {
> > +			while (nb_enqueued < len) {
> >  				struct rte_crypto_op *op;
> > -				op = op_buffer[ret++];
> > +				op = qp_info-
> > >cbuf.op_buffer[nb_enqueued++];
> >  				stats->crypto_enq_fail++;
> >  				rte_pktmbuf_free(op->sym->m_src);
> >  				rte_crypto_op_free(op);
> Not sure, why are you free the ops which are not enqueued to cryptodev.
> Isn't it the goal of the patch is to retain those non-enqueued crypto_ops
> temporarily and retry later?
Retained failed ops
> 
> > @@ -413,7 +510,6 @@ eca_enq_to_cryptodev(struct
> event_crypto_adapter
> > *adapter, struct rte_event *ev,
> >
> >  		if (qp_info)
> >  			qp_info->len = len;
> > -		n += ret;
> >  	}
> >
> >  	return n;
> > @@ -425,14 +521,12 @@ eca_crypto_enq_flush(struct
> event_crypto_adapter
> > *adapter)
> >  	struct rte_event_crypto_adapter_stats *stats = &adapter-
> >crypto_stats;
> >  	struct crypto_device_info *curr_dev;
> >  	struct crypto_queue_pair_info *curr_queue;
> > -	struct rte_crypto_op **op_buffer;
> >  	struct rte_cryptodev *dev;
> >  	uint8_t cdev_id;
> >  	uint16_t qp;
> > -	uint16_t ret;
> > +	uint16_t nb_enqueued = 0, nb = 0;
> >  	uint16_t num_cdev = rte_cryptodev_count();
> >
> > -	ret = 0;
> >  	for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
> >  		curr_dev = &adapter->cdevs[cdev_id];
> >  		dev = curr_dev->dev;
> > @@ -444,16 +538,17 @@ eca_crypto_enq_flush(struct
> event_crypto_adapter
> > *adapter)
> >  			if (!curr_queue->qp_enabled)
> >  				continue;
> >
> > -			op_buffer = curr_queue->op_buffer;
> > -			ret = rte_cryptodev_enqueue_burst(cdev_id,
> > +			eca_circular_buffer_flush_to_cdev(&curr_queue-
> >cbuf,
> > +							  cdev_id,
> >  							  qp,
> > -							  op_buffer,
> > -							  curr_queue->len);
> > -			stats->crypto_enq_count += ret;
> > +							  &nb_enqueued);
> >
> > -			while (ret < curr_queue->len) {
> > +			stats->crypto_enq_count += nb_enqueued;
> > +			nb += nb_enqueued;
> > +
> > +			while (nb_enqueued < curr_queue->len) {
> >  				struct rte_crypto_op *op;
> > -				op = op_buffer[ret++];
> > +				op = curr_queue-
> > >cbuf.op_buffer[nb_enqueued++];
> >  				stats->crypto_enq_fail++;
> >  				rte_pktmbuf_free(op->sym->m_src);
> >  				rte_crypto_op_free(op);
> > @@ -462,7 +557,7 @@ eca_crypto_enq_flush(struct
> event_crypto_adapter
> > *adapter)
> >  		}
> >  	}
> >
> > -	return ret;
> > +	return nb;
> >  }
> >
> >  static int
> > @@ -499,9 +594,9 @@ eca_crypto_adapter_enq_run(struct
> > event_crypto_adapter *adapter,
> >  	return nb_enqueued;
> >  }
> >
> > -static inline void
> > +static inline uint16_t
> >  eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
> > -		      struct rte_crypto_op **ops, uint16_t num)
> > +		  struct rte_crypto_op **ops, uint16_t num)
> >  {
> >  	struct rte_event_crypto_adapter_stats *stats = &adapter-
> >crypto_stats;
> >  	union rte_event_crypto_metadata *m_data = NULL; @@ -518,6
> +613,8 @@
> > eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
> >  	num = RTE_MIN(num, BATCH_SIZE);
> >  	for (i = 0; i < num; i++) {
> >  		struct rte_event *ev = &events[nb_ev++];
> > +
> > +		m_data = NULL;
> >  		if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
> >  			m_data =
> rte_cryptodev_sym_session_get_user_data(
> >  					ops[i]->sym->session);
> > @@ -548,21 +645,58 @@ eca_ops_enqueue_burst(struct
> > event_crypto_adapter *adapter,
> >  						  event_port_id,
> >  						  &events[nb_enqueued],
> >  						  nb_ev - nb_enqueued);
> > +
> >  	} while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
> >  		 nb_enqueued < nb_ev);
> >
> > -	/* Free mbufs and rte_crypto_ops for failed events */
> > -	for (i = nb_enqueued; i < nb_ev; i++) {
> > -		struct rte_crypto_op *op = events[i].event_ptr;
> > -		rte_pktmbuf_free(op->sym->m_src);
> > -		rte_crypto_op_free(op);
> > -	}
> > -
> >  	stats->event_enq_fail_count += nb_ev - nb_enqueued;
> >  	stats->event_enq_count += nb_enqueued;
> >  	stats->event_enq_retry_count += retry - 1;
> > +
> > +	return nb_enqueued;
> >  }
> >
> > +static int
> > +eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter
> *adapter,
> > +				   struct crypto_ops_circular_buffer *bufp) {
> > +	uint16_t n = 0, nb_ops_flushed;
> > +	uint16_t *headp = &bufp->head;
> > +	uint16_t *tailp = &bufp->tail;
> > +	struct rte_crypto_op **ops = bufp->op_buffer;
> > +
> > +	if (*tailp > *headp)
> > +		n = *tailp - *headp;
> > +	else if (*tailp < *headp)
> > +		n = bufp->size - *headp;
> > +	else
> > +		return 0;  /* buffer empty */
> > +
> > +	nb_ops_flushed =  eca_ops_enqueue_burst(adapter, ops, n);
> > +	bufp->count -= nb_ops_flushed;
> > +	if (!bufp->count) {
> > +		*headp = 0;
> > +		*tailp = 0;
> > +
> Extra line not required
removed
> > +		return 0;  /* buffer empty */
> > +	}
> > +
> > +	*headp = (*headp + nb_ops_flushed) % bufp->size;
> > +
> Extra line not required
removed
> > +	return 1;
> > +}
> > +
> > +
> > +static void
> > +eca_ops_buffer_flush(struct event_crypto_adapter *adapter) {
> > +	if (adapter->ebuf.count == 0)
> > +		return;
> > +
> > +	while (eca_circular_buffer_flush_to_evdev(adapter,
> > +						  &adapter->ebuf))
> > +		;
> > +}
> >  static inline unsigned int
> >  eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
> >  			   unsigned int max_deq)
> > @@ -571,7 +705,7 @@ eca_crypto_adapter_deq_run(struct
> > event_crypto_adapter *adapter,
> >  	struct crypto_device_info *curr_dev;
> >  	struct crypto_queue_pair_info *curr_queue;
> >  	struct rte_crypto_op *ops[BATCH_SIZE];
> > -	uint16_t n, nb_deq;
> > +	uint16_t n, nb_deq, nb_enqueued, i;
> >  	struct rte_cryptodev *dev;
> >  	uint8_t cdev_id;
> >  	uint16_t qp, dev_qps;
> > @@ -579,16 +713,20 @@ eca_crypto_adapter_deq_run(struct
> > event_crypto_adapter *adapter,
> >  	uint16_t num_cdev = rte_cryptodev_count();
> >
> >  	nb_deq = 0;
> > +	eca_ops_buffer_flush(adapter);
> > +
> >  	do {
> > -		uint16_t queues = 0;
> >  		done = true;
> >
> >  		for (cdev_id = adapter->next_cdev_id;
> >  			cdev_id < num_cdev; cdev_id++) {
> > +			uint16_t queues = 0;
> > +
> >  			curr_dev = &adapter->cdevs[cdev_id];
> >  			dev = curr_dev->dev;
> >  			if (dev == NULL)
> >  				continue;
> > +
> >  			dev_qps = dev->data->nb_queue_pairs;
> >
> >  			for (qp = curr_dev->next_queue_pair_id; @@ -596,7
> > +734,8 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter
> > *adapter,
> >  				queues++) {
> >
> >  				curr_queue = &curr_dev->qpairs[qp];
> > -				if (!curr_queue->qp_enabled)
> > +				if (curr_queue == NULL ||
> > +				    !curr_queue->qp_enabled)
> >  					continue;
> >
> >  				n = rte_cryptodev_dequeue_burst(cdev_id,
> qp, @@ -605,11 +744,27
> > @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
> >  					continue;
> >
> >  				done = false;
> > +				nb_enqueued = 0;
> > +
> >  				stats->crypto_deq_count += n;
> > -				eca_ops_enqueue_burst(adapter, ops, n);
> > +
> > +				if (unlikely(!adapter->ebuf.count))
> > +					nb_enqueued =
> > eca_ops_enqueue_burst(
> > +							adapter, ops, n);
> > +
> > +				if (nb_enqueued == n)
> > +					goto check;
> > +
> > +				/* Failed to enqueue events case */
> > +				for (i = nb_enqueued; i < n; i++)
> > +					eca_circular_buffer_add(
> > +						&adapter->ebuf,
> > +						ops[nb_enqueued]);
> > +
> > +check:
> >  				nb_deq += n;
> >
> > -				if (nb_deq > max_deq) {
> > +				if (nb_deq >= max_deq) {
> >  					if ((qp + 1) == dev_qps) {
> >  						adapter->next_cdev_id =
> >  							(cdev_id + 1)
> > @@ -622,6 +777,7 @@ eca_crypto_adapter_deq_run(struct
> > event_crypto_adapter *adapter,
> >  				}
> >  			}
> >  		}
> > +		adapter->next_cdev_id = 0;
> >  	} while (done == false);
> >  	return nb_deq;
> >  }
> > @@ -751,11 +907,10 @@ eca_add_queue_pair(struct
> event_crypto_adapter
> > *adapter, uint8_t cdev_id,
> >  			return -ENOMEM;
> >
> >  		qpairs = dev_info->qpairs;
> > -		qpairs->op_buffer = rte_zmalloc_socket(adapter-
> >mem_name,
> > -					BATCH_SIZE *
> > -					sizeof(struct rte_crypto_op *),
> > -					0, adapter->socket_id);
> > -		if (!qpairs->op_buffer) {
> > +
> > +		if (eca_circular_buffer_init("eca_cdev_circular_buffer",
> > +					     &qpairs->cbuf,
> > +
> > CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
> >  			rte_free(qpairs);
> >  			return -ENOMEM;
> >  		}
> > --
> > 2.6.4


^ permalink raw reply	[flat|nested] 17+ messages in thread

* [PATCH v4 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer
  2022-01-11 10:36   ` [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
  2022-01-11 10:36     ` [PATCH v3 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
  2022-01-13 11:05     ` [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Gujjar, Abhinandan S
@ 2022-02-07 10:50     ` Ganapati Kundapura
  2022-02-07 10:50       ` [PATCH v4 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
                         ` (2 more replies)
  2 siblings, 3 replies; 17+ messages in thread
From: Ganapati Kundapura @ 2022-02-07 10:50 UTC (permalink / raw)
  To: jay.jayatheerthan, jerinjacobk, abhinandan.gujjar, dev

Move crypto ops to circular buffer to retain crypto
ops when cryptodev/eventdev are temporarily full

Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>

---
v4:
* Retain the non enqueued crypto ops in circular buffer to
  process later and stop the dequeue from eventdev till
  all the crypto ops are enqueued to cryptodev

  check space in circular buffer and stop dequeue from
  eventdev if some ops failed to flush to cdev
  and no space for another batch is available in circular buffer

  Enable dequeue from eventdev after all the ops are flushed

v3:
* update eca_ops_buffer_flush() to flush out all the crypto
  ops out of circular buffer.
* remove freeing of failed crypto ops from eca_ops_enqueue_burst()
  and add to cirular buffer for later processing.

v2:
* reset crypto adapter next cdev id before dequeueing from the
  next cdev
---

diff --git a/lib/eventdev/rte_event_crypto_adapter.c b/lib/eventdev/rte_event_crypto_adapter.c
index d840803..0faac36 100644
--- a/lib/eventdev/rte_event_crypto_adapter.c
+++ b/lib/eventdev/rte_event_crypto_adapter.c
@@ -25,11 +25,27 @@
 #define CRYPTO_ADAPTER_MEM_NAME_LEN 32
 #define CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
 
+#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
+#define CRYPTO_ADAPTER_BUFFER_SZ 1024
+
 /* Flush an instance's enqueue buffers every CRYPTO_ENQ_FLUSH_THRESHOLD
  * iterations of eca_crypto_adapter_enq_run()
  */
 #define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
 
+struct crypto_ops_circular_buffer {
+	/* index of head element in circular buffer */
+	uint16_t head;
+	/* index of tail element in circular buffer */
+	uint16_t tail;
+	/* number of elements in buffer */
+	uint16_t count;
+	/* size of circular buffer */
+	uint16_t size;
+	/* Pointer to hold rte_crypto_ops for batching */
+	struct rte_crypto_op **op_buffer;
+} __rte_cache_aligned;
+
 struct event_crypto_adapter {
 	/* Event device identifier */
 	uint8_t eventdev_id;
@@ -37,6 +53,10 @@ struct event_crypto_adapter {
 	uint8_t event_port_id;
 	/* Store event device's implicit release capability */
 	uint8_t implicit_release_disabled;
+	/* Flag to indicate backpressure at cryptodev
+	 * Stop further dequeuing events from eventdev
+	 */
+	bool stop_enq_to_cryptodev;
 	/* Max crypto ops processed in any service function invocation */
 	uint32_t max_nb;
 	/* Lock to serialize config updates with service function */
@@ -47,6 +67,8 @@ struct event_crypto_adapter {
 	struct crypto_device_info *cdevs;
 	/* Loop counter to flush crypto ops */
 	uint16_t transmit_loop_count;
+	/* Circular buffer for batching crypto ops to eventdev */
+	struct crypto_ops_circular_buffer ebuf;
 	/* Per instance stats structure */
 	struct rte_event_crypto_adapter_stats crypto_stats;
 	/* Configuration callback for rte_service configuration */
@@ -93,10 +115,8 @@ struct crypto_device_info {
 struct crypto_queue_pair_info {
 	/* Set to indicate queue pair is enabled */
 	bool qp_enabled;
-	/* Pointer to hold rte_crypto_ops for batching */
-	struct rte_crypto_op **op_buffer;
-	/* No of crypto ops accumulated */
-	uint8_t len;
+	/* Circular buffer for batching crypto ops to cdev */
+	struct crypto_ops_circular_buffer cbuf;
 } __rte_cache_aligned;
 
 static struct event_crypto_adapter **event_crypto_adapter;
@@ -141,6 +161,84 @@ eca_init(void)
 	return 0;
 }
 
+static inline bool
+eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer *bufp)
+{
+	return bufp->count >= BATCH_SIZE;
+}
+
+static inline bool
+eca_circular_buffer_space_for_batch(struct crypto_ops_circular_buffer *bufp)
+{
+	return (bufp->size - bufp->count) >= BATCH_SIZE;
+}
+
+static inline void
+eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp)
+{
+	rte_free(bufp->op_buffer);
+}
+
+static inline int
+eca_circular_buffer_init(const char *name,
+			 struct crypto_ops_circular_buffer *bufp,
+			 uint16_t sz)
+{
+	bufp->op_buffer = rte_zmalloc(name,
+				      sizeof(struct rte_crypto_op *) * sz,
+				      0);
+	if (bufp->op_buffer == NULL)
+		return -ENOMEM;
+
+	bufp->size = sz;
+	return 0;
+}
+
+static inline int
+eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
+			struct rte_crypto_op *op)
+{
+	uint16_t *tailp = &bufp->tail;
+
+	bufp->op_buffer[*tailp] = op;
+	/* circular buffer, go round */
+	*tailp = (*tailp + 1) % bufp->size;
+	bufp->count++;
+
+	return 0;
+}
+
+static inline int
+eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer *bufp,
+				  uint8_t cdev_id, uint16_t qp_id,
+				  uint16_t *nb_ops_flushed)
+{
+	uint16_t n = 0;
+	uint16_t *headp = &bufp->head;
+	uint16_t *tailp = &bufp->tail;
+	struct rte_crypto_op **ops = bufp->op_buffer;
+
+	if (*tailp > *headp)
+		n = *tailp - *headp;
+	else if (*tailp < *headp)
+		n = bufp->size - *headp;
+	else {
+		*nb_ops_flushed = 0;
+		return 0;  /* buffer empty */
+	}
+
+	*nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
+						      &ops[*headp], n);
+	bufp->count -= *nb_ops_flushed;
+	if (!bufp->count) {
+		*headp = 0;
+		*tailp = 0;
+	} else
+		*headp = (*headp + *nb_ops_flushed) % bufp->size;
+
+	return *nb_ops_flushed == n ? 0 : -1;
+}
+
 static inline struct event_crypto_adapter *
 eca_id_to_adapter(uint8_t id)
 {
@@ -237,10 +335,19 @@ rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
 		return -ENOMEM;
 	}
 
+	if (eca_circular_buffer_init("eca_edev_circular_buffer",
+				     &adapter->ebuf,
+				     CRYPTO_ADAPTER_BUFFER_SZ)) {
+		RTE_EDEV_LOG_ERR("Failed to get memory for eventdev buffer");
+		rte_free(adapter);
+		return -ENOMEM;
+	}
+
 	ret = rte_event_dev_info_get(dev_id, &dev_info);
 	if (ret < 0) {
 		RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d: %s!",
 				 dev_id, dev_info.driver_name);
+		eca_circular_buffer_free(&adapter->ebuf);
 		rte_free(adapter);
 		return ret;
 	}
@@ -259,6 +366,7 @@ rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
 					socket_id);
 	if (adapter->cdevs == NULL) {
 		RTE_EDEV_LOG_ERR("Failed to get mem for crypto devices\n");
+		eca_circular_buffer_free(&adapter->ebuf);
 		rte_free(adapter);
 		return -ENOMEM;
 	}
@@ -337,10 +445,10 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 	struct crypto_queue_pair_info *qp_info = NULL;
 	struct rte_crypto_op *crypto_op;
 	unsigned int i, n;
-	uint16_t qp_id, len, ret;
+	uint16_t qp_id, nb_enqueued = 0;
 	uint8_t cdev_id;
+	int ret;
 
-	len = 0;
 	ret = 0;
 	n = 0;
 	stats->event_deq_count += cnt;
@@ -366,9 +474,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 				rte_crypto_op_free(crypto_op);
 				continue;
 			}
-			len = qp_info->len;
-			qp_info->op_buffer[len] = crypto_op;
-			len++;
+			eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
 		} else if (crypto_op->sess_type == RTE_CRYPTO_OP_SESSIONLESS &&
 				crypto_op->private_data_offset) {
 			m_data = (union rte_event_crypto_metadata *)
@@ -382,87 +488,91 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 				rte_crypto_op_free(crypto_op);
 				continue;
 			}
-			len = qp_info->len;
-			qp_info->op_buffer[len] = crypto_op;
-			len++;
+			eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
 		} else {
 			rte_pktmbuf_free(crypto_op->sym->m_src);
 			rte_crypto_op_free(crypto_op);
 			continue;
 		}
 
-		if (len == BATCH_SIZE) {
-			struct rte_crypto_op **op_buffer = qp_info->op_buffer;
-			ret = rte_cryptodev_enqueue_burst(cdev_id,
-							  qp_id,
-							  op_buffer,
-							  BATCH_SIZE);
-
-			stats->crypto_enq_count += ret;
-
-			while (ret < len) {
-				struct rte_crypto_op *op;
-				op = op_buffer[ret++];
-				stats->crypto_enq_fail++;
-				rte_pktmbuf_free(op->sym->m_src);
-				rte_crypto_op_free(op);
-			}
-
-			len = 0;
+		if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
+			ret = eca_circular_buffer_flush_to_cdev(&qp_info->cbuf,
+								cdev_id,
+								qp_id,
+								&nb_enqueued);
+			/**
+			 * If some crypto ops failed to flush to cdev and
+			 * space for another batch is not available, stop
+			 * dequeue from eventdev momentarily
+			 */
+			if (unlikely(ret < 0 &&
+				!eca_circular_buffer_space_for_batch(
+							&qp_info->cbuf)))
+				adapter->stop_enq_to_cryptodev = true;
 		}
 
-		if (qp_info)
-			qp_info->len = len;
-		n += ret;
+		stats->crypto_enq_count += nb_enqueued;
+		n += nb_enqueued;
 	}
 
 	return n;
 }
 
 static unsigned int
-eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
+eca_crypto_cdev_flush(struct event_crypto_adapter *adapter,
+		      uint8_t cdev_id, uint16_t *nb_ops_flushed)
 {
-	struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
 	struct crypto_device_info *curr_dev;
 	struct crypto_queue_pair_info *curr_queue;
-	struct rte_crypto_op **op_buffer;
 	struct rte_cryptodev *dev;
-	uint8_t cdev_id;
+	uint16_t nb = 0, nb_enqueued = 0;
 	uint16_t qp;
-	uint16_t ret;
-	uint16_t num_cdev = rte_cryptodev_count();
 
-	ret = 0;
-	for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
-		curr_dev = &adapter->cdevs[cdev_id];
-		dev = curr_dev->dev;
-		if (dev == NULL)
-			continue;
-		for (qp = 0; qp < dev->data->nb_queue_pairs; qp++) {
+	curr_dev = &adapter->cdevs[cdev_id];
+	if (unlikely(curr_dev == NULL))
+		return 0;
 
-			curr_queue = &curr_dev->qpairs[qp];
-			if (!curr_queue->qp_enabled)
-				continue;
+	dev = rte_cryptodev_pmd_get_dev(cdev_id);
+	for (qp = 0; qp < dev->data->nb_queue_pairs; qp++) {
 
-			op_buffer = curr_queue->op_buffer;
-			ret = rte_cryptodev_enqueue_burst(cdev_id,
-							  qp,
-							  op_buffer,
-							  curr_queue->len);
-			stats->crypto_enq_count += ret;
-
-			while (ret < curr_queue->len) {
-				struct rte_crypto_op *op;
-				op = op_buffer[ret++];
-				stats->crypto_enq_fail++;
-				rte_pktmbuf_free(op->sym->m_src);
-				rte_crypto_op_free(op);
-			}
-			curr_queue->len = 0;
-		}
+		curr_queue = &curr_dev->qpairs[qp];
+		if (unlikely(curr_queue == NULL || !curr_queue->qp_enabled))
+			continue;
+
+		eca_circular_buffer_flush_to_cdev(&curr_queue->cbuf,
+						  cdev_id,
+						  qp,
+						  &nb_enqueued);
+		*nb_ops_flushed += curr_queue->cbuf.count;
+		nb += nb_enqueued;
 	}
 
-	return ret;
+	return nb;
+}
+
+static unsigned int
+eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
+{
+	struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
+	uint8_t cdev_id;
+	uint16_t nb_enqueued = 0;
+	uint16_t nb_ops_flushed = 0;
+	uint16_t num_cdev = rte_cryptodev_count();
+
+	for (cdev_id = 0; cdev_id < num_cdev; cdev_id++)
+		nb_enqueued += eca_crypto_cdev_flush(adapter,
+						    cdev_id,
+						    &nb_ops_flushed);
+	/**
+	 * Enable dequeue from eventdev if all ops from circular
+	 * buffer flushed to cdev
+	 */
+	if (!nb_ops_flushed)
+		adapter->stop_enq_to_cryptodev = false;
+
+	stats->crypto_enq_count += nb_enqueued;
+
+	return nb_enqueued;
 }
 
 static int
@@ -480,6 +590,13 @@ eca_crypto_adapter_enq_run(struct event_crypto_adapter *adapter,
 	if (adapter->mode == RTE_EVENT_CRYPTO_ADAPTER_OP_NEW)
 		return 0;
 
+	if (adapter->stop_enq_to_cryptodev) {
+		nb_enqueued += eca_crypto_enq_flush(adapter);
+
+		if (adapter->stop_enq_to_cryptodev)
+			goto skip_event_dequeue_burst;
+	}
+
 	for (nb_enq = 0; nb_enq < max_enq; nb_enq += n) {
 		stats->event_poll_count++;
 		n = rte_event_dequeue_burst(event_dev_id,
@@ -491,6 +608,8 @@ eca_crypto_adapter_enq_run(struct event_crypto_adapter *adapter,
 		nb_enqueued += eca_enq_to_cryptodev(adapter, ev, n);
 	}
 
+skip_event_dequeue_burst:
+
 	if ((++adapter->transmit_loop_count &
 		(CRYPTO_ENQ_FLUSH_THRESHOLD - 1)) == 0) {
 		nb_enqueued += eca_crypto_enq_flush(adapter);
@@ -499,9 +618,9 @@ eca_crypto_adapter_enq_run(struct event_crypto_adapter *adapter,
 	return nb_enqueued;
 }
 
-static inline void
+static inline uint16_t
 eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
-		      struct rte_crypto_op **ops, uint16_t num)
+		  struct rte_crypto_op **ops, uint16_t num)
 {
 	struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
 	union rte_event_crypto_metadata *m_data = NULL;
@@ -518,6 +637,8 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
 	num = RTE_MIN(num, BATCH_SIZE);
 	for (i = 0; i < num; i++) {
 		struct rte_event *ev = &events[nb_ev++];
+
+		m_data = NULL;
 		if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
 			m_data = rte_cryptodev_sym_session_get_user_data(
 					ops[i]->sym->session);
@@ -548,21 +669,56 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
 						  event_port_id,
 						  &events[nb_enqueued],
 						  nb_ev - nb_enqueued);
+
 	} while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
 		 nb_enqueued < nb_ev);
 
-	/* Free mbufs and rte_crypto_ops for failed events */
-	for (i = nb_enqueued; i < nb_ev; i++) {
-		struct rte_crypto_op *op = events[i].event_ptr;
-		rte_pktmbuf_free(op->sym->m_src);
-		rte_crypto_op_free(op);
-	}
-
 	stats->event_enq_fail_count += nb_ev - nb_enqueued;
 	stats->event_enq_count += nb_enqueued;
 	stats->event_enq_retry_count += retry - 1;
+
+	return nb_enqueued;
+}
+
+static int
+eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter *adapter,
+				   struct crypto_ops_circular_buffer *bufp)
+{
+	uint16_t n = 0, nb_ops_flushed;
+	uint16_t *headp = &bufp->head;
+	uint16_t *tailp = &bufp->tail;
+	struct rte_crypto_op **ops = bufp->op_buffer;
+
+	if (*tailp > *headp)
+		n = *tailp - *headp;
+	else if (*tailp < *headp)
+		n = bufp->size - *headp;
+	else
+		return 0;  /* buffer empty */
+
+	nb_ops_flushed =  eca_ops_enqueue_burst(adapter, ops, n);
+	bufp->count -= nb_ops_flushed;
+	if (!bufp->count) {
+		*headp = 0;
+		*tailp = 0;
+		return 0;  /* buffer empty */
+	}
+
+	*headp = (*headp + nb_ops_flushed) % bufp->size;
+	return 1;
 }
 
+
+static void
+eca_ops_buffer_flush(struct event_crypto_adapter *adapter)
+{
+	if (adapter->ebuf.count == 0)
+		return;
+
+	while (eca_circular_buffer_flush_to_evdev(adapter,
+						  &adapter->ebuf))
+		;
+}
 static inline unsigned int
 eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 			   unsigned int max_deq)
@@ -571,7 +727,7 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 	struct crypto_device_info *curr_dev;
 	struct crypto_queue_pair_info *curr_queue;
 	struct rte_crypto_op *ops[BATCH_SIZE];
-	uint16_t n, nb_deq;
+	uint16_t n, nb_deq, nb_enqueued, i;
 	struct rte_cryptodev *dev;
 	uint8_t cdev_id;
 	uint16_t qp, dev_qps;
@@ -579,16 +735,20 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 	uint16_t num_cdev = rte_cryptodev_count();
 
 	nb_deq = 0;
+	eca_ops_buffer_flush(adapter);
+
 	do {
-		uint16_t queues = 0;
 		done = true;
 
 		for (cdev_id = adapter->next_cdev_id;
 			cdev_id < num_cdev; cdev_id++) {
+			uint16_t queues = 0;
+
 			curr_dev = &adapter->cdevs[cdev_id];
 			dev = curr_dev->dev;
 			if (dev == NULL)
 				continue;
+
 			dev_qps = dev->data->nb_queue_pairs;
 
 			for (qp = curr_dev->next_queue_pair_id;
@@ -596,7 +756,8 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 				queues++) {
 
 				curr_queue = &curr_dev->qpairs[qp];
-				if (!curr_queue->qp_enabled)
+				if (curr_queue == NULL ||
+				    !curr_queue->qp_enabled)
 					continue;
 
 				n = rte_cryptodev_dequeue_burst(cdev_id, qp,
@@ -605,11 +766,27 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 					continue;
 
 				done = false;
+				nb_enqueued = 0;
+
 				stats->crypto_deq_count += n;
-				eca_ops_enqueue_burst(adapter, ops, n);
+
+				if (unlikely(!adapter->ebuf.count))
+					nb_enqueued = eca_ops_enqueue_burst(
+							adapter, ops, n);
+
+				if (nb_enqueued == n)
+					goto check;
+
+				/* Failed to enqueue events case */
+				for (i = nb_enqueued; i < n; i++)
+					eca_circular_buffer_add(
+						&adapter->ebuf,
+						ops[nb_enqueued]);
+
+check:
 				nb_deq += n;
 
-				if (nb_deq > max_deq) {
+				if (nb_deq >= max_deq) {
 					if ((qp + 1) == dev_qps) {
 						adapter->next_cdev_id =
 							(cdev_id + 1)
@@ -622,6 +799,7 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 				}
 			}
 		}
+		adapter->next_cdev_id = 0;
 	} while (done == false);
 	return nb_deq;
 }
@@ -751,11 +929,12 @@ eca_add_queue_pair(struct event_crypto_adapter *adapter, uint8_t cdev_id,
 			return -ENOMEM;
 
 		qpairs = dev_info->qpairs;
-		qpairs->op_buffer = rte_zmalloc_socket(adapter->mem_name,
-					BATCH_SIZE *
-					sizeof(struct rte_crypto_op *),
-					0, adapter->socket_id);
-		if (!qpairs->op_buffer) {
+
+		if (eca_circular_buffer_init("eca_cdev_circular_buffer",
+					     &qpairs->cbuf,
+					     CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
+			RTE_EDEV_LOG_ERR("Failed to get memory for cryptodev "
+					 "buffer");
 			rte_free(qpairs);
 			return -ENOMEM;
 		}
-- 
2.6.4


^ permalink raw reply	[flat|nested] 17+ messages in thread

* [PATCH v4 2/2] eventdev: update crypto caps get to return SW cap
  2022-02-07 10:50     ` [PATCH v4 " Ganapati Kundapura
@ 2022-02-07 10:50       ` Ganapati Kundapura
  2022-02-10 15:07       ` [PATCH v4 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Gujjar, Abhinandan S
  2022-02-10 17:41       ` [PATCH v5 " Ganapati Kundapura
  2 siblings, 0 replies; 17+ messages in thread
From: Ganapati Kundapura @ 2022-02-07 10:50 UTC (permalink / raw)
  To: jay.jayatheerthan, jerinjacobk, abhinandan.gujjar, dev

update rte_event_crypto_adapter_caps_get() to return
SW_CAP if PMD callback is not registered.

Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>

diff --git a/lib/eventdev/rte_eventdev.c b/lib/eventdev/rte_eventdev.c
index 79b9ea3..6988bf1 100644
--- a/lib/eventdev/rte_eventdev.c
+++ b/lib/eventdev/rte_eventdev.c
@@ -176,11 +176,15 @@ rte_event_crypto_adapter_caps_get(uint8_t dev_id, uint8_t cdev_id,
 
 	if (caps == NULL)
 		return -EINVAL;
-	*caps = 0;
+
+	if (dev->dev_ops->crypto_adapter_caps_get == NULL)
+		*caps = RTE_EVENT_CRYPTO_ADAPTER_SW_CAP;
+	else
+		*caps = 0;
 
 	return dev->dev_ops->crypto_adapter_caps_get ?
 		(*dev->dev_ops->crypto_adapter_caps_get)
-		(dev, cdev, caps) : -ENOTSUP;
+		(dev, cdev, caps) : 0;
 }
 
 int
-- 
2.6.4


^ permalink raw reply	[flat|nested] 17+ messages in thread

* RE: [PATCH v4 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer
  2022-02-07 10:50     ` [PATCH v4 " Ganapati Kundapura
  2022-02-07 10:50       ` [PATCH v4 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
@ 2022-02-10 15:07       ` Gujjar, Abhinandan S
  2022-02-10 17:46         ` Kundapura, Ganapati
  2022-02-10 17:41       ` [PATCH v5 " Ganapati Kundapura
  2 siblings, 1 reply; 17+ messages in thread
From: Gujjar, Abhinandan S @ 2022-02-10 15:07 UTC (permalink / raw)
  To: Kundapura, Ganapati, Jayatheerthan, Jay, jerinjacobk, dev

Hi Ganapati,

It looks good to me. Some minor comments inline.

> -----Original Message-----
> From: Kundapura, Ganapati <ganapati.kundapura@intel.com>
> Sent: Monday, February 7, 2022 4:21 PM
> To: Jayatheerthan, Jay <jay.jayatheerthan@intel.com>;
> jerinjacobk@gmail.com; Gujjar, Abhinandan S
> <abhinandan.gujjar@intel.com>; dev@dpdk.org
> Subject: [PATCH v4 1/2] eventdev/crypto_adapter: move crypto ops to
> circular buffer
> 
> Move crypto ops to circular buffer to retain crypto ops when
> cryptodev/eventdev are temporarily full
> 
> Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>
> 
> ---
> v4:
> * Retain the non enqueued crypto ops in circular buffer to
>   process later and stop the dequeue from eventdev till
>   all the crypto ops are enqueued to cryptodev
> 
>   check space in circular buffer and stop dequeue from
>   eventdev if some ops failed to flush to cdev
>   and no space for another batch is available in circular buffer
> 
>   Enable dequeue from eventdev after all the ops are flushed
> 
> v3:
> * update eca_ops_buffer_flush() to flush out all the crypto
>   ops out of circular buffer.
> * remove freeing of failed crypto ops from eca_ops_enqueue_burst()
>   and add to cirular buffer for later processing.
> 
> v2:
> * reset crypto adapter next cdev id before dequeueing from the
>   next cdev
> ---
> 
> diff --git a/lib/eventdev/rte_event_crypto_adapter.c
> b/lib/eventdev/rte_event_crypto_adapter.c
> index d840803..0faac36 100644
> --- a/lib/eventdev/rte_event_crypto_adapter.c
> +++ b/lib/eventdev/rte_event_crypto_adapter.c
> @@ -25,11 +25,27 @@
>  #define CRYPTO_ADAPTER_MEM_NAME_LEN 32
>  #define CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
> 
> +#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
> #define
> +CRYPTO_ADAPTER_BUFFER_SZ 1024
> +
>  /* Flush an instance's enqueue buffers every
> CRYPTO_ENQ_FLUSH_THRESHOLD
>   * iterations of eca_crypto_adapter_enq_run()
>   */
>  #define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
> 
> +struct crypto_ops_circular_buffer {
> +	/* index of head element in circular buffer */
> +	uint16_t head;
> +	/* index of tail element in circular buffer */
> +	uint16_t tail;
> +	/* number of elements in buffer */
> +	uint16_t count;
> +	/* size of circular buffer */
> +	uint16_t size;
> +	/* Pointer to hold rte_crypto_ops for batching */
> +	struct rte_crypto_op **op_buffer;
> +} __rte_cache_aligned;
> +
>  struct event_crypto_adapter {
>  	/* Event device identifier */
>  	uint8_t eventdev_id;
> @@ -37,6 +53,10 @@ struct event_crypto_adapter {
>  	uint8_t event_port_id;
>  	/* Store event device's implicit release capability */
>  	uint8_t implicit_release_disabled;
> +	/* Flag to indicate backpressure at cryptodev
> +	 * Stop further dequeuing events from eventdev
> +	 */
> +	bool stop_enq_to_cryptodev;
>  	/* Max crypto ops processed in any service function invocation */
>  	uint32_t max_nb;
>  	/* Lock to serialize config updates with service function */ @@ -47,6
> +67,8 @@ struct event_crypto_adapter {
>  	struct crypto_device_info *cdevs;
>  	/* Loop counter to flush crypto ops */
>  	uint16_t transmit_loop_count;
> +	/* Circular buffer for batching crypto ops to eventdev */
> +	struct crypto_ops_circular_buffer ebuf;
>  	/* Per instance stats structure */
>  	struct rte_event_crypto_adapter_stats crypto_stats;
>  	/* Configuration callback for rte_service configuration */ @@ -93,10
> +115,8 @@ struct crypto_device_info {  struct crypto_queue_pair_info {
>  	/* Set to indicate queue pair is enabled */
>  	bool qp_enabled;
> -	/* Pointer to hold rte_crypto_ops for batching */
> -	struct rte_crypto_op **op_buffer;
> -	/* No of crypto ops accumulated */
> -	uint8_t len;
> +	/* Circular buffer for batching crypto ops to cdev */
> +	struct crypto_ops_circular_buffer cbuf;
>  } __rte_cache_aligned;
> 
>  static struct event_crypto_adapter **event_crypto_adapter; @@ -141,6
> +161,84 @@ eca_init(void)
>  	return 0;
>  }
> 
> +static inline bool
> +eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer
> +*bufp) {
> +	return bufp->count >= BATCH_SIZE;
> +}
> +
> +static inline bool
> +eca_circular_buffer_space_for_batch(struct crypto_ops_circular_buffer
> +*bufp) {
> +	return (bufp->size - bufp->count) >= BATCH_SIZE; }
> +
> +static inline void
> +eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp) {
> +	rte_free(bufp->op_buffer);
> +}
> +
> +static inline int
> +eca_circular_buffer_init(const char *name,
> +			 struct crypto_ops_circular_buffer *bufp,
> +			 uint16_t sz)
> +{
> +	bufp->op_buffer = rte_zmalloc(name,
> +				      sizeof(struct rte_crypto_op *) * sz,
> +				      0);
> +	if (bufp->op_buffer == NULL)
> +		return -ENOMEM;
> +
> +	bufp->size = sz;
> +	return 0;
> +}
> +
> +static inline int
> +eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
> +			struct rte_crypto_op *op)
> +{
> +	uint16_t *tailp = &bufp->tail;
> +
> +	bufp->op_buffer[*tailp] = op;
> +	/* circular buffer, go round */
> +	*tailp = (*tailp + 1) % bufp->size;
> +	bufp->count++;
> +
> +	return 0;
> +}
> +
> +static inline int
> +eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer
> *bufp,
> +				  uint8_t cdev_id, uint16_t qp_id,
> +				  uint16_t *nb_ops_flushed)
> +{
> +	uint16_t n = 0;
> +	uint16_t *headp = &bufp->head;
> +	uint16_t *tailp = &bufp->tail;
> +	struct rte_crypto_op **ops = bufp->op_buffer;
> +
> +	if (*tailp > *headp)
> +		n = *tailp - *headp;
> +	else if (*tailp < *headp)
> +		n = bufp->size - *headp;
> +	else {
> +		*nb_ops_flushed = 0;
> +		return 0;  /* buffer empty */
> +	}
> +
> +	*nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
> +						      &ops[*headp], n);
> +	bufp->count -= *nb_ops_flushed;
> +	if (!bufp->count) {
> +		*headp = 0;
> +		*tailp = 0;
> +	} else
> +		*headp = (*headp + *nb_ops_flushed) % bufp->size;
> +
> +	return *nb_ops_flushed == n ? 0 : -1;
> +}
> +
>  static inline struct event_crypto_adapter *  eca_id_to_adapter(uint8_t id)  {
> @@ -237,10 +335,19 @@ rte_event_crypto_adapter_create_ext(uint8_t id,
> uint8_t dev_id,
>  		return -ENOMEM;
>  	}
> 
> +	if (eca_circular_buffer_init("eca_edev_circular_buffer",
> +				     &adapter->ebuf,
> +				     CRYPTO_ADAPTER_BUFFER_SZ)) {
> +		RTE_EDEV_LOG_ERR("Failed to get memory for eventdev
> buffer");
> +		rte_free(adapter);
> +		return -ENOMEM;
> +	}
> +
>  	ret = rte_event_dev_info_get(dev_id, &dev_info);
>  	if (ret < 0) {
>  		RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d:
> %s!",
>  				 dev_id, dev_info.driver_name);
> +		eca_circular_buffer_free(&adapter->ebuf);
>  		rte_free(adapter);
>  		return ret;
>  	}
> @@ -259,6 +366,7 @@ rte_event_crypto_adapter_create_ext(uint8_t id,
> uint8_t dev_id,
>  					socket_id);
>  	if (adapter->cdevs == NULL) {
>  		RTE_EDEV_LOG_ERR("Failed to get mem for crypto
> devices\n");
> +		eca_circular_buffer_free(&adapter->ebuf);
>  		rte_free(adapter);
>  		return -ENOMEM;
>  	}
> @@ -337,10 +445,10 @@ eca_enq_to_cryptodev(struct
> event_crypto_adapter *adapter, struct rte_event *ev,
>  	struct crypto_queue_pair_info *qp_info = NULL;
>  	struct rte_crypto_op *crypto_op;
>  	unsigned int i, n;
> -	uint16_t qp_id, len, ret;
> +	uint16_t qp_id, nb_enqueued = 0;
>  	uint8_t cdev_id;
> +	int ret;
> 
> -	len = 0;
>  	ret = 0;
>  	n = 0;
>  	stats->event_deq_count += cnt;
> @@ -366,9 +474,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
>  				rte_crypto_op_free(crypto_op);
>  				continue;
>  			}
> -			len = qp_info->len;
> -			qp_info->op_buffer[len] = crypto_op;
> -			len++;
> +			eca_circular_buffer_add(&qp_info->cbuf,
> crypto_op);
>  		} else if (crypto_op->sess_type ==
> RTE_CRYPTO_OP_SESSIONLESS &&
>  				crypto_op->private_data_offset) {
>  			m_data = (union rte_event_crypto_metadata *) @@
> -382,87 +488,91 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
>  				rte_crypto_op_free(crypto_op);
>  				continue;
>  			}
> -			len = qp_info->len;
> -			qp_info->op_buffer[len] = crypto_op;
> -			len++;
> +			eca_circular_buffer_add(&qp_info->cbuf,
> crypto_op);
>  		} else {
>  			rte_pktmbuf_free(crypto_op->sym->m_src);
>  			rte_crypto_op_free(crypto_op);
>  			continue;
>  		}
> 
> -		if (len == BATCH_SIZE) {
> -			struct rte_crypto_op **op_buffer = qp_info-
> >op_buffer;
> -			ret = rte_cryptodev_enqueue_burst(cdev_id,
> -							  qp_id,
> -							  op_buffer,
> -							  BATCH_SIZE);
> -
> -			stats->crypto_enq_count += ret;
> -
> -			while (ret < len) {
> -				struct rte_crypto_op *op;
> -				op = op_buffer[ret++];
> -				stats->crypto_enq_fail++;
> -				rte_pktmbuf_free(op->sym->m_src);
> -				rte_crypto_op_free(op);
> -			}
> -
> -			len = 0;
> +		if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
> +			ret = eca_circular_buffer_flush_to_cdev(&qp_info-
> >cbuf,
> +								cdev_id,
> +								qp_id,
> +
> 	&nb_enqueued);
> +			/**
> +			 * If some crypto ops failed to flush to cdev and
> +			 * space for another batch is not available, stop
> +			 * dequeue from eventdev momentarily
> +			 */
> +			if (unlikely(ret < 0 &&
> +				!eca_circular_buffer_space_for_batch(
> +							&qp_info->cbuf)))
> +				adapter->stop_enq_to_cryptodev = true;
>  		}
> 
> -		if (qp_info)
> -			qp_info->len = len;
> -		n += ret;
> +		stats->crypto_enq_count += nb_enqueued;
> +		n += nb_enqueued;
>  	}
> 
>  	return n;
>  }
> 
>  static unsigned int
> -eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
> +eca_crypto_cdev_flush(struct event_crypto_adapter *adapter,
> +		      uint8_t cdev_id, uint16_t *nb_ops_flushed)
>  {
> -	struct rte_event_crypto_adapter_stats *stats = &adapter-
> >crypto_stats;
>  	struct crypto_device_info *curr_dev;
>  	struct crypto_queue_pair_info *curr_queue;
> -	struct rte_crypto_op **op_buffer;
>  	struct rte_cryptodev *dev;
> -	uint8_t cdev_id;
> +	uint16_t nb = 0, nb_enqueued = 0;
>  	uint16_t qp;
> -	uint16_t ret;
> -	uint16_t num_cdev = rte_cryptodev_count();
> 
> -	ret = 0;
> -	for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
> -		curr_dev = &adapter->cdevs[cdev_id];
> -		dev = curr_dev->dev;
> -		if (dev == NULL)
> -			continue;
> -		for (qp = 0; qp < dev->data->nb_queue_pairs; qp++) {
> +	curr_dev = &adapter->cdevs[cdev_id];
> +	if (unlikely(curr_dev == NULL))
> +		return 0;
> 
> -			curr_queue = &curr_dev->qpairs[qp];
> -			if (!curr_queue->qp_enabled)
> -				continue;
> +	dev = rte_cryptodev_pmd_get_dev(cdev_id);
> +	for (qp = 0; qp < dev->data->nb_queue_pairs; qp++) {
> 
> -			op_buffer = curr_queue->op_buffer;
> -			ret = rte_cryptodev_enqueue_burst(cdev_id,
> -							  qp,
> -							  op_buffer,
> -							  curr_queue->len);
> -			stats->crypto_enq_count += ret;
> -
> -			while (ret < curr_queue->len) {
> -				struct rte_crypto_op *op;
> -				op = op_buffer[ret++];
> -				stats->crypto_enq_fail++;
> -				rte_pktmbuf_free(op->sym->m_src);
> -				rte_crypto_op_free(op);
> -			}
> -			curr_queue->len = 0;
> -		}
> +		curr_queue = &curr_dev->qpairs[qp];
> +		if (unlikely(curr_queue == NULL || !curr_queue-
> >qp_enabled))
> +			continue;
> +
> +		eca_circular_buffer_flush_to_cdev(&curr_queue->cbuf,
> +						  cdev_id,
> +						  qp,
> +						  &nb_enqueued);
> +		*nb_ops_flushed += curr_queue->cbuf.count;
> +		nb += nb_enqueued;
>  	}
> 
> -	return ret;
> +	return nb;
> +}
> +
> +static unsigned int
> +eca_crypto_enq_flush(struct event_crypto_adapter *adapter) {
> +	struct rte_event_crypto_adapter_stats *stats = &adapter-
> >crypto_stats;
> +	uint8_t cdev_id;
> +	uint16_t nb_enqueued = 0;
> +	uint16_t nb_ops_flushed = 0;
> +	uint16_t num_cdev = rte_cryptodev_count();
> +
> +	for (cdev_id = 0; cdev_id < num_cdev; cdev_id++)
> +		nb_enqueued += eca_crypto_cdev_flush(adapter,
> +						    cdev_id,
> +						    &nb_ops_flushed);
> +	/**
> +	 * Enable dequeue from eventdev if all ops from circular
> +	 * buffer flushed to cdev
> +	 */
> +	if (!nb_ops_flushed)
> +		adapter->stop_enq_to_cryptodev = false;
> +
> +	stats->crypto_enq_count += nb_enqueued;
> +
> +	return nb_enqueued;
>  }
> 
>  static int
> @@ -480,6 +590,13 @@ eca_crypto_adapter_enq_run(struct
> event_crypto_adapter *adapter,
>  	if (adapter->mode == RTE_EVENT_CRYPTO_ADAPTER_OP_NEW)
>  		return 0;
> 
> +	if (adapter->stop_enq_to_cryptodev) {
Add unlikely here
> +		nb_enqueued += eca_crypto_enq_flush(adapter);
> +
> +		if (adapter->stop_enq_to_cryptodev)
Add unlikely here
> +			goto skip_event_dequeue_burst;
> +	}
> +
>  	for (nb_enq = 0; nb_enq < max_enq; nb_enq += n) {
>  		stats->event_poll_count++;
>  		n = rte_event_dequeue_burst(event_dev_id,
> @@ -491,6 +608,8 @@ eca_crypto_adapter_enq_run(struct
> event_crypto_adapter *adapter,
>  		nb_enqueued += eca_enq_to_cryptodev(adapter, ev, n);
>  	}
> 
> +skip_event_dequeue_burst:
> +
>  	if ((++adapter->transmit_loop_count &
>  		(CRYPTO_ENQ_FLUSH_THRESHOLD - 1)) == 0) {
>  		nb_enqueued += eca_crypto_enq_flush(adapter); @@ -
> 499,9 +618,9 @@ eca_crypto_adapter_enq_run(struct
> event_crypto_adapter *adapter,
>  	return nb_enqueued;
>  }
> 
> -static inline void
> +static inline uint16_t
>  eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
> -		      struct rte_crypto_op **ops, uint16_t num)
> +		  struct rte_crypto_op **ops, uint16_t num)
>  {
>  	struct rte_event_crypto_adapter_stats *stats = &adapter-
> >crypto_stats;
>  	union rte_event_crypto_metadata *m_data = NULL; @@ -518,6
> +637,8 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
>  	num = RTE_MIN(num, BATCH_SIZE);
>  	for (i = 0; i < num; i++) {
>  		struct rte_event *ev = &events[nb_ev++];
> +
> +		m_data = NULL;
>  		if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
>  			m_data =
> rte_cryptodev_sym_session_get_user_data(
>  					ops[i]->sym->session);
> @@ -548,21 +669,56 @@ eca_ops_enqueue_burst(struct
> event_crypto_adapter *adapter,
>  						  event_port_id,
>  						  &events[nb_enqueued],
>  						  nb_ev - nb_enqueued);
> +
>  	} while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
>  		 nb_enqueued < nb_ev);
> 
> -	/* Free mbufs and rte_crypto_ops for failed events */
> -	for (i = nb_enqueued; i < nb_ev; i++) {
> -		struct rte_crypto_op *op = events[i].event_ptr;
> -		rte_pktmbuf_free(op->sym->m_src);
> -		rte_crypto_op_free(op);
> -	}
> -
>  	stats->event_enq_fail_count += nb_ev - nb_enqueued;
>  	stats->event_enq_count += nb_enqueued;
>  	stats->event_enq_retry_count += retry - 1;
> +
> +	return nb_enqueued;
> +}
> +
> +static int
> +eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter
> *adapter,
> +				   struct crypto_ops_circular_buffer *bufp) {
> +	uint16_t n = 0, nb_ops_flushed;
> +	uint16_t *headp = &bufp->head;
> +	uint16_t *tailp = &bufp->tail;
> +	struct rte_crypto_op **ops = bufp->op_buffer;
> +
> +	if (*tailp > *headp)
> +		n = *tailp - *headp;
> +	else if (*tailp < *headp)
> +		n = bufp->size - *headp;
> +	else
> +		return 0;  /* buffer empty */
> +
> +	nb_ops_flushed =  eca_ops_enqueue_burst(adapter, ops, n);
> +	bufp->count -= nb_ops_flushed;
> +	if (!bufp->count) {
> +		*headp = 0;
> +		*tailp = 0;
> +		return 0;  /* buffer empty */
> +	}
> +
> +	*headp = (*headp + nb_ops_flushed) % bufp->size;
> +	return 1;
>  }
> 
> +
> +static void
> +eca_ops_buffer_flush(struct event_crypto_adapter *adapter) {
> +	if (adapter->ebuf.count == 0)
Add likely here
> +		return;
> +
> +	while (eca_circular_buffer_flush_to_evdev(adapter,
> +						  &adapter->ebuf))
> +		;
> +}
>  static inline unsigned int
>  eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
>  			   unsigned int max_deq)
> @@ -571,7 +727,7 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
>  	struct crypto_device_info *curr_dev;
>  	struct crypto_queue_pair_info *curr_queue;
>  	struct rte_crypto_op *ops[BATCH_SIZE];
> -	uint16_t n, nb_deq;
> +	uint16_t n, nb_deq, nb_enqueued, i;
>  	struct rte_cryptodev *dev;
>  	uint8_t cdev_id;
>  	uint16_t qp, dev_qps;
> @@ -579,16 +735,20 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
>  	uint16_t num_cdev = rte_cryptodev_count();
> 
>  	nb_deq = 0;
> +	eca_ops_buffer_flush(adapter);
> +
>  	do {
> -		uint16_t queues = 0;
>  		done = true;
> 
>  		for (cdev_id = adapter->next_cdev_id;
>  			cdev_id < num_cdev; cdev_id++) {
> +			uint16_t queues = 0;
> +
>  			curr_dev = &adapter->cdevs[cdev_id];
>  			dev = curr_dev->dev;
>  			if (dev == NULL)
Add unlikely here
>  				continue;
> +
>  			dev_qps = dev->data->nb_queue_pairs;
> 
>  			for (qp = curr_dev->next_queue_pair_id; @@ -596,7
> +756,8 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter
> *adapter,
>  				queues++) {
> 
>  				curr_queue = &curr_dev->qpairs[qp];
> -				if (!curr_queue->qp_enabled)
> +				if (curr_queue == NULL ||
> +				    !curr_queue->qp_enabled)
Add unlikely here
>  					continue;
> 
>  				n = rte_cryptodev_dequeue_burst(cdev_id,
> qp, @@ -605,11 +766,27 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
>  					continue;
> 
>  				done = false;
> +				nb_enqueued = 0;
> +
>  				stats->crypto_deq_count += n;
> -				eca_ops_enqueue_burst(adapter, ops, n);
> +
> +				if (unlikely(!adapter->ebuf.count))
> +					nb_enqueued =
> eca_ops_enqueue_burst(
> +							adapter, ops, n);
> +
> +				if (nb_enqueued == n)
Add likely here
> +					goto check;
> +
> +				/* Failed to enqueue events case */
> +				for (i = nb_enqueued; i < n; i++)
> +					eca_circular_buffer_add(
> +						&adapter->ebuf,
> +						ops[nb_enqueued]);
> +
> +check:
>  				nb_deq += n;
> 
> -				if (nb_deq > max_deq) {
> +				if (nb_deq >= max_deq) {
>  					if ((qp + 1) == dev_qps) {
>  						adapter->next_cdev_id =
>  							(cdev_id + 1)
> @@ -622,6 +799,7 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
>  				}
>  			}
>  		}
> +		adapter->next_cdev_id = 0;
>  	} while (done == false);
>  	return nb_deq;
>  }
> @@ -751,11 +929,12 @@ eca_add_queue_pair(struct event_crypto_adapter
> *adapter, uint8_t cdev_id,
>  			return -ENOMEM;
> 
>  		qpairs = dev_info->qpairs;
> -		qpairs->op_buffer = rte_zmalloc_socket(adapter-
> >mem_name,
> -					BATCH_SIZE *
> -					sizeof(struct rte_crypto_op *),
> -					0, adapter->socket_id);
> -		if (!qpairs->op_buffer) {
> +
> +		if (eca_circular_buffer_init("eca_cdev_circular_buffer",
> +					     &qpairs->cbuf,
> +
> CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
> +			RTE_EDEV_LOG_ERR("Failed to get memory for
> cryptodev "
> +					 "buffer");
>  			rte_free(qpairs);
>  			return -ENOMEM;
>  		}
> --
> 2.6.4


^ permalink raw reply	[flat|nested] 17+ messages in thread

* [PATCH v5 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer
  2022-02-07 10:50     ` [PATCH v4 " Ganapati Kundapura
  2022-02-07 10:50       ` [PATCH v4 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
  2022-02-10 15:07       ` [PATCH v4 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Gujjar, Abhinandan S
@ 2022-02-10 17:41       ` Ganapati Kundapura
  2022-02-10 17:41         ` [PATCH v5 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
  2022-02-11  4:43         ` [PATCH v5 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Gujjar, Abhinandan S
  2 siblings, 2 replies; 17+ messages in thread
From: Ganapati Kundapura @ 2022-02-10 17:41 UTC (permalink / raw)
  To: jay.jayatheerthan, jerinjacobk, abhinandan.gujjar, dev

Move crypto ops to circular buffer to retain crypto
ops when cryptodev/eventdev are temporarily full

Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>

---
v5:
* Add branch prediction to if conditions

v4:
* Retain the non enqueued crypto ops in circular buffer to
  process later and stop the dequeue from eventdev till
  all the crypto ops are enqueued to cryptodev

  check space in circular buffer and stop dequeue from
  eventdev if some ops failed to flush to cdev
  and no space for another batch is available in circular buffer

  Enable dequeue from eventdev after all the ops are flushed

v3:
* update eca_ops_buffer_flush() to flush out all the crypto
  ops out of circular buffer.
* remove freeing of failed crypto ops from eca_ops_enqueue_burst()
  and add to cirular buffer for later processing.

v2:
* reset crypto adapter next cdev id before dequeueing from the
  next cdev
---

diff --git a/lib/eventdev/rte_event_crypto_adapter.c b/lib/eventdev/rte_event_crypto_adapter.c
index d840803..0b484f3 100644
--- a/lib/eventdev/rte_event_crypto_adapter.c
+++ b/lib/eventdev/rte_event_crypto_adapter.c
@@ -25,11 +25,27 @@
 #define CRYPTO_ADAPTER_MEM_NAME_LEN 32
 #define CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
 
+#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
+#define CRYPTO_ADAPTER_BUFFER_SZ 1024
+
 /* Flush an instance's enqueue buffers every CRYPTO_ENQ_FLUSH_THRESHOLD
  * iterations of eca_crypto_adapter_enq_run()
  */
 #define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
 
+struct crypto_ops_circular_buffer {
+	/* index of head element in circular buffer */
+	uint16_t head;
+	/* index of tail element in circular buffer */
+	uint16_t tail;
+	/* number of elements in buffer */
+	uint16_t count;
+	/* size of circular buffer */
+	uint16_t size;
+	/* Pointer to hold rte_crypto_ops for batching */
+	struct rte_crypto_op **op_buffer;
+} __rte_cache_aligned;
+
 struct event_crypto_adapter {
 	/* Event device identifier */
 	uint8_t eventdev_id;
@@ -37,6 +53,10 @@ struct event_crypto_adapter {
 	uint8_t event_port_id;
 	/* Store event device's implicit release capability */
 	uint8_t implicit_release_disabled;
+	/* Flag to indicate backpressure at cryptodev
+	 * Stop further dequeuing events from eventdev
+	 */
+	bool stop_enq_to_cryptodev;
 	/* Max crypto ops processed in any service function invocation */
 	uint32_t max_nb;
 	/* Lock to serialize config updates with service function */
@@ -47,6 +67,8 @@ struct event_crypto_adapter {
 	struct crypto_device_info *cdevs;
 	/* Loop counter to flush crypto ops */
 	uint16_t transmit_loop_count;
+	/* Circular buffer for batching crypto ops to eventdev */
+	struct crypto_ops_circular_buffer ebuf;
 	/* Per instance stats structure */
 	struct rte_event_crypto_adapter_stats crypto_stats;
 	/* Configuration callback for rte_service configuration */
@@ -93,10 +115,8 @@ struct crypto_device_info {
 struct crypto_queue_pair_info {
 	/* Set to indicate queue pair is enabled */
 	bool qp_enabled;
-	/* Pointer to hold rte_crypto_ops for batching */
-	struct rte_crypto_op **op_buffer;
-	/* No of crypto ops accumulated */
-	uint8_t len;
+	/* Circular buffer for batching crypto ops to cdev */
+	struct crypto_ops_circular_buffer cbuf;
 } __rte_cache_aligned;
 
 static struct event_crypto_adapter **event_crypto_adapter;
@@ -141,6 +161,84 @@ eca_init(void)
 	return 0;
 }
 
+static inline bool
+eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer *bufp)
+{
+	return bufp->count >= BATCH_SIZE;
+}
+
+static inline bool
+eca_circular_buffer_space_for_batch(struct crypto_ops_circular_buffer *bufp)
+{
+	return (bufp->size - bufp->count) >= BATCH_SIZE;
+}
+
+static inline void
+eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp)
+{
+	rte_free(bufp->op_buffer);
+}
+
+static inline int
+eca_circular_buffer_init(const char *name,
+			 struct crypto_ops_circular_buffer *bufp,
+			 uint16_t sz)
+{
+	bufp->op_buffer = rte_zmalloc(name,
+				      sizeof(struct rte_crypto_op *) * sz,
+				      0);
+	if (bufp->op_buffer == NULL)
+		return -ENOMEM;
+
+	bufp->size = sz;
+	return 0;
+}
+
+static inline int
+eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
+			struct rte_crypto_op *op)
+{
+	uint16_t *tailp = &bufp->tail;
+
+	bufp->op_buffer[*tailp] = op;
+	/* circular buffer, go round */
+	*tailp = (*tailp + 1) % bufp->size;
+	bufp->count++;
+
+	return 0;
+}
+
+static inline int
+eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer *bufp,
+				  uint8_t cdev_id, uint16_t qp_id,
+				  uint16_t *nb_ops_flushed)
+{
+	uint16_t n = 0;
+	uint16_t *headp = &bufp->head;
+	uint16_t *tailp = &bufp->tail;
+	struct rte_crypto_op **ops = bufp->op_buffer;
+
+	if (*tailp > *headp)
+		n = *tailp - *headp;
+	else if (*tailp < *headp)
+		n = bufp->size - *headp;
+	else {
+		*nb_ops_flushed = 0;
+		return 0;  /* buffer empty */
+	}
+
+	*nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
+						      &ops[*headp], n);
+	bufp->count -= *nb_ops_flushed;
+	if (!bufp->count) {
+		*headp = 0;
+		*tailp = 0;
+	} else
+		*headp = (*headp + *nb_ops_flushed) % bufp->size;
+
+	return *nb_ops_flushed == n ? 0 : -1;
+}
+
 static inline struct event_crypto_adapter *
 eca_id_to_adapter(uint8_t id)
 {
@@ -237,10 +335,19 @@ rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
 		return -ENOMEM;
 	}
 
+	if (eca_circular_buffer_init("eca_edev_circular_buffer",
+				     &adapter->ebuf,
+				     CRYPTO_ADAPTER_BUFFER_SZ)) {
+		RTE_EDEV_LOG_ERR("Failed to get memory for eventdev buffer");
+		rte_free(adapter);
+		return -ENOMEM;
+	}
+
 	ret = rte_event_dev_info_get(dev_id, &dev_info);
 	if (ret < 0) {
 		RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d: %s!",
 				 dev_id, dev_info.driver_name);
+		eca_circular_buffer_free(&adapter->ebuf);
 		rte_free(adapter);
 		return ret;
 	}
@@ -259,6 +366,7 @@ rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
 					socket_id);
 	if (adapter->cdevs == NULL) {
 		RTE_EDEV_LOG_ERR("Failed to get mem for crypto devices\n");
+		eca_circular_buffer_free(&adapter->ebuf);
 		rte_free(adapter);
 		return -ENOMEM;
 	}
@@ -337,10 +445,10 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 	struct crypto_queue_pair_info *qp_info = NULL;
 	struct rte_crypto_op *crypto_op;
 	unsigned int i, n;
-	uint16_t qp_id, len, ret;
+	uint16_t qp_id, nb_enqueued = 0;
 	uint8_t cdev_id;
+	int ret;
 
-	len = 0;
 	ret = 0;
 	n = 0;
 	stats->event_deq_count += cnt;
@@ -366,9 +474,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 				rte_crypto_op_free(crypto_op);
 				continue;
 			}
-			len = qp_info->len;
-			qp_info->op_buffer[len] = crypto_op;
-			len++;
+			eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
 		} else if (crypto_op->sess_type == RTE_CRYPTO_OP_SESSIONLESS &&
 				crypto_op->private_data_offset) {
 			m_data = (union rte_event_crypto_metadata *)
@@ -382,87 +488,91 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 				rte_crypto_op_free(crypto_op);
 				continue;
 			}
-			len = qp_info->len;
-			qp_info->op_buffer[len] = crypto_op;
-			len++;
+			eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
 		} else {
 			rte_pktmbuf_free(crypto_op->sym->m_src);
 			rte_crypto_op_free(crypto_op);
 			continue;
 		}
 
-		if (len == BATCH_SIZE) {
-			struct rte_crypto_op **op_buffer = qp_info->op_buffer;
-			ret = rte_cryptodev_enqueue_burst(cdev_id,
-							  qp_id,
-							  op_buffer,
-							  BATCH_SIZE);
-
-			stats->crypto_enq_count += ret;
-
-			while (ret < len) {
-				struct rte_crypto_op *op;
-				op = op_buffer[ret++];
-				stats->crypto_enq_fail++;
-				rte_pktmbuf_free(op->sym->m_src);
-				rte_crypto_op_free(op);
-			}
-
-			len = 0;
+		if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
+			ret = eca_circular_buffer_flush_to_cdev(&qp_info->cbuf,
+								cdev_id,
+								qp_id,
+								&nb_enqueued);
+			/**
+			 * If some crypto ops failed to flush to cdev and
+			 * space for another batch is not available, stop
+			 * dequeue from eventdev momentarily
+			 */
+			if (unlikely(ret < 0 &&
+				!eca_circular_buffer_space_for_batch(
+							&qp_info->cbuf)))
+				adapter->stop_enq_to_cryptodev = true;
 		}
 
-		if (qp_info)
-			qp_info->len = len;
-		n += ret;
+		stats->crypto_enq_count += nb_enqueued;
+		n += nb_enqueued;
 	}
 
 	return n;
 }
 
 static unsigned int
-eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
+eca_crypto_cdev_flush(struct event_crypto_adapter *adapter,
+		      uint8_t cdev_id, uint16_t *nb_ops_flushed)
 {
-	struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
 	struct crypto_device_info *curr_dev;
 	struct crypto_queue_pair_info *curr_queue;
-	struct rte_crypto_op **op_buffer;
 	struct rte_cryptodev *dev;
-	uint8_t cdev_id;
+	uint16_t nb = 0, nb_enqueued = 0;
 	uint16_t qp;
-	uint16_t ret;
-	uint16_t num_cdev = rte_cryptodev_count();
 
-	ret = 0;
-	for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
-		curr_dev = &adapter->cdevs[cdev_id];
-		dev = curr_dev->dev;
-		if (dev == NULL)
-			continue;
-		for (qp = 0; qp < dev->data->nb_queue_pairs; qp++) {
+	curr_dev = &adapter->cdevs[cdev_id];
+	if (unlikely(curr_dev == NULL))
+		return 0;
 
-			curr_queue = &curr_dev->qpairs[qp];
-			if (!curr_queue->qp_enabled)
-				continue;
+	dev = rte_cryptodev_pmd_get_dev(cdev_id);
+	for (qp = 0; qp < dev->data->nb_queue_pairs; qp++) {
 
-			op_buffer = curr_queue->op_buffer;
-			ret = rte_cryptodev_enqueue_burst(cdev_id,
-							  qp,
-							  op_buffer,
-							  curr_queue->len);
-			stats->crypto_enq_count += ret;
-
-			while (ret < curr_queue->len) {
-				struct rte_crypto_op *op;
-				op = op_buffer[ret++];
-				stats->crypto_enq_fail++;
-				rte_pktmbuf_free(op->sym->m_src);
-				rte_crypto_op_free(op);
-			}
-			curr_queue->len = 0;
-		}
+		curr_queue = &curr_dev->qpairs[qp];
+		if (unlikely(curr_queue == NULL || !curr_queue->qp_enabled))
+			continue;
+
+		eca_circular_buffer_flush_to_cdev(&curr_queue->cbuf,
+						  cdev_id,
+						  qp,
+						  &nb_enqueued);
+		*nb_ops_flushed += curr_queue->cbuf.count;
+		nb += nb_enqueued;
 	}
 
-	return ret;
+	return nb;
+}
+
+static unsigned int
+eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
+{
+	struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
+	uint8_t cdev_id;
+	uint16_t nb_enqueued = 0;
+	uint16_t nb_ops_flushed = 0;
+	uint16_t num_cdev = rte_cryptodev_count();
+
+	for (cdev_id = 0; cdev_id < num_cdev; cdev_id++)
+		nb_enqueued += eca_crypto_cdev_flush(adapter,
+						    cdev_id,
+						    &nb_ops_flushed);
+	/**
+	 * Enable dequeue from eventdev if all ops from circular
+	 * buffer flushed to cdev
+	 */
+	if (!nb_ops_flushed)
+		adapter->stop_enq_to_cryptodev = false;
+
+	stats->crypto_enq_count += nb_enqueued;
+
+	return nb_enqueued;
 }
 
 static int
@@ -480,6 +590,13 @@ eca_crypto_adapter_enq_run(struct event_crypto_adapter *adapter,
 	if (adapter->mode == RTE_EVENT_CRYPTO_ADAPTER_OP_NEW)
 		return 0;
 
+	if (unlikely(adapter->stop_enq_to_cryptodev)) {
+		nb_enqueued += eca_crypto_enq_flush(adapter);
+
+		if (unlikely(adapter->stop_enq_to_cryptodev))
+			goto skip_event_dequeue_burst;
+	}
+
 	for (nb_enq = 0; nb_enq < max_enq; nb_enq += n) {
 		stats->event_poll_count++;
 		n = rte_event_dequeue_burst(event_dev_id,
@@ -491,6 +608,8 @@ eca_crypto_adapter_enq_run(struct event_crypto_adapter *adapter,
 		nb_enqueued += eca_enq_to_cryptodev(adapter, ev, n);
 	}
 
+skip_event_dequeue_burst:
+
 	if ((++adapter->transmit_loop_count &
 		(CRYPTO_ENQ_FLUSH_THRESHOLD - 1)) == 0) {
 		nb_enqueued += eca_crypto_enq_flush(adapter);
@@ -499,9 +618,9 @@ eca_crypto_adapter_enq_run(struct event_crypto_adapter *adapter,
 	return nb_enqueued;
 }
 
-static inline void
+static inline uint16_t
 eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
-		      struct rte_crypto_op **ops, uint16_t num)
+		  struct rte_crypto_op **ops, uint16_t num)
 {
 	struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
 	union rte_event_crypto_metadata *m_data = NULL;
@@ -518,6 +637,8 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
 	num = RTE_MIN(num, BATCH_SIZE);
 	for (i = 0; i < num; i++) {
 		struct rte_event *ev = &events[nb_ev++];
+
+		m_data = NULL;
 		if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
 			m_data = rte_cryptodev_sym_session_get_user_data(
 					ops[i]->sym->session);
@@ -548,21 +669,56 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
 						  event_port_id,
 						  &events[nb_enqueued],
 						  nb_ev - nb_enqueued);
+
 	} while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
 		 nb_enqueued < nb_ev);
 
-	/* Free mbufs and rte_crypto_ops for failed events */
-	for (i = nb_enqueued; i < nb_ev; i++) {
-		struct rte_crypto_op *op = events[i].event_ptr;
-		rte_pktmbuf_free(op->sym->m_src);
-		rte_crypto_op_free(op);
-	}
-
 	stats->event_enq_fail_count += nb_ev - nb_enqueued;
 	stats->event_enq_count += nb_enqueued;
 	stats->event_enq_retry_count += retry - 1;
+
+	return nb_enqueued;
+}
+
+static int
+eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter *adapter,
+				   struct crypto_ops_circular_buffer *bufp)
+{
+	uint16_t n = 0, nb_ops_flushed;
+	uint16_t *headp = &bufp->head;
+	uint16_t *tailp = &bufp->tail;
+	struct rte_crypto_op **ops = bufp->op_buffer;
+
+	if (*tailp > *headp)
+		n = *tailp - *headp;
+	else if (*tailp < *headp)
+		n = bufp->size - *headp;
+	else
+		return 0;  /* buffer empty */
+
+	nb_ops_flushed =  eca_ops_enqueue_burst(adapter, ops, n);
+	bufp->count -= nb_ops_flushed;
+	if (!bufp->count) {
+		*headp = 0;
+		*tailp = 0;
+		return 0;  /* buffer empty */
+	}
+
+	*headp = (*headp + nb_ops_flushed) % bufp->size;
+	return 1;
 }
 
+
+static void
+eca_ops_buffer_flush(struct event_crypto_adapter *adapter)
+{
+	if (likely(adapter->ebuf.count == 0))
+		return;
+
+	while (eca_circular_buffer_flush_to_evdev(adapter,
+						  &adapter->ebuf))
+		;
+}
 static inline unsigned int
 eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 			   unsigned int max_deq)
@@ -571,7 +727,7 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 	struct crypto_device_info *curr_dev;
 	struct crypto_queue_pair_info *curr_queue;
 	struct rte_crypto_op *ops[BATCH_SIZE];
-	uint16_t n, nb_deq;
+	uint16_t n, nb_deq, nb_enqueued, i;
 	struct rte_cryptodev *dev;
 	uint8_t cdev_id;
 	uint16_t qp, dev_qps;
@@ -579,16 +735,20 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 	uint16_t num_cdev = rte_cryptodev_count();
 
 	nb_deq = 0;
+	eca_ops_buffer_flush(adapter);
+
 	do {
-		uint16_t queues = 0;
 		done = true;
 
 		for (cdev_id = adapter->next_cdev_id;
 			cdev_id < num_cdev; cdev_id++) {
+			uint16_t queues = 0;
+
 			curr_dev = &adapter->cdevs[cdev_id];
 			dev = curr_dev->dev;
-			if (dev == NULL)
+			if (unlikely(dev == NULL))
 				continue;
+
 			dev_qps = dev->data->nb_queue_pairs;
 
 			for (qp = curr_dev->next_queue_pair_id;
@@ -596,7 +756,8 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 				queues++) {
 
 				curr_queue = &curr_dev->qpairs[qp];
-				if (!curr_queue->qp_enabled)
+				if (unlikely(curr_queue == NULL ||
+				    !curr_queue->qp_enabled))
 					continue;
 
 				n = rte_cryptodev_dequeue_burst(cdev_id, qp,
@@ -605,11 +766,27 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 					continue;
 
 				done = false;
+				nb_enqueued = 0;
+
 				stats->crypto_deq_count += n;
-				eca_ops_enqueue_burst(adapter, ops, n);
+
+				if (unlikely(!adapter->ebuf.count))
+					nb_enqueued = eca_ops_enqueue_burst(
+							adapter, ops, n);
+
+				if (likely(nb_enqueued == n))
+					goto check;
+
+				/* Failed to enqueue events case */
+				for (i = nb_enqueued; i < n; i++)
+					eca_circular_buffer_add(
+						&adapter->ebuf,
+						ops[nb_enqueued]);
+
+check:
 				nb_deq += n;
 
-				if (nb_deq > max_deq) {
+				if (nb_deq >= max_deq) {
 					if ((qp + 1) == dev_qps) {
 						adapter->next_cdev_id =
 							(cdev_id + 1)
@@ -622,6 +799,7 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 				}
 			}
 		}
+		adapter->next_cdev_id = 0;
 	} while (done == false);
 	return nb_deq;
 }
@@ -751,11 +929,12 @@ eca_add_queue_pair(struct event_crypto_adapter *adapter, uint8_t cdev_id,
 			return -ENOMEM;
 
 		qpairs = dev_info->qpairs;
-		qpairs->op_buffer = rte_zmalloc_socket(adapter->mem_name,
-					BATCH_SIZE *
-					sizeof(struct rte_crypto_op *),
-					0, adapter->socket_id);
-		if (!qpairs->op_buffer) {
+
+		if (eca_circular_buffer_init("eca_cdev_circular_buffer",
+					     &qpairs->cbuf,
+					     CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
+			RTE_EDEV_LOG_ERR("Failed to get memory for cryptodev "
+					 "buffer");
 			rte_free(qpairs);
 			return -ENOMEM;
 		}
-- 
2.6.4


^ permalink raw reply	[flat|nested] 17+ messages in thread

* [PATCH v5 2/2] eventdev: update crypto caps get to return SW cap
  2022-02-10 17:41       ` [PATCH v5 " Ganapati Kundapura
@ 2022-02-10 17:41         ` Ganapati Kundapura
  2022-02-11  4:43           ` Gujjar, Abhinandan S
  2022-02-11  4:43         ` [PATCH v5 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Gujjar, Abhinandan S
  1 sibling, 1 reply; 17+ messages in thread
From: Ganapati Kundapura @ 2022-02-10 17:41 UTC (permalink / raw)
  To: jay.jayatheerthan, jerinjacobk, abhinandan.gujjar, dev

update rte_event_crypto_adapter_caps_get() to return
SW_CAP if PMD callback is not registered.

Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>

diff --git a/lib/eventdev/rte_eventdev.c b/lib/eventdev/rte_eventdev.c
index 79b9ea3..6988bf1 100644
--- a/lib/eventdev/rte_eventdev.c
+++ b/lib/eventdev/rte_eventdev.c
@@ -176,11 +176,15 @@ rte_event_crypto_adapter_caps_get(uint8_t dev_id, uint8_t cdev_id,
 
 	if (caps == NULL)
 		return -EINVAL;
-	*caps = 0;
+
+	if (dev->dev_ops->crypto_adapter_caps_get == NULL)
+		*caps = RTE_EVENT_CRYPTO_ADAPTER_SW_CAP;
+	else
+		*caps = 0;
 
 	return dev->dev_ops->crypto_adapter_caps_get ?
 		(*dev->dev_ops->crypto_adapter_caps_get)
-		(dev, cdev, caps) : -ENOTSUP;
+		(dev, cdev, caps) : 0;
 }
 
 int
-- 
2.6.4


^ permalink raw reply	[flat|nested] 17+ messages in thread

* RE: [PATCH v4 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer
  2022-02-10 15:07       ` [PATCH v4 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Gujjar, Abhinandan S
@ 2022-02-10 17:46         ` Kundapura, Ganapati
  0 siblings, 0 replies; 17+ messages in thread
From: Kundapura, Ganapati @ 2022-02-10 17:46 UTC (permalink / raw)
  To: Gujjar, Abhinandan S, Jayatheerthan, Jay, jerinjacobk, dev

Hi Abhi,

> -----Original Message-----
> From: Gujjar, Abhinandan S <abhinandan.gujjar@intel.com>
> Sent: 10 February 2022 20:37
> To: Kundapura, Ganapati <ganapati.kundapura@intel.com>; Jayatheerthan,
> Jay <jay.jayatheerthan@intel.com>; jerinjacobk@gmail.com; dev@dpdk.org
> Subject: RE: [PATCH v4 1/2] eventdev/crypto_adapter: move crypto ops to
> circular buffer
> 
> Hi Ganapati,
> 
> It looks good to me. Some minor comments inline.
> 
> > -----Original Message-----
> > From: Kundapura, Ganapati <ganapati.kundapura@intel.com>
> > Sent: Monday, February 7, 2022 4:21 PM
> > To: Jayatheerthan, Jay <jay.jayatheerthan@intel.com>;
> > jerinjacobk@gmail.com; Gujjar, Abhinandan S
> > <abhinandan.gujjar@intel.com>; dev@dpdk.org
> > Subject: [PATCH v4 1/2] eventdev/crypto_adapter: move crypto ops to
> > circular buffer
> >
> > Move crypto ops to circular buffer to retain crypto ops when
> > cryptodev/eventdev are temporarily full
> >
> > Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>
> >
> > ---
> > v4:
> > * Retain the non enqueued crypto ops in circular buffer to
> >   process later and stop the dequeue from eventdev till
> >   all the crypto ops are enqueued to cryptodev
> >
> >   check space in circular buffer and stop dequeue from
> >   eventdev if some ops failed to flush to cdev
> >   and no space for another batch is available in circular buffer
> >
> >   Enable dequeue from eventdev after all the ops are flushed
> >
> > v3:
> > * update eca_ops_buffer_flush() to flush out all the crypto
> >   ops out of circular buffer.
> > * remove freeing of failed crypto ops from eca_ops_enqueue_burst()
> >   and add to cirular buffer for later processing.
> >
> > v2:
> > * reset crypto adapter next cdev id before dequeueing from the
> >   next cdev
> > ---
> >
> > diff --git a/lib/eventdev/rte_event_crypto_adapter.c
> > b/lib/eventdev/rte_event_crypto_adapter.c
> > index d840803..0faac36 100644
> > --- a/lib/eventdev/rte_event_crypto_adapter.c
> > +++ b/lib/eventdev/rte_event_crypto_adapter.c
> > @@ -25,11 +25,27 @@
> >  #define CRYPTO_ADAPTER_MEM_NAME_LEN 32  #define
> > CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
> >
> > +#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
> > #define
> > +CRYPTO_ADAPTER_BUFFER_SZ 1024
> > +
> >  /* Flush an instance's enqueue buffers every
> > CRYPTO_ENQ_FLUSH_THRESHOLD
> >   * iterations of eca_crypto_adapter_enq_run()
> >   */
> >  #define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
> >
> > +struct crypto_ops_circular_buffer {
> > +	/* index of head element in circular buffer */
> > +	uint16_t head;
> > +	/* index of tail element in circular buffer */
> > +	uint16_t tail;
> > +	/* number of elements in buffer */
> > +	uint16_t count;
> > +	/* size of circular buffer */
> > +	uint16_t size;
> > +	/* Pointer to hold rte_crypto_ops for batching */
> > +	struct rte_crypto_op **op_buffer;
> > +} __rte_cache_aligned;
> > +
> >  struct event_crypto_adapter {
> >  	/* Event device identifier */
> >  	uint8_t eventdev_id;
> > @@ -37,6 +53,10 @@ struct event_crypto_adapter {
> >  	uint8_t event_port_id;
> >  	/* Store event device's implicit release capability */
> >  	uint8_t implicit_release_disabled;
> > +	/* Flag to indicate backpressure at cryptodev
> > +	 * Stop further dequeuing events from eventdev
> > +	 */
> > +	bool stop_enq_to_cryptodev;
> >  	/* Max crypto ops processed in any service function invocation */
> >  	uint32_t max_nb;
> >  	/* Lock to serialize config updates with service function */ @@
> > -47,6
> > +67,8 @@ struct event_crypto_adapter {
> >  	struct crypto_device_info *cdevs;
> >  	/* Loop counter to flush crypto ops */
> >  	uint16_t transmit_loop_count;
> > +	/* Circular buffer for batching crypto ops to eventdev */
> > +	struct crypto_ops_circular_buffer ebuf;
> >  	/* Per instance stats structure */
> >  	struct rte_event_crypto_adapter_stats crypto_stats;
> >  	/* Configuration callback for rte_service configuration */ @@ -93,10
> > +115,8 @@ struct crypto_device_info {  struct crypto_queue_pair_info {
> >  	/* Set to indicate queue pair is enabled */
> >  	bool qp_enabled;
> > -	/* Pointer to hold rte_crypto_ops for batching */
> > -	struct rte_crypto_op **op_buffer;
> > -	/* No of crypto ops accumulated */
> > -	uint8_t len;
> > +	/* Circular buffer for batching crypto ops to cdev */
> > +	struct crypto_ops_circular_buffer cbuf;
> >  } __rte_cache_aligned;
> >
> >  static struct event_crypto_adapter **event_crypto_adapter; @@ -141,6
> > +161,84 @@ eca_init(void)
> >  	return 0;
> >  }
> >
> > +static inline bool
> > +eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer
> > +*bufp) {
> > +	return bufp->count >= BATCH_SIZE;
> > +}
> > +
> > +static inline bool
> > +eca_circular_buffer_space_for_batch(struct crypto_ops_circular_buffer
> > +*bufp) {
> > +	return (bufp->size - bufp->count) >= BATCH_SIZE; }
> > +
> > +static inline void
> > +eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp) {
> > +	rte_free(bufp->op_buffer);
> > +}
> > +
> > +static inline int
> > +eca_circular_buffer_init(const char *name,
> > +			 struct crypto_ops_circular_buffer *bufp,
> > +			 uint16_t sz)
> > +{
> > +	bufp->op_buffer = rte_zmalloc(name,
> > +				      sizeof(struct rte_crypto_op *) * sz,
> > +				      0);
> > +	if (bufp->op_buffer == NULL)
> > +		return -ENOMEM;
> > +
> > +	bufp->size = sz;
> > +	return 0;
> > +}
> > +
> > +static inline int
> > +eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
> > +			struct rte_crypto_op *op)
> > +{
> > +	uint16_t *tailp = &bufp->tail;
> > +
> > +	bufp->op_buffer[*tailp] = op;
> > +	/* circular buffer, go round */
> > +	*tailp = (*tailp + 1) % bufp->size;
> > +	bufp->count++;
> > +
> > +	return 0;
> > +}
> > +
> > +static inline int
> > +eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer
> > *bufp,
> > +				  uint8_t cdev_id, uint16_t qp_id,
> > +				  uint16_t *nb_ops_flushed)
> > +{
> > +	uint16_t n = 0;
> > +	uint16_t *headp = &bufp->head;
> > +	uint16_t *tailp = &bufp->tail;
> > +	struct rte_crypto_op **ops = bufp->op_buffer;
> > +
> > +	if (*tailp > *headp)
> > +		n = *tailp - *headp;
> > +	else if (*tailp < *headp)
> > +		n = bufp->size - *headp;
> > +	else {
> > +		*nb_ops_flushed = 0;
> > +		return 0;  /* buffer empty */
> > +	}
> > +
> > +	*nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
> > +						      &ops[*headp], n);
> > +	bufp->count -= *nb_ops_flushed;
> > +	if (!bufp->count) {
> > +		*headp = 0;
> > +		*tailp = 0;
> > +	} else
> > +		*headp = (*headp + *nb_ops_flushed) % bufp->size;
> > +
> > +	return *nb_ops_flushed == n ? 0 : -1; }
> > +
> >  static inline struct event_crypto_adapter *
> > eca_id_to_adapter(uint8_t id)  { @@ -237,10 +335,19 @@
> > rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
> >  		return -ENOMEM;
> >  	}
> >
> > +	if (eca_circular_buffer_init("eca_edev_circular_buffer",
> > +				     &adapter->ebuf,
> > +				     CRYPTO_ADAPTER_BUFFER_SZ)) {
> > +		RTE_EDEV_LOG_ERR("Failed to get memory for eventdev
> > buffer");
> > +		rte_free(adapter);
> > +		return -ENOMEM;
> > +	}
> > +
> >  	ret = rte_event_dev_info_get(dev_id, &dev_info);
> >  	if (ret < 0) {
> >  		RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d:
> > %s!",
> >  				 dev_id, dev_info.driver_name);
> > +		eca_circular_buffer_free(&adapter->ebuf);
> >  		rte_free(adapter);
> >  		return ret;
> >  	}
> > @@ -259,6 +366,7 @@ rte_event_crypto_adapter_create_ext(uint8_t id,
> > uint8_t dev_id,
> >  					socket_id);
> >  	if (adapter->cdevs == NULL) {
> >  		RTE_EDEV_LOG_ERR("Failed to get mem for crypto
> devices\n");
> > +		eca_circular_buffer_free(&adapter->ebuf);
> >  		rte_free(adapter);
> >  		return -ENOMEM;
> >  	}
> > @@ -337,10 +445,10 @@ eca_enq_to_cryptodev(struct
> event_crypto_adapter
> > *adapter, struct rte_event *ev,
> >  	struct crypto_queue_pair_info *qp_info = NULL;
> >  	struct rte_crypto_op *crypto_op;
> >  	unsigned int i, n;
> > -	uint16_t qp_id, len, ret;
> > +	uint16_t qp_id, nb_enqueued = 0;
> >  	uint8_t cdev_id;
> > +	int ret;
> >
> > -	len = 0;
> >  	ret = 0;
> >  	n = 0;
> >  	stats->event_deq_count += cnt;
> > @@ -366,9 +474,7 @@ eca_enq_to_cryptodev(struct
> event_crypto_adapter
> > *adapter, struct rte_event *ev,
> >  				rte_crypto_op_free(crypto_op);
> >  				continue;
> >  			}
> > -			len = qp_info->len;
> > -			qp_info->op_buffer[len] = crypto_op;
> > -			len++;
> > +			eca_circular_buffer_add(&qp_info->cbuf,
> > crypto_op);
> >  		} else if (crypto_op->sess_type ==
> > RTE_CRYPTO_OP_SESSIONLESS &&
> >  				crypto_op->private_data_offset) {
> >  			m_data = (union rte_event_crypto_metadata *) @@
> > -382,87 +488,91 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> > *adapter, struct rte_event *ev,
> >  				rte_crypto_op_free(crypto_op);
> >  				continue;
> >  			}
> > -			len = qp_info->len;
> > -			qp_info->op_buffer[len] = crypto_op;
> > -			len++;
> > +			eca_circular_buffer_add(&qp_info->cbuf,
> > crypto_op);
> >  		} else {
> >  			rte_pktmbuf_free(crypto_op->sym->m_src);
> >  			rte_crypto_op_free(crypto_op);
> >  			continue;
> >  		}
> >
> > -		if (len == BATCH_SIZE) {
> > -			struct rte_crypto_op **op_buffer = qp_info-
> > >op_buffer;
> > -			ret = rte_cryptodev_enqueue_burst(cdev_id,
> > -							  qp_id,
> > -							  op_buffer,
> > -							  BATCH_SIZE);
> > -
> > -			stats->crypto_enq_count += ret;
> > -
> > -			while (ret < len) {
> > -				struct rte_crypto_op *op;
> > -				op = op_buffer[ret++];
> > -				stats->crypto_enq_fail++;
> > -				rte_pktmbuf_free(op->sym->m_src);
> > -				rte_crypto_op_free(op);
> > -			}
> > -
> > -			len = 0;
> > +		if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
> > +			ret = eca_circular_buffer_flush_to_cdev(&qp_info-
> > >cbuf,
> > +								cdev_id,
> > +								qp_id,
> > +
> > 	&nb_enqueued);
> > +			/**
> > +			 * If some crypto ops failed to flush to cdev and
> > +			 * space for another batch is not available, stop
> > +			 * dequeue from eventdev momentarily
> > +			 */
> > +			if (unlikely(ret < 0 &&
> > +				!eca_circular_buffer_space_for_batch(
> > +							&qp_info->cbuf)))
> > +				adapter->stop_enq_to_cryptodev = true;
> >  		}
> >
> > -		if (qp_info)
> > -			qp_info->len = len;
> > -		n += ret;
> > +		stats->crypto_enq_count += nb_enqueued;
> > +		n += nb_enqueued;
> >  	}
> >
> >  	return n;
> >  }
> >
> >  static unsigned int
> > -eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
> > +eca_crypto_cdev_flush(struct event_crypto_adapter *adapter,
> > +		      uint8_t cdev_id, uint16_t *nb_ops_flushed)
> >  {
> > -	struct rte_event_crypto_adapter_stats *stats = &adapter-
> > >crypto_stats;
> >  	struct crypto_device_info *curr_dev;
> >  	struct crypto_queue_pair_info *curr_queue;
> > -	struct rte_crypto_op **op_buffer;
> >  	struct rte_cryptodev *dev;
> > -	uint8_t cdev_id;
> > +	uint16_t nb = 0, nb_enqueued = 0;
> >  	uint16_t qp;
> > -	uint16_t ret;
> > -	uint16_t num_cdev = rte_cryptodev_count();
> >
> > -	ret = 0;
> > -	for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
> > -		curr_dev = &adapter->cdevs[cdev_id];
> > -		dev = curr_dev->dev;
> > -		if (dev == NULL)
> > -			continue;
> > -		for (qp = 0; qp < dev->data->nb_queue_pairs; qp++) {
> > +	curr_dev = &adapter->cdevs[cdev_id];
> > +	if (unlikely(curr_dev == NULL))
> > +		return 0;
> >
> > -			curr_queue = &curr_dev->qpairs[qp];
> > -			if (!curr_queue->qp_enabled)
> > -				continue;
> > +	dev = rte_cryptodev_pmd_get_dev(cdev_id);
> > +	for (qp = 0; qp < dev->data->nb_queue_pairs; qp++) {
> >
> > -			op_buffer = curr_queue->op_buffer;
> > -			ret = rte_cryptodev_enqueue_burst(cdev_id,
> > -							  qp,
> > -							  op_buffer,
> > -							  curr_queue->len);
> > -			stats->crypto_enq_count += ret;
> > -
> > -			while (ret < curr_queue->len) {
> > -				struct rte_crypto_op *op;
> > -				op = op_buffer[ret++];
> > -				stats->crypto_enq_fail++;
> > -				rte_pktmbuf_free(op->sym->m_src);
> > -				rte_crypto_op_free(op);
> > -			}
> > -			curr_queue->len = 0;
> > -		}
> > +		curr_queue = &curr_dev->qpairs[qp];
> > +		if (unlikely(curr_queue == NULL || !curr_queue-
> > >qp_enabled))
> > +			continue;
> > +
> > +		eca_circular_buffer_flush_to_cdev(&curr_queue->cbuf,
> > +						  cdev_id,
> > +						  qp,
> > +						  &nb_enqueued);
> > +		*nb_ops_flushed += curr_queue->cbuf.count;
> > +		nb += nb_enqueued;
> >  	}
> >
> > -	return ret;
> > +	return nb;
> > +}
> > +
> > +static unsigned int
> > +eca_crypto_enq_flush(struct event_crypto_adapter *adapter) {
> > +	struct rte_event_crypto_adapter_stats *stats = &adapter-
> > >crypto_stats;
> > +	uint8_t cdev_id;
> > +	uint16_t nb_enqueued = 0;
> > +	uint16_t nb_ops_flushed = 0;
> > +	uint16_t num_cdev = rte_cryptodev_count();
> > +
> > +	for (cdev_id = 0; cdev_id < num_cdev; cdev_id++)
> > +		nb_enqueued += eca_crypto_cdev_flush(adapter,
> > +						    cdev_id,
> > +						    &nb_ops_flushed);
> > +	/**
> > +	 * Enable dequeue from eventdev if all ops from circular
> > +	 * buffer flushed to cdev
> > +	 */
> > +	if (!nb_ops_flushed)
> > +		adapter->stop_enq_to_cryptodev = false;
> > +
> > +	stats->crypto_enq_count += nb_enqueued;
> > +
> > +	return nb_enqueued;
> >  }
> >
> >  static int
> > @@ -480,6 +590,13 @@ eca_crypto_adapter_enq_run(struct
> > event_crypto_adapter *adapter,
> >  	if (adapter->mode == RTE_EVENT_CRYPTO_ADAPTER_OP_NEW)
> >  		return 0;
> >
> > +	if (adapter->stop_enq_to_cryptodev) {
> Add unlikely here
Added in v5
> > +		nb_enqueued += eca_crypto_enq_flush(adapter);
> > +
> > +		if (adapter->stop_enq_to_cryptodev)
> Add unlikely here
Added in v5
> > +			goto skip_event_dequeue_burst;
> > +	}
> > +
> >  	for (nb_enq = 0; nb_enq < max_enq; nb_enq += n) {
> >  		stats->event_poll_count++;
> >  		n = rte_event_dequeue_burst(event_dev_id,
> > @@ -491,6 +608,8 @@ eca_crypto_adapter_enq_run(struct
> > event_crypto_adapter *adapter,
> >  		nb_enqueued += eca_enq_to_cryptodev(adapter, ev, n);
> >  	}
> >
> > +skip_event_dequeue_burst:
> > +
> >  	if ((++adapter->transmit_loop_count &
> >  		(CRYPTO_ENQ_FLUSH_THRESHOLD - 1)) == 0) {
> >  		nb_enqueued += eca_crypto_enq_flush(adapter); @@ -
> > 499,9 +618,9 @@ eca_crypto_adapter_enq_run(struct
> event_crypto_adapter
> > *adapter,
> >  	return nb_enqueued;
> >  }
> >
> > -static inline void
> > +static inline uint16_t
> >  eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
> > -		      struct rte_crypto_op **ops, uint16_t num)
> > +		  struct rte_crypto_op **ops, uint16_t num)
> >  {
> >  	struct rte_event_crypto_adapter_stats *stats = &adapter-
> > >crypto_stats;
> >  	union rte_event_crypto_metadata *m_data = NULL; @@ -518,6
> > +637,8 @@ eca_ops_enqueue_burst(struct event_crypto_adapter
> *adapter,
> >  	num = RTE_MIN(num, BATCH_SIZE);
> >  	for (i = 0; i < num; i++) {
> >  		struct rte_event *ev = &events[nb_ev++];
> > +
> > +		m_data = NULL;
> >  		if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
> >  			m_data =
> > rte_cryptodev_sym_session_get_user_data(
> >  					ops[i]->sym->session);
> > @@ -548,21 +669,56 @@ eca_ops_enqueue_burst(struct
> > event_crypto_adapter *adapter,
> >  						  event_port_id,
> >  						  &events[nb_enqueued],
> >  						  nb_ev - nb_enqueued);
> > +
> >  	} while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
> >  		 nb_enqueued < nb_ev);
> >
> > -	/* Free mbufs and rte_crypto_ops for failed events */
> > -	for (i = nb_enqueued; i < nb_ev; i++) {
> > -		struct rte_crypto_op *op = events[i].event_ptr;
> > -		rte_pktmbuf_free(op->sym->m_src);
> > -		rte_crypto_op_free(op);
> > -	}
> > -
> >  	stats->event_enq_fail_count += nb_ev - nb_enqueued;
> >  	stats->event_enq_count += nb_enqueued;
> >  	stats->event_enq_retry_count += retry - 1;
> > +
> > +	return nb_enqueued;
> > +}
> > +
> > +static int
> > +eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter
> > *adapter,
> > +				   struct crypto_ops_circular_buffer *bufp) {
> > +	uint16_t n = 0, nb_ops_flushed;
> > +	uint16_t *headp = &bufp->head;
> > +	uint16_t *tailp = &bufp->tail;
> > +	struct rte_crypto_op **ops = bufp->op_buffer;
> > +
> > +	if (*tailp > *headp)
> > +		n = *tailp - *headp;
> > +	else if (*tailp < *headp)
> > +		n = bufp->size - *headp;
> > +	else
> > +		return 0;  /* buffer empty */
> > +
> > +	nb_ops_flushed =  eca_ops_enqueue_burst(adapter, ops, n);
> > +	bufp->count -= nb_ops_flushed;
> > +	if (!bufp->count) {
> > +		*headp = 0;
> > +		*tailp = 0;
> > +		return 0;  /* buffer empty */
> > +	}
> > +
> > +	*headp = (*headp + nb_ops_flushed) % bufp->size;
> > +	return 1;
> >  }
> >
> > +
> > +static void
> > +eca_ops_buffer_flush(struct event_crypto_adapter *adapter) {
> > +	if (adapter->ebuf.count == 0)
> Add likely here
Added in v5
> > +		return;
> > +
> > +	while (eca_circular_buffer_flush_to_evdev(adapter,
> > +						  &adapter->ebuf))
> > +		;
> > +}
> >  static inline unsigned int
> >  eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
> >  			   unsigned int max_deq)
> > @@ -571,7 +727,7 @@ eca_crypto_adapter_deq_run(struct
> > event_crypto_adapter *adapter,
> >  	struct crypto_device_info *curr_dev;
> >  	struct crypto_queue_pair_info *curr_queue;
> >  	struct rte_crypto_op *ops[BATCH_SIZE];
> > -	uint16_t n, nb_deq;
> > +	uint16_t n, nb_deq, nb_enqueued, i;
> >  	struct rte_cryptodev *dev;
> >  	uint8_t cdev_id;
> >  	uint16_t qp, dev_qps;
> > @@ -579,16 +735,20 @@ eca_crypto_adapter_deq_run(struct
> > event_crypto_adapter *adapter,
> >  	uint16_t num_cdev = rte_cryptodev_count();
> >
> >  	nb_deq = 0;
> > +	eca_ops_buffer_flush(adapter);
> > +
> >  	do {
> > -		uint16_t queues = 0;
> >  		done = true;
> >
> >  		for (cdev_id = adapter->next_cdev_id;
> >  			cdev_id < num_cdev; cdev_id++) {
> > +			uint16_t queues = 0;
> > +
> >  			curr_dev = &adapter->cdevs[cdev_id];
> >  			dev = curr_dev->dev;
> >  			if (dev == NULL)
> Add unlikely here
Added in v5
> >  				continue;
> > +
> >  			dev_qps = dev->data->nb_queue_pairs;
> >
> >  			for (qp = curr_dev->next_queue_pair_id; @@ -596,7
> > +756,8 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter
> > *adapter,
> >  				queues++) {
> >
> >  				curr_queue = &curr_dev->qpairs[qp];
> > -				if (!curr_queue->qp_enabled)
> > +				if (curr_queue == NULL ||
> > +				    !curr_queue->qp_enabled)
> Add unlikely here
Added in v5
> >  					continue;
> >
> >  				n = rte_cryptodev_dequeue_burst(cdev_id,
> > qp, @@ -605,11 +766,27 @@ eca_crypto_adapter_deq_run(struct
> > event_crypto_adapter *adapter,
> >  					continue;
> >
> >  				done = false;
> > +				nb_enqueued = 0;
> > +
> >  				stats->crypto_deq_count += n;
> > -				eca_ops_enqueue_burst(adapter, ops, n);
> > +
> > +				if (unlikely(!adapter->ebuf.count))
> > +					nb_enqueued =
> > eca_ops_enqueue_burst(
> > +							adapter, ops, n);
> > +
> > +				if (nb_enqueued == n)
> Add likely here
Added in v5
> > +					goto check;
> > +
> > +				/* Failed to enqueue events case */
> > +				for (i = nb_enqueued; i < n; i++)
> > +					eca_circular_buffer_add(
> > +						&adapter->ebuf,
> > +						ops[nb_enqueued]);
> > +
> > +check:
> >  				nb_deq += n;
> >
> > -				if (nb_deq > max_deq) {
> > +				if (nb_deq >= max_deq) {
> >  					if ((qp + 1) == dev_qps) {
> >  						adapter->next_cdev_id =
> >  							(cdev_id + 1)
> > @@ -622,6 +799,7 @@ eca_crypto_adapter_deq_run(struct
> > event_crypto_adapter *adapter,
> >  				}
> >  			}
> >  		}
> > +		adapter->next_cdev_id = 0;
> >  	} while (done == false);
> >  	return nb_deq;
> >  }
> > @@ -751,11 +929,12 @@ eca_add_queue_pair(struct
> event_crypto_adapter
> > *adapter, uint8_t cdev_id,
> >  			return -ENOMEM;
> >
> >  		qpairs = dev_info->qpairs;
> > -		qpairs->op_buffer = rte_zmalloc_socket(adapter-
> > >mem_name,
> > -					BATCH_SIZE *
> > -					sizeof(struct rte_crypto_op *),
> > -					0, adapter->socket_id);
> > -		if (!qpairs->op_buffer) {
> > +
> > +		if (eca_circular_buffer_init("eca_cdev_circular_buffer",
> > +					     &qpairs->cbuf,
> > +
> > CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
> > +			RTE_EDEV_LOG_ERR("Failed to get memory for
> > cryptodev "
> > +					 "buffer");
> >  			rte_free(qpairs);
> >  			return -ENOMEM;
> >  		}
> > --
> > 2.6.4


^ permalink raw reply	[flat|nested] 17+ messages in thread

* RE: [PATCH v5 2/2] eventdev: update crypto caps get to return SW cap
  2022-02-10 17:41         ` [PATCH v5 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
@ 2022-02-11  4:43           ` Gujjar, Abhinandan S
  0 siblings, 0 replies; 17+ messages in thread
From: Gujjar, Abhinandan S @ 2022-02-11  4:43 UTC (permalink / raw)
  To: Kundapura, Ganapati, Jayatheerthan, Jay, jerinjacobk, dev

Acked-by: Abhinandan Gujjar <abhinandan.gujjar@intel.com>

> -----Original Message-----
> From: Kundapura, Ganapati <ganapati.kundapura@intel.com>
> Sent: Thursday, February 10, 2022 11:11 PM
> To: Jayatheerthan, Jay <jay.jayatheerthan@intel.com>;
> jerinjacobk@gmail.com; Gujjar, Abhinandan S
> <abhinandan.gujjar@intel.com>; dev@dpdk.org
> Subject: [PATCH v5 2/2] eventdev: update crypto caps get to return SW cap
> 
> update rte_event_crypto_adapter_caps_get() to return SW_CAP if PMD
> callback is not registered.
> 
> Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>
> 
> diff --git a/lib/eventdev/rte_eventdev.c b/lib/eventdev/rte_eventdev.c
> index 79b9ea3..6988bf1 100644
> --- a/lib/eventdev/rte_eventdev.c
> +++ b/lib/eventdev/rte_eventdev.c
> @@ -176,11 +176,15 @@ rte_event_crypto_adapter_caps_get(uint8_t
> dev_id, uint8_t cdev_id,
> 
>  	if (caps == NULL)
>  		return -EINVAL;
> -	*caps = 0;
> +
> +	if (dev->dev_ops->crypto_adapter_caps_get == NULL)
> +		*caps = RTE_EVENT_CRYPTO_ADAPTER_SW_CAP;
> +	else
> +		*caps = 0;
> 
>  	return dev->dev_ops->crypto_adapter_caps_get ?
>  		(*dev->dev_ops->crypto_adapter_caps_get)
> -		(dev, cdev, caps) : -ENOTSUP;
> +		(dev, cdev, caps) : 0;
>  }
> 
>  int
> --
> 2.6.4


^ permalink raw reply	[flat|nested] 17+ messages in thread

* RE: [PATCH v5 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer
  2022-02-10 17:41       ` [PATCH v5 " Ganapati Kundapura
  2022-02-10 17:41         ` [PATCH v5 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
@ 2022-02-11  4:43         ` Gujjar, Abhinandan S
  2022-02-14 13:15           ` Jerin Jacob
  1 sibling, 1 reply; 17+ messages in thread
From: Gujjar, Abhinandan S @ 2022-02-11  4:43 UTC (permalink / raw)
  To: Kundapura, Ganapati, Jayatheerthan, Jay, jerinjacobk, dev

Acked-by: Abhinandan Gujjar <abhinandan.gujjar@intel.com>

> -----Original Message-----
> From: Kundapura, Ganapati <ganapati.kundapura@intel.com>
> Sent: Thursday, February 10, 2022 11:11 PM
> To: Jayatheerthan, Jay <jay.jayatheerthan@intel.com>;
> jerinjacobk@gmail.com; Gujjar, Abhinandan S
> <abhinandan.gujjar@intel.com>; dev@dpdk.org
> Subject: [PATCH v5 1/2] eventdev/crypto_adapter: move crypto ops to
> circular buffer
> 
> Move crypto ops to circular buffer to retain crypto ops when
> cryptodev/eventdev are temporarily full
> 
> Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>
> 
> ---
> v5:
> * Add branch prediction to if conditions
> 
> v4:
> * Retain the non enqueued crypto ops in circular buffer to
>   process later and stop the dequeue from eventdev till
>   all the crypto ops are enqueued to cryptodev
> 
>   check space in circular buffer and stop dequeue from
>   eventdev if some ops failed to flush to cdev
>   and no space for another batch is available in circular buffer
> 
>   Enable dequeue from eventdev after all the ops are flushed
> 
> v3:
> * update eca_ops_buffer_flush() to flush out all the crypto
>   ops out of circular buffer.
> * remove freeing of failed crypto ops from eca_ops_enqueue_burst()
>   and add to cirular buffer for later processing.
> 
> v2:
> * reset crypto adapter next cdev id before dequeueing from the
>   next cdev
> ---
> 
> diff --git a/lib/eventdev/rte_event_crypto_adapter.c
> b/lib/eventdev/rte_event_crypto_adapter.c
> index d840803..0b484f3 100644
> --- a/lib/eventdev/rte_event_crypto_adapter.c
> +++ b/lib/eventdev/rte_event_crypto_adapter.c
> @@ -25,11 +25,27 @@
>  #define CRYPTO_ADAPTER_MEM_NAME_LEN 32
>  #define CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
> 
> +#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
> #define
> +CRYPTO_ADAPTER_BUFFER_SZ 1024
> +
>  /* Flush an instance's enqueue buffers every
> CRYPTO_ENQ_FLUSH_THRESHOLD
>   * iterations of eca_crypto_adapter_enq_run()
>   */
>  #define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
> 
> +struct crypto_ops_circular_buffer {
> +	/* index of head element in circular buffer */
> +	uint16_t head;
> +	/* index of tail element in circular buffer */
> +	uint16_t tail;
> +	/* number of elements in buffer */
> +	uint16_t count;
> +	/* size of circular buffer */
> +	uint16_t size;
> +	/* Pointer to hold rte_crypto_ops for batching */
> +	struct rte_crypto_op **op_buffer;
> +} __rte_cache_aligned;
> +
>  struct event_crypto_adapter {
>  	/* Event device identifier */
>  	uint8_t eventdev_id;
> @@ -37,6 +53,10 @@ struct event_crypto_adapter {
>  	uint8_t event_port_id;
>  	/* Store event device's implicit release capability */
>  	uint8_t implicit_release_disabled;
> +	/* Flag to indicate backpressure at cryptodev
> +	 * Stop further dequeuing events from eventdev
> +	 */
> +	bool stop_enq_to_cryptodev;
>  	/* Max crypto ops processed in any service function invocation */
>  	uint32_t max_nb;
>  	/* Lock to serialize config updates with service function */ @@ -47,6
> +67,8 @@ struct event_crypto_adapter {
>  	struct crypto_device_info *cdevs;
>  	/* Loop counter to flush crypto ops */
>  	uint16_t transmit_loop_count;
> +	/* Circular buffer for batching crypto ops to eventdev */
> +	struct crypto_ops_circular_buffer ebuf;
>  	/* Per instance stats structure */
>  	struct rte_event_crypto_adapter_stats crypto_stats;
>  	/* Configuration callback for rte_service configuration */ @@ -93,10
> +115,8 @@ struct crypto_device_info {  struct crypto_queue_pair_info {
>  	/* Set to indicate queue pair is enabled */
>  	bool qp_enabled;
> -	/* Pointer to hold rte_crypto_ops for batching */
> -	struct rte_crypto_op **op_buffer;
> -	/* No of crypto ops accumulated */
> -	uint8_t len;
> +	/* Circular buffer for batching crypto ops to cdev */
> +	struct crypto_ops_circular_buffer cbuf;
>  } __rte_cache_aligned;
> 
>  static struct event_crypto_adapter **event_crypto_adapter; @@ -141,6
> +161,84 @@ eca_init(void)
>  	return 0;
>  }
> 
> +static inline bool
> +eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer
> +*bufp) {
> +	return bufp->count >= BATCH_SIZE;
> +}
> +
> +static inline bool
> +eca_circular_buffer_space_for_batch(struct crypto_ops_circular_buffer
> +*bufp) {
> +	return (bufp->size - bufp->count) >= BATCH_SIZE; }
> +
> +static inline void
> +eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp) {
> +	rte_free(bufp->op_buffer);
> +}
> +
> +static inline int
> +eca_circular_buffer_init(const char *name,
> +			 struct crypto_ops_circular_buffer *bufp,
> +			 uint16_t sz)
> +{
> +	bufp->op_buffer = rte_zmalloc(name,
> +				      sizeof(struct rte_crypto_op *) * sz,
> +				      0);
> +	if (bufp->op_buffer == NULL)
> +		return -ENOMEM;
> +
> +	bufp->size = sz;
> +	return 0;
> +}
> +
> +static inline int
> +eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
> +			struct rte_crypto_op *op)
> +{
> +	uint16_t *tailp = &bufp->tail;
> +
> +	bufp->op_buffer[*tailp] = op;
> +	/* circular buffer, go round */
> +	*tailp = (*tailp + 1) % bufp->size;
> +	bufp->count++;
> +
> +	return 0;
> +}
> +
> +static inline int
> +eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer
> *bufp,
> +				  uint8_t cdev_id, uint16_t qp_id,
> +				  uint16_t *nb_ops_flushed)
> +{
> +	uint16_t n = 0;
> +	uint16_t *headp = &bufp->head;
> +	uint16_t *tailp = &bufp->tail;
> +	struct rte_crypto_op **ops = bufp->op_buffer;
> +
> +	if (*tailp > *headp)
> +		n = *tailp - *headp;
> +	else if (*tailp < *headp)
> +		n = bufp->size - *headp;
> +	else {
> +		*nb_ops_flushed = 0;
> +		return 0;  /* buffer empty */
> +	}
> +
> +	*nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
> +						      &ops[*headp], n);
> +	bufp->count -= *nb_ops_flushed;
> +	if (!bufp->count) {
> +		*headp = 0;
> +		*tailp = 0;
> +	} else
> +		*headp = (*headp + *nb_ops_flushed) % bufp->size;
> +
> +	return *nb_ops_flushed == n ? 0 : -1;
> +}
> +
>  static inline struct event_crypto_adapter *  eca_id_to_adapter(uint8_t id)  {
> @@ -237,10 +335,19 @@ rte_event_crypto_adapter_create_ext(uint8_t id,
> uint8_t dev_id,
>  		return -ENOMEM;
>  	}
> 
> +	if (eca_circular_buffer_init("eca_edev_circular_buffer",
> +				     &adapter->ebuf,
> +				     CRYPTO_ADAPTER_BUFFER_SZ)) {
> +		RTE_EDEV_LOG_ERR("Failed to get memory for eventdev
> buffer");
> +		rte_free(adapter);
> +		return -ENOMEM;
> +	}
> +
>  	ret = rte_event_dev_info_get(dev_id, &dev_info);
>  	if (ret < 0) {
>  		RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d:
> %s!",
>  				 dev_id, dev_info.driver_name);
> +		eca_circular_buffer_free(&adapter->ebuf);
>  		rte_free(adapter);
>  		return ret;
>  	}
> @@ -259,6 +366,7 @@ rte_event_crypto_adapter_create_ext(uint8_t id,
> uint8_t dev_id,
>  					socket_id);
>  	if (adapter->cdevs == NULL) {
>  		RTE_EDEV_LOG_ERR("Failed to get mem for crypto
> devices\n");
> +		eca_circular_buffer_free(&adapter->ebuf);
>  		rte_free(adapter);
>  		return -ENOMEM;
>  	}
> @@ -337,10 +445,10 @@ eca_enq_to_cryptodev(struct
> event_crypto_adapter *adapter, struct rte_event *ev,
>  	struct crypto_queue_pair_info *qp_info = NULL;
>  	struct rte_crypto_op *crypto_op;
>  	unsigned int i, n;
> -	uint16_t qp_id, len, ret;
> +	uint16_t qp_id, nb_enqueued = 0;
>  	uint8_t cdev_id;
> +	int ret;
> 
> -	len = 0;
>  	ret = 0;
>  	n = 0;
>  	stats->event_deq_count += cnt;
> @@ -366,9 +474,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
>  				rte_crypto_op_free(crypto_op);
>  				continue;
>  			}
> -			len = qp_info->len;
> -			qp_info->op_buffer[len] = crypto_op;
> -			len++;
> +			eca_circular_buffer_add(&qp_info->cbuf,
> crypto_op);
>  		} else if (crypto_op->sess_type ==
> RTE_CRYPTO_OP_SESSIONLESS &&
>  				crypto_op->private_data_offset) {
>  			m_data = (union rte_event_crypto_metadata *) @@
> -382,87 +488,91 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
>  				rte_crypto_op_free(crypto_op);
>  				continue;
>  			}
> -			len = qp_info->len;
> -			qp_info->op_buffer[len] = crypto_op;
> -			len++;
> +			eca_circular_buffer_add(&qp_info->cbuf,
> crypto_op);
>  		} else {
>  			rte_pktmbuf_free(crypto_op->sym->m_src);
>  			rte_crypto_op_free(crypto_op);
>  			continue;
>  		}
> 
> -		if (len == BATCH_SIZE) {
> -			struct rte_crypto_op **op_buffer = qp_info-
> >op_buffer;
> -			ret = rte_cryptodev_enqueue_burst(cdev_id,
> -							  qp_id,
> -							  op_buffer,
> -							  BATCH_SIZE);
> -
> -			stats->crypto_enq_count += ret;
> -
> -			while (ret < len) {
> -				struct rte_crypto_op *op;
> -				op = op_buffer[ret++];
> -				stats->crypto_enq_fail++;
> -				rte_pktmbuf_free(op->sym->m_src);
> -				rte_crypto_op_free(op);
> -			}
> -
> -			len = 0;
> +		if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
> +			ret = eca_circular_buffer_flush_to_cdev(&qp_info-
> >cbuf,
> +								cdev_id,
> +								qp_id,
> +
> 	&nb_enqueued);
> +			/**
> +			 * If some crypto ops failed to flush to cdev and
> +			 * space for another batch is not available, stop
> +			 * dequeue from eventdev momentarily
> +			 */
> +			if (unlikely(ret < 0 &&
> +				!eca_circular_buffer_space_for_batch(
> +							&qp_info->cbuf)))
> +				adapter->stop_enq_to_cryptodev = true;
>  		}
> 
> -		if (qp_info)
> -			qp_info->len = len;
> -		n += ret;
> +		stats->crypto_enq_count += nb_enqueued;
> +		n += nb_enqueued;
>  	}
> 
>  	return n;
>  }
> 
>  static unsigned int
> -eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
> +eca_crypto_cdev_flush(struct event_crypto_adapter *adapter,
> +		      uint8_t cdev_id, uint16_t *nb_ops_flushed)
>  {
> -	struct rte_event_crypto_adapter_stats *stats = &adapter-
> >crypto_stats;
>  	struct crypto_device_info *curr_dev;
>  	struct crypto_queue_pair_info *curr_queue;
> -	struct rte_crypto_op **op_buffer;
>  	struct rte_cryptodev *dev;
> -	uint8_t cdev_id;
> +	uint16_t nb = 0, nb_enqueued = 0;
>  	uint16_t qp;
> -	uint16_t ret;
> -	uint16_t num_cdev = rte_cryptodev_count();
> 
> -	ret = 0;
> -	for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
> -		curr_dev = &adapter->cdevs[cdev_id];
> -		dev = curr_dev->dev;
> -		if (dev == NULL)
> -			continue;
> -		for (qp = 0; qp < dev->data->nb_queue_pairs; qp++) {
> +	curr_dev = &adapter->cdevs[cdev_id];
> +	if (unlikely(curr_dev == NULL))
> +		return 0;
> 
> -			curr_queue = &curr_dev->qpairs[qp];
> -			if (!curr_queue->qp_enabled)
> -				continue;
> +	dev = rte_cryptodev_pmd_get_dev(cdev_id);
> +	for (qp = 0; qp < dev->data->nb_queue_pairs; qp++) {
> 
> -			op_buffer = curr_queue->op_buffer;
> -			ret = rte_cryptodev_enqueue_burst(cdev_id,
> -							  qp,
> -							  op_buffer,
> -							  curr_queue->len);
> -			stats->crypto_enq_count += ret;
> -
> -			while (ret < curr_queue->len) {
> -				struct rte_crypto_op *op;
> -				op = op_buffer[ret++];
> -				stats->crypto_enq_fail++;
> -				rte_pktmbuf_free(op->sym->m_src);
> -				rte_crypto_op_free(op);
> -			}
> -			curr_queue->len = 0;
> -		}
> +		curr_queue = &curr_dev->qpairs[qp];
> +		if (unlikely(curr_queue == NULL || !curr_queue-
> >qp_enabled))
> +			continue;
> +
> +		eca_circular_buffer_flush_to_cdev(&curr_queue->cbuf,
> +						  cdev_id,
> +						  qp,
> +						  &nb_enqueued);
> +		*nb_ops_flushed += curr_queue->cbuf.count;
> +		nb += nb_enqueued;
>  	}
> 
> -	return ret;
> +	return nb;
> +}
> +
> +static unsigned int
> +eca_crypto_enq_flush(struct event_crypto_adapter *adapter) {
> +	struct rte_event_crypto_adapter_stats *stats = &adapter-
> >crypto_stats;
> +	uint8_t cdev_id;
> +	uint16_t nb_enqueued = 0;
> +	uint16_t nb_ops_flushed = 0;
> +	uint16_t num_cdev = rte_cryptodev_count();
> +
> +	for (cdev_id = 0; cdev_id < num_cdev; cdev_id++)
> +		nb_enqueued += eca_crypto_cdev_flush(adapter,
> +						    cdev_id,
> +						    &nb_ops_flushed);
> +	/**
> +	 * Enable dequeue from eventdev if all ops from circular
> +	 * buffer flushed to cdev
> +	 */
> +	if (!nb_ops_flushed)
> +		adapter->stop_enq_to_cryptodev = false;
> +
> +	stats->crypto_enq_count += nb_enqueued;
> +
> +	return nb_enqueued;
>  }
> 
>  static int
> @@ -480,6 +590,13 @@ eca_crypto_adapter_enq_run(struct
> event_crypto_adapter *adapter,
>  	if (adapter->mode == RTE_EVENT_CRYPTO_ADAPTER_OP_NEW)
>  		return 0;
> 
> +	if (unlikely(adapter->stop_enq_to_cryptodev)) {
> +		nb_enqueued += eca_crypto_enq_flush(adapter);
> +
> +		if (unlikely(adapter->stop_enq_to_cryptodev))
> +			goto skip_event_dequeue_burst;
> +	}
> +
>  	for (nb_enq = 0; nb_enq < max_enq; nb_enq += n) {
>  		stats->event_poll_count++;
>  		n = rte_event_dequeue_burst(event_dev_id,
> @@ -491,6 +608,8 @@ eca_crypto_adapter_enq_run(struct
> event_crypto_adapter *adapter,
>  		nb_enqueued += eca_enq_to_cryptodev(adapter, ev, n);
>  	}
> 
> +skip_event_dequeue_burst:
> +
>  	if ((++adapter->transmit_loop_count &
>  		(CRYPTO_ENQ_FLUSH_THRESHOLD - 1)) == 0) {
>  		nb_enqueued += eca_crypto_enq_flush(adapter); @@ -
> 499,9 +618,9 @@ eca_crypto_adapter_enq_run(struct
> event_crypto_adapter *adapter,
>  	return nb_enqueued;
>  }
> 
> -static inline void
> +static inline uint16_t
>  eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
> -		      struct rte_crypto_op **ops, uint16_t num)
> +		  struct rte_crypto_op **ops, uint16_t num)
>  {
>  	struct rte_event_crypto_adapter_stats *stats = &adapter-
> >crypto_stats;
>  	union rte_event_crypto_metadata *m_data = NULL; @@ -518,6
> +637,8 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
>  	num = RTE_MIN(num, BATCH_SIZE);
>  	for (i = 0; i < num; i++) {
>  		struct rte_event *ev = &events[nb_ev++];
> +
> +		m_data = NULL;
>  		if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
>  			m_data =
> rte_cryptodev_sym_session_get_user_data(
>  					ops[i]->sym->session);
> @@ -548,21 +669,56 @@ eca_ops_enqueue_burst(struct
> event_crypto_adapter *adapter,
>  						  event_port_id,
>  						  &events[nb_enqueued],
>  						  nb_ev - nb_enqueued);
> +
>  	} while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
>  		 nb_enqueued < nb_ev);
> 
> -	/* Free mbufs and rte_crypto_ops for failed events */
> -	for (i = nb_enqueued; i < nb_ev; i++) {
> -		struct rte_crypto_op *op = events[i].event_ptr;
> -		rte_pktmbuf_free(op->sym->m_src);
> -		rte_crypto_op_free(op);
> -	}
> -
>  	stats->event_enq_fail_count += nb_ev - nb_enqueued;
>  	stats->event_enq_count += nb_enqueued;
>  	stats->event_enq_retry_count += retry - 1;
> +
> +	return nb_enqueued;
> +}
> +
> +static int
> +eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter
> *adapter,
> +				   struct crypto_ops_circular_buffer *bufp) {
> +	uint16_t n = 0, nb_ops_flushed;
> +	uint16_t *headp = &bufp->head;
> +	uint16_t *tailp = &bufp->tail;
> +	struct rte_crypto_op **ops = bufp->op_buffer;
> +
> +	if (*tailp > *headp)
> +		n = *tailp - *headp;
> +	else if (*tailp < *headp)
> +		n = bufp->size - *headp;
> +	else
> +		return 0;  /* buffer empty */
> +
> +	nb_ops_flushed =  eca_ops_enqueue_burst(adapter, ops, n);
> +	bufp->count -= nb_ops_flushed;
> +	if (!bufp->count) {
> +		*headp = 0;
> +		*tailp = 0;
> +		return 0;  /* buffer empty */
> +	}
> +
> +	*headp = (*headp + nb_ops_flushed) % bufp->size;
> +	return 1;
>  }
> 
> +
> +static void
> +eca_ops_buffer_flush(struct event_crypto_adapter *adapter) {
> +	if (likely(adapter->ebuf.count == 0))
> +		return;
> +
> +	while (eca_circular_buffer_flush_to_evdev(adapter,
> +						  &adapter->ebuf))
> +		;
> +}
>  static inline unsigned int
>  eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
>  			   unsigned int max_deq)
> @@ -571,7 +727,7 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
>  	struct crypto_device_info *curr_dev;
>  	struct crypto_queue_pair_info *curr_queue;
>  	struct rte_crypto_op *ops[BATCH_SIZE];
> -	uint16_t n, nb_deq;
> +	uint16_t n, nb_deq, nb_enqueued, i;
>  	struct rte_cryptodev *dev;
>  	uint8_t cdev_id;
>  	uint16_t qp, dev_qps;
> @@ -579,16 +735,20 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
>  	uint16_t num_cdev = rte_cryptodev_count();
> 
>  	nb_deq = 0;
> +	eca_ops_buffer_flush(adapter);
> +
>  	do {
> -		uint16_t queues = 0;
>  		done = true;
> 
>  		for (cdev_id = adapter->next_cdev_id;
>  			cdev_id < num_cdev; cdev_id++) {
> +			uint16_t queues = 0;
> +
>  			curr_dev = &adapter->cdevs[cdev_id];
>  			dev = curr_dev->dev;
> -			if (dev == NULL)
> +			if (unlikely(dev == NULL))
>  				continue;
> +
>  			dev_qps = dev->data->nb_queue_pairs;
> 
>  			for (qp = curr_dev->next_queue_pair_id; @@ -596,7
> +756,8 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter
> *adapter,
>  				queues++) {
> 
>  				curr_queue = &curr_dev->qpairs[qp];
> -				if (!curr_queue->qp_enabled)
> +				if (unlikely(curr_queue == NULL ||
> +				    !curr_queue->qp_enabled))
>  					continue;
> 
>  				n = rte_cryptodev_dequeue_burst(cdev_id,
> qp, @@ -605,11 +766,27 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
>  					continue;
> 
>  				done = false;
> +				nb_enqueued = 0;
> +
>  				stats->crypto_deq_count += n;
> -				eca_ops_enqueue_burst(adapter, ops, n);
> +
> +				if (unlikely(!adapter->ebuf.count))
> +					nb_enqueued =
> eca_ops_enqueue_burst(
> +							adapter, ops, n);
> +
> +				if (likely(nb_enqueued == n))
> +					goto check;
> +
> +				/* Failed to enqueue events case */
> +				for (i = nb_enqueued; i < n; i++)
> +					eca_circular_buffer_add(
> +						&adapter->ebuf,
> +						ops[nb_enqueued]);
> +
> +check:
>  				nb_deq += n;
> 
> -				if (nb_deq > max_deq) {
> +				if (nb_deq >= max_deq) {
>  					if ((qp + 1) == dev_qps) {
>  						adapter->next_cdev_id =
>  							(cdev_id + 1)
> @@ -622,6 +799,7 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
>  				}
>  			}
>  		}
> +		adapter->next_cdev_id = 0;
>  	} while (done == false);
>  	return nb_deq;
>  }
> @@ -751,11 +929,12 @@ eca_add_queue_pair(struct event_crypto_adapter
> *adapter, uint8_t cdev_id,
>  			return -ENOMEM;
> 
>  		qpairs = dev_info->qpairs;
> -		qpairs->op_buffer = rte_zmalloc_socket(adapter-
> >mem_name,
> -					BATCH_SIZE *
> -					sizeof(struct rte_crypto_op *),
> -					0, adapter->socket_id);
> -		if (!qpairs->op_buffer) {
> +
> +		if (eca_circular_buffer_init("eca_cdev_circular_buffer",
> +					     &qpairs->cbuf,
> +
> CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
> +			RTE_EDEV_LOG_ERR("Failed to get memory for
> cryptodev "
> +					 "buffer");
>  			rte_free(qpairs);
>  			return -ENOMEM;
>  		}
> --
> 2.6.4


^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [PATCH v5 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer
  2022-02-11  4:43         ` [PATCH v5 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Gujjar, Abhinandan S
@ 2022-02-14 13:15           ` Jerin Jacob
  0 siblings, 0 replies; 17+ messages in thread
From: Jerin Jacob @ 2022-02-14 13:15 UTC (permalink / raw)
  To: Gujjar, Abhinandan S; +Cc: Kundapura, Ganapati, Jayatheerthan, Jay, dev

On Fri, Feb 11, 2022 at 10:13 AM Gujjar, Abhinandan S
<abhinandan.gujjar@intel.com> wrote:
>
> Acked-by: Abhinandan Gujjar <abhinandan.gujjar@intel.com>

Series applied to dpdk-next-net-eventdev/for-main. Thanks


>
> > -----Original Message-----
> > From: Kundapura, Ganapati <ganapati.kundapura@intel.com>
> > Sent: Thursday, February 10, 2022 11:11 PM
> > To: Jayatheerthan, Jay <jay.jayatheerthan@intel.com>;
> > jerinjacobk@gmail.com; Gujjar, Abhinandan S
> > <abhinandan.gujjar@intel.com>; dev@dpdk.org
> > Subject: [PATCH v5 1/2] eventdev/crypto_adapter: move crypto ops to
> > circular buffer
> >
> > Move crypto ops to circular buffer to retain crypto ops when
> > cryptodev/eventdev are temporarily full
> >
> > Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>
> >
> > ---
> > v5:
> > * Add branch prediction to if conditions
> >
> > v4:
> > * Retain the non enqueued crypto ops in circular buffer to
> >   process later and stop the dequeue from eventdev till
> >   all the crypto ops are enqueued to cryptodev
> >
> >   check space in circular buffer and stop dequeue from
> >   eventdev if some ops failed to flush to cdev
> >   and no space for another batch is available in circular buffer
> >
> >   Enable dequeue from eventdev after all the ops are flushed
> >
> > v3:
> > * update eca_ops_buffer_flush() to flush out all the crypto
> >   ops out of circular buffer.
> > * remove freeing of failed crypto ops from eca_ops_enqueue_burst()
> >   and add to cirular buffer for later processing.
> >
> > v2:
> > * reset crypto adapter next cdev id before dequeueing from the
> >   next cdev
> > ---
> >
> > diff --git a/lib/eventdev/rte_event_crypto_adapter.c
> > b/lib/eventdev/rte_event_crypto_adapter.c
> > index d840803..0b484f3 100644
> > --- a/lib/eventdev/rte_event_crypto_adapter.c
> > +++ b/lib/eventdev/rte_event_crypto_adapter.c
> > @@ -25,11 +25,27 @@
> >  #define CRYPTO_ADAPTER_MEM_NAME_LEN 32
> >  #define CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
> >
> > +#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
> > #define
> > +CRYPTO_ADAPTER_BUFFER_SZ 1024
> > +
> >  /* Flush an instance's enqueue buffers every
> > CRYPTO_ENQ_FLUSH_THRESHOLD
> >   * iterations of eca_crypto_adapter_enq_run()
> >   */
> >  #define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
> >
> > +struct crypto_ops_circular_buffer {
> > +     /* index of head element in circular buffer */
> > +     uint16_t head;
> > +     /* index of tail element in circular buffer */
> > +     uint16_t tail;
> > +     /* number of elements in buffer */
> > +     uint16_t count;
> > +     /* size of circular buffer */
> > +     uint16_t size;
> > +     /* Pointer to hold rte_crypto_ops for batching */
> > +     struct rte_crypto_op **op_buffer;
> > +} __rte_cache_aligned;
> > +
> >  struct event_crypto_adapter {
> >       /* Event device identifier */
> >       uint8_t eventdev_id;
> > @@ -37,6 +53,10 @@ struct event_crypto_adapter {
> >       uint8_t event_port_id;
> >       /* Store event device's implicit release capability */
> >       uint8_t implicit_release_disabled;
> > +     /* Flag to indicate backpressure at cryptodev
> > +      * Stop further dequeuing events from eventdev
> > +      */
> > +     bool stop_enq_to_cryptodev;
> >       /* Max crypto ops processed in any service function invocation */
> >       uint32_t max_nb;
> >       /* Lock to serialize config updates with service function */ @@ -47,6
> > +67,8 @@ struct event_crypto_adapter {
> >       struct crypto_device_info *cdevs;
> >       /* Loop counter to flush crypto ops */
> >       uint16_t transmit_loop_count;
> > +     /* Circular buffer for batching crypto ops to eventdev */
> > +     struct crypto_ops_circular_buffer ebuf;
> >       /* Per instance stats structure */
> >       struct rte_event_crypto_adapter_stats crypto_stats;
> >       /* Configuration callback for rte_service configuration */ @@ -93,10
> > +115,8 @@ struct crypto_device_info {  struct crypto_queue_pair_info {
> >       /* Set to indicate queue pair is enabled */
> >       bool qp_enabled;
> > -     /* Pointer to hold rte_crypto_ops for batching */
> > -     struct rte_crypto_op **op_buffer;
> > -     /* No of crypto ops accumulated */
> > -     uint8_t len;
> > +     /* Circular buffer for batching crypto ops to cdev */
> > +     struct crypto_ops_circular_buffer cbuf;
> >  } __rte_cache_aligned;
> >
> >  static struct event_crypto_adapter **event_crypto_adapter; @@ -141,6
> > +161,84 @@ eca_init(void)
> >       return 0;
> >  }
> >
> > +static inline bool
> > +eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer
> > +*bufp) {
> > +     return bufp->count >= BATCH_SIZE;
> > +}
> > +
> > +static inline bool
> > +eca_circular_buffer_space_for_batch(struct crypto_ops_circular_buffer
> > +*bufp) {
> > +     return (bufp->size - bufp->count) >= BATCH_SIZE; }
> > +
> > +static inline void
> > +eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp) {
> > +     rte_free(bufp->op_buffer);
> > +}
> > +
> > +static inline int
> > +eca_circular_buffer_init(const char *name,
> > +                      struct crypto_ops_circular_buffer *bufp,
> > +                      uint16_t sz)
> > +{
> > +     bufp->op_buffer = rte_zmalloc(name,
> > +                                   sizeof(struct rte_crypto_op *) * sz,
> > +                                   0);
> > +     if (bufp->op_buffer == NULL)
> > +             return -ENOMEM;
> > +
> > +     bufp->size = sz;
> > +     return 0;
> > +}
> > +
> > +static inline int
> > +eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
> > +                     struct rte_crypto_op *op)
> > +{
> > +     uint16_t *tailp = &bufp->tail;
> > +
> > +     bufp->op_buffer[*tailp] = op;
> > +     /* circular buffer, go round */
> > +     *tailp = (*tailp + 1) % bufp->size;
> > +     bufp->count++;
> > +
> > +     return 0;
> > +}
> > +
> > +static inline int
> > +eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer
> > *bufp,
> > +                               uint8_t cdev_id, uint16_t qp_id,
> > +                               uint16_t *nb_ops_flushed)
> > +{
> > +     uint16_t n = 0;
> > +     uint16_t *headp = &bufp->head;
> > +     uint16_t *tailp = &bufp->tail;
> > +     struct rte_crypto_op **ops = bufp->op_buffer;
> > +
> > +     if (*tailp > *headp)
> > +             n = *tailp - *headp;
> > +     else if (*tailp < *headp)
> > +             n = bufp->size - *headp;
> > +     else {
> > +             *nb_ops_flushed = 0;
> > +             return 0;  /* buffer empty */
> > +     }
> > +
> > +     *nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
> > +                                                   &ops[*headp], n);
> > +     bufp->count -= *nb_ops_flushed;
> > +     if (!bufp->count) {
> > +             *headp = 0;
> > +             *tailp = 0;
> > +     } else
> > +             *headp = (*headp + *nb_ops_flushed) % bufp->size;
> > +
> > +     return *nb_ops_flushed == n ? 0 : -1;
> > +}
> > +
> >  static inline struct event_crypto_adapter *  eca_id_to_adapter(uint8_t id)  {
> > @@ -237,10 +335,19 @@ rte_event_crypto_adapter_create_ext(uint8_t id,
> > uint8_t dev_id,
> >               return -ENOMEM;
> >       }
> >
> > +     if (eca_circular_buffer_init("eca_edev_circular_buffer",
> > +                                  &adapter->ebuf,
> > +                                  CRYPTO_ADAPTER_BUFFER_SZ)) {
> > +             RTE_EDEV_LOG_ERR("Failed to get memory for eventdev
> > buffer");
> > +             rte_free(adapter);
> > +             return -ENOMEM;
> > +     }
> > +
> >       ret = rte_event_dev_info_get(dev_id, &dev_info);
> >       if (ret < 0) {
> >               RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d:
> > %s!",
> >                                dev_id, dev_info.driver_name);
> > +             eca_circular_buffer_free(&adapter->ebuf);
> >               rte_free(adapter);
> >               return ret;
> >       }
> > @@ -259,6 +366,7 @@ rte_event_crypto_adapter_create_ext(uint8_t id,
> > uint8_t dev_id,
> >                                       socket_id);
> >       if (adapter->cdevs == NULL) {
> >               RTE_EDEV_LOG_ERR("Failed to get mem for crypto
> > devices\n");
> > +             eca_circular_buffer_free(&adapter->ebuf);
> >               rte_free(adapter);
> >               return -ENOMEM;
> >       }
> > @@ -337,10 +445,10 @@ eca_enq_to_cryptodev(struct
> > event_crypto_adapter *adapter, struct rte_event *ev,
> >       struct crypto_queue_pair_info *qp_info = NULL;
> >       struct rte_crypto_op *crypto_op;
> >       unsigned int i, n;
> > -     uint16_t qp_id, len, ret;
> > +     uint16_t qp_id, nb_enqueued = 0;
> >       uint8_t cdev_id;
> > +     int ret;
> >
> > -     len = 0;
> >       ret = 0;
> >       n = 0;
> >       stats->event_deq_count += cnt;
> > @@ -366,9 +474,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> > *adapter, struct rte_event *ev,
> >                               rte_crypto_op_free(crypto_op);
> >                               continue;
> >                       }
> > -                     len = qp_info->len;
> > -                     qp_info->op_buffer[len] = crypto_op;
> > -                     len++;
> > +                     eca_circular_buffer_add(&qp_info->cbuf,
> > crypto_op);
> >               } else if (crypto_op->sess_type ==
> > RTE_CRYPTO_OP_SESSIONLESS &&
> >                               crypto_op->private_data_offset) {
> >                       m_data = (union rte_event_crypto_metadata *) @@
> > -382,87 +488,91 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> > *adapter, struct rte_event *ev,
> >                               rte_crypto_op_free(crypto_op);
> >                               continue;
> >                       }
> > -                     len = qp_info->len;
> > -                     qp_info->op_buffer[len] = crypto_op;
> > -                     len++;
> > +                     eca_circular_buffer_add(&qp_info->cbuf,
> > crypto_op);
> >               } else {
> >                       rte_pktmbuf_free(crypto_op->sym->m_src);
> >                       rte_crypto_op_free(crypto_op);
> >                       continue;
> >               }
> >
> > -             if (len == BATCH_SIZE) {
> > -                     struct rte_crypto_op **op_buffer = qp_info-
> > >op_buffer;
> > -                     ret = rte_cryptodev_enqueue_burst(cdev_id,
> > -                                                       qp_id,
> > -                                                       op_buffer,
> > -                                                       BATCH_SIZE);
> > -
> > -                     stats->crypto_enq_count += ret;
> > -
> > -                     while (ret < len) {
> > -                             struct rte_crypto_op *op;
> > -                             op = op_buffer[ret++];
> > -                             stats->crypto_enq_fail++;
> > -                             rte_pktmbuf_free(op->sym->m_src);
> > -                             rte_crypto_op_free(op);
> > -                     }
> > -
> > -                     len = 0;
> > +             if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
> > +                     ret = eca_circular_buffer_flush_to_cdev(&qp_info-
> > >cbuf,
> > +                                                             cdev_id,
> > +                                                             qp_id,
> > +
> >       &nb_enqueued);
> > +                     /**
> > +                      * If some crypto ops failed to flush to cdev and
> > +                      * space for another batch is not available, stop
> > +                      * dequeue from eventdev momentarily
> > +                      */
> > +                     if (unlikely(ret < 0 &&
> > +                             !eca_circular_buffer_space_for_batch(
> > +                                                     &qp_info->cbuf)))
> > +                             adapter->stop_enq_to_cryptodev = true;
> >               }
> >
> > -             if (qp_info)
> > -                     qp_info->len = len;
> > -             n += ret;
> > +             stats->crypto_enq_count += nb_enqueued;
> > +             n += nb_enqueued;
> >       }
> >
> >       return n;
> >  }
> >
> >  static unsigned int
> > -eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
> > +eca_crypto_cdev_flush(struct event_crypto_adapter *adapter,
> > +                   uint8_t cdev_id, uint16_t *nb_ops_flushed)
> >  {
> > -     struct rte_event_crypto_adapter_stats *stats = &adapter-
> > >crypto_stats;
> >       struct crypto_device_info *curr_dev;
> >       struct crypto_queue_pair_info *curr_queue;
> > -     struct rte_crypto_op **op_buffer;
> >       struct rte_cryptodev *dev;
> > -     uint8_t cdev_id;
> > +     uint16_t nb = 0, nb_enqueued = 0;
> >       uint16_t qp;
> > -     uint16_t ret;
> > -     uint16_t num_cdev = rte_cryptodev_count();
> >
> > -     ret = 0;
> > -     for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
> > -             curr_dev = &adapter->cdevs[cdev_id];
> > -             dev = curr_dev->dev;
> > -             if (dev == NULL)
> > -                     continue;
> > -             for (qp = 0; qp < dev->data->nb_queue_pairs; qp++) {
> > +     curr_dev = &adapter->cdevs[cdev_id];
> > +     if (unlikely(curr_dev == NULL))
> > +             return 0;
> >
> > -                     curr_queue = &curr_dev->qpairs[qp];
> > -                     if (!curr_queue->qp_enabled)
> > -                             continue;
> > +     dev = rte_cryptodev_pmd_get_dev(cdev_id);
> > +     for (qp = 0; qp < dev->data->nb_queue_pairs; qp++) {
> >
> > -                     op_buffer = curr_queue->op_buffer;
> > -                     ret = rte_cryptodev_enqueue_burst(cdev_id,
> > -                                                       qp,
> > -                                                       op_buffer,
> > -                                                       curr_queue->len);
> > -                     stats->crypto_enq_count += ret;
> > -
> > -                     while (ret < curr_queue->len) {
> > -                             struct rte_crypto_op *op;
> > -                             op = op_buffer[ret++];
> > -                             stats->crypto_enq_fail++;
> > -                             rte_pktmbuf_free(op->sym->m_src);
> > -                             rte_crypto_op_free(op);
> > -                     }
> > -                     curr_queue->len = 0;
> > -             }
> > +             curr_queue = &curr_dev->qpairs[qp];
> > +             if (unlikely(curr_queue == NULL || !curr_queue-
> > >qp_enabled))
> > +                     continue;
> > +
> > +             eca_circular_buffer_flush_to_cdev(&curr_queue->cbuf,
> > +                                               cdev_id,
> > +                                               qp,
> > +                                               &nb_enqueued);
> > +             *nb_ops_flushed += curr_queue->cbuf.count;
> > +             nb += nb_enqueued;
> >       }
> >
> > -     return ret;
> > +     return nb;
> > +}
> > +
> > +static unsigned int
> > +eca_crypto_enq_flush(struct event_crypto_adapter *adapter) {
> > +     struct rte_event_crypto_adapter_stats *stats = &adapter-
> > >crypto_stats;
> > +     uint8_t cdev_id;
> > +     uint16_t nb_enqueued = 0;
> > +     uint16_t nb_ops_flushed = 0;
> > +     uint16_t num_cdev = rte_cryptodev_count();
> > +
> > +     for (cdev_id = 0; cdev_id < num_cdev; cdev_id++)
> > +             nb_enqueued += eca_crypto_cdev_flush(adapter,
> > +                                                 cdev_id,
> > +                                                 &nb_ops_flushed);
> > +     /**
> > +      * Enable dequeue from eventdev if all ops from circular
> > +      * buffer flushed to cdev
> > +      */
> > +     if (!nb_ops_flushed)
> > +             adapter->stop_enq_to_cryptodev = false;
> > +
> > +     stats->crypto_enq_count += nb_enqueued;
> > +
> > +     return nb_enqueued;
> >  }
> >
> >  static int
> > @@ -480,6 +590,13 @@ eca_crypto_adapter_enq_run(struct
> > event_crypto_adapter *adapter,
> >       if (adapter->mode == RTE_EVENT_CRYPTO_ADAPTER_OP_NEW)
> >               return 0;
> >
> > +     if (unlikely(adapter->stop_enq_to_cryptodev)) {
> > +             nb_enqueued += eca_crypto_enq_flush(adapter);
> > +
> > +             if (unlikely(adapter->stop_enq_to_cryptodev))
> > +                     goto skip_event_dequeue_burst;
> > +     }
> > +
> >       for (nb_enq = 0; nb_enq < max_enq; nb_enq += n) {
> >               stats->event_poll_count++;
> >               n = rte_event_dequeue_burst(event_dev_id,
> > @@ -491,6 +608,8 @@ eca_crypto_adapter_enq_run(struct
> > event_crypto_adapter *adapter,
> >               nb_enqueued += eca_enq_to_cryptodev(adapter, ev, n);
> >       }
> >
> > +skip_event_dequeue_burst:
> > +
> >       if ((++adapter->transmit_loop_count &
> >               (CRYPTO_ENQ_FLUSH_THRESHOLD - 1)) == 0) {
> >               nb_enqueued += eca_crypto_enq_flush(adapter); @@ -
> > 499,9 +618,9 @@ eca_crypto_adapter_enq_run(struct
> > event_crypto_adapter *adapter,
> >       return nb_enqueued;
> >  }
> >
> > -static inline void
> > +static inline uint16_t
> >  eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
> > -                   struct rte_crypto_op **ops, uint16_t num)
> > +               struct rte_crypto_op **ops, uint16_t num)
> >  {
> >       struct rte_event_crypto_adapter_stats *stats = &adapter-
> > >crypto_stats;
> >       union rte_event_crypto_metadata *m_data = NULL; @@ -518,6
> > +637,8 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
> >       num = RTE_MIN(num, BATCH_SIZE);
> >       for (i = 0; i < num; i++) {
> >               struct rte_event *ev = &events[nb_ev++];
> > +
> > +             m_data = NULL;
> >               if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
> >                       m_data =
> > rte_cryptodev_sym_session_get_user_data(
> >                                       ops[i]->sym->session);
> > @@ -548,21 +669,56 @@ eca_ops_enqueue_burst(struct
> > event_crypto_adapter *adapter,
> >                                                 event_port_id,
> >                                                 &events[nb_enqueued],
> >                                                 nb_ev - nb_enqueued);
> > +
> >       } while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
> >                nb_enqueued < nb_ev);
> >
> > -     /* Free mbufs and rte_crypto_ops for failed events */
> > -     for (i = nb_enqueued; i < nb_ev; i++) {
> > -             struct rte_crypto_op *op = events[i].event_ptr;
> > -             rte_pktmbuf_free(op->sym->m_src);
> > -             rte_crypto_op_free(op);
> > -     }
> > -
> >       stats->event_enq_fail_count += nb_ev - nb_enqueued;
> >       stats->event_enq_count += nb_enqueued;
> >       stats->event_enq_retry_count += retry - 1;
> > +
> > +     return nb_enqueued;
> > +}
> > +
> > +static int
> > +eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter
> > *adapter,
> > +                                struct crypto_ops_circular_buffer *bufp) {
> > +     uint16_t n = 0, nb_ops_flushed;
> > +     uint16_t *headp = &bufp->head;
> > +     uint16_t *tailp = &bufp->tail;
> > +     struct rte_crypto_op **ops = bufp->op_buffer;
> > +
> > +     if (*tailp > *headp)
> > +             n = *tailp - *headp;
> > +     else if (*tailp < *headp)
> > +             n = bufp->size - *headp;
> > +     else
> > +             return 0;  /* buffer empty */
> > +
> > +     nb_ops_flushed =  eca_ops_enqueue_burst(adapter, ops, n);
> > +     bufp->count -= nb_ops_flushed;
> > +     if (!bufp->count) {
> > +             *headp = 0;
> > +             *tailp = 0;
> > +             return 0;  /* buffer empty */
> > +     }
> > +
> > +     *headp = (*headp + nb_ops_flushed) % bufp->size;
> > +     return 1;
> >  }
> >
> > +
> > +static void
> > +eca_ops_buffer_flush(struct event_crypto_adapter *adapter) {
> > +     if (likely(adapter->ebuf.count == 0))
> > +             return;
> > +
> > +     while (eca_circular_buffer_flush_to_evdev(adapter,
> > +                                               &adapter->ebuf))
> > +             ;
> > +}
> >  static inline unsigned int
> >  eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
> >                          unsigned int max_deq)
> > @@ -571,7 +727,7 @@ eca_crypto_adapter_deq_run(struct
> > event_crypto_adapter *adapter,
> >       struct crypto_device_info *curr_dev;
> >       struct crypto_queue_pair_info *curr_queue;
> >       struct rte_crypto_op *ops[BATCH_SIZE];
> > -     uint16_t n, nb_deq;
> > +     uint16_t n, nb_deq, nb_enqueued, i;
> >       struct rte_cryptodev *dev;
> >       uint8_t cdev_id;
> >       uint16_t qp, dev_qps;
> > @@ -579,16 +735,20 @@ eca_crypto_adapter_deq_run(struct
> > event_crypto_adapter *adapter,
> >       uint16_t num_cdev = rte_cryptodev_count();
> >
> >       nb_deq = 0;
> > +     eca_ops_buffer_flush(adapter);
> > +
> >       do {
> > -             uint16_t queues = 0;
> >               done = true;
> >
> >               for (cdev_id = adapter->next_cdev_id;
> >                       cdev_id < num_cdev; cdev_id++) {
> > +                     uint16_t queues = 0;
> > +
> >                       curr_dev = &adapter->cdevs[cdev_id];
> >                       dev = curr_dev->dev;
> > -                     if (dev == NULL)
> > +                     if (unlikely(dev == NULL))
> >                               continue;
> > +
> >                       dev_qps = dev->data->nb_queue_pairs;
> >
> >                       for (qp = curr_dev->next_queue_pair_id; @@ -596,7
> > +756,8 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter
> > *adapter,
> >                               queues++) {
> >
> >                               curr_queue = &curr_dev->qpairs[qp];
> > -                             if (!curr_queue->qp_enabled)
> > +                             if (unlikely(curr_queue == NULL ||
> > +                                 !curr_queue->qp_enabled))
> >                                       continue;
> >
> >                               n = rte_cryptodev_dequeue_burst(cdev_id,
> > qp, @@ -605,11 +766,27 @@ eca_crypto_adapter_deq_run(struct
> > event_crypto_adapter *adapter,
> >                                       continue;
> >
> >                               done = false;
> > +                             nb_enqueued = 0;
> > +
> >                               stats->crypto_deq_count += n;
> > -                             eca_ops_enqueue_burst(adapter, ops, n);
> > +
> > +                             if (unlikely(!adapter->ebuf.count))
> > +                                     nb_enqueued =
> > eca_ops_enqueue_burst(
> > +                                                     adapter, ops, n);
> > +
> > +                             if (likely(nb_enqueued == n))
> > +                                     goto check;
> > +
> > +                             /* Failed to enqueue events case */
> > +                             for (i = nb_enqueued; i < n; i++)
> > +                                     eca_circular_buffer_add(
> > +                                             &adapter->ebuf,
> > +                                             ops[nb_enqueued]);
> > +
> > +check:
> >                               nb_deq += n;
> >
> > -                             if (nb_deq > max_deq) {
> > +                             if (nb_deq >= max_deq) {
> >                                       if ((qp + 1) == dev_qps) {
> >                                               adapter->next_cdev_id =
> >                                                       (cdev_id + 1)
> > @@ -622,6 +799,7 @@ eca_crypto_adapter_deq_run(struct
> > event_crypto_adapter *adapter,
> >                               }
> >                       }
> >               }
> > +             adapter->next_cdev_id = 0;
> >       } while (done == false);
> >       return nb_deq;
> >  }
> > @@ -751,11 +929,12 @@ eca_add_queue_pair(struct event_crypto_adapter
> > *adapter, uint8_t cdev_id,
> >                       return -ENOMEM;
> >
> >               qpairs = dev_info->qpairs;
> > -             qpairs->op_buffer = rte_zmalloc_socket(adapter-
> > >mem_name,
> > -                                     BATCH_SIZE *
> > -                                     sizeof(struct rte_crypto_op *),
> > -                                     0, adapter->socket_id);
> > -             if (!qpairs->op_buffer) {
> > +
> > +             if (eca_circular_buffer_init("eca_cdev_circular_buffer",
> > +                                          &qpairs->cbuf,
> > +
> > CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
> > +                     RTE_EDEV_LOG_ERR("Failed to get memory for
> > cryptodev "
> > +                                      "buffer");
> >                       rte_free(qpairs);
> >                       return -ENOMEM;
> >               }
> > --
> > 2.6.4
>

^ permalink raw reply	[flat|nested] 17+ messages in thread

end of thread, other threads:[~2022-02-14 13:16 UTC | newest]

Thread overview: 17+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-12-21 11:31 [PATCH v1 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
2021-12-21 11:31 ` [PATCH v1 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
2022-01-04 12:31 ` [PATCH v2 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
2022-01-04 12:31   ` [PATCH v2 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
2022-01-11 10:36   ` [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
2022-01-11 10:36     ` [PATCH v3 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
2022-01-13 11:05     ` [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Gujjar, Abhinandan S
2022-02-07 10:32       ` Kundapura, Ganapati
2022-02-07 10:50     ` [PATCH v4 " Ganapati Kundapura
2022-02-07 10:50       ` [PATCH v4 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
2022-02-10 15:07       ` [PATCH v4 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Gujjar, Abhinandan S
2022-02-10 17:46         ` Kundapura, Ganapati
2022-02-10 17:41       ` [PATCH v5 " Ganapati Kundapura
2022-02-10 17:41         ` [PATCH v5 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
2022-02-11  4:43           ` Gujjar, Abhinandan S
2022-02-11  4:43         ` [PATCH v5 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Gujjar, Abhinandan S
2022-02-14 13:15           ` Jerin Jacob

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).