DPDK patches and discussions
 help / color / mirror / Atom feed
* [PATCH v1 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer
@ 2021-12-21 11:31 Ganapati Kundapura
  2021-12-21 11:31 ` [PATCH v1 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
  2022-01-04 12:31 ` [PATCH v2 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
  0 siblings, 2 replies; 7+ messages in thread
From: Ganapati Kundapura @ 2021-12-21 11:31 UTC (permalink / raw)
  To: jay.jayatheerthan, jerinjacobk, dev; +Cc: abhinandan.gujjar

Move crypto ops to circular buffer to retain crypto
ops when cryptodev/eventdev are temporarily full

Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>

diff --git a/lib/eventdev/rte_event_crypto_adapter.c b/lib/eventdev/rte_event_crypto_adapter.c
index d840803..4469a89 100644
--- a/lib/eventdev/rte_event_crypto_adapter.c
+++ b/lib/eventdev/rte_event_crypto_adapter.c
@@ -25,11 +25,27 @@
 #define CRYPTO_ADAPTER_MEM_NAME_LEN 32
 #define CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
 
+#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
+#define CRYPTO_ADAPTER_BUFFER_SZ 1024
+
 /* Flush an instance's enqueue buffers every CRYPTO_ENQ_FLUSH_THRESHOLD
  * iterations of eca_crypto_adapter_enq_run()
  */
 #define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
 
+struct crypto_ops_circular_buffer {
+	/* index of head element in circular buffer */
+	uint16_t head;
+	/* index of tail element in circular buffer */
+	uint16_t tail;
+	/* number elements in buffer */
+	uint16_t count;
+	/* size of circular buffer */
+	uint16_t size;
+	/* Pointer to hold rte_crypto_ops for batching */
+	struct rte_crypto_op **op_buffer;
+} __rte_cache_aligned;
+
 struct event_crypto_adapter {
 	/* Event device identifier */
 	uint8_t eventdev_id;
@@ -47,6 +63,8 @@ struct event_crypto_adapter {
 	struct crypto_device_info *cdevs;
 	/* Loop counter to flush crypto ops */
 	uint16_t transmit_loop_count;
+	/* Circular buffer for batching crypto ops to eventdev */
+	struct crypto_ops_circular_buffer ebuf;
 	/* Per instance stats structure */
 	struct rte_event_crypto_adapter_stats crypto_stats;
 	/* Configuration callback for rte_service configuration */
@@ -93,8 +111,8 @@ struct crypto_device_info {
 struct crypto_queue_pair_info {
 	/* Set to indicate queue pair is enabled */
 	bool qp_enabled;
-	/* Pointer to hold rte_crypto_ops for batching */
-	struct rte_crypto_op **op_buffer;
+	/* Circular buffer for batching crypto ops to cdev */
+	struct crypto_ops_circular_buffer cbuf;
 	/* No of crypto ops accumulated */
 	uint8_t len;
 } __rte_cache_aligned;
@@ -141,6 +159,77 @@ eca_init(void)
 	return 0;
 }
 
+static inline bool
+eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer *bufp)
+{
+	return bufp->count >= BATCH_SIZE;
+}
+
+static inline void
+eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp)
+{
+	rte_free(bufp->op_buffer);
+}
+
+static inline int
+eca_circular_buffer_init(const char *name,
+			 struct crypto_ops_circular_buffer *bufp,
+			 uint16_t sz)
+{
+	bufp->op_buffer = rte_zmalloc(name,
+				      sizeof(struct rte_crypto_op *) * sz,
+				      0);
+	if (bufp->op_buffer == NULL)
+		return -ENOMEM;
+
+	bufp->size = sz;
+	return 0;
+}
+
+static inline int
+eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
+			struct rte_crypto_op *op)
+{
+	uint16_t *tailp = &bufp->tail;
+
+	bufp->op_buffer[*tailp] = op;
+	*tailp = (*tailp + 1) % bufp->size;
+	bufp->count++;
+
+	return 0;
+}
+
+static inline int
+eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer *bufp,
+				  uint8_t cdev_id, uint16_t qp_id,
+				  uint16_t *nb_ops_flushed)
+{
+	uint16_t n = 0;
+	uint16_t *headp = &bufp->head;
+	uint16_t *tailp = &bufp->tail;
+	struct rte_crypto_op **ops = bufp->op_buffer;
+
+	if (*tailp > *headp)
+		n = *tailp - *headp;
+	else if (*tailp < *headp)
+		n = bufp->size - *headp;
+	else {
+		*nb_ops_flushed = 0;
+		return 0;  /* buffer empty */
+	}
+
+	*nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
+						      &ops[*headp], n);
+	bufp->count -= *nb_ops_flushed;
+	if (!bufp->count) {
+		*headp = 0;
+		*tailp = 0;
+	} else
+		*headp = (*headp + *nb_ops_flushed) % bufp->size;
+
+	return *nb_ops_flushed == n ? 0 : -1;
+}
+
 static inline struct event_crypto_adapter *
 eca_id_to_adapter(uint8_t id)
 {
@@ -237,10 +326,19 @@ rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
 		return -ENOMEM;
 	}
 
+	if (eca_circular_buffer_init("eca_edev_circular_buffer",
+				     &adapter->ebuf,
+				     CRYPTO_ADAPTER_BUFFER_SZ)) {
+		RTE_EDEV_LOG_ERR("Failed to get mem for edev buffer");
+		rte_free(adapter);
+		return -ENOMEM;
+	}
+
 	ret = rte_event_dev_info_get(dev_id, &dev_info);
 	if (ret < 0) {
 		RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d: %s!",
 				 dev_id, dev_info.driver_name);
+		eca_circular_buffer_free(&adapter->ebuf);
 		rte_free(adapter);
 		return ret;
 	}
@@ -259,6 +357,7 @@ rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
 					socket_id);
 	if (adapter->cdevs == NULL) {
 		RTE_EDEV_LOG_ERR("Failed to get mem for crypto devices\n");
+		eca_circular_buffer_free(&adapter->ebuf);
 		rte_free(adapter);
 		return -ENOMEM;
 	}
@@ -337,11 +436,10 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 	struct crypto_queue_pair_info *qp_info = NULL;
 	struct rte_crypto_op *crypto_op;
 	unsigned int i, n;
-	uint16_t qp_id, len, ret;
+	uint16_t qp_id, len, nb_enqueued = 0;
 	uint8_t cdev_id;
 
 	len = 0;
-	ret = 0;
 	n = 0;
 	stats->event_deq_count += cnt;
 
@@ -367,7 +465,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 				continue;
 			}
 			len = qp_info->len;
-			qp_info->op_buffer[len] = crypto_op;
+			eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
 			len++;
 		} else if (crypto_op->sess_type == RTE_CRYPTO_OP_SESSIONLESS &&
 				crypto_op->private_data_offset) {
@@ -383,7 +481,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 				continue;
 			}
 			len = qp_info->len;
-			qp_info->op_buffer[len] = crypto_op;
+			eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
 			len++;
 		} else {
 			rte_pktmbuf_free(crypto_op->sym->m_src);
@@ -391,18 +489,16 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 			continue;
 		}
 
-		if (len == BATCH_SIZE) {
-			struct rte_crypto_op **op_buffer = qp_info->op_buffer;
-			ret = rte_cryptodev_enqueue_burst(cdev_id,
+		if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
+			eca_circular_buffer_flush_to_cdev(&qp_info->cbuf,
+							  cdev_id,
 							  qp_id,
-							  op_buffer,
-							  BATCH_SIZE);
+							  &nb_enqueued);
+			stats->crypto_enq_count += nb_enqueued;
 
-			stats->crypto_enq_count += ret;
-
-			while (ret < len) {
+			while (nb_enqueued < len) {
 				struct rte_crypto_op *op;
-				op = op_buffer[ret++];
+				op = qp_info->cbuf.op_buffer[nb_enqueued++];
 				stats->crypto_enq_fail++;
 				rte_pktmbuf_free(op->sym->m_src);
 				rte_crypto_op_free(op);
@@ -413,7 +509,8 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 
 		if (qp_info)
 			qp_info->len = len;
-		n += ret;
+
+		n += nb_enqueued;
 	}
 
 	return n;
@@ -425,14 +522,12 @@ eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
 	struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
 	struct crypto_device_info *curr_dev;
 	struct crypto_queue_pair_info *curr_queue;
-	struct rte_crypto_op **op_buffer;
 	struct rte_cryptodev *dev;
 	uint8_t cdev_id;
 	uint16_t qp;
-	uint16_t ret;
+	uint16_t nb_enqueued = 0, nb = 0;
 	uint16_t num_cdev = rte_cryptodev_count();
 
-	ret = 0;
 	for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
 		curr_dev = &adapter->cdevs[cdev_id];
 		dev = curr_dev->dev;
@@ -444,25 +539,26 @@ eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
 			if (!curr_queue->qp_enabled)
 				continue;
 
-			op_buffer = curr_queue->op_buffer;
-			ret = rte_cryptodev_enqueue_burst(cdev_id,
+			eca_circular_buffer_flush_to_cdev(&curr_queue->cbuf,
+							  cdev_id,
 							  qp,
-							  op_buffer,
-							  curr_queue->len);
-			stats->crypto_enq_count += ret;
+							  &nb_enqueued);
+
+			stats->crypto_enq_count += nb_enqueued;
 
-			while (ret < curr_queue->len) {
+			while (nb_enqueued < curr_queue->len) {
 				struct rte_crypto_op *op;
-				op = op_buffer[ret++];
+				op = curr_queue->cbuf.op_buffer[nb_enqueued++];
 				stats->crypto_enq_fail++;
 				rte_pktmbuf_free(op->sym->m_src);
 				rte_crypto_op_free(op);
 			}
 			curr_queue->len = 0;
+			nb += nb_enqueued;
 		}
 	}
 
-	return ret;
+	return nb;
 }
 
 static int
@@ -499,9 +595,9 @@ eca_crypto_adapter_enq_run(struct event_crypto_adapter *adapter,
 	return nb_enqueued;
 }
 
-static inline void
+static inline uint16_t
 eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
-		      struct rte_crypto_op **ops, uint16_t num)
+		  struct rte_crypto_op **ops, uint16_t num)
 {
 	struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
 	union rte_event_crypto_metadata *m_data = NULL;
@@ -518,6 +614,8 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
 	num = RTE_MIN(num, BATCH_SIZE);
 	for (i = 0; i < num; i++) {
 		struct rte_event *ev = &events[nb_ev++];
+
+		m_data = NULL;
 		if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
 			m_data = rte_cryptodev_sym_session_get_user_data(
 					ops[i]->sym->session);
@@ -548,6 +646,7 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
 						  event_port_id,
 						  &events[nb_enqueued],
 						  nb_ev - nb_enqueued);
+
 	} while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
 		 nb_enqueued < nb_ev);
 
@@ -561,8 +660,52 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
 	stats->event_enq_fail_count += nb_ev - nb_enqueued;
 	stats->event_enq_count += nb_enqueued;
 	stats->event_enq_retry_count += retry - 1;
+
+	return nb_enqueued;
 }
 
+static int
+eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter *adapter,
+				   struct crypto_ops_circular_buffer *bufp)
+{
+	uint16_t n = 0, nb_ops_flushed;
+	uint16_t *headp = &bufp->head;
+	uint16_t *tailp = &bufp->tail;
+	struct rte_crypto_op **ops = bufp->op_buffer;
+
+	if (*tailp > *headp)
+		n = *tailp - *headp;
+	else if (*tailp < *headp)
+		n = bufp->size - *headp;
+	else
+		return 0;  /* buffer empty */
+
+	nb_ops_flushed =  eca_ops_enqueue_burst(adapter, ops, n);
+	bufp->count -= nb_ops_flushed;
+	if (!bufp->count) {
+		*headp = 0;
+		*tailp = 0;
+	} else
+		*headp = (*headp + nb_ops_flushed) % bufp->size;
+
+	/* Stop further enqueue to eventdev, if nb_ops_flushed < n */
+	if (nb_ops_flushed < n)
+		return 0;
+
+	return 1;
+}
+
+
+static void
+eca_ops_buffer_flush(struct event_crypto_adapter *adapter)
+{
+	if (adapter->ebuf.count == 0)
+		return;
+
+	while (eca_circular_buffer_flush_to_evdev(adapter,
+						  &adapter->ebuf))
+		;
+}
 static inline unsigned int
 eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 			   unsigned int max_deq)
@@ -571,7 +714,7 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 	struct crypto_device_info *curr_dev;
 	struct crypto_queue_pair_info *curr_queue;
 	struct rte_crypto_op *ops[BATCH_SIZE];
-	uint16_t n, nb_deq;
+	uint16_t n, nb_deq, nb_enqueued, i;
 	struct rte_cryptodev *dev;
 	uint8_t cdev_id;
 	uint16_t qp, dev_qps;
@@ -579,16 +722,20 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 	uint16_t num_cdev = rte_cryptodev_count();
 
 	nb_deq = 0;
+	eca_ops_buffer_flush(adapter);
+
 	do {
-		uint16_t queues = 0;
 		done = true;
 
 		for (cdev_id = adapter->next_cdev_id;
 			cdev_id < num_cdev; cdev_id++) {
+			uint16_t queues = 0;
+
 			curr_dev = &adapter->cdevs[cdev_id];
 			dev = curr_dev->dev;
 			if (dev == NULL)
 				continue;
+
 			dev_qps = dev->data->nb_queue_pairs;
 
 			for (qp = curr_dev->next_queue_pair_id;
@@ -605,8 +752,24 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 					continue;
 
 				done = false;
+				nb_enqueued = 0;
+
 				stats->crypto_deq_count += n;
-				eca_ops_enqueue_burst(adapter, ops, n);
+
+				if (unlikely(!adapter->ebuf.count))
+					nb_enqueued = eca_ops_enqueue_burst(
+							adapter, ops, n);
+
+				if (nb_enqueued == n)
+					goto check;
+
+				/* Failed to enqueue events case */
+				for (i = nb_enqueued; i < n; i++)
+					eca_circular_buffer_add(
+						&adapter->ebuf,
+						ops[nb_enqueued]);
+
+check:
 				nb_deq += n;
 
 				if (nb_deq > max_deq) {
@@ -751,11 +914,10 @@ eca_add_queue_pair(struct event_crypto_adapter *adapter, uint8_t cdev_id,
 			return -ENOMEM;
 
 		qpairs = dev_info->qpairs;
-		qpairs->op_buffer = rte_zmalloc_socket(adapter->mem_name,
-					BATCH_SIZE *
-					sizeof(struct rte_crypto_op *),
-					0, adapter->socket_id);
-		if (!qpairs->op_buffer) {
+
+		if (eca_circular_buffer_init("eca_cdev_circular_buffer",
+					     &qpairs->cbuf,
+					     CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
 			rte_free(qpairs);
 			return -ENOMEM;
 		}
-- 
2.6.4


^ permalink raw reply	[flat|nested] 7+ messages in thread

* [PATCH v1 2/2] eventdev: update crypto caps get to return SW cap
  2021-12-21 11:31 [PATCH v1 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
@ 2021-12-21 11:31 ` Ganapati Kundapura
  2022-01-04 12:31 ` [PATCH v2 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
  1 sibling, 0 replies; 7+ messages in thread
From: Ganapati Kundapura @ 2021-12-21 11:31 UTC (permalink / raw)
  To: jay.jayatheerthan, jerinjacobk, dev; +Cc: abhinandan.gujjar

update rte_event_crypto_adapter_caps_get() to return
SW_CAP if PMD callback is not registered.

Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>

diff --git a/lib/eventdev/rte_eventdev.c b/lib/eventdev/rte_eventdev.c
index 79b9ea3..6988bf1 100644
--- a/lib/eventdev/rte_eventdev.c
+++ b/lib/eventdev/rte_eventdev.c
@@ -176,11 +176,15 @@ rte_event_crypto_adapter_caps_get(uint8_t dev_id, uint8_t cdev_id,
 
 	if (caps == NULL)
 		return -EINVAL;
-	*caps = 0;
+
+	if (dev->dev_ops->crypto_adapter_caps_get == NULL)
+		*caps = RTE_EVENT_CRYPTO_ADAPTER_SW_CAP;
+	else
+		*caps = 0;
 
 	return dev->dev_ops->crypto_adapter_caps_get ?
 		(*dev->dev_ops->crypto_adapter_caps_get)
-		(dev, cdev, caps) : -ENOTSUP;
+		(dev, cdev, caps) : 0;
 }
 
 int
-- 
2.6.4


^ permalink raw reply	[flat|nested] 7+ messages in thread

* [PATCH v2 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer
  2021-12-21 11:31 [PATCH v1 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
  2021-12-21 11:31 ` [PATCH v1 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
@ 2022-01-04 12:31 ` Ganapati Kundapura
  2022-01-04 12:31   ` [PATCH v2 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
  2022-01-11 10:36   ` [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
  1 sibling, 2 replies; 7+ messages in thread
From: Ganapati Kundapura @ 2022-01-04 12:31 UTC (permalink / raw)
  To: jerinjacobk, jay.jayatheerthan, dev; +Cc: abhinandan.gujjar

Move crypto ops to circular buffer to retain crypto
ops when cryptodev/eventdev are temporarily full

---
v2:
* reset cryptp adapter next cdev id before dequeueing from the
  next cdev
---

Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>

diff --git a/lib/eventdev/rte_event_crypto_adapter.c b/lib/eventdev/rte_event_crypto_adapter.c
index d840803..981db6e 100644
--- a/lib/eventdev/rte_event_crypto_adapter.c
+++ b/lib/eventdev/rte_event_crypto_adapter.c
@@ -25,11 +25,27 @@
 #define CRYPTO_ADAPTER_MEM_NAME_LEN 32
 #define CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
 
+#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
+#define CRYPTO_ADAPTER_BUFFER_SZ 1024
+
 /* Flush an instance's enqueue buffers every CRYPTO_ENQ_FLUSH_THRESHOLD
  * iterations of eca_crypto_adapter_enq_run()
  */
 #define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
 
+struct crypto_ops_circular_buffer {
+	/* index of head element in circular buffer */
+	uint16_t head;
+	/* index of tail element in circular buffer */
+	uint16_t tail;
+	/* number elements in buffer */
+	uint16_t count;
+	/* size of circular buffer */
+	uint16_t size;
+	/* Pointer to hold rte_crypto_ops for batching */
+	struct rte_crypto_op **op_buffer;
+} __rte_cache_aligned;
+
 struct event_crypto_adapter {
 	/* Event device identifier */
 	uint8_t eventdev_id;
@@ -47,6 +63,8 @@ struct event_crypto_adapter {
 	struct crypto_device_info *cdevs;
 	/* Loop counter to flush crypto ops */
 	uint16_t transmit_loop_count;
+	/* Circular buffer for batching crypto ops to eventdev */
+	struct crypto_ops_circular_buffer ebuf;
 	/* Per instance stats structure */
 	struct rte_event_crypto_adapter_stats crypto_stats;
 	/* Configuration callback for rte_service configuration */
@@ -93,8 +111,8 @@ struct crypto_device_info {
 struct crypto_queue_pair_info {
 	/* Set to indicate queue pair is enabled */
 	bool qp_enabled;
-	/* Pointer to hold rte_crypto_ops for batching */
-	struct rte_crypto_op **op_buffer;
+	/* Circular buffer for batching crypto ops to cdev */
+	struct crypto_ops_circular_buffer cbuf;
 	/* No of crypto ops accumulated */
 	uint8_t len;
 } __rte_cache_aligned;
@@ -141,6 +159,77 @@ eca_init(void)
 	return 0;
 }
 
+static inline bool
+eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer *bufp)
+{
+	return bufp->count >= BATCH_SIZE;
+}
+
+static inline void
+eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp)
+{
+	rte_free(bufp->op_buffer);
+}
+
+static inline int
+eca_circular_buffer_init(const char *name,
+			 struct crypto_ops_circular_buffer *bufp,
+			 uint16_t sz)
+{
+	bufp->op_buffer = rte_zmalloc(name,
+				      sizeof(struct rte_crypto_op *) * sz,
+				      0);
+	if (bufp->op_buffer == NULL)
+		return -ENOMEM;
+
+	bufp->size = sz;
+	return 0;
+}
+
+static inline int
+eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
+			struct rte_crypto_op *op)
+{
+	uint16_t *tailp = &bufp->tail;
+
+	bufp->op_buffer[*tailp] = op;
+	*tailp = (*tailp + 1) % bufp->size;
+	bufp->count++;
+
+	return 0;
+}
+
+static inline int
+eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer *bufp,
+				  uint8_t cdev_id, uint16_t qp_id,
+				  uint16_t *nb_ops_flushed)
+{
+	uint16_t n = 0;
+	uint16_t *headp = &bufp->head;
+	uint16_t *tailp = &bufp->tail;
+	struct rte_crypto_op **ops = bufp->op_buffer;
+
+	if (*tailp > *headp)
+		n = *tailp - *headp;
+	else if (*tailp < *headp)
+		n = bufp->size - *headp;
+	else {
+		*nb_ops_flushed = 0;
+		return 0;  /* buffer empty */
+	}
+
+	*nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
+						      &ops[*headp], n);
+	bufp->count -= *nb_ops_flushed;
+	if (!bufp->count) {
+		*headp = 0;
+		*tailp = 0;
+	} else
+		*headp = (*headp + *nb_ops_flushed) % bufp->size;
+
+	return *nb_ops_flushed == n ? 0 : -1;
+}
+
 static inline struct event_crypto_adapter *
 eca_id_to_adapter(uint8_t id)
 {
@@ -237,10 +326,19 @@ rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
 		return -ENOMEM;
 	}
 
+	if (eca_circular_buffer_init("eca_edev_circular_buffer",
+				     &adapter->ebuf,
+				     CRYPTO_ADAPTER_BUFFER_SZ)) {
+		RTE_EDEV_LOG_ERR("Failed to get mem for edev buffer");
+		rte_free(adapter);
+		return -ENOMEM;
+	}
+
 	ret = rte_event_dev_info_get(dev_id, &dev_info);
 	if (ret < 0) {
 		RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d: %s!",
 				 dev_id, dev_info.driver_name);
+		eca_circular_buffer_free(&adapter->ebuf);
 		rte_free(adapter);
 		return ret;
 	}
@@ -259,6 +357,7 @@ rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
 					socket_id);
 	if (adapter->cdevs == NULL) {
 		RTE_EDEV_LOG_ERR("Failed to get mem for crypto devices\n");
+		eca_circular_buffer_free(&adapter->ebuf);
 		rte_free(adapter);
 		return -ENOMEM;
 	}
@@ -337,11 +436,10 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 	struct crypto_queue_pair_info *qp_info = NULL;
 	struct rte_crypto_op *crypto_op;
 	unsigned int i, n;
-	uint16_t qp_id, len, ret;
+	uint16_t qp_id, len, nb_enqueued = 0;
 	uint8_t cdev_id;
 
 	len = 0;
-	ret = 0;
 	n = 0;
 	stats->event_deq_count += cnt;
 
@@ -367,7 +465,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 				continue;
 			}
 			len = qp_info->len;
-			qp_info->op_buffer[len] = crypto_op;
+			eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
 			len++;
 		} else if (crypto_op->sess_type == RTE_CRYPTO_OP_SESSIONLESS &&
 				crypto_op->private_data_offset) {
@@ -383,7 +481,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 				continue;
 			}
 			len = qp_info->len;
-			qp_info->op_buffer[len] = crypto_op;
+			eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
 			len++;
 		} else {
 			rte_pktmbuf_free(crypto_op->sym->m_src);
@@ -391,18 +489,16 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 			continue;
 		}
 
-		if (len == BATCH_SIZE) {
-			struct rte_crypto_op **op_buffer = qp_info->op_buffer;
-			ret = rte_cryptodev_enqueue_burst(cdev_id,
+		if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
+			eca_circular_buffer_flush_to_cdev(&qp_info->cbuf,
+							  cdev_id,
 							  qp_id,
-							  op_buffer,
-							  BATCH_SIZE);
+							  &nb_enqueued);
+			stats->crypto_enq_count += nb_enqueued;
 
-			stats->crypto_enq_count += ret;
-
-			while (ret < len) {
+			while (nb_enqueued < len) {
 				struct rte_crypto_op *op;
-				op = op_buffer[ret++];
+				op = qp_info->cbuf.op_buffer[nb_enqueued++];
 				stats->crypto_enq_fail++;
 				rte_pktmbuf_free(op->sym->m_src);
 				rte_crypto_op_free(op);
@@ -413,7 +509,8 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 
 		if (qp_info)
 			qp_info->len = len;
-		n += ret;
+
+		n += nb_enqueued;
 	}
 
 	return n;
@@ -425,14 +522,12 @@ eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
 	struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
 	struct crypto_device_info *curr_dev;
 	struct crypto_queue_pair_info *curr_queue;
-	struct rte_crypto_op **op_buffer;
 	struct rte_cryptodev *dev;
 	uint8_t cdev_id;
 	uint16_t qp;
-	uint16_t ret;
+	uint16_t nb_enqueued = 0, nb = 0;
 	uint16_t num_cdev = rte_cryptodev_count();
 
-	ret = 0;
 	for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
 		curr_dev = &adapter->cdevs[cdev_id];
 		dev = curr_dev->dev;
@@ -444,25 +539,26 @@ eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
 			if (!curr_queue->qp_enabled)
 				continue;
 
-			op_buffer = curr_queue->op_buffer;
-			ret = rte_cryptodev_enqueue_burst(cdev_id,
+			eca_circular_buffer_flush_to_cdev(&curr_queue->cbuf,
+							  cdev_id,
 							  qp,
-							  op_buffer,
-							  curr_queue->len);
-			stats->crypto_enq_count += ret;
+							  &nb_enqueued);
+
+			stats->crypto_enq_count += nb_enqueued;
 
-			while (ret < curr_queue->len) {
+			while (nb_enqueued < curr_queue->len) {
 				struct rte_crypto_op *op;
-				op = op_buffer[ret++];
+				op = curr_queue->cbuf.op_buffer[nb_enqueued++];
 				stats->crypto_enq_fail++;
 				rte_pktmbuf_free(op->sym->m_src);
 				rte_crypto_op_free(op);
 			}
 			curr_queue->len = 0;
+			nb += nb_enqueued;
 		}
 	}
 
-	return ret;
+	return nb;
 }
 
 static int
@@ -499,9 +595,9 @@ eca_crypto_adapter_enq_run(struct event_crypto_adapter *adapter,
 	return nb_enqueued;
 }
 
-static inline void
+static inline uint16_t
 eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
-		      struct rte_crypto_op **ops, uint16_t num)
+		  struct rte_crypto_op **ops, uint16_t num)
 {
 	struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
 	union rte_event_crypto_metadata *m_data = NULL;
@@ -518,6 +614,8 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
 	num = RTE_MIN(num, BATCH_SIZE);
 	for (i = 0; i < num; i++) {
 		struct rte_event *ev = &events[nb_ev++];
+
+		m_data = NULL;
 		if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
 			m_data = rte_cryptodev_sym_session_get_user_data(
 					ops[i]->sym->session);
@@ -548,6 +646,7 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
 						  event_port_id,
 						  &events[nb_enqueued],
 						  nb_ev - nb_enqueued);
+
 	} while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
 		 nb_enqueued < nb_ev);
 
@@ -561,8 +660,52 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
 	stats->event_enq_fail_count += nb_ev - nb_enqueued;
 	stats->event_enq_count += nb_enqueued;
 	stats->event_enq_retry_count += retry - 1;
+
+	return nb_enqueued;
 }
 
+static int
+eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter *adapter,
+				   struct crypto_ops_circular_buffer *bufp)
+{
+	uint16_t n = 0, nb_ops_flushed;
+	uint16_t *headp = &bufp->head;
+	uint16_t *tailp = &bufp->tail;
+	struct rte_crypto_op **ops = bufp->op_buffer;
+
+	if (*tailp > *headp)
+		n = *tailp - *headp;
+	else if (*tailp < *headp)
+		n = bufp->size - *headp;
+	else
+		return 0;  /* buffer empty */
+
+	nb_ops_flushed =  eca_ops_enqueue_burst(adapter, ops, n);
+	bufp->count -= nb_ops_flushed;
+	if (!bufp->count) {
+		*headp = 0;
+		*tailp = 0;
+	} else
+		*headp = (*headp + nb_ops_flushed) % bufp->size;
+
+	/* Stop further enqueue to eventdev, if nb_ops_flushed < n */
+	if (nb_ops_flushed < n)
+		return 0;
+
+	return 1;
+}
+
+
+static void
+eca_ops_buffer_flush(struct event_crypto_adapter *adapter)
+{
+	if (adapter->ebuf.count == 0)
+		return;
+
+	while (eca_circular_buffer_flush_to_evdev(adapter,
+						  &adapter->ebuf))
+		;
+}
 static inline unsigned int
 eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 			   unsigned int max_deq)
@@ -571,7 +714,7 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 	struct crypto_device_info *curr_dev;
 	struct crypto_queue_pair_info *curr_queue;
 	struct rte_crypto_op *ops[BATCH_SIZE];
-	uint16_t n, nb_deq;
+	uint16_t n, nb_deq, nb_enqueued, i;
 	struct rte_cryptodev *dev;
 	uint8_t cdev_id;
 	uint16_t qp, dev_qps;
@@ -579,16 +722,20 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 	uint16_t num_cdev = rte_cryptodev_count();
 
 	nb_deq = 0;
+	eca_ops_buffer_flush(adapter);
+
 	do {
-		uint16_t queues = 0;
 		done = true;
 
 		for (cdev_id = adapter->next_cdev_id;
 			cdev_id < num_cdev; cdev_id++) {
+			uint16_t queues = 0;
+
 			curr_dev = &adapter->cdevs[cdev_id];
 			dev = curr_dev->dev;
 			if (dev == NULL)
 				continue;
+
 			dev_qps = dev->data->nb_queue_pairs;
 
 			for (qp = curr_dev->next_queue_pair_id;
@@ -596,7 +743,8 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 				queues++) {
 
 				curr_queue = &curr_dev->qpairs[qp];
-				if (!curr_queue->qp_enabled)
+				if (curr_queue == NULL ||
+				    !curr_queue->qp_enabled)
 					continue;
 
 				n = rte_cryptodev_dequeue_burst(cdev_id, qp,
@@ -605,8 +753,24 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 					continue;
 
 				done = false;
+				nb_enqueued = 0;
+
 				stats->crypto_deq_count += n;
-				eca_ops_enqueue_burst(adapter, ops, n);
+
+				if (unlikely(!adapter->ebuf.count))
+					nb_enqueued = eca_ops_enqueue_burst(
+							adapter, ops, n);
+
+				if (nb_enqueued == n)
+					goto check;
+
+				/* Failed to enqueue events case */
+				for (i = nb_enqueued; i < n; i++)
+					eca_circular_buffer_add(
+						&adapter->ebuf,
+						ops[nb_enqueued]);
+
+check:
 				nb_deq += n;
 
 				if (nb_deq > max_deq) {
@@ -622,6 +786,7 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 				}
 			}
 		}
+		adapter->next_cdev_id = 0;
 	} while (done == false);
 	return nb_deq;
 }
@@ -751,11 +916,10 @@ eca_add_queue_pair(struct event_crypto_adapter *adapter, uint8_t cdev_id,
 			return -ENOMEM;
 
 		qpairs = dev_info->qpairs;
-		qpairs->op_buffer = rte_zmalloc_socket(adapter->mem_name,
-					BATCH_SIZE *
-					sizeof(struct rte_crypto_op *),
-					0, adapter->socket_id);
-		if (!qpairs->op_buffer) {
+
+		if (eca_circular_buffer_init("eca_cdev_circular_buffer",
+					     &qpairs->cbuf,
+					     CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
 			rte_free(qpairs);
 			return -ENOMEM;
 		}
-- 
2.6.4


^ permalink raw reply	[flat|nested] 7+ messages in thread

* [PATCH v2 2/2] eventdev: update crypto caps get to return SW cap
  2022-01-04 12:31 ` [PATCH v2 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
@ 2022-01-04 12:31   ` Ganapati Kundapura
  2022-01-11 10:36   ` [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
  1 sibling, 0 replies; 7+ messages in thread
From: Ganapati Kundapura @ 2022-01-04 12:31 UTC (permalink / raw)
  To: jerinjacobk, jay.jayatheerthan, dev; +Cc: abhinandan.gujjar

update rte_event_crypto_adapter_caps_get() to return
SW_CAP if PMD callback is not registered.

Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>

diff --git a/lib/eventdev/rte_eventdev.c b/lib/eventdev/rte_eventdev.c
index 79b9ea3..6988bf1 100644
--- a/lib/eventdev/rte_eventdev.c
+++ b/lib/eventdev/rte_eventdev.c
@@ -176,11 +176,15 @@ rte_event_crypto_adapter_caps_get(uint8_t dev_id, uint8_t cdev_id,
 
 	if (caps == NULL)
 		return -EINVAL;
-	*caps = 0;
+
+	if (dev->dev_ops->crypto_adapter_caps_get == NULL)
+		*caps = RTE_EVENT_CRYPTO_ADAPTER_SW_CAP;
+	else
+		*caps = 0;
 
 	return dev->dev_ops->crypto_adapter_caps_get ?
 		(*dev->dev_ops->crypto_adapter_caps_get)
-		(dev, cdev, caps) : -ENOTSUP;
+		(dev, cdev, caps) : 0;
 }
 
 int
-- 
2.6.4


^ permalink raw reply	[flat|nested] 7+ messages in thread

* [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer
  2022-01-04 12:31 ` [PATCH v2 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
  2022-01-04 12:31   ` [PATCH v2 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
@ 2022-01-11 10:36   ` Ganapati Kundapura
  2022-01-11 10:36     ` [PATCH v3 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
  2022-01-13 11:05     ` [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Gujjar, Abhinandan S
  1 sibling, 2 replies; 7+ messages in thread
From: Ganapati Kundapura @ 2022-01-11 10:36 UTC (permalink / raw)
  To: jerinjacobk, jay.jayatheerthan, dev; +Cc: abhinandan.gujjar

Move crypto ops to circular buffer to retain crypto
ops when cryptodev/eventdev are temporarily full

---
v3:
* update eca_ops_buffer_flush() to flush out all the crypto
  ops out of circular buffer.
* remove freeing of failed crypto ops from eca_ops_enqueue_burst()
  and add to cirular buffer for later processing.

v2:
* reset cryptp adapter next cdev id before dequeueing from the
  next cdev
---

Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>

diff --git a/lib/eventdev/rte_event_crypto_adapter.c b/lib/eventdev/rte_event_crypto_adapter.c
index d840803..9086368 100644
--- a/lib/eventdev/rte_event_crypto_adapter.c
+++ b/lib/eventdev/rte_event_crypto_adapter.c
@@ -25,11 +25,27 @@
 #define CRYPTO_ADAPTER_MEM_NAME_LEN 32
 #define CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
 
+#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
+#define CRYPTO_ADAPTER_BUFFER_SZ 1024
+
 /* Flush an instance's enqueue buffers every CRYPTO_ENQ_FLUSH_THRESHOLD
  * iterations of eca_crypto_adapter_enq_run()
  */
 #define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
 
+struct crypto_ops_circular_buffer {
+	/* index of head element in circular buffer */
+	uint16_t head;
+	/* index of tail element in circular buffer */
+	uint16_t tail;
+	/* number elements in buffer */
+	uint16_t count;
+	/* size of circular buffer */
+	uint16_t size;
+	/* Pointer to hold rte_crypto_ops for batching */
+	struct rte_crypto_op **op_buffer;
+} __rte_cache_aligned;
+
 struct event_crypto_adapter {
 	/* Event device identifier */
 	uint8_t eventdev_id;
@@ -47,6 +63,8 @@ struct event_crypto_adapter {
 	struct crypto_device_info *cdevs;
 	/* Loop counter to flush crypto ops */
 	uint16_t transmit_loop_count;
+	/* Circular buffer for batching crypto ops to eventdev */
+	struct crypto_ops_circular_buffer ebuf;
 	/* Per instance stats structure */
 	struct rte_event_crypto_adapter_stats crypto_stats;
 	/* Configuration callback for rte_service configuration */
@@ -93,8 +111,8 @@ struct crypto_device_info {
 struct crypto_queue_pair_info {
 	/* Set to indicate queue pair is enabled */
 	bool qp_enabled;
-	/* Pointer to hold rte_crypto_ops for batching */
-	struct rte_crypto_op **op_buffer;
+	/* Circular buffer for batching crypto ops to cdev */
+	struct crypto_ops_circular_buffer cbuf;
 	/* No of crypto ops accumulated */
 	uint8_t len;
 } __rte_cache_aligned;
@@ -141,6 +159,77 @@ eca_init(void)
 	return 0;
 }
 
+static inline bool
+eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer *bufp)
+{
+	return bufp->count >= BATCH_SIZE;
+}
+
+static inline void
+eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp)
+{
+	rte_free(bufp->op_buffer);
+}
+
+static inline int
+eca_circular_buffer_init(const char *name,
+			 struct crypto_ops_circular_buffer *bufp,
+			 uint16_t sz)
+{
+	bufp->op_buffer = rte_zmalloc(name,
+				      sizeof(struct rte_crypto_op *) * sz,
+				      0);
+	if (bufp->op_buffer == NULL)
+		return -ENOMEM;
+
+	bufp->size = sz;
+	return 0;
+}
+
+static inline int
+eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
+			struct rte_crypto_op *op)
+{
+	uint16_t *tailp = &bufp->tail;
+
+	bufp->op_buffer[*tailp] = op;
+	*tailp = (*tailp + 1) % bufp->size;
+	bufp->count++;
+
+	return 0;
+}
+
+static inline int
+eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer *bufp,
+				  uint8_t cdev_id, uint16_t qp_id,
+				  uint16_t *nb_ops_flushed)
+{
+	uint16_t n = 0;
+	uint16_t *headp = &bufp->head;
+	uint16_t *tailp = &bufp->tail;
+	struct rte_crypto_op **ops = bufp->op_buffer;
+
+	if (*tailp > *headp)
+		n = *tailp - *headp;
+	else if (*tailp < *headp)
+		n = bufp->size - *headp;
+	else {
+		*nb_ops_flushed = 0;
+		return 0;  /* buffer empty */
+	}
+
+	*nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
+						      &ops[*headp], n);
+	bufp->count -= *nb_ops_flushed;
+	if (!bufp->count) {
+		*headp = 0;
+		*tailp = 0;
+	} else
+		*headp = (*headp + *nb_ops_flushed) % bufp->size;
+
+	return *nb_ops_flushed == n ? 0 : -1;
+}
+
 static inline struct event_crypto_adapter *
 eca_id_to_adapter(uint8_t id)
 {
@@ -237,10 +326,19 @@ rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
 		return -ENOMEM;
 	}
 
+	if (eca_circular_buffer_init("eca_edev_circular_buffer",
+				     &adapter->ebuf,
+				     CRYPTO_ADAPTER_BUFFER_SZ)) {
+		RTE_EDEV_LOG_ERR("Failed to get mem for edev buffer");
+		rte_free(adapter);
+		return -ENOMEM;
+	}
+
 	ret = rte_event_dev_info_get(dev_id, &dev_info);
 	if (ret < 0) {
 		RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d: %s!",
 				 dev_id, dev_info.driver_name);
+		eca_circular_buffer_free(&adapter->ebuf);
 		rte_free(adapter);
 		return ret;
 	}
@@ -259,6 +357,7 @@ rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
 					socket_id);
 	if (adapter->cdevs == NULL) {
 		RTE_EDEV_LOG_ERR("Failed to get mem for crypto devices\n");
+		eca_circular_buffer_free(&adapter->ebuf);
 		rte_free(adapter);
 		return -ENOMEM;
 	}
@@ -337,11 +436,10 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 	struct crypto_queue_pair_info *qp_info = NULL;
 	struct rte_crypto_op *crypto_op;
 	unsigned int i, n;
-	uint16_t qp_id, len, ret;
+	uint16_t qp_id, len, nb_enqueued = 0;
 	uint8_t cdev_id;
 
 	len = 0;
-	ret = 0;
 	n = 0;
 	stats->event_deq_count += cnt;
 
@@ -367,7 +465,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 				continue;
 			}
 			len = qp_info->len;
-			qp_info->op_buffer[len] = crypto_op;
+			eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
 			len++;
 		} else if (crypto_op->sess_type == RTE_CRYPTO_OP_SESSIONLESS &&
 				crypto_op->private_data_offset) {
@@ -383,7 +481,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 				continue;
 			}
 			len = qp_info->len;
-			qp_info->op_buffer[len] = crypto_op;
+			eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
 			len++;
 		} else {
 			rte_pktmbuf_free(crypto_op->sym->m_src);
@@ -391,18 +489,17 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 			continue;
 		}
 
-		if (len == BATCH_SIZE) {
-			struct rte_crypto_op **op_buffer = qp_info->op_buffer;
-			ret = rte_cryptodev_enqueue_burst(cdev_id,
+		if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
+			eca_circular_buffer_flush_to_cdev(&qp_info->cbuf,
+							  cdev_id,
 							  qp_id,
-							  op_buffer,
-							  BATCH_SIZE);
+							  &nb_enqueued);
+			stats->crypto_enq_count += nb_enqueued;
+			n += nb_enqueued;
 
-			stats->crypto_enq_count += ret;
-
-			while (ret < len) {
+			while (nb_enqueued < len) {
 				struct rte_crypto_op *op;
-				op = op_buffer[ret++];
+				op = qp_info->cbuf.op_buffer[nb_enqueued++];
 				stats->crypto_enq_fail++;
 				rte_pktmbuf_free(op->sym->m_src);
 				rte_crypto_op_free(op);
@@ -413,7 +510,6 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 
 		if (qp_info)
 			qp_info->len = len;
-		n += ret;
 	}
 
 	return n;
@@ -425,14 +521,12 @@ eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
 	struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
 	struct crypto_device_info *curr_dev;
 	struct crypto_queue_pair_info *curr_queue;
-	struct rte_crypto_op **op_buffer;
 	struct rte_cryptodev *dev;
 	uint8_t cdev_id;
 	uint16_t qp;
-	uint16_t ret;
+	uint16_t nb_enqueued = 0, nb = 0;
 	uint16_t num_cdev = rte_cryptodev_count();
 
-	ret = 0;
 	for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
 		curr_dev = &adapter->cdevs[cdev_id];
 		dev = curr_dev->dev;
@@ -444,16 +538,17 @@ eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
 			if (!curr_queue->qp_enabled)
 				continue;
 
-			op_buffer = curr_queue->op_buffer;
-			ret = rte_cryptodev_enqueue_burst(cdev_id,
+			eca_circular_buffer_flush_to_cdev(&curr_queue->cbuf,
+							  cdev_id,
 							  qp,
-							  op_buffer,
-							  curr_queue->len);
-			stats->crypto_enq_count += ret;
+							  &nb_enqueued);
 
-			while (ret < curr_queue->len) {
+			stats->crypto_enq_count += nb_enqueued;
+			nb += nb_enqueued;
+
+			while (nb_enqueued < curr_queue->len) {
 				struct rte_crypto_op *op;
-				op = op_buffer[ret++];
+				op = curr_queue->cbuf.op_buffer[nb_enqueued++];
 				stats->crypto_enq_fail++;
 				rte_pktmbuf_free(op->sym->m_src);
 				rte_crypto_op_free(op);
@@ -462,7 +557,7 @@ eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
 		}
 	}
 
-	return ret;
+	return nb;
 }
 
 static int
@@ -499,9 +594,9 @@ eca_crypto_adapter_enq_run(struct event_crypto_adapter *adapter,
 	return nb_enqueued;
 }
 
-static inline void
+static inline uint16_t
 eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
-		      struct rte_crypto_op **ops, uint16_t num)
+		  struct rte_crypto_op **ops, uint16_t num)
 {
 	struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
 	union rte_event_crypto_metadata *m_data = NULL;
@@ -518,6 +613,8 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
 	num = RTE_MIN(num, BATCH_SIZE);
 	for (i = 0; i < num; i++) {
 		struct rte_event *ev = &events[nb_ev++];
+
+		m_data = NULL;
 		if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
 			m_data = rte_cryptodev_sym_session_get_user_data(
 					ops[i]->sym->session);
@@ -548,21 +645,58 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
 						  event_port_id,
 						  &events[nb_enqueued],
 						  nb_ev - nb_enqueued);
+
 	} while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
 		 nb_enqueued < nb_ev);
 
-	/* Free mbufs and rte_crypto_ops for failed events */
-	for (i = nb_enqueued; i < nb_ev; i++) {
-		struct rte_crypto_op *op = events[i].event_ptr;
-		rte_pktmbuf_free(op->sym->m_src);
-		rte_crypto_op_free(op);
-	}
-
 	stats->event_enq_fail_count += nb_ev - nb_enqueued;
 	stats->event_enq_count += nb_enqueued;
 	stats->event_enq_retry_count += retry - 1;
+
+	return nb_enqueued;
 }
 
+static int
+eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter *adapter,
+				   struct crypto_ops_circular_buffer *bufp)
+{
+	uint16_t n = 0, nb_ops_flushed;
+	uint16_t *headp = &bufp->head;
+	uint16_t *tailp = &bufp->tail;
+	struct rte_crypto_op **ops = bufp->op_buffer;
+
+	if (*tailp > *headp)
+		n = *tailp - *headp;
+	else if (*tailp < *headp)
+		n = bufp->size - *headp;
+	else
+		return 0;  /* buffer empty */
+
+	nb_ops_flushed =  eca_ops_enqueue_burst(adapter, ops, n);
+	bufp->count -= nb_ops_flushed;
+	if (!bufp->count) {
+		*headp = 0;
+		*tailp = 0;
+
+		return 0;  /* buffer empty */
+	}
+
+	*headp = (*headp + nb_ops_flushed) % bufp->size;
+
+	return 1;
+}
+
+
+static void
+eca_ops_buffer_flush(struct event_crypto_adapter *adapter)
+{
+	if (adapter->ebuf.count == 0)
+		return;
+
+	while (eca_circular_buffer_flush_to_evdev(adapter,
+						  &adapter->ebuf))
+		;
+}
 static inline unsigned int
 eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 			   unsigned int max_deq)
@@ -571,7 +705,7 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 	struct crypto_device_info *curr_dev;
 	struct crypto_queue_pair_info *curr_queue;
 	struct rte_crypto_op *ops[BATCH_SIZE];
-	uint16_t n, nb_deq;
+	uint16_t n, nb_deq, nb_enqueued, i;
 	struct rte_cryptodev *dev;
 	uint8_t cdev_id;
 	uint16_t qp, dev_qps;
@@ -579,16 +713,20 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 	uint16_t num_cdev = rte_cryptodev_count();
 
 	nb_deq = 0;
+	eca_ops_buffer_flush(adapter);
+
 	do {
-		uint16_t queues = 0;
 		done = true;
 
 		for (cdev_id = adapter->next_cdev_id;
 			cdev_id < num_cdev; cdev_id++) {
+			uint16_t queues = 0;
+
 			curr_dev = &adapter->cdevs[cdev_id];
 			dev = curr_dev->dev;
 			if (dev == NULL)
 				continue;
+
 			dev_qps = dev->data->nb_queue_pairs;
 
 			for (qp = curr_dev->next_queue_pair_id;
@@ -596,7 +734,8 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 				queues++) {
 
 				curr_queue = &curr_dev->qpairs[qp];
-				if (!curr_queue->qp_enabled)
+				if (curr_queue == NULL ||
+				    !curr_queue->qp_enabled)
 					continue;
 
 				n = rte_cryptodev_dequeue_burst(cdev_id, qp,
@@ -605,11 +744,27 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 					continue;
 
 				done = false;
+				nb_enqueued = 0;
+
 				stats->crypto_deq_count += n;
-				eca_ops_enqueue_burst(adapter, ops, n);
+
+				if (unlikely(!adapter->ebuf.count))
+					nb_enqueued = eca_ops_enqueue_burst(
+							adapter, ops, n);
+
+				if (nb_enqueued == n)
+					goto check;
+
+				/* Failed to enqueue events case */
+				for (i = nb_enqueued; i < n; i++)
+					eca_circular_buffer_add(
+						&adapter->ebuf,
+						ops[nb_enqueued]);
+
+check:
 				nb_deq += n;
 
-				if (nb_deq > max_deq) {
+				if (nb_deq >= max_deq) {
 					if ((qp + 1) == dev_qps) {
 						adapter->next_cdev_id =
 							(cdev_id + 1)
@@ -622,6 +777,7 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 				}
 			}
 		}
+		adapter->next_cdev_id = 0;
 	} while (done == false);
 	return nb_deq;
 }
@@ -751,11 +907,10 @@ eca_add_queue_pair(struct event_crypto_adapter *adapter, uint8_t cdev_id,
 			return -ENOMEM;
 
 		qpairs = dev_info->qpairs;
-		qpairs->op_buffer = rte_zmalloc_socket(adapter->mem_name,
-					BATCH_SIZE *
-					sizeof(struct rte_crypto_op *),
-					0, adapter->socket_id);
-		if (!qpairs->op_buffer) {
+
+		if (eca_circular_buffer_init("eca_cdev_circular_buffer",
+					     &qpairs->cbuf,
+					     CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
 			rte_free(qpairs);
 			return -ENOMEM;
 		}
-- 
2.6.4


^ permalink raw reply	[flat|nested] 7+ messages in thread

* [PATCH v3 2/2] eventdev: update crypto caps get to return SW cap
  2022-01-11 10:36   ` [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
@ 2022-01-11 10:36     ` Ganapati Kundapura
  2022-01-13 11:05     ` [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Gujjar, Abhinandan S
  1 sibling, 0 replies; 7+ messages in thread
From: Ganapati Kundapura @ 2022-01-11 10:36 UTC (permalink / raw)
  To: jerinjacobk, jay.jayatheerthan, dev; +Cc: abhinandan.gujjar

update rte_event_crypto_adapter_caps_get() to return
SW_CAP if PMD callback is not registered.

Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>

diff --git a/lib/eventdev/rte_eventdev.c b/lib/eventdev/rte_eventdev.c
index 79b9ea3..6988bf1 100644
--- a/lib/eventdev/rte_eventdev.c
+++ b/lib/eventdev/rte_eventdev.c
@@ -176,11 +176,15 @@ rte_event_crypto_adapter_caps_get(uint8_t dev_id, uint8_t cdev_id,
 
 	if (caps == NULL)
 		return -EINVAL;
-	*caps = 0;
+
+	if (dev->dev_ops->crypto_adapter_caps_get == NULL)
+		*caps = RTE_EVENT_CRYPTO_ADAPTER_SW_CAP;
+	else
+		*caps = 0;
 
 	return dev->dev_ops->crypto_adapter_caps_get ?
 		(*dev->dev_ops->crypto_adapter_caps_get)
-		(dev, cdev, caps) : -ENOTSUP;
+		(dev, cdev, caps) : 0;
 }
 
 int
-- 
2.6.4


^ permalink raw reply	[flat|nested] 7+ messages in thread

* RE: [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer
  2022-01-11 10:36   ` [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
  2022-01-11 10:36     ` [PATCH v3 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
@ 2022-01-13 11:05     ` Gujjar, Abhinandan S
  1 sibling, 0 replies; 7+ messages in thread
From: Gujjar, Abhinandan S @ 2022-01-13 11:05 UTC (permalink / raw)
  To: Kundapura, Ganapati, jerinjacobk, Jayatheerthan, Jay, dev

Hi Ganapati,


> -----Original Message-----
> From: Kundapura, Ganapati <ganapati.kundapura@intel.com>
> Sent: Tuesday, January 11, 2022 4:07 PM
> To: jerinjacobk@gmail.com; Jayatheerthan, Jay <jay.jayatheerthan@intel.com>;
> dev@dpdk.org
> Cc: Gujjar, Abhinandan S <abhinandan.gujjar@intel.com>
> Subject: [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular
> buffer
> 
> Move crypto ops to circular buffer to retain crypto ops when
> cryptodev/eventdev are temporarily full
> 
> ---
> v3:
> * update eca_ops_buffer_flush() to flush out all the crypto
>   ops out of circular buffer.
> * remove freeing of failed crypto ops from eca_ops_enqueue_burst()
>   and add to cirular buffer for later processing.
> 
> v2:
> * reset cryptp adapter next cdev id before dequeueing from the
Cryptp -> crypto
>   next cdev
> ---
> 
> Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>
I don't see sign off after applying the patch. Could you take a look?
commit 18d9c3b1728f325dba5fe5dbb51df2366b70527a
Author: Ganapati Kundapura <ganapati.kundapura@intel.com>
Date:   Tue Jan 11 04:36:30 2022 -0600

    eventdev/crypto_adapter: move crypto ops to circular buffer
    
    Move crypto ops to circular buffer to retain crypto
    ops when cryptodev/eventdev are temporarily full


> 
> diff --git a/lib/eventdev/rte_event_crypto_adapter.c
> b/lib/eventdev/rte_event_crypto_adapter.c
> index d840803..9086368 100644
> --- a/lib/eventdev/rte_event_crypto_adapter.c
> +++ b/lib/eventdev/rte_event_crypto_adapter.c
> @@ -25,11 +25,27 @@
>  #define CRYPTO_ADAPTER_MEM_NAME_LEN 32
>  #define CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
> 
> +#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
> #define
> +CRYPTO_ADAPTER_BUFFER_SZ 1024
> +
>  /* Flush an instance's enqueue buffers every CRYPTO_ENQ_FLUSH_THRESHOLD
>   * iterations of eca_crypto_adapter_enq_run()
>   */
>  #define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
> 
> +struct crypto_ops_circular_buffer {
> +	/* index of head element in circular buffer */
> +	uint16_t head;
> +	/* index of tail element in circular buffer */
> +	uint16_t tail;
> +	/* number elements in buffer */
number elements -> number of elements 
> +	uint16_t count;
> +	/* size of circular buffer */
> +	uint16_t size;
> +	/* Pointer to hold rte_crypto_ops for batching */
> +	struct rte_crypto_op **op_buffer;
> +} __rte_cache_aligned;
> +
>  struct event_crypto_adapter {
>  	/* Event device identifier */
>  	uint8_t eventdev_id;
> @@ -47,6 +63,8 @@ struct event_crypto_adapter {
>  	struct crypto_device_info *cdevs;
>  	/* Loop counter to flush crypto ops */
>  	uint16_t transmit_loop_count;
> +	/* Circular buffer for batching crypto ops to eventdev */
> +	struct crypto_ops_circular_buffer ebuf;
>  	/* Per instance stats structure */
>  	struct rte_event_crypto_adapter_stats crypto_stats;
>  	/* Configuration callback for rte_service configuration */ @@ -93,8
> +111,8 @@ struct crypto_device_info {  struct crypto_queue_pair_info {
>  	/* Set to indicate queue pair is enabled */
>  	bool qp_enabled;
> -	/* Pointer to hold rte_crypto_ops for batching */
> -	struct rte_crypto_op **op_buffer;
> +	/* Circular buffer for batching crypto ops to cdev */
> +	struct crypto_ops_circular_buffer cbuf;
>  	/* No of crypto ops accumulated */
>  	uint8_t len;
>  } __rte_cache_aligned;
> @@ -141,6 +159,77 @@ eca_init(void)
>  	return 0;
>  }
> 
> +static inline bool
> +eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer
> +*bufp) {
> +	return bufp->count >= BATCH_SIZE;
> +}
> +
> +static inline void
> +eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp) {
> +	rte_free(bufp->op_buffer);
> +}
> +
> +static inline int
> +eca_circular_buffer_init(const char *name,
> +			 struct crypto_ops_circular_buffer *bufp,
> +			 uint16_t sz)
> +{
> +	bufp->op_buffer = rte_zmalloc(name,
> +				      sizeof(struct rte_crypto_op *) * sz,
> +				      0);
> +	if (bufp->op_buffer == NULL)
> +		return -ENOMEM;
> +
> +	bufp->size = sz;
> +	return 0;
> +}
> +
> +static inline int
> +eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
> +			struct rte_crypto_op *op)
> +{
> +	uint16_t *tailp = &bufp->tail;
> +
> +	bufp->op_buffer[*tailp] = op;
> +	*tailp = (*tailp + 1) % bufp->size;
Provide a comment saying you are taking care of buffer rollover condition
> +	bufp->count++;
> +
> +	return 0;
> +}
> +
> +static inline int
> +eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer *bufp,
> +				  uint8_t cdev_id, uint16_t qp_id,
> +				  uint16_t *nb_ops_flushed)
This function is returning a value but caller does not check!
> +{
> +	uint16_t n = 0;
> +	uint16_t *headp = &bufp->head;
> +	uint16_t *tailp = &bufp->tail;
> +	struct rte_crypto_op **ops = bufp->op_buffer;
> +
> +	if (*tailp > *headp)
> +		n = *tailp - *headp;
> +	else if (*tailp < *headp)
> +		n = bufp->size - *headp;
> +	else {
> +		*nb_ops_flushed = 0;
> +		return 0;  /* buffer empty */
> +	}
> +
> +	*nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
> +						      &ops[*headp], n);
> +	bufp->count -= *nb_ops_flushed;
> +	if (!bufp->count) {
> +		*headp = 0;
> +		*tailp = 0;
> +	} else
> +		*headp = (*headp + *nb_ops_flushed) % bufp->size;
> +
> +	return *nb_ops_flushed == n ? 0 : -1;
> +}
> +
>  static inline struct event_crypto_adapter *  eca_id_to_adapter(uint8_t id)  {
> @@ -237,10 +326,19 @@ rte_event_crypto_adapter_create_ext(uint8_t id,
> uint8_t dev_id,
>  		return -ENOMEM;
>  	}
> 
> +	if (eca_circular_buffer_init("eca_edev_circular_buffer",
> +				     &adapter->ebuf,
> +				     CRYPTO_ADAPTER_BUFFER_SZ)) {
> +		RTE_EDEV_LOG_ERR("Failed to get mem for edev buffer");
Mem -> memory, edev -> eventdev. 
> +		rte_free(adapter);
> +		return -ENOMEM;
> +	}
> +
>  	ret = rte_event_dev_info_get(dev_id, &dev_info);
>  	if (ret < 0) {
>  		RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d: %s!",
>  				 dev_id, dev_info.driver_name);
> +		eca_circular_buffer_free(&adapter->ebuf);
>  		rte_free(adapter);
>  		return ret;
>  	}
> @@ -259,6 +357,7 @@ rte_event_crypto_adapter_create_ext(uint8_t id,
> uint8_t dev_id,
>  					socket_id);
>  	if (adapter->cdevs == NULL) {
>  		RTE_EDEV_LOG_ERR("Failed to get mem for crypto devices\n");
> +		eca_circular_buffer_free(&adapter->ebuf);
>  		rte_free(adapter);
>  		return -ENOMEM;
>  	}
> @@ -337,11 +436,10 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
>  	struct crypto_queue_pair_info *qp_info = NULL;
>  	struct rte_crypto_op *crypto_op;
>  	unsigned int i, n;
> -	uint16_t qp_id, len, ret;
> +	uint16_t qp_id, len, nb_enqueued = 0;
>  	uint8_t cdev_id;
> 
>  	len = 0;
> -	ret = 0;
>  	n = 0;
>  	stats->event_deq_count += cnt;
> 
> @@ -367,7 +465,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
>  				continue;
>  			}
>  			len = qp_info->len;
> -			qp_info->op_buffer[len] = crypto_op;
> +			eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
>  			len++;
>  		} else if (crypto_op->sess_type ==
> RTE_CRYPTO_OP_SESSIONLESS &&
>  				crypto_op->private_data_offset) {
> @@ -383,7 +481,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
>  				continue;
>  			}
>  			len = qp_info->len;
> -			qp_info->op_buffer[len] = crypto_op;
> +			eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
>  			len++;
>  		} else {
>  			rte_pktmbuf_free(crypto_op->sym->m_src);
> @@ -391,18 +489,17 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
>  			continue;
>  		}
> 
> -		if (len == BATCH_SIZE) {
> -			struct rte_crypto_op **op_buffer = qp_info-
> >op_buffer;
> -			ret = rte_cryptodev_enqueue_burst(cdev_id,
> +		if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
> +			eca_circular_buffer_flush_to_cdev(&qp_info->cbuf,
> +							  cdev_id,
>  							  qp_id,
> -							  op_buffer,
> -							  BATCH_SIZE);
> +							  &nb_enqueued);
> +			stats->crypto_enq_count += nb_enqueued;
> +			n += nb_enqueued;
> 
> -			stats->crypto_enq_count += ret;
> -
> -			while (ret < len) {
> +			while (nb_enqueued < len) {
>  				struct rte_crypto_op *op;
> -				op = op_buffer[ret++];
> +				op = qp_info-
> >cbuf.op_buffer[nb_enqueued++];
>  				stats->crypto_enq_fail++;
>  				rte_pktmbuf_free(op->sym->m_src);
>  				rte_crypto_op_free(op);
Not sure, why are you free the ops which are not enqueued to cryptodev.
Isn't it the goal of the patch is to retain those non-enqueued crypto_ops temporarily and retry later?

> @@ -413,7 +510,6 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
> 
>  		if (qp_info)
>  			qp_info->len = len;
> -		n += ret;
>  	}
> 
>  	return n;
> @@ -425,14 +521,12 @@ eca_crypto_enq_flush(struct event_crypto_adapter
> *adapter)
>  	struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
>  	struct crypto_device_info *curr_dev;
>  	struct crypto_queue_pair_info *curr_queue;
> -	struct rte_crypto_op **op_buffer;
>  	struct rte_cryptodev *dev;
>  	uint8_t cdev_id;
>  	uint16_t qp;
> -	uint16_t ret;
> +	uint16_t nb_enqueued = 0, nb = 0;
>  	uint16_t num_cdev = rte_cryptodev_count();
> 
> -	ret = 0;
>  	for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
>  		curr_dev = &adapter->cdevs[cdev_id];
>  		dev = curr_dev->dev;
> @@ -444,16 +538,17 @@ eca_crypto_enq_flush(struct event_crypto_adapter
> *adapter)
>  			if (!curr_queue->qp_enabled)
>  				continue;
> 
> -			op_buffer = curr_queue->op_buffer;
> -			ret = rte_cryptodev_enqueue_burst(cdev_id,
> +			eca_circular_buffer_flush_to_cdev(&curr_queue->cbuf,
> +							  cdev_id,
>  							  qp,
> -							  op_buffer,
> -							  curr_queue->len);
> -			stats->crypto_enq_count += ret;
> +							  &nb_enqueued);
> 
> -			while (ret < curr_queue->len) {
> +			stats->crypto_enq_count += nb_enqueued;
> +			nb += nb_enqueued;
> +
> +			while (nb_enqueued < curr_queue->len) {
>  				struct rte_crypto_op *op;
> -				op = op_buffer[ret++];
> +				op = curr_queue-
> >cbuf.op_buffer[nb_enqueued++];
>  				stats->crypto_enq_fail++;
>  				rte_pktmbuf_free(op->sym->m_src);
>  				rte_crypto_op_free(op);
> @@ -462,7 +557,7 @@ eca_crypto_enq_flush(struct event_crypto_adapter
> *adapter)
>  		}
>  	}
> 
> -	return ret;
> +	return nb;
>  }
> 
>  static int
> @@ -499,9 +594,9 @@ eca_crypto_adapter_enq_run(struct
> event_crypto_adapter *adapter,
>  	return nb_enqueued;
>  }
> 
> -static inline void
> +static inline uint16_t
>  eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
> -		      struct rte_crypto_op **ops, uint16_t num)
> +		  struct rte_crypto_op **ops, uint16_t num)
>  {
>  	struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
>  	union rte_event_crypto_metadata *m_data = NULL; @@ -518,6 +613,8
> @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
>  	num = RTE_MIN(num, BATCH_SIZE);
>  	for (i = 0; i < num; i++) {
>  		struct rte_event *ev = &events[nb_ev++];
> +
> +		m_data = NULL;
>  		if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
>  			m_data = rte_cryptodev_sym_session_get_user_data(
>  					ops[i]->sym->session);
> @@ -548,21 +645,58 @@ eca_ops_enqueue_burst(struct event_crypto_adapter
> *adapter,
>  						  event_port_id,
>  						  &events[nb_enqueued],
>  						  nb_ev - nb_enqueued);
> +
>  	} while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
>  		 nb_enqueued < nb_ev);
> 
> -	/* Free mbufs and rte_crypto_ops for failed events */
> -	for (i = nb_enqueued; i < nb_ev; i++) {
> -		struct rte_crypto_op *op = events[i].event_ptr;
> -		rte_pktmbuf_free(op->sym->m_src);
> -		rte_crypto_op_free(op);
> -	}
> -
>  	stats->event_enq_fail_count += nb_ev - nb_enqueued;
>  	stats->event_enq_count += nb_enqueued;
>  	stats->event_enq_retry_count += retry - 1;
> +
> +	return nb_enqueued;
>  }
> 
> +static int
> +eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter *adapter,
> +				   struct crypto_ops_circular_buffer *bufp) {
> +	uint16_t n = 0, nb_ops_flushed;
> +	uint16_t *headp = &bufp->head;
> +	uint16_t *tailp = &bufp->tail;
> +	struct rte_crypto_op **ops = bufp->op_buffer;
> +
> +	if (*tailp > *headp)
> +		n = *tailp - *headp;
> +	else if (*tailp < *headp)
> +		n = bufp->size - *headp;
> +	else
> +		return 0;  /* buffer empty */
> +
> +	nb_ops_flushed =  eca_ops_enqueue_burst(adapter, ops, n);
> +	bufp->count -= nb_ops_flushed;
> +	if (!bufp->count) {
> +		*headp = 0;
> +		*tailp = 0;
> +
Extra line not required
> +		return 0;  /* buffer empty */
> +	}
> +
> +	*headp = (*headp + nb_ops_flushed) % bufp->size;
> +
Extra line not required
> +	return 1;
> +}
> +
> +
> +static void
> +eca_ops_buffer_flush(struct event_crypto_adapter *adapter) {
> +	if (adapter->ebuf.count == 0)
> +		return;
> +
> +	while (eca_circular_buffer_flush_to_evdev(adapter,
> +						  &adapter->ebuf))
> +		;
> +}
>  static inline unsigned int
>  eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
>  			   unsigned int max_deq)
> @@ -571,7 +705,7 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
>  	struct crypto_device_info *curr_dev;
>  	struct crypto_queue_pair_info *curr_queue;
>  	struct rte_crypto_op *ops[BATCH_SIZE];
> -	uint16_t n, nb_deq;
> +	uint16_t n, nb_deq, nb_enqueued, i;
>  	struct rte_cryptodev *dev;
>  	uint8_t cdev_id;
>  	uint16_t qp, dev_qps;
> @@ -579,16 +713,20 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
>  	uint16_t num_cdev = rte_cryptodev_count();
> 
>  	nb_deq = 0;
> +	eca_ops_buffer_flush(adapter);
> +
>  	do {
> -		uint16_t queues = 0;
>  		done = true;
> 
>  		for (cdev_id = adapter->next_cdev_id;
>  			cdev_id < num_cdev; cdev_id++) {
> +			uint16_t queues = 0;
> +
>  			curr_dev = &adapter->cdevs[cdev_id];
>  			dev = curr_dev->dev;
>  			if (dev == NULL)
>  				continue;
> +
>  			dev_qps = dev->data->nb_queue_pairs;
> 
>  			for (qp = curr_dev->next_queue_pair_id; @@ -596,7
> +734,8 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter
> *adapter,
>  				queues++) {
> 
>  				curr_queue = &curr_dev->qpairs[qp];
> -				if (!curr_queue->qp_enabled)
> +				if (curr_queue == NULL ||
> +				    !curr_queue->qp_enabled)
>  					continue;
> 
>  				n = rte_cryptodev_dequeue_burst(cdev_id, qp,
> @@ -605,11 +744,27 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
>  					continue;
> 
>  				done = false;
> +				nb_enqueued = 0;
> +
>  				stats->crypto_deq_count += n;
> -				eca_ops_enqueue_burst(adapter, ops, n);
> +
> +				if (unlikely(!adapter->ebuf.count))
> +					nb_enqueued =
> eca_ops_enqueue_burst(
> +							adapter, ops, n);
> +
> +				if (nb_enqueued == n)
> +					goto check;
> +
> +				/* Failed to enqueue events case */
> +				for (i = nb_enqueued; i < n; i++)
> +					eca_circular_buffer_add(
> +						&adapter->ebuf,
> +						ops[nb_enqueued]);
> +
> +check:
>  				nb_deq += n;
> 
> -				if (nb_deq > max_deq) {
> +				if (nb_deq >= max_deq) {
>  					if ((qp + 1) == dev_qps) {
>  						adapter->next_cdev_id =
>  							(cdev_id + 1)
> @@ -622,6 +777,7 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
>  				}
>  			}
>  		}
> +		adapter->next_cdev_id = 0;
>  	} while (done == false);
>  	return nb_deq;
>  }
> @@ -751,11 +907,10 @@ eca_add_queue_pair(struct event_crypto_adapter
> *adapter, uint8_t cdev_id,
>  			return -ENOMEM;
> 
>  		qpairs = dev_info->qpairs;
> -		qpairs->op_buffer = rte_zmalloc_socket(adapter->mem_name,
> -					BATCH_SIZE *
> -					sizeof(struct rte_crypto_op *),
> -					0, adapter->socket_id);
> -		if (!qpairs->op_buffer) {
> +
> +		if (eca_circular_buffer_init("eca_cdev_circular_buffer",
> +					     &qpairs->cbuf,
> +
> CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
>  			rte_free(qpairs);
>  			return -ENOMEM;
>  		}
> --
> 2.6.4


^ permalink raw reply	[flat|nested] 7+ messages in thread

end of thread, other threads:[~2022-01-13 11:05 UTC | newest]

Thread overview: 7+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-12-21 11:31 [PATCH v1 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
2021-12-21 11:31 ` [PATCH v1 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
2022-01-04 12:31 ` [PATCH v2 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
2022-01-04 12:31   ` [PATCH v2 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
2022-01-11 10:36   ` [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Ganapati Kundapura
2022-01-11 10:36     ` [PATCH v3 2/2] eventdev: update crypto caps get to return SW cap Ganapati Kundapura
2022-01-13 11:05     ` [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular buffer Gujjar, Abhinandan S

DPDK patches and discussions

This inbox may be cloned and mirrored by anyone:

	git clone --mirror http://inbox.dpdk.org/dev/0 dev/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 dev dev/ http://inbox.dpdk.org/dev \
		dev@dpdk.org
	public-inbox-index dev

Example config snippet for mirrors.
Newsgroup available over NNTP:
	nntp://inbox.dpdk.org/inbox.dpdk.dev


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git