From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga07.intel.com (mga07.intel.com [134.134.136.100]) by dpdk.org (Postfix) with ESMTP id 02487374F for ; Fri, 10 Mar 2017 20:43:56 +0100 (CET) Received: from fmsmga001.fm.intel.com ([10.253.24.23]) by orsmga105.jf.intel.com with ESMTP; 10 Mar 2017 11:43:56 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.36,142,1486454400"; d="scan'208";a="1121055611" Received: from silpixa00398672.ir.intel.com ([10.237.223.128]) by fmsmga001.fm.intel.com with ESMTP; 10 Mar 2017 11:43:55 -0800 From: Harry van Haaren To: dev@dpdk.org Cc: jerin.jacob@caviumnetworks.com, Bruce Richardson , Harry van Haaren Date: Fri, 10 Mar 2017 19:43:25 +0000 Message-Id: <1489175012-101439-11-git-send-email-harry.van.haaren@intel.com> X-Mailer: git-send-email 2.7.4 In-Reply-To: <1489175012-101439-1-git-send-email-harry.van.haaren@intel.com> References: <1487343252-16092-1-git-send-email-harry.van.haaren@intel.com> <1489175012-101439-1-git-send-email-harry.van.haaren@intel.com> Subject: [dpdk-dev] [PATCH v4 10/17] event/sw: add support for event ports X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Fri, 10 Mar 2017 19:43:57 -0000 From: Bruce Richardson Add in the data-structures for the ports used by workers to send packets to/from the scheduler. Also add in the functions to create/destroy those ports. Signed-off-by: Bruce Richardson Signed-off-by: Harry van Haaren --- v3 -> v4: - fix port leaking credits on reconfigure --- drivers/event/sw/event_ring.h | 185 ++++++++++++++++++++++++++++++++++++++++++ drivers/event/sw/sw_evdev.c | 88 ++++++++++++++++++++ drivers/event/sw/sw_evdev.h | 78 ++++++++++++++++++ 3 files changed, 351 insertions(+) create mode 100644 drivers/event/sw/event_ring.h diff --git a/drivers/event/sw/event_ring.h b/drivers/event/sw/event_ring.h new file mode 100644 index 0000000..cdaee95 --- /dev/null +++ b/drivers/event/sw/event_ring.h @@ -0,0 +1,185 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2016-2017 Intel Corporation. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * Generic ring structure for passing events from one core to another. + * + * Used by the software scheduler for the producer and consumer rings for + * each port, i.e. for passing events from worker cores to scheduler and + * vice-versa. Designed for single-producer, single-consumer use with two + * cores working on each ring. + */ + +#ifndef _EVENT_RING_ +#define _EVENT_RING_ + +#include + +#include +#include +#include + +#define QE_RING_NAMESIZE 32 + +struct qe_ring { + char name[QE_RING_NAMESIZE] __rte_cache_aligned; + uint32_t ring_size; /* size of memory block allocated to the ring */ + uint32_t mask; /* mask for read/write values == ring_size -1 */ + uint32_t size; /* actual usable space in the ring */ + volatile uint32_t write_idx __rte_cache_aligned; + volatile uint32_t read_idx __rte_cache_aligned; + + struct rte_event ring[0] __rte_cache_aligned; +}; + +#ifndef force_inline +#define force_inline inline __attribute__((always_inline)) +#endif + +static inline struct qe_ring * +qe_ring_create(const char *name, unsigned int size, unsigned int socket_id) +{ + struct qe_ring *retval; + const uint32_t ring_size = rte_align32pow2(size + 1); + size_t memsize = sizeof(*retval) + + (ring_size * sizeof(retval->ring[0])); + + retval = rte_zmalloc_socket(NULL, memsize, 0, socket_id); + if (retval == NULL) + goto end; + + snprintf(retval->name, sizeof(retval->name), "EVDEV_RG_%s", name); + retval->ring_size = ring_size; + retval->mask = ring_size - 1; + retval->size = size; +end: + return retval; +} + +static inline void +qe_ring_destroy(struct qe_ring *r) +{ + rte_free(r); +} + +static force_inline unsigned int +qe_ring_count(const struct qe_ring *r) +{ + return r->write_idx - r->read_idx; +} + +static force_inline unsigned int +qe_ring_free_count(const struct qe_ring *r) +{ + return r->size - qe_ring_count(r); +} + +static force_inline unsigned int +qe_ring_enqueue_burst(struct qe_ring *r, const struct rte_event *qes, + unsigned int nb_qes, uint16_t *free_count) +{ + const uint32_t size = r->size; + const uint32_t mask = r->mask; + const uint32_t read = r->read_idx; + uint32_t write = r->write_idx; + const uint32_t space = read + size - write; + uint32_t i; + + if (space < nb_qes) + nb_qes = space; + + for (i = 0; i < nb_qes; i++, write++) + r->ring[write & mask] = qes[i]; + + rte_smp_wmb(); + + if (nb_qes != 0) + r->write_idx = write; + + *free_count = space - nb_qes; + + return nb_qes; +} + +static force_inline unsigned int +qe_ring_enqueue_burst_with_ops(struct qe_ring *r, const struct rte_event *qes, + unsigned int nb_qes, uint8_t *ops) +{ + const uint32_t size = r->size; + const uint32_t mask = r->mask; + const uint32_t read = r->read_idx; + uint32_t write = r->write_idx; + const uint32_t space = read + size - write; + uint32_t i; + + if (space < nb_qes) + nb_qes = space; + + for (i = 0; i < nb_qes; i++, write++) { + r->ring[write & mask] = qes[i]; + r->ring[write & mask].op = ops[i]; + } + + rte_smp_wmb(); + + if (nb_qes != 0) + r->write_idx = write; + + return nb_qes; +} + +static force_inline unsigned int +qe_ring_dequeue_burst(struct qe_ring *r, struct rte_event *qes, + unsigned int nb_qes) +{ + const uint32_t mask = r->mask; + uint32_t read = r->read_idx; + const uint32_t write = r->write_idx; + const uint32_t items = write - read; + uint32_t i; + + if (items < nb_qes) + nb_qes = items; + + + for (i = 0; i < nb_qes; i++, read++) + qes[i] = r->ring[read & mask]; + + rte_smp_rmb(); + + if (nb_qes != 0) + r->read_idx += nb_qes; + + return nb_qes; +} + +#endif diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c index eaf8e77..4b8370d 100644 --- a/drivers/event/sw/sw_evdev.c +++ b/drivers/event/sw/sw_evdev.c @@ -39,12 +39,98 @@ #include "sw_evdev.h" #include "iq_ring.h" +#include "event_ring.h" #define EVENTDEV_NAME_SW_PMD event_sw #define NUMA_NODE_ARG "numa_node" #define SCHED_QUANTA_ARG "sched_quanta" #define CREDIT_QUANTA_ARG "credit_quanta" +static void +sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info); + +static int +sw_port_setup(struct rte_eventdev *dev, uint8_t port_id, + const struct rte_event_port_conf *conf) +{ + struct sw_evdev *sw = sw_pmd_priv(dev); + struct sw_port *p = &sw->ports[port_id]; + char buf[QE_RING_NAMESIZE]; + unsigned int i; + + struct rte_event_dev_info info; + sw_info_get(dev, &info); + + uint8_t enq_oversize = + conf->enqueue_depth > info.max_event_port_enqueue_depth; + uint8_t deq_oversize = + conf->dequeue_depth > info.max_event_port_dequeue_depth; + if (enq_oversize || deq_oversize) + return -EINVAL; + + + /* detect re-configuring and return credits to instance if needed */ + if (p->initialized) { + /* taking credits from pool is done one quanta at a time, and + * credits may be spend (counted in p->inflights) or still + * available in the port (p->inflight_credits). We must return + * the sum to no leak credits + */ + int possible_inflights = p->inflight_credits + p->inflights; + rte_atomic32_sub(&sw->inflights, possible_inflights); + } + + *p = (struct sw_port){0}; /* zero entire structure */ + p->id = port_id; + p->sw = sw; + + snprintf(buf, sizeof(buf), "sw%d_%s", dev->data->dev_id, + "rx_worker_ring"); + p->rx_worker_ring = qe_ring_create(buf, MAX_SW_PROD_Q_DEPTH, + dev->data->socket_id); + if (p->rx_worker_ring == NULL) { + printf("%s %d: error creating RX worker ring\n", + __func__, __LINE__); + return -1; + } + + p->inflight_max = conf->new_event_threshold; + + snprintf(buf, sizeof(buf), "sw%d_%s", dev->data->dev_id, + "cq_worker_ring"); + p->cq_worker_ring = qe_ring_create(buf, conf->dequeue_depth, + dev->data->socket_id); + if (p->cq_worker_ring == NULL) { + qe_ring_destroy(p->rx_worker_ring); + printf("%s %d: error creating CQ worker ring\n", + __func__, __LINE__); + return -1; + } + sw->cq_ring_space[port_id] = conf->dequeue_depth; + + /* set hist list contents to empty */ + for (i = 0; i < SW_PORT_HIST_LIST; i++) { + p->hist_list[i].fid = -1; + p->hist_list[i].qid = -1; + } + dev->data->ports[port_id] = p; + p->initialized = 1; + + return 0; +} + +static void +sw_port_release(void *port) +{ + struct sw_port *p = (void *)port; + if (p == NULL) + return; + + qe_ring_destroy(p->rx_worker_ring); + qe_ring_destroy(p->cq_worker_ring); + memset(p, 0, sizeof(*p)); +} + static int32_t qid_init(struct sw_evdev *sw, unsigned int idx, int type, const struct rte_event_queue_conf *queue_conf) @@ -314,6 +400,8 @@ sw_probe(const char *name, const char *params) .queue_setup = sw_queue_setup, .queue_release = sw_queue_release, .port_def_conf = sw_port_def_conf, + .port_setup = sw_port_setup, + .port_release = sw_port_release, }; static const char *const args[] = { diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h index ddf0cd2..1bedd63 100644 --- a/drivers/event/sw/sw_evdev.h +++ b/drivers/event/sw/sw_evdev.h @@ -49,6 +49,13 @@ #define MAX_SW_PROD_Q_DEPTH 4096 #define SW_FRAGMENTS_MAX 16 +/* report dequeue burst sizes in buckets */ +#define SW_DEQ_STAT_BUCKET_SHIFT 2 +/* how many packets pulled from port by sched */ +#define SCHED_DEQUEUE_BURST_SIZE 32 + +#define SW_PORT_HIST_LIST (MAX_SW_PROD_Q_DEPTH) /* size of our history list */ + #define EVENTDEV_NAME_SW_PMD event_sw #define SW_PMD_NAME RTE_STR(event_sw) @@ -129,12 +136,80 @@ struct sw_qid { uint8_t priority; }; +struct sw_hist_list_entry { + int32_t qid; + int32_t fid; + struct reorder_buffer_entry *rob_entry; +}; + +struct sw_evdev; + +struct sw_port { + /* new enqueue / dequeue API doesn't have an instance pointer, only the + * pointer to the port being enqueue/dequeued from + */ + struct sw_evdev *sw; + + /* set when the port is initialized */ + uint8_t initialized; + /* A numeric ID for the port */ + uint8_t id; + + int16_t is_directed; /** Takes from a single directed QID */ + /** + * For loadbalanced we can optimise pulling packets from + * producers if there is no reordering involved + */ + int16_t num_ordered_qids; + + /** Ring and buffer for pulling events from workers for scheduling */ + struct qe_ring *rx_worker_ring __rte_cache_aligned; + /** Ring and buffer for pushing packets to workers after scheduling */ + struct qe_ring *cq_worker_ring; + + /* hole */ + + /* num releases yet to be completed on this port */ + uint16_t outstanding_releases __rte_cache_aligned; + uint16_t inflight_max; /* app requested max inflights for this port */ + uint16_t inflight_credits; /* num credits this port has right now */ + + uint16_t last_dequeue_burst_sz; /* how big the burst was */ + uint64_t last_dequeue_ticks; /* used to track burst processing time */ + uint64_t avg_pkt_ticks; /* tracks average over NUM_SAMPLES burst */ + uint64_t total_polls; /* how many polls were counted in stats */ + uint64_t zero_polls; /* tracks polls returning nothing */ + uint32_t poll_buckets[MAX_SW_CONS_Q_DEPTH >> SW_DEQ_STAT_BUCKET_SHIFT]; + /* bucket values in 4s for shorter reporting */ + + /* History list structs, containing info on pkts egressed to worker */ + uint16_t hist_head __rte_cache_aligned; + uint16_t hist_tail; + uint16_t inflights; + struct sw_hist_list_entry hist_list[SW_PORT_HIST_LIST]; + + /* track packets in and out of this port */ + struct sw_point_stats stats; + + + uint32_t pp_buf_start; + uint32_t pp_buf_count; + uint16_t cq_buf_count; + struct rte_event pp_buf[SCHED_DEQUEUE_BURST_SIZE]; + struct rte_event cq_buf[MAX_SW_CONS_Q_DEPTH]; + + uint8_t num_qids_mapped; +}; + struct sw_evdev { struct rte_eventdev_data *data; uint32_t port_count; uint32_t qid_count; + /* Contains all ports - load balanced and directed */ + struct sw_port ports[SW_PORTS_MAX] __rte_cache_aligned; + /* * max events in this instance. Cached here for performance. * (also available in data->conf.nb_events_limit) @@ -144,6 +219,9 @@ struct sw_evdev { /* Internal queues - one per logical queue */ struct sw_qid qids[RTE_EVENT_MAX_QUEUES_PER_DEV] __rte_cache_aligned; + /* Cache how many packets are in each cq */ + uint16_t cq_ring_space[SW_PORTS_MAX] __rte_cache_aligned; + int32_t sched_quanta; uint32_t credit_update_quanta; -- 2.7.4