From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga07.intel.com (mga07.intel.com [134.134.136.100]) by dpdk.org (Postfix) with ESMTP id A1998374F for ; Fri, 10 Mar 2017 20:43:55 +0100 (CET) Received: from fmsmga001.fm.intel.com ([10.253.24.23]) by orsmga105.jf.intel.com with ESMTP; 10 Mar 2017 11:43:55 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.36,142,1486454400"; d="scan'208";a="1121055606" Received: from silpixa00398672.ir.intel.com ([10.237.223.128]) by fmsmga001.fm.intel.com with ESMTP; 10 Mar 2017 11:43:53 -0800 From: Harry van Haaren To: dev@dpdk.org Cc: jerin.jacob@caviumnetworks.com, Bruce Richardson , Harry van Haaren Date: Fri, 10 Mar 2017 19:43:24 +0000 Message-Id: <1489175012-101439-10-git-send-email-harry.van.haaren@intel.com> X-Mailer: git-send-email 2.7.4 In-Reply-To: <1489175012-101439-1-git-send-email-harry.van.haaren@intel.com> References: <1487343252-16092-1-git-send-email-harry.van.haaren@intel.com> <1489175012-101439-1-git-send-email-harry.van.haaren@intel.com> Subject: [dpdk-dev] [PATCH v4 09/17] event/sw: add support for event queues X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Fri, 10 Mar 2017 19:43:56 -0000 From: Bruce Richardson Add in the data structures for the event queues, and the eventdev functions to create and destroy those queues. Signed-off-by: Bruce Richardson Signed-off-by: Harry van Haaren --- v3 -> v4: - return -ENOTSUP for CFG_ALL_TYPES, as it is not supported - fix bug when reordered queue is re-configured for second time --- drivers/event/sw/iq_ring.h | 176 ++++++++++++++++++++++++++++++++++++++++++++ drivers/event/sw/sw_evdev.c | 166 +++++++++++++++++++++++++++++++++++++++++ drivers/event/sw/sw_evdev.h | 5 ++ 3 files changed, 347 insertions(+) create mode 100644 drivers/event/sw/iq_ring.h diff --git a/drivers/event/sw/iq_ring.h b/drivers/event/sw/iq_ring.h new file mode 100644 index 0000000..d480d15 --- /dev/null +++ b/drivers/event/sw/iq_ring.h @@ -0,0 +1,176 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2016-2017 Intel Corporation. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * Ring structure definitions used for the internal ring buffers of the + * SW eventdev implementation. These are designed for single-core use only. + */ +#ifndef _IQ_RING_ +#define _IQ_RING_ + +#include + +#include +#include +#include +#include + +#define IQ_RING_NAMESIZE 12 +#define QID_IQ_DEPTH 512 +#define QID_IQ_MASK (uint16_t)(QID_IQ_DEPTH - 1) + +struct iq_ring { + char name[IQ_RING_NAMESIZE] __rte_cache_aligned; + uint16_t write_idx; + uint16_t read_idx; + + struct rte_event ring[QID_IQ_DEPTH]; +}; + +#ifndef force_inline +#define force_inline inline __attribute__((always_inline)) +#endif + +static inline struct iq_ring * +iq_ring_create(const char *name, unsigned int socket_id) +{ + struct iq_ring *retval; + + retval = rte_malloc_socket(NULL, sizeof(*retval), 0, socket_id); + if (retval == NULL) + goto end; + + snprintf(retval->name, sizeof(retval->name), "%s", name); + retval->write_idx = retval->read_idx = 0; +end: + return retval; +} + +static inline void +iq_ring_destroy(struct iq_ring *r) +{ + rte_free(r); +} + +static force_inline uint16_t +iq_ring_count(const struct iq_ring *r) +{ + return r->write_idx - r->read_idx; +} + +static force_inline uint16_t +iq_ring_free_count(const struct iq_ring *r) +{ + return QID_IQ_MASK - iq_ring_count(r); +} + +static force_inline uint16_t +iq_ring_enqueue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes) +{ + const uint16_t read = r->read_idx; + uint16_t write = r->write_idx; + const uint16_t space = read + QID_IQ_MASK - write; + uint16_t i; + + if (space < nb_qes) + nb_qes = space; + + for (i = 0; i < nb_qes; i++, write++) + r->ring[write & QID_IQ_MASK] = qes[i]; + + r->write_idx = write; + + return nb_qes; +} + +static force_inline uint16_t +iq_ring_dequeue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes) +{ + uint16_t read = r->read_idx; + const uint16_t write = r->write_idx; + const uint16_t items = write - read; + uint16_t i; + + for (i = 0; i < nb_qes; i++, read++) + qes[i] = r->ring[read & QID_IQ_MASK]; + + if (items < nb_qes) + nb_qes = items; + + r->read_idx += nb_qes; + + return nb_qes; +} + +/* assumes there is space, from a previous dequeue_burst */ +static force_inline uint16_t +iq_ring_put_back(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes) +{ + uint16_t i, read = r->read_idx; + + for (i = nb_qes; i-- > 0; ) + r->ring[--read & QID_IQ_MASK] = qes[i]; + + r->read_idx = read; + return nb_qes; +} + +static force_inline const struct rte_event * +iq_ring_peek(const struct iq_ring *r) +{ + return &r->ring[r->read_idx & QID_IQ_MASK]; +} + +static force_inline void +iq_ring_pop(struct iq_ring *r) +{ + r->read_idx++; +} + +static force_inline int +iq_ring_enqueue(struct iq_ring *r, const struct rte_event *qe) +{ + const uint16_t read = r->read_idx; + const uint16_t write = r->write_idx; + const uint16_t space = read + QID_IQ_MASK - write; + + if (space == 0) + return -1; + + r->ring[write & QID_IQ_MASK] = *qe; + + r->write_idx = write + 1; + + return 0; +} + +#endif diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c index d1fa3a7..eaf8e77 100644 --- a/drivers/event/sw/sw_evdev.c +++ b/drivers/event/sw/sw_evdev.c @@ -38,12 +38,176 @@ #include #include "sw_evdev.h" +#include "iq_ring.h" #define EVENTDEV_NAME_SW_PMD event_sw #define NUMA_NODE_ARG "numa_node" #define SCHED_QUANTA_ARG "sched_quanta" #define CREDIT_QUANTA_ARG "credit_quanta" +static int32_t +qid_init(struct sw_evdev *sw, unsigned int idx, int type, + const struct rte_event_queue_conf *queue_conf) +{ + unsigned int i; + int dev_id = sw->data->dev_id; + int socket_id = sw->data->socket_id; + char buf[IQ_RING_NAMESIZE]; + struct sw_qid *qid = &sw->qids[idx]; + + for (i = 0; i < SW_IQS_MAX; i++) { + snprintf(buf, sizeof(buf), "q_%u_iq_%d", idx, i); + qid->iq[i] = iq_ring_create(buf, socket_id); + if (!qid->iq[i]) { + SW_LOG_DBG("ring create failed"); + goto cleanup; + } + } + + /* Initialize the FID structures to no pinning (-1), and zero packets */ + const struct sw_fid_t fid = {.cq = -1, .pcount = 0}; + for (i = 0; i < RTE_DIM(qid->fids); i++) + qid->fids[i] = fid; + + qid->id = idx; + qid->type = type; + qid->priority = queue_conf->priority; + + if (qid->type == RTE_SCHED_TYPE_ORDERED) { + char ring_name[RTE_RING_NAMESIZE]; + uint32_t window_size; + + /* rte_ring and window_size_mask require require window_size to + * be a power-of-2. + */ + window_size = rte_align32pow2( + queue_conf->nb_atomic_order_sequences); + + qid->window_size = window_size - 1; + + if (!window_size) { + SW_LOG_DBG( + "invalid reorder_window_size for ordered queue\n" + ); + goto cleanup; + } + + snprintf(buf, sizeof(buf), "sw%d_iq_%d_rob", dev_id, i); + qid->reorder_buffer = rte_zmalloc_socket(buf, + window_size * sizeof(qid->reorder_buffer[0]), + 0, socket_id); + if (!qid->reorder_buffer) { + SW_LOG_DBG("reorder_buffer malloc failed\n"); + goto cleanup; + } + + memset(&qid->reorder_buffer[0], + 0, + window_size * sizeof(qid->reorder_buffer[0])); + + snprintf(ring_name, sizeof(ring_name), "sw%d_q%d_freelist", + dev_id, idx); + + /* lookup the ring, and if it already exists, free it */ + struct rte_ring *cleanup = rte_ring_lookup(ring_name); + if (cleanup) + rte_ring_free(cleanup); + + qid->reorder_buffer_freelist = rte_ring_create(ring_name, + window_size, + socket_id, + RING_F_SP_ENQ | RING_F_SC_DEQ); + if (!qid->reorder_buffer_freelist) { + SW_LOG_DBG("freelist ring create failed"); + goto cleanup; + } + + /* Populate the freelist with reorder buffer entries. Enqueue + * 'window_size - 1' entries because the rte_ring holds only + * that many. + */ + for (i = 0; i < window_size - 1; i++) { + if (rte_ring_sp_enqueue(qid->reorder_buffer_freelist, + &qid->reorder_buffer[i]) < 0) + goto cleanup; + } + + qid->reorder_buffer_index = 0; + qid->cq_next_tx = 0; + } + + qid->initialized = 1; + + return 0; + +cleanup: + for (i = 0; i < SW_IQS_MAX; i++) { + if (qid->iq[i]) + iq_ring_destroy(qid->iq[i]); + } + + if (qid->reorder_buffer) { + rte_free(qid->reorder_buffer); + qid->reorder_buffer = NULL; + } + + if (qid->reorder_buffer_freelist) { + rte_ring_free(qid->reorder_buffer_freelist); + qid->reorder_buffer_freelist = NULL; + } + + return -EINVAL; +} + +static int +sw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id, + const struct rte_event_queue_conf *conf) +{ + int type; + + switch (conf->event_queue_cfg) { + case RTE_EVENT_QUEUE_CFG_SINGLE_LINK: + type = SW_SCHED_TYPE_DIRECT; + break; + case RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY: + type = RTE_SCHED_TYPE_ATOMIC; + break; + case RTE_EVENT_QUEUE_CFG_ORDERED_ONLY: + type = RTE_SCHED_TYPE_ORDERED; + break; + case RTE_EVENT_QUEUE_CFG_PARALLEL_ONLY: + type = RTE_SCHED_TYPE_PARALLEL; + break; + case RTE_EVENT_QUEUE_CFG_ALL_TYPES: + SW_LOG_ERR("QUEUE_CFG_ALL_TYPES not supported\n"); + return -ENOTSUP; + default: + SW_LOG_ERR("Unknown queue type %d requested\n", + conf->event_queue_cfg); + return -EINVAL; + } + + struct sw_evdev *sw = sw_pmd_priv(dev); + return qid_init(sw, queue_id, type, conf); +} + +static void +sw_queue_release(struct rte_eventdev *dev, uint8_t id) +{ + struct sw_evdev *sw = sw_pmd_priv(dev); + struct sw_qid *qid = &sw->qids[id]; + uint32_t i; + + for (i = 0; i < SW_IQS_MAX; i++) + iq_ring_destroy(qid->iq[i]); + + if (qid->type == RTE_SCHED_TYPE_ORDERED) { + rte_free(qid->reorder_buffer); + rte_ring_free(qid->reorder_buffer_freelist); + } + memset(qid, 0, sizeof(*qid)); +} + static void sw_queue_def_conf(struct rte_eventdev *dev, uint8_t queue_id, struct rte_event_queue_conf *conf) @@ -147,6 +311,8 @@ sw_probe(const char *name, const char *params) .dev_infos_get = sw_info_get, .queue_def_conf = sw_queue_def_conf, + .queue_setup = sw_queue_setup, + .queue_release = sw_queue_release, .port_def_conf = sw_port_def_conf, }; diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h index fda57df..ddf0cd2 100644 --- a/drivers/event/sw/sw_evdev.h +++ b/drivers/event/sw/sw_evdev.h @@ -52,6 +52,8 @@ #define EVENTDEV_NAME_SW_PMD event_sw #define SW_PMD_NAME RTE_STR(event_sw) +#define SW_SCHED_TYPE_DIRECT (RTE_SCHED_TYPE_PARALLEL + 1) + #ifdef RTE_LIBRTE_PMD_EVDEV_SW_DEBUG #define SW_LOG_INFO(fmt, args...) \ RTE_LOG(INFO, EVENTDEV, "[%s] %s() line %u: " fmt "\n", \ @@ -139,6 +141,9 @@ struct sw_evdev { */ uint32_t nb_events_limit; + /* Internal queues - one per logical queue */ + struct sw_qid qids[RTE_EVENT_MAX_QUEUES_PER_DEV] __rte_cache_aligned; + int32_t sched_quanta; uint32_t credit_update_quanta; -- 2.7.4