From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga14.intel.com (mga14.intel.com [192.55.52.115]) by dpdk.org (Postfix) with ESMTP id A0E426CB5 for ; Thu, 2 Mar 2017 15:17:19 +0100 (CET) Received: from fmsmga004.fm.intel.com ([10.253.24.48]) by fmsmga103.fm.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 02 Mar 2017 06:17:18 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.35,231,1484035200"; d="scan'208";a="231478998" Received: from silpixa00381633.ir.intel.com (HELO silpixa00381633.ger.corp.intel.com) ([10.237.222.114]) by fmsmga004.fm.intel.com with ESMTP; 02 Mar 2017 06:17:17 -0800 From: Fan Zhang To: dev@dpdk.org Cc: pablo.de.lara.guarch@intel.com, sergio.gonzalez.monroy@intel.com, declan.doherty@intel.com Date: Thu, 2 Mar 2017 14:18:34 +0000 Message-Id: <1488464314-119897-1-git-send-email-roy.fan.zhang@intel.com> X-Mailer: git-send-email 2.7.4 Subject: [dpdk-dev] [PATCH] crypto/scheduler: optimize crypto op ordering X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 02 Mar 2017 14:17:20 -0000 This patch optimizes the crypto op ordering by replacing the ordering method from using rte_reorder library to using rte_ring to avoid unnecessary crypto op storing and recovering cost. Signed-off-by: Fan Zhang Signed-off-by: Sergio Gonzalez Monroy --- drivers/crypto/scheduler/scheduler_pmd_ops.c | 42 +++--- drivers/crypto/scheduler/scheduler_pmd_private.h | 49 +++++- drivers/crypto/scheduler/scheduler_roundrobin.c | 181 ++--------------------- 3 files changed, 79 insertions(+), 193 deletions(-) diff --git a/drivers/crypto/scheduler/scheduler_pmd_ops.c b/drivers/crypto/scheduler/scheduler_pmd_ops.c index 56624c7..287b2fb 100644 --- a/drivers/crypto/scheduler/scheduler_pmd_ops.c +++ b/drivers/crypto/scheduler/scheduler_pmd_ops.c @@ -63,24 +63,25 @@ scheduler_pmd_config(struct rte_cryptodev *dev) } static int -update_reorder_buff(struct rte_cryptodev *dev, uint16_t qp_id) +update_order_ring(struct rte_cryptodev *dev, uint16_t qp_id) { struct scheduler_ctx *sched_ctx = dev->data->dev_private; struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[qp_id]; if (sched_ctx->reordering_enabled) { - char reorder_buff_name[RTE_CRYPTODEV_NAME_MAX_LEN]; - uint32_t buff_size = sched_ctx->nb_slaves * PER_SLAVE_BUFF_SIZE; + char order_ring_name[RTE_CRYPTODEV_NAME_MAX_LEN]; + uint32_t buff_size = rte_align32pow2( + sched_ctx->nb_slaves * PER_SLAVE_BUFF_SIZE); - if (qp_ctx->reorder_buf) { - rte_reorder_free(qp_ctx->reorder_buf); - qp_ctx->reorder_buf = NULL; + if (qp_ctx->order_ring) { + rte_ring_free(qp_ctx->order_ring); + qp_ctx->order_ring = NULL; } if (!buff_size) return 0; - if (snprintf(reorder_buff_name, RTE_CRYPTODEV_NAME_MAX_LEN, + if (snprintf(order_ring_name, RTE_CRYPTODEV_NAME_MAX_LEN, "%s_rb_%u_%u", RTE_STR(CRYPTODEV_NAME_SCHEDULER_PMD), dev->data->dev_id, qp_id) < 0) { CS_LOG_ERR("failed to create unique reorder buffer " @@ -88,16 +89,17 @@ update_reorder_buff(struct rte_cryptodev *dev, uint16_t qp_id) return -ENOMEM; } - qp_ctx->reorder_buf = rte_reorder_create(reorder_buff_name, - rte_socket_id(), buff_size); - if (!qp_ctx->reorder_buf) { - CS_LOG_ERR("failed to create reorder buffer"); + qp_ctx->order_ring = rte_ring_create(order_ring_name, + buff_size, rte_socket_id(), + RING_F_SP_ENQ | RING_F_SC_DEQ); + if (!qp_ctx->order_ring) { + CS_LOG_ERR("failed to create order ring"); return -ENOMEM; } } else { - if (qp_ctx->reorder_buf) { - rte_reorder_free(qp_ctx->reorder_buf); - qp_ctx->reorder_buf = NULL; + if (qp_ctx->order_ring) { + rte_ring_free(qp_ctx->order_ring); + qp_ctx->order_ring = NULL; } } @@ -116,7 +118,7 @@ scheduler_pmd_start(struct rte_cryptodev *dev) return 0; for (i = 0; i < dev->data->nb_queue_pairs; i++) { - ret = update_reorder_buff(dev, i); + ret = update_order_ring(dev, i); if (ret < 0) { CS_LOG_ERR("Failed to update reorder buffer"); return ret; @@ -224,9 +226,9 @@ scheduler_pmd_close(struct rte_cryptodev *dev) for (i = 0; i < dev->data->nb_queue_pairs; i++) { struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[i]; - if (qp_ctx->reorder_buf) { - rte_reorder_free(qp_ctx->reorder_buf); - qp_ctx->reorder_buf = NULL; + if (qp_ctx->order_ring) { + rte_ring_free(qp_ctx->order_ring); + qp_ctx->order_ring = NULL; } if (qp_ctx->private_qp_ctx) { @@ -324,8 +326,8 @@ scheduler_pmd_qp_release(struct rte_cryptodev *dev, uint16_t qp_id) if (!qp_ctx) return 0; - if (qp_ctx->reorder_buf) - rte_reorder_free(qp_ctx->reorder_buf); + if (qp_ctx->order_ring) + rte_ring_free(qp_ctx->order_ring); if (qp_ctx->private_qp_ctx) rte_free(qp_ctx->private_qp_ctx); diff --git a/drivers/crypto/scheduler/scheduler_pmd_private.h b/drivers/crypto/scheduler/scheduler_pmd_private.h index ac4690e..f5348f6 100644 --- a/drivers/crypto/scheduler/scheduler_pmd_private.h +++ b/drivers/crypto/scheduler/scheduler_pmd_private.h @@ -34,9 +34,7 @@ #ifndef _SCHEDULER_PMD_PRIVATE_H #define _SCHEDULER_PMD_PRIVATE_H -#include -#include -#include +#include "rte_cryptodev_scheduler.h" /**< Maximum number of bonded devices per devices */ #ifndef MAX_SLAVES_NUM @@ -101,7 +99,7 @@ struct scheduler_qp_ctx { rte_cryptodev_scheduler_burst_enqueue_t schedule_enqueue; rte_cryptodev_scheduler_burst_dequeue_t schedule_dequeue; - struct rte_reorder_buffer *reorder_buf; + struct rte_ring *order_ring; uint32_t seqn; } __rte_cache_aligned; @@ -109,6 +107,49 @@ struct scheduler_session { struct rte_cryptodev_sym_session *sessions[MAX_SLAVES_NUM]; }; +static inline uint16_t __attribute__((always_inline)) +get_max_enqueue_order_count(struct rte_ring *order_ring, uint16_t nb_ops) +{ + uint32_t count = rte_ring_free_count(order_ring); + + return count > nb_ops ? nb_ops : count; +} + +static inline void __attribute__((always_inline)) +scheduler_order_insert(struct rte_ring *order_ring, + struct rte_crypto_op **ops, uint16_t nb_ops) +{ + rte_ring_sp_enqueue_burst(order_ring, (void **)ops, nb_ops); +} + +#define SCHEDULER_GET_RING_OBJ(order_ring, pos) \ + order_ring->ring[(order_ring->cons.head + pos) & order_ring->prod.mask] + +static inline uint16_t __attribute__((always_inline)) +scheduler_order_drain(struct rte_ring *order_ring, + struct rte_crypto_op **ops, uint16_t nb_ops) +{ + struct rte_crypto_op *op; + uint32_t nb_objs = rte_ring_count(order_ring); + uint32_t nb_ops_to_deq = 0; + int status = -1; + + if (nb_objs > nb_ops) + nb_objs = nb_ops; + + while (nb_ops_to_deq < nb_objs) { + op = SCHEDULER_GET_RING_OBJ(order_ring, nb_ops_to_deq); + if (op->status == RTE_CRYPTO_OP_STATUS_NOT_PROCESSED) + break; + nb_ops_to_deq++; + } + + if (nb_ops_to_deq) + status = rte_ring_sc_dequeue_bulk(order_ring, (void **)ops, + nb_ops_to_deq); + + return (status == 0) ? nb_ops_to_deq : 0; +} /** device specific operations function pointer structure */ extern struct rte_cryptodev_ops *rte_crypto_scheduler_pmd_ops; diff --git a/drivers/crypto/scheduler/scheduler_roundrobin.c b/drivers/crypto/scheduler/scheduler_roundrobin.c index 9545aa9..52f8c5e 100644 --- a/drivers/crypto/scheduler/scheduler_roundrobin.c +++ b/drivers/crypto/scheduler/scheduler_roundrobin.c @@ -115,80 +115,16 @@ static uint16_t schedule_enqueue_ordering(void *qp_ctx, struct rte_crypto_op **ops, uint16_t nb_ops) { - struct scheduler_qp_ctx *gen_qp_ctx = qp_ctx; - struct rr_scheduler_qp_ctx *rr_qp_ctx = - gen_qp_ctx->private_qp_ctx; - uint32_t slave_idx = rr_qp_ctx->last_enq_slave_idx; - struct scheduler_slave *slave = &rr_qp_ctx->slaves[slave_idx]; - uint16_t i, processed_ops; - struct rte_cryptodev_sym_session *sessions[nb_ops]; - struct scheduler_session *sess0, *sess1, *sess2, *sess3; - - if (unlikely(nb_ops == 0)) - return 0; - - for (i = 0; i < nb_ops && i < 4; i++) { - rte_prefetch0(ops[i]->sym->session); - rte_prefetch0(ops[i]->sym->m_src); - } - - for (i = 0; (i < (nb_ops - 8)) && (nb_ops > 8); i += 4) { - sess0 = (struct scheduler_session *) - ops[i]->sym->session->_private; - sess1 = (struct scheduler_session *) - ops[i+1]->sym->session->_private; - sess2 = (struct scheduler_session *) - ops[i+2]->sym->session->_private; - sess3 = (struct scheduler_session *) - ops[i+3]->sym->session->_private; - - sessions[i] = ops[i]->sym->session; - sessions[i + 1] = ops[i + 1]->sym->session; - sessions[i + 2] = ops[i + 2]->sym->session; - sessions[i + 3] = ops[i + 3]->sym->session; - - ops[i]->sym->session = sess0->sessions[slave_idx]; - ops[i]->sym->m_src->seqn = gen_qp_ctx->seqn++; - ops[i + 1]->sym->session = sess1->sessions[slave_idx]; - ops[i + 1]->sym->m_src->seqn = gen_qp_ctx->seqn++; - ops[i + 2]->sym->session = sess2->sessions[slave_idx]; - ops[i + 2]->sym->m_src->seqn = gen_qp_ctx->seqn++; - ops[i + 3]->sym->session = sess3->sessions[slave_idx]; - ops[i + 3]->sym->m_src->seqn = gen_qp_ctx->seqn++; - - rte_prefetch0(ops[i + 4]->sym->session); - rte_prefetch0(ops[i + 4]->sym->m_src); - rte_prefetch0(ops[i + 5]->sym->session); - rte_prefetch0(ops[i + 5]->sym->m_src); - rte_prefetch0(ops[i + 6]->sym->session); - rte_prefetch0(ops[i + 6]->sym->m_src); - rte_prefetch0(ops[i + 7]->sym->session); - rte_prefetch0(ops[i + 7]->sym->m_src); - } - - for (; i < nb_ops; i++) { - sess0 = (struct scheduler_session *) - ops[i]->sym->session->_private; - sessions[i] = ops[i]->sym->session; - ops[i]->sym->session = sess0->sessions[slave_idx]; - ops[i]->sym->m_src->seqn = gen_qp_ctx->seqn++; - } - - processed_ops = rte_cryptodev_enqueue_burst(slave->dev_id, - slave->qp_id, ops, nb_ops); - - slave->nb_inflight_cops += processed_ops; + struct rte_ring *order_ring = + ((struct scheduler_qp_ctx *)qp_ctx)->order_ring; + uint16_t nb_ops_to_enq = get_max_enqueue_order_count(order_ring, + nb_ops); + uint16_t nb_ops_enqd = schedule_enqueue(qp_ctx, ops, + nb_ops_to_enq); - rr_qp_ctx->last_enq_slave_idx += 1; - rr_qp_ctx->last_enq_slave_idx %= rr_qp_ctx->nb_slaves; + scheduler_order_insert(order_ring, ops, nb_ops_enqd); - /* recover session if enqueue is failed */ - if (unlikely(processed_ops < nb_ops)) { - for (i = processed_ops; i < nb_ops; i++) - ops[i]->sym->session = sessions[i]; - } - - return processed_ops; + return nb_ops_enqd; } @@ -233,105 +169,12 @@ static uint16_t schedule_dequeue_ordering(void *qp_ctx, struct rte_crypto_op **ops, uint16_t nb_ops) { - struct scheduler_qp_ctx *gen_qp_ctx = (struct scheduler_qp_ctx *)qp_ctx; - struct rr_scheduler_qp_ctx *rr_qp_ctx = (gen_qp_ctx->private_qp_ctx); - struct scheduler_slave *slave; - struct rte_reorder_buffer *reorder_buff = gen_qp_ctx->reorder_buf; - struct rte_mbuf *mbuf0, *mbuf1, *mbuf2, *mbuf3; - uint16_t nb_deq_ops, nb_drained_mbufs; - const uint16_t nb_op_ops = nb_ops; - struct rte_crypto_op *op_ops[nb_op_ops]; - struct rte_mbuf *reorder_mbufs[nb_op_ops]; - uint32_t last_slave_idx = rr_qp_ctx->last_deq_slave_idx; - uint16_t i; + struct rte_ring *order_ring = + ((struct scheduler_qp_ctx *)qp_ctx)->order_ring; - if (unlikely(rr_qp_ctx->slaves[last_slave_idx].nb_inflight_cops == 0)) { - do { - last_slave_idx += 1; - - if (unlikely(last_slave_idx >= rr_qp_ctx->nb_slaves)) - last_slave_idx = 0; - /* looped back, means no inflight cops in the queue */ - if (last_slave_idx == rr_qp_ctx->last_deq_slave_idx) - return 0; - } while (rr_qp_ctx->slaves[last_slave_idx].nb_inflight_cops - == 0); - } - - slave = &rr_qp_ctx->slaves[last_slave_idx]; - - nb_deq_ops = rte_cryptodev_dequeue_burst(slave->dev_id, - slave->qp_id, op_ops, nb_ops); - - rr_qp_ctx->last_deq_slave_idx += 1; - rr_qp_ctx->last_deq_slave_idx %= rr_qp_ctx->nb_slaves; - - slave->nb_inflight_cops -= nb_deq_ops; - - for (i = 0; i < nb_deq_ops && i < 4; i++) - rte_prefetch0(op_ops[i]->sym->m_src); - - for (i = 0; (i < (nb_deq_ops - 8)) && (nb_deq_ops > 8); i += 4) { - mbuf0 = op_ops[i]->sym->m_src; - mbuf1 = op_ops[i + 1]->sym->m_src; - mbuf2 = op_ops[i + 2]->sym->m_src; - mbuf3 = op_ops[i + 3]->sym->m_src; - - mbuf0->userdata = op_ops[i]; - mbuf1->userdata = op_ops[i + 1]; - mbuf2->userdata = op_ops[i + 2]; - mbuf3->userdata = op_ops[i + 3]; - - rte_reorder_insert(reorder_buff, mbuf0); - rte_reorder_insert(reorder_buff, mbuf1); - rte_reorder_insert(reorder_buff, mbuf2); - rte_reorder_insert(reorder_buff, mbuf3); - - rte_prefetch0(op_ops[i + 4]->sym->m_src); - rte_prefetch0(op_ops[i + 5]->sym->m_src); - rte_prefetch0(op_ops[i + 6]->sym->m_src); - rte_prefetch0(op_ops[i + 7]->sym->m_src); - } - - for (; i < nb_deq_ops; i++) { - mbuf0 = op_ops[i]->sym->m_src; - mbuf0->userdata = op_ops[i]; - rte_reorder_insert(reorder_buff, mbuf0); - } - - nb_drained_mbufs = rte_reorder_drain(reorder_buff, reorder_mbufs, - nb_ops); - for (i = 0; i < nb_drained_mbufs && i < 4; i++) - rte_prefetch0(reorder_mbufs[i]); - - for (i = 0; (i < (nb_drained_mbufs - 8)) && (nb_drained_mbufs > 8); - i += 4) { - ops[i] = *(struct rte_crypto_op **)reorder_mbufs[i]->userdata; - ops[i + 1] = *(struct rte_crypto_op **) - reorder_mbufs[i + 1]->userdata; - ops[i + 2] = *(struct rte_crypto_op **) - reorder_mbufs[i + 2]->userdata; - ops[i + 3] = *(struct rte_crypto_op **) - reorder_mbufs[i + 3]->userdata; - - reorder_mbufs[i]->userdata = NULL; - reorder_mbufs[i + 1]->userdata = NULL; - reorder_mbufs[i + 2]->userdata = NULL; - reorder_mbufs[i + 3]->userdata = NULL; - - rte_prefetch0(reorder_mbufs[i + 4]); - rte_prefetch0(reorder_mbufs[i + 5]); - rte_prefetch0(reorder_mbufs[i + 6]); - rte_prefetch0(reorder_mbufs[i + 7]); - } - - for (; i < nb_drained_mbufs; i++) { - ops[i] = *(struct rte_crypto_op **) - reorder_mbufs[i]->userdata; - reorder_mbufs[i]->userdata = NULL; - } + schedule_dequeue(qp_ctx, ops, nb_ops); - return nb_drained_mbufs; + return scheduler_order_drain(order_ring, ops, nb_ops); } static int -- 2.7.4