From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga07.intel.com (mga07.intel.com [134.134.136.100]) by dpdk.org (Postfix) with ESMTP id 9E621378B for ; Wed, 7 Jun 2017 16:05:43 +0200 (CEST) Received: from fmsmga001.fm.intel.com ([10.253.24.23]) by orsmga105.jf.intel.com with ESMTP; 07 Jun 2017 07:05:43 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.39,311,1493708400"; d="scan'208";a="1157667648" Received: from silpixa00399126.ir.intel.com (HELO silpixa00399126.ger.corp.intel.com) ([10.237.223.223]) by fmsmga001.fm.intel.com with ESMTP; 07 Jun 2017 07:05:41 -0700 From: Bruce Richardson To: dev@dpdk.org Cc: olivier.matz@6wind.com, jerin.jacob@caviumnetworks.com, Bruce Richardson Date: Wed, 7 Jun 2017 14:36:20 +0100 Message-Id: <20170607133620.275801-6-bruce.richardson@intel.com> X-Mailer: git-send-email 2.9.4 In-Reply-To: <20170607133620.275801-1-bruce.richardson@intel.com> References: <20170607133620.275801-1-bruce.richardson@intel.com> Subject: [dpdk-dev] [PATCH 5/5] event/sw: change worker rings to standard event rings X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 07 Jun 2017 14:05:44 -0000 Now that we have a standard event ring implementation for passing events core-to-core, use that in place of the custom event rings in the software eventdev. Signed-off-by: Bruce Richardson --- drivers/event/sw/event_ring.h | 185 ---------------------------------- drivers/event/sw/sw_evdev.c | 38 +++---- drivers/event/sw/sw_evdev.h | 4 +- drivers/event/sw/sw_evdev_scheduler.c | 19 ++-- drivers/event/sw/sw_evdev_worker.c | 28 ++++- drivers/event/sw/sw_evdev_xstats.c | 15 +-- 6 files changed, 64 insertions(+), 225 deletions(-) delete mode 100644 drivers/event/sw/event_ring.h diff --git a/drivers/event/sw/event_ring.h b/drivers/event/sw/event_ring.h deleted file mode 100644 index cdaee95..0000000 --- a/drivers/event/sw/event_ring.h +++ /dev/null @@ -1,185 +0,0 @@ -/*- - * BSD LICENSE - * - * Copyright(c) 2016-2017 Intel Corporation. All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in - * the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Intel Corporation nor the names of its - * contributors may be used to endorse or promote products derived - * from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -/* - * Generic ring structure for passing events from one core to another. - * - * Used by the software scheduler for the producer and consumer rings for - * each port, i.e. for passing events from worker cores to scheduler and - * vice-versa. Designed for single-producer, single-consumer use with two - * cores working on each ring. - */ - -#ifndef _EVENT_RING_ -#define _EVENT_RING_ - -#include - -#include -#include -#include - -#define QE_RING_NAMESIZE 32 - -struct qe_ring { - char name[QE_RING_NAMESIZE] __rte_cache_aligned; - uint32_t ring_size; /* size of memory block allocated to the ring */ - uint32_t mask; /* mask for read/write values == ring_size -1 */ - uint32_t size; /* actual usable space in the ring */ - volatile uint32_t write_idx __rte_cache_aligned; - volatile uint32_t read_idx __rte_cache_aligned; - - struct rte_event ring[0] __rte_cache_aligned; -}; - -#ifndef force_inline -#define force_inline inline __attribute__((always_inline)) -#endif - -static inline struct qe_ring * -qe_ring_create(const char *name, unsigned int size, unsigned int socket_id) -{ - struct qe_ring *retval; - const uint32_t ring_size = rte_align32pow2(size + 1); - size_t memsize = sizeof(*retval) + - (ring_size * sizeof(retval->ring[0])); - - retval = rte_zmalloc_socket(NULL, memsize, 0, socket_id); - if (retval == NULL) - goto end; - - snprintf(retval->name, sizeof(retval->name), "EVDEV_RG_%s", name); - retval->ring_size = ring_size; - retval->mask = ring_size - 1; - retval->size = size; -end: - return retval; -} - -static inline void -qe_ring_destroy(struct qe_ring *r) -{ - rte_free(r); -} - -static force_inline unsigned int -qe_ring_count(const struct qe_ring *r) -{ - return r->write_idx - r->read_idx; -} - -static force_inline unsigned int -qe_ring_free_count(const struct qe_ring *r) -{ - return r->size - qe_ring_count(r); -} - -static force_inline unsigned int -qe_ring_enqueue_burst(struct qe_ring *r, const struct rte_event *qes, - unsigned int nb_qes, uint16_t *free_count) -{ - const uint32_t size = r->size; - const uint32_t mask = r->mask; - const uint32_t read = r->read_idx; - uint32_t write = r->write_idx; - const uint32_t space = read + size - write; - uint32_t i; - - if (space < nb_qes) - nb_qes = space; - - for (i = 0; i < nb_qes; i++, write++) - r->ring[write & mask] = qes[i]; - - rte_smp_wmb(); - - if (nb_qes != 0) - r->write_idx = write; - - *free_count = space - nb_qes; - - return nb_qes; -} - -static force_inline unsigned int -qe_ring_enqueue_burst_with_ops(struct qe_ring *r, const struct rte_event *qes, - unsigned int nb_qes, uint8_t *ops) -{ - const uint32_t size = r->size; - const uint32_t mask = r->mask; - const uint32_t read = r->read_idx; - uint32_t write = r->write_idx; - const uint32_t space = read + size - write; - uint32_t i; - - if (space < nb_qes) - nb_qes = space; - - for (i = 0; i < nb_qes; i++, write++) { - r->ring[write & mask] = qes[i]; - r->ring[write & mask].op = ops[i]; - } - - rte_smp_wmb(); - - if (nb_qes != 0) - r->write_idx = write; - - return nb_qes; -} - -static force_inline unsigned int -qe_ring_dequeue_burst(struct qe_ring *r, struct rte_event *qes, - unsigned int nb_qes) -{ - const uint32_t mask = r->mask; - uint32_t read = r->read_idx; - const uint32_t write = r->write_idx; - const uint32_t items = write - read; - uint32_t i; - - if (items < nb_qes) - nb_qes = items; - - - for (i = 0; i < nb_qes; i++, read++) - qes[i] = r->ring[read & mask]; - - rte_smp_rmb(); - - if (nb_qes != 0) - r->read_idx += nb_qes; - - return nb_qes; -} - -#endif diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c index a31aaa6..2e9d907 100644 --- a/drivers/event/sw/sw_evdev.c +++ b/drivers/event/sw/sw_evdev.c @@ -37,10 +37,10 @@ #include #include #include +#include #include "sw_evdev.h" #include "iq_ring.h" -#include "event_ring.h" #define EVENTDEV_NAME_SW_PMD event_sw #define NUMA_NODE_ARG "numa_node" @@ -138,7 +138,7 @@ sw_port_setup(struct rte_eventdev *dev, uint8_t port_id, { struct sw_evdev *sw = sw_pmd_priv(dev); struct sw_port *p = &sw->ports[port_id]; - char buf[QE_RING_NAMESIZE]; + char buf[RTE_RING_NAMESIZE]; unsigned int i; struct rte_event_dev_info info; @@ -159,10 +159,11 @@ sw_port_setup(struct rte_eventdev *dev, uint8_t port_id, p->id = port_id; p->sw = sw; - snprintf(buf, sizeof(buf), "sw%d_%s", dev->data->dev_id, - "rx_worker_ring"); - p->rx_worker_ring = qe_ring_create(buf, MAX_SW_PROD_Q_DEPTH, - dev->data->socket_id); + snprintf(buf, sizeof(buf), "sw%d_p%u_%s", dev->data->dev_id, + port_id, "rx_worker_ring"); + p->rx_worker_ring = rte_event_ring_create(buf, MAX_SW_PROD_Q_DEPTH, + dev->data->socket_id, + RING_F_SP_ENQ | RING_F_SC_DEQ | RING_F_EXACT_SZ); if (p->rx_worker_ring == NULL) { SW_LOG_ERR("Error creating RX worker ring for port %d\n", port_id); @@ -171,12 +172,13 @@ sw_port_setup(struct rte_eventdev *dev, uint8_t port_id, p->inflight_max = conf->new_event_threshold; - snprintf(buf, sizeof(buf), "sw%d_%s", dev->data->dev_id, - "cq_worker_ring"); - p->cq_worker_ring = qe_ring_create(buf, conf->dequeue_depth, - dev->data->socket_id); + snprintf(buf, sizeof(buf), "sw%d_p%u, %s", dev->data->dev_id, + port_id, "cq_worker_ring"); + p->cq_worker_ring = rte_event_ring_create(buf, conf->dequeue_depth, + dev->data->socket_id, + RING_F_SP_ENQ | RING_F_SC_DEQ | RING_F_EXACT_SZ); if (p->cq_worker_ring == NULL) { - qe_ring_destroy(p->rx_worker_ring); + rte_event_ring_free(p->rx_worker_ring); SW_LOG_ERR("Error creating CQ worker ring for port %d\n", port_id); return -1; @@ -202,8 +204,8 @@ sw_port_release(void *port) if (p == NULL) return; - qe_ring_destroy(p->rx_worker_ring); - qe_ring_destroy(p->cq_worker_ring); + rte_event_ring_free(p->rx_worker_ring); + rte_event_ring_free(p->cq_worker_ring); memset(p, 0, sizeof(*p)); } @@ -509,8 +511,9 @@ sw_dump(struct rte_eventdev *dev, FILE *f) fprintf(f, "\n"); if (p->rx_worker_ring) { - uint64_t used = qe_ring_count(p->rx_worker_ring); - uint64_t space = qe_ring_free_count(p->rx_worker_ring); + uint64_t used = rte_event_ring_count(p->rx_worker_ring); + uint64_t space = rte_event_ring_free_count( + p->rx_worker_ring); const char *col = (space == 0) ? COL_RED : COL_RESET; fprintf(f, "\t%srx ring used: %4"PRIu64"\tfree: %4" PRIu64 COL_RESET"\n", col, used, space); @@ -518,8 +521,9 @@ sw_dump(struct rte_eventdev *dev, FILE *f) fprintf(f, "\trx ring not initialized.\n"); if (p->cq_worker_ring) { - uint64_t used = qe_ring_count(p->cq_worker_ring); - uint64_t space = qe_ring_free_count(p->cq_worker_ring); + uint64_t used = rte_event_ring_count(p->cq_worker_ring); + uint64_t space = rte_event_ring_free_count( + p->cq_worker_ring); const char *col = (space == 0) ? COL_RED : COL_RESET; fprintf(f, "\t%scq ring used: %4"PRIu64"\tfree: %4" PRIu64 COL_RESET"\n", col, used, space); diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h index 61c671d..1695352 100644 --- a/drivers/event/sw/sw_evdev.h +++ b/drivers/event/sw/sw_evdev.h @@ -189,9 +189,9 @@ struct sw_port { int16_t num_ordered_qids; /** Ring and buffer for pulling events from workers for scheduling */ - struct qe_ring *rx_worker_ring __rte_cache_aligned; + struct rte_event_ring *rx_worker_ring __rte_cache_aligned; /** Ring and buffer for pushing packets to workers after scheduling */ - struct qe_ring *cq_worker_ring; + struct rte_event_ring *cq_worker_ring; /* hole */ diff --git a/drivers/event/sw/sw_evdev_scheduler.c b/drivers/event/sw/sw_evdev_scheduler.c index a333a6f..0778c80 100644 --- a/drivers/event/sw/sw_evdev_scheduler.c +++ b/drivers/event/sw/sw_evdev_scheduler.c @@ -32,9 +32,9 @@ #include #include +#include #include "sw_evdev.h" #include "iq_ring.h" -#include "event_ring.h" #define SW_IQS_MASK (SW_IQS_MAX-1) @@ -122,8 +122,8 @@ sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, /* if we just filled in the last slot, flush the buffer */ if (sw->cq_ring_space[cq] == 0) { - struct qe_ring *worker = p->cq_worker_ring; - qe_ring_enqueue_burst(worker, p->cq_buf, + struct rte_event_ring *worker = p->cq_worker_ring; + rte_event_ring_enqueue_burst(worker, p->cq_buf, p->cq_buf_count, &sw->cq_ring_space[cq]); p->cq_buf_count = 0; @@ -170,7 +170,8 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, cq = qid->cq_map[cq_idx]; if (++cq_idx == qid->cq_num_mapped_cqs) cq_idx = 0; - } while (qe_ring_free_count(sw->ports[cq].cq_worker_ring) == 0 || + } while (rte_event_ring_free_count( + sw->ports[cq].cq_worker_ring) == 0 || sw->ports[cq].inflights == SW_PORT_HIST_LIST); struct sw_port *p = &sw->ports[cq]; @@ -366,10 +367,10 @@ static inline void __attribute__((always_inline)) sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port) { RTE_SET_USED(sw); - struct qe_ring *worker = port->rx_worker_ring; + struct rte_event_ring *worker = port->rx_worker_ring; port->pp_buf_start = 0; - port->pp_buf_count = qe_ring_dequeue_burst(worker, port->pp_buf, - RTE_DIM(port->pp_buf)); + port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf, + RTE_DIM(port->pp_buf), NULL); } static inline uint32_t __attribute__((always_inline)) @@ -585,8 +586,8 @@ sw_event_schedule(struct rte_eventdev *dev) * worker cores: aka, do the ring transfers batched. */ for (i = 0; i < sw->port_count; i++) { - struct qe_ring *worker = sw->ports[i].cq_worker_ring; - qe_ring_enqueue_burst(worker, sw->ports[i].cq_buf, + struct rte_event_ring *worker = sw->ports[i].cq_worker_ring; + rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf, sw->ports[i].cq_buf_count, &sw->cq_ring_space[i]); sw->ports[i].cq_buf_count = 0; diff --git a/drivers/event/sw/sw_evdev_worker.c b/drivers/event/sw/sw_evdev_worker.c index 9cb6bef..c9431a3 100644 --- a/drivers/event/sw/sw_evdev_worker.c +++ b/drivers/event/sw/sw_evdev_worker.c @@ -32,9 +32,9 @@ #include #include +#include #include "sw_evdev.h" -#include "event_ring.h" #define PORT_ENQUEUE_MAX_BURST_SIZE 64 @@ -52,13 +52,31 @@ sw_event_release(struct sw_port *p, uint8_t index) ev.op = sw_qe_flag_map[RTE_EVENT_OP_RELEASE]; uint16_t free_count; - qe_ring_enqueue_burst(p->rx_worker_ring, &ev, 1, &free_count); + rte_event_ring_enqueue_burst(p->rx_worker_ring, &ev, 1, &free_count); /* each release returns one credit */ p->outstanding_releases--; p->inflight_credits++; } +/* + * special-case of rte_event_ring enqueue, with overriding the ops member on + * the events that get written to the ring. + */ +static inline unsigned int +enqueue_burst_with_ops(struct rte_event_ring *r, const struct rte_event *events, + unsigned int n, uint8_t *ops) +{ + struct rte_event tmp_evs[PORT_ENQUEUE_MAX_BURST_SIZE]; + unsigned int i; + + memcpy(tmp_evs, events, n * sizeof(events[0])); + for (i = 0; i < n; i++) + tmp_evs[i].op = ops[i]; + + return rte_event_ring_enqueue_burst(r, tmp_evs, n, NULL); +} + uint16_t sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num) { @@ -114,7 +132,7 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num) } /* returns number of events actually enqueued */ - uint32_t enq = qe_ring_enqueue_burst_with_ops(p->rx_worker_ring, ev, i, + uint32_t enq = enqueue_burst_with_ops(p->rx_worker_ring, ev, i, new_ops); if (p->outstanding_releases == 0 && p->last_dequeue_burst_sz != 0) { uint64_t burst_ticks = rte_get_timer_cycles() - @@ -141,7 +159,7 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num, RTE_SET_USED(wait); struct sw_port *p = (void *)port; struct sw_evdev *sw = (void *)p->sw; - struct qe_ring *ring = p->cq_worker_ring; + struct rte_event_ring *ring = p->cq_worker_ring; uint32_t credit_update_quanta = sw->credit_update_quanta; /* check that all previous dequeues have been released */ @@ -153,7 +171,7 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num, } /* returns number of events actually dequeued */ - uint16_t ndeq = qe_ring_dequeue_burst(ring, ev, num); + uint16_t ndeq = rte_event_ring_dequeue_burst(ring, ev, num, NULL); if (unlikely(ndeq == 0)) { p->outstanding_releases = 0; p->zero_polls++; diff --git a/drivers/event/sw/sw_evdev_xstats.c b/drivers/event/sw/sw_evdev_xstats.c index c7b1abe..4e13509 100644 --- a/drivers/event/sw/sw_evdev_xstats.c +++ b/drivers/event/sw/sw_evdev_xstats.c @@ -30,9 +30,9 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +#include #include "sw_evdev.h" #include "iq_ring.h" -#include "event_ring.h" enum xstats_type { /* common stats */ @@ -104,10 +104,10 @@ get_port_stat(const struct sw_evdev *sw, uint16_t obj_idx, case calls: return p->total_polls; case credits: return p->inflight_credits; case poll_return: return p->zero_polls; - case rx_used: return qe_ring_count(p->rx_worker_ring); - case rx_free: return qe_ring_free_count(p->rx_worker_ring); - case tx_used: return qe_ring_count(p->cq_worker_ring); - case tx_free: return qe_ring_free_count(p->cq_worker_ring); + case rx_used: return rte_event_ring_count(p->rx_worker_ring); + case rx_free: return rte_event_ring_free_count(p->rx_worker_ring); + case tx_used: return rte_event_ring_count(p->cq_worker_ring); + case tx_free: return rte_event_ring_free_count(p->cq_worker_ring); default: return -1; } } @@ -312,8 +312,9 @@ sw_xstats_init(struct sw_evdev *sw) port, port_stats[i]); } - for (bkt = 0; bkt < (sw->ports[port].cq_worker_ring->size >> - SW_DEQ_STAT_BUCKET_SHIFT) + 1; bkt++) { + for (bkt = 0; bkt < (rte_event_ring_get_capacity( + sw->ports[port].cq_worker_ring) >> + SW_DEQ_STAT_BUCKET_SHIFT) + 1; bkt++) { for (i = 0; i < RTE_DIM(port_bucket_stats); i++) { sw->xstats[stat] = (struct sw_xstats_entry){ .fn = get_port_bucket_stat, -- 2.9.4