From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga01.intel.com (mga01.intel.com [192.55.52.88]) by dpdk.org (Postfix) with ESMTP id 35DF54A63 for ; Fri, 15 Dec 2017 12:26:26 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga007.jf.intel.com ([10.7.209.58]) by fmsmga101.fm.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 15 Dec 2017 03:26:26 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.45,404,1508828400"; d="scan'208";a="2943768" Received: from silpixa00398162.ir.intel.com (HELO silpixa00398162.ger.corp.intel.com) ([10.237.223.171]) by orsmga007.jf.intel.com with ESMTP; 15 Dec 2017 03:26:24 -0800 From: Liang Ma To: jerin.jacob@caviumnetworks.com Cc: dev@dpdk.org, harry.van.haaren@intel.com, bruce.richardson@intel.com, deepak.k.jain@intel.com, john.geary@intel.com, peter.mccarthy@intel.com, seanbh@gmail.com Date: Fri, 15 Dec 2017 11:26:23 +0000 Message-Id: <1513337189-137661-3-git-send-email-liang.j.ma@intel.com> X-Mailer: git-send-email 2.7.5 In-Reply-To: <1513337189-137661-1-git-send-email-liang.j.ma@intel.com> References: <1513337189-137661-1-git-send-email-liang.j.ma@intel.com> Subject: [dpdk-dev] [PATCH v2 2/8] event/opdl: add the opdl pmd header and init helper function X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Fri, 15 Dec 2017 11:26:28 -0000 opdl_evdev.h include the main data structure of opdl device and all the function prototype need to be exposed to support eventdev API. opdl_evdev_init.c implement all initailization helper function Signed-off-by: Liang Ma Signed-off-by: Peter, Mccarthy --- drivers/event/opdl/opdl_evdev.h | 354 +++++++++++++ drivers/event/opdl/opdl_evdev_init.c | 951 +++++++++++++++++++++++++++++++++++ 2 files changed, 1305 insertions(+) create mode 100644 drivers/event/opdl/opdl_evdev.h create mode 100644 drivers/event/opdl/opdl_evdev_init.c diff --git a/drivers/event/opdl/opdl_evdev.h b/drivers/event/opdl/opdl_evdev.h new file mode 100644 index 0000000..c776d6f --- /dev/null +++ b/drivers/event/opdl/opdl_evdev.h @@ -0,0 +1,354 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2016-2017 Intel Corporation. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _OPDL_EVDEV_H_ +#define _OPDL_EVDEV_H_ + +#include +#include +#include +#include "opdl_ring.h" + +#define OPDL_QID_NUM_FIDS 1024 +#define OPDL_IQS_MAX 1 +#define OPDL_Q_PRIORITY_MAX 1 +#define OPDL_PORTS_MAX 64 +#define MAX_OPDL_CONS_Q_DEPTH 128 +/* OPDL size */ +#define OPDL_INFLIGHT_EVENTS_TOTAL 4096 +/* allow for lots of over-provisioning */ +#define OPDL_FRAGMENTS_MAX 1 + +/* report dequeue burst sizes in buckets */ +#define OPDL_DEQ_STAT_BUCKET_SHIFT 2 +/* how many packets pulled from port by sched */ +#define SCHED_DEQUEUE_BURST_SIZE 32 + +/* size of our history list */ +#define OPDL_PORT_HIST_LIST (MAX_OPDL_PROD_Q_DEPTH) + +/* how many data points use for average stats */ +#define NUM_SAMPLES 64 + +#define EVENTDEV_NAME_OPDL_PMD event_opdl +#define OPDL_PMD_NAME RTE_STR(event_opdl) +#define OPDL_PMD_NAME_MAX 64 + +#define OPDL_INVALID_QID 255 + +#define OPDL_SCHED_TYPE_DIRECT (RTE_SCHED_TYPE_PARALLEL + 1) + +#define OPDL_NUM_POLL_BUCKETS \ + (MAX_OPDL_CONS_Q_DEPTH >> OPDL_DEQ_STAT_BUCKET_SHIFT) + +enum { + QE_FLAG_VALID_SHIFT = 0, + QE_FLAG_COMPLETE_SHIFT, + QE_FLAG_NOT_EOP_SHIFT, + _QE_FLAG_COUNT +}; + +enum port_type { + OPDL_INVALID_PORT = 0, + OPDL_REGULAR_PORT = 1, + OPDL_PURE_RX_PORT, + OPDL_PURE_TX_PORT, + OPDL_ASYNC_PORT +}; + +enum queue_type { + OPDL_Q_TYPE_INVALID = 0, + OPDL_Q_TYPE_SINGLE_LINK = 1, + OPDL_Q_TYPE_ATOMIC, + OPDL_Q_TYPE_ORDERED +}; + +enum queue_pos { + OPDL_Q_POS_START = 0, + OPDL_Q_POS_MIDDLE, + OPDL_Q_POS_END +}; + +#define QE_FLAG_VALID (1 << QE_FLAG_VALID_SHIFT) /* for NEW FWD, FRAG */ +#define QE_FLAG_COMPLETE (1 << QE_FLAG_COMPLETE_SHIFT) /* set for FWD, DROP */ +#define QE_FLAG_NOT_EOP (1 << QE_FLAG_NOT_EOP_SHIFT) /* set for FRAG only */ + +static const uint8_t opdl_qe_flag_map[] = { + QE_FLAG_VALID /* NEW Event */, + QE_FLAG_VALID | QE_FLAG_COMPLETE /* FWD Event */, + QE_FLAG_COMPLETE /* RELEASE Event */, + + /* Values which can be used for future support for partial + * events, i.e. where one event comes back to the scheduler + * as multiple which need to be tracked together + */ + QE_FLAG_VALID | QE_FLAG_COMPLETE | QE_FLAG_NOT_EOP, +}; + +#define OPDL_LOG_INFO(fmt, args...) \ + RTE_LOG(INFO, EVENTDEV, "[%s] line %u: " fmt "\n", \ + OPDL_PMD_NAME, \ + __LINE__, ## args) + +#ifdef RTE_LIBRTE_PMD_EVDEV_OPDL_DEBUG +#define OPDL_LOG_DBG(fmt, args...) \ + RTE_LOG(DEBUG, EVENTDEV, "[%s] %s() line %u: " fmt "\n", \ + OPDL_PMD_NAME, \ + __func__, __LINE__, ## args) +#else +#define OPDL_LOG_DBG(fmt, args...) +#endif + +#define OPDL_LOG_ERR(fmt, args...) \ + RTE_LOG(ERR, EVENTDEV, "[%s] %s() line %u: " fmt "\n", \ + OPDL_PMD_NAME, \ + __func__, __LINE__, ## args) + +enum port_xstat_name { + claim_pkts_requested = 0, + claim_pkts_granted, + claim_non_empty, + claim_empty, + total_cycles, + max_num_port_xstat +}; + +#define OPDL_MAX_PORT_XSTAT_NUM (OPDL_PORTS_MAX * max_num_port_xstat) + +struct opdl_port; + +typedef uint16_t (*opdl_enq_operation)(struct opdl_port *port, + const struct rte_event ev[], + uint16_t num); + +typedef uint16_t (*opdl_deq_operation)(struct opdl_port *port, + struct rte_event ev[], + uint16_t num); + +struct opdl_evdev; + +struct opdl_stage_meta_data { + uint32_t num_claimed; /* number of entries claimed by this stage */ + uint32_t burst_sz; /* Port claim burst size */ +}; + +struct opdl_port { + + /* back pointer */ + struct opdl_evdev *opdl; + + /* enq handler & stage instance */ + opdl_enq_operation enq; + struct opdl_stage *enq_stage_inst; + + /* deq handler & stage instance */ + opdl_deq_operation deq; + struct opdl_stage *deq_stage_inst; + + /* port id has correctly been set */ + uint8_t configured; + + /* set when the port is initialized */ + uint8_t initialized; + + /* A numeric ID for the port */ + uint8_t id; + + /* Space for claimed entries */ + struct rte_event *entries[MAX_OPDL_CONS_Q_DEPTH]; + + /* RX/REGULAR/TX/ASYNC - determined on position in queue */ + enum port_type p_type; + + /* if the claim is static atomic type */ + bool atomic_claim; + + /* Queue linked to this port - internal queue id*/ + uint8_t queue_id; + + /* Queue linked to this port - external queue id*/ + uint8_t external_qid; + + /* Next queue linked to this port - external queue id*/ + uint8_t next_external_qid; + + /* number of instances of this stage */ + uint32_t num_instance; + + /* instance ID of this stage*/ + uint32_t instance_id; + + /* track packets in and out of this port */ + uint64_t port_stat[max_num_port_xstat]; + uint64_t start_cycles; +}; + +struct opdl_queue_meta_data { + uint8_t ext_id; + enum queue_type type; + int8_t setup; +}; + +struct opdl_xstats_entry { + struct rte_event_dev_xstats_name stat; + unsigned int id; + uint64_t *value; +}; + +struct opdl_queue { + + /* Opdl ring this queue is associated with */ + uint32_t opdl_id; + + /* type and position have correctly been set */ + uint8_t configured; + + /* port number and associated ports have been associated */ + uint8_t initialized; + + /* type of this queue (Atomic, Ordered, Parallel, Direct)*/ + enum queue_type q_type; + + /* position of queue (START, MIDDLE, END) */ + enum queue_pos q_pos; + + /* external queue id. It is mapped to the queue position */ + uint8_t external_qid; + + struct opdl_port *ports[OPDL_PORTS_MAX]; + uint32_t nb_ports; + + /* priority, reserved for future */ + uint8_t priority; +}; + + +#define OPDL_TUR_PER_DEV 12 + +/* PMD needs an extra queue per Opdl */ +#define OPDL_MAX_QUEUES (RTE_EVENT_MAX_QUEUES_PER_DEV - OPDL_TUR_PER_DEV) + + +struct opdl_evdev { + struct rte_eventdev_data *data; + + uint8_t started; + + /* Max number of ports and queues*/ + uint32_t max_port_nb; + uint32_t max_queue_nb; + + /* slots in the opdl ring */ + uint32_t nb_events_limit; + + /* + * Array holding all opdl for this device + */ + struct opdl_ring *opdl[OPDL_TUR_PER_DEV]; + uint32_t nb_opdls; + + struct opdl_queue_meta_data q_md[OPDL_MAX_QUEUES]; + uint32_t nb_q_md; + + /* Internal queues - one per logical queue */ + struct opdl_queue + queue[RTE_EVENT_MAX_QUEUES_PER_DEV] __rte_cache_aligned; + + uint32_t nb_queues; + + struct opdl_stage_meta_data s_md[OPDL_PORTS_MAX]; + + /* Contains all ports - load balanced and directed */ + struct opdl_port ports[OPDL_PORTS_MAX] __rte_cache_aligned; + uint32_t nb_ports; + + uint8_t q_map_ex_to_in[OPDL_INVALID_QID]; + + /* Stats */ + struct opdl_xstats_entry port_xstat[OPDL_MAX_PORT_XSTAT_NUM]; + + char service_name[OPDL_PMD_NAME_MAX]; + int socket; + int do_validation; + int do_test; +}; + + +static inline struct opdl_evdev * +opdl_pmd_priv(const struct rte_eventdev *eventdev) +{ + return eventdev->data->dev_private; +} + +static inline const struct opdl_evdev * +opdl_pmd_priv_const(const struct rte_eventdev *eventdev) +{ + return eventdev->data->dev_private; +} + +uint16_t opdl_event_enqueue(void *port, const struct rte_event *ev); +uint16_t opdl_event_enqueue_burst(void *port, const struct rte_event ev[], + uint16_t num); + +uint16_t opdl_event_dequeue(void *port, struct rte_event *ev, uint64_t wait); +uint16_t opdl_event_dequeue_burst(void *port, struct rte_event *ev, + uint16_t num, uint64_t wait); +void opdl_event_schedule(struct rte_eventdev *dev); + +void opdl_xstats_init(struct rte_eventdev *dev); +int opdl_xstats_uninit(struct rte_eventdev *dev); +int opdl_xstats_get_names(const struct rte_eventdev *dev, + enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id, + struct rte_event_dev_xstats_name *xstats_names, + unsigned int *ids, unsigned int size); +int opdl_xstats_get(const struct rte_eventdev *dev, + enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id, + const unsigned int ids[], uint64_t values[], unsigned int n); +uint64_t opdl_xstats_get_by_name(const struct rte_eventdev *dev, + const char *name, unsigned int *id); +int opdl_xstats_reset(struct rte_eventdev *dev, + enum rte_event_dev_xstats_mode mode, + int16_t queue_port_id, + const uint32_t ids[], + uint32_t nb_ids); + +int opdl_add_event_handlers(struct rte_eventdev *dev); +int build_all_dependencies(struct rte_eventdev *dev); +int check_queues_linked(struct rte_eventdev *dev); +int create_queues_and_rings(struct rte_eventdev *dev); +int initialise_all_other_ports(struct rte_eventdev *dev); +int initialise_queue_zero_ports(struct rte_eventdev *dev); +int assign_internal_queue_ids(struct rte_eventdev *dev); +void destroy_queues_and_rings(struct rte_eventdev *dev); +int opdl_selftest(void); + +#endif /* _OPDL_EVDEV_H_ */ diff --git a/drivers/event/opdl/opdl_evdev_init.c b/drivers/event/opdl/opdl_evdev_init.c new file mode 100644 index 0000000..e128afb --- /dev/null +++ b/drivers/event/opdl/opdl_evdev_init.c @@ -0,0 +1,951 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2016-2017 Intel Corporation. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "opdl_evdev.h" +#include "opdl_ring.h" + + +static inline uint32_t __attribute__((always_inline)) +enqueue_check(struct opdl_port *p, + const struct rte_event ev[], + uint16_t num, + uint16_t num_events) +{ + uint16_t i; + + if (p->opdl->do_validation) { + + for (i = 0; i < num; i++) { + if (ev[i].queue_id != p->next_external_qid) { + OPDL_LOG_ERR("ERROR - port:[%u] - event wants" + " to enq to q_id[%u]," + " but should be [%u]\n", + p->id, + ev[i].queue_id, + p->next_external_qid); + rte_errno = -EINVAL; + return 0; + } + } + + /* Stats */ + if (p->p_type == OPDL_PURE_RX_PORT || + p->p_type == OPDL_ASYNC_PORT) { + /* Stats */ + if (num_events) { + p->port_stat[claim_pkts_requested] += num; + p->port_stat[claim_pkts_granted] += num_events; + p->port_stat[claim_non_empty]++; + p->start_cycles = rte_rdtsc(); + } else { + p->port_stat[claim_empty]++; + p->start_cycles = 0; + } + } else { + if (p->start_cycles) { + uint64_t end_cycles = rte_rdtsc(); + p->port_stat[total_cycles] += + end_cycles - p->start_cycles; + } + } + } else { + if (num > 0 && + ev[0].queue_id != p->next_external_qid) { + rte_errno = -EINVAL; + return 0; + } + } + + return num; +} + +static inline void __attribute__((always_inline)) +update_on_dequeue(struct opdl_port *p, + struct rte_event ev[], + uint16_t num, + uint16_t num_events) +{ + if (p->opdl->do_validation) { + int16_t i; + for (i = 0; i < num; i++) + ev[i].queue_id = + p->opdl->queue[p->queue_id].external_qid; + + /* Stats */ + if (num_events) { + p->port_stat[claim_pkts_requested] += num; + p->port_stat[claim_pkts_granted] += num_events; + p->port_stat[claim_non_empty]++; + p->start_cycles = rte_rdtsc(); + } else { + p->port_stat[claim_empty]++; + p->start_cycles = 0; + } + } else { + if (num > 0) + ev[0].queue_id = + p->opdl->queue[p->queue_id].external_qid; + } +} + + +/* + * Error RX enqueue: + * + * + */ + +static uint16_t +opdl_rx_error_enqueue(struct opdl_port *p, + const struct rte_event ev[], + uint16_t num) +{ + RTE_SET_USED(p); + RTE_SET_USED(ev); + RTE_SET_USED(num); + + rte_errno = -ENOSPC; + + return 0; +} + +/* + * RX enqueue: + * + * This function handles enqueue for a single input stage_inst with + * threadsafe disabled or enabled. eg 1 thread using a stage_inst or + * multiple threads sharing a stage_inst + */ + +static uint16_t +opdl_rx_enqueue(struct opdl_port *p, + const struct rte_event ev[], + uint16_t num) +{ + uint16_t enqueued = 0; + + enqueued = opdl_ring_input(opdl_stage_get_opdl_ring(p->enq_stage_inst), + ev, + num, + false); + if (!enqueue_check(p, ev, num, enqueued)) + return 0; + + + if (enqueued < num) + rte_errno = -ENOSPC; + + return enqueued; +} + +/* + * Error TX handler + * + */ + +static uint16_t +opdl_tx_error_dequeue(struct opdl_port *p, + struct rte_event ev[], + uint16_t num) +{ + RTE_SET_USED(p); + RTE_SET_USED(ev); + RTE_SET_USED(num); + + rte_errno = -ENOSPC; + + return 0; +} + +/* + * TX single threaded claim + * + * This function handles dequeue for a single worker stage_inst with + * threadsafe disabled. eg 1 thread using an stage_inst + */ + +static uint16_t +opdl_tx_dequeue_single_thread(struct opdl_port *p, + struct rte_event ev[], + uint16_t num) +{ + uint16_t returned; + + struct opdl_ring *ring; + + ring = opdl_stage_get_opdl_ring(p->deq_stage_inst); + + returned = opdl_ring_copy_to_burst(ring, + p->deq_stage_inst, + ev, + num, + false); + + update_on_dequeue(p, ev, num, returned); + + return returned; +} + +/* + * TX multi threaded claim + * + * This function handles dequeue for multiple worker stage_inst with + * threadsafe disabled. eg multiple stage_inst each with its own instance + */ + +static uint16_t +opdl_tx_dequeue_multi_inst(struct opdl_port *p, + struct rte_event ev[], + uint16_t num) +{ + uint32_t num_events = 0; + + num_events = opdl_stage_claim(p->deq_stage_inst, + (void *)ev, + num, + NULL, + false, + false); + + update_on_dequeue(p, ev, num, num_events); + + return opdl_stage_disclaim(p->deq_stage_inst, num_events, false); +} + + +/* + * Worker thread claim + * + */ + +static uint16_t +opdl_claim(struct opdl_port *p, struct rte_event ev[], uint16_t num) +{ + uint32_t num_events = 0; + + if (unlikely(num > MAX_OPDL_CONS_Q_DEPTH)) { + OPDL_LOG_ERR("" + "Attempt to dequeue num of events larger than port (%d) max\n", + p->id); + rte_errno = -EINVAL; + return 0; + } + + + num_events = opdl_stage_claim(p->deq_stage_inst, + (void *)ev, + num, + NULL, + false, + p->atomic_claim); + + + update_on_dequeue(p, ev, num, num_events); + + return num_events; +} + +/* + * Worker thread disclaim + */ + +static uint16_t +opdl_disclaim(struct opdl_port *p, const struct rte_event ev[], uint16_t num) +{ + uint16_t enqueued = 0; + + uint32_t i = 0; + + for (i = 0; i < num; i++) + opdl_ring_cas_slot(p->enq_stage_inst, &ev[i], i, p->atomic_claim); + + + enqueued = opdl_stage_disclaim(p->enq_stage_inst, + num, + false); + + return enqueue_check(p, ev, num, enqueued); +} + +static inline struct opdl_stage *__attribute__((always_inline)) +stage_for_port(struct opdl_queue *q, unsigned int i) +{ + if (q->q_pos == OPDL_Q_POS_START || q->q_pos == OPDL_Q_POS_MIDDLE) + return q->ports[i]->enq_stage_inst; + else + return q->ports[i]->deq_stage_inst; +} + +static int opdl_add_deps(struct opdl_evdev *device, + int q_id, + int deps_q_id) +{ + unsigned int i, j; + int status; + struct opdl_ring *ring; + struct opdl_queue *queue = &device->queue[q_id]; + struct opdl_queue *queue_deps = &device->queue[deps_q_id]; + struct opdl_stage *dep_stages[OPDL_PORTS_MAX]; + + /* sanity check that all stages are for same opdl ring */ + for (i = 0; i < queue->nb_ports; i++) { + struct opdl_ring *r = + opdl_stage_get_opdl_ring(stage_for_port(queue, i)); + for (j = 0; j < queue_deps->nb_ports; j++) { + struct opdl_ring *rj = + opdl_stage_get_opdl_ring( + stage_for_port(queue_deps, j)); + if (r != rj) { + OPDL_LOG_ERR("Stages and dependents" + " are not for same opdl ring"); + for (uint32_t k = 0; + k < device->nb_opdls; k++) { + opdl_ring_dump(device->opdl[k], + stdout); + } + return -EINVAL; + } + } + } + + /* Gather all stages instance in deps */ + for (i = 0; i < queue_deps->nb_ports; i++) + dep_stages[i] = stage_for_port(queue_deps, i); + + + /* Add all deps for each port->stage_inst in this queue */ + for (i = 0; i < queue->nb_ports; i++) { + + ring = opdl_stage_get_opdl_ring(stage_for_port(queue, i)); + + status = opdl_stage_deps_add(ring, + stage_for_port(queue, i), + queue->ports[i]->num_instance, + queue->ports[i]->instance_id, + dep_stages, + queue_deps->nb_ports); + if (status < 0) + return -EINVAL; + } + + return 0; +} + +int +opdl_add_event_handlers(struct rte_eventdev *dev) +{ + int err = 0; + + struct opdl_evdev *device = opdl_pmd_priv(dev); + unsigned int i; + + for (i = 0; i < device->max_port_nb; i++) { + + struct opdl_port *port = &device->ports[i]; + + if (port->configured) { + if (port->p_type == OPDL_PURE_RX_PORT) { + port->enq = opdl_rx_enqueue; + port->deq = opdl_tx_error_dequeue; + + } else if (port->p_type == OPDL_PURE_TX_PORT) { + + port->enq = opdl_rx_error_enqueue; + + if (port->num_instance == 1) + port->deq = + opdl_tx_dequeue_single_thread; + else + port->deq = opdl_tx_dequeue_multi_inst; + + } else if (port->p_type == OPDL_REGULAR_PORT) { + + port->enq = opdl_disclaim; + port->deq = opdl_claim; + + } else if (port->p_type == OPDL_ASYNC_PORT) { + + port->enq = opdl_rx_enqueue; + + /* Always single instance */ + port->deq = opdl_tx_dequeue_single_thread; + } else { + OPDL_LOG_ERR("port:[%u] has invalid port type - ", + port->id); + err = -EINVAL; + break; + } + port->initialized = 1; + } + } + + if (!err) + fprintf(stdout, "Success - enqueue/dequeue handler(s) added\n"); + return err; +} + +int +build_all_dependencies(struct rte_eventdev *dev) +{ + + int err = 0; + unsigned int i; + struct opdl_evdev *device = opdl_pmd_priv(dev); + + uint8_t start_qid = 0; + + for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) { + struct opdl_queue *queue = &device->queue[i]; + if (!queue->initialized) + break; + + if (queue->q_pos == OPDL_Q_POS_START) { + start_qid = i; + continue; + } + + if (queue->q_pos == OPDL_Q_POS_MIDDLE) { + err = opdl_add_deps(device, i, i-1); + if (err < 0) { + OPDL_LOG_ERR("dependancy addition for queue:[%u] - FAILED", + queue->external_qid); + break; + } + } + + if (queue->q_pos == OPDL_Q_POS_END) { + /* Add this dependency */ + err = opdl_add_deps(device, i, i-1); + if (err < 0) { + OPDL_LOG_ERR("dependancy addition for queue:[%u] - FAILED", + queue->external_qid); + break; + } + /* Add dependency for rx on tx */ + err = opdl_add_deps(device, start_qid, i); + if (err < 0) { + OPDL_LOG_ERR("dependancy addition for queue:[%u] - FAILED", + queue->external_qid); + break; + } + } + } + + if (!err) + fprintf(stdout, "Success - dependencies built\n"); + + return err; +} +int +check_queues_linked(struct rte_eventdev *dev) +{ + + int err = 0; + unsigned int i; + struct opdl_evdev *device = opdl_pmd_priv(dev); + uint32_t nb_iq = 0; + + for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) { + struct opdl_queue *queue = &device->queue[i]; + + if (!queue->initialized) + break; + + if (queue->external_qid == OPDL_INVALID_QID) + nb_iq++; + + if (queue->nb_ports == 0) { + OPDL_LOG_ERR("queue:[%u] has no associated ports", + i); + err = -EINVAL; + break; + } + } + if (!err) { + if ((i - nb_iq) != device->max_queue_nb) { + OPDL_LOG_ERR("%u queues counted but should be %u", + i - nb_iq, + device->max_queue_nb); + err = -1; + } else { + fprintf(stdout, "Success - %u queues (ex:%u + in:%u) validated\n", + i, + device->max_queue_nb, + nb_iq); + } + + } + return err; +} + +void +destroy_queues_and_rings(struct rte_eventdev *dev) +{ + struct opdl_evdev *device = opdl_pmd_priv(dev); + + for (uint32_t i = 0; i < device->nb_opdls; i++) { + if (device->opdl[i]) + opdl_ring_free(device->opdl[i]); + } + + memset(&device->queue, + 0, + sizeof(struct opdl_queue) + * RTE_EVENT_MAX_QUEUES_PER_DEV); +} + +#define OPDL_ID(d)(d->nb_opdls - 1) + +static inline void +initialise_queue(struct opdl_evdev *device, + enum queue_pos pos, + int32_t i) +{ + struct opdl_queue *queue = &device->queue[device->nb_queues]; + + if (i == -1) { + queue->q_type = OPDL_Q_TYPE_ORDERED; + queue->external_qid = OPDL_INVALID_QID; + } else { + queue->q_type = device->q_md[i].type; + queue->external_qid = device->q_md[i].ext_id; + /* Add ex->in for queues setup */ + device->q_map_ex_to_in[queue->external_qid] = device->nb_queues; + } + queue->opdl_id = OPDL_ID(device); + queue->q_pos = pos; + queue->nb_ports = 0; + queue->configured = 1; + + device->nb_queues++; +} + + +static inline int +create_opdl(struct opdl_evdev *device) +{ + int err = 0; + + char name[RTE_MEMZONE_NAMESIZE]; + + sprintf(name, "%s_%u", device->service_name, device->nb_opdls); + + device->opdl[device->nb_opdls] = + opdl_ring_create(name, + device->nb_events_limit, + sizeof(struct rte_event), + device->max_port_nb * 2, + device->socket); + + if (!device->opdl[device->nb_opdls]) { + OPDL_LOG_ERR("opdl ring %u creation - FAILED", + device->nb_opdls); + err = -EINVAL; + } else { + device->nb_opdls++; + } + return err; +} + +static inline int +create_link_opdl(struct opdl_evdev *device, uint32_t index) +{ + + int err = 0; + + if (device->q_md[index + 1].type != + OPDL_Q_TYPE_SINGLE_LINK) { + + /* async queue with regular + * queue following it + */ + + /* create a new opdl ring */ + err = create_opdl(device); + if (!err) { + /* create an initial + * dummy queue for new opdl + */ + initialise_queue(device, + OPDL_Q_POS_START, + -1); + } else { + err = -EINVAL; + } + } else { + OPDL_LOG_ERR("queue %u, 2" + " SINGLE_LINK queues, not allowed", + index); + err = -EINVAL; + } + + return err; +} + +int +create_queues_and_rings(struct rte_eventdev *dev) +{ + int err = 0; + + struct opdl_evdev *device = opdl_pmd_priv(dev); + + device->nb_queues = 0; + + if (device->nb_ports != device->max_port_nb) { + OPDL_LOG_ERR("Number ports setup:%u NOT EQUAL to max port" + " number:%u for this device", + device->nb_ports, + device->max_port_nb); + err = -1; + } + + if (!err) { + /* We will have at least one opdl so create it now */ + err = create_opdl(device); + } + + if (!err) { + + /* Create 1st "dummy" queue */ + initialise_queue(device, + OPDL_Q_POS_START, + -1); + + for (uint32_t i = 0; i < device->nb_q_md; i++) { + + /* Check */ + if (!device->q_md[i].setup) { + + OPDL_LOG_ERR("queue meta data slot %u" + " not setup - FAILING", + i); + err = -EINVAL; + break; + } else if (device->q_md[i].type != + OPDL_Q_TYPE_SINGLE_LINK) { + + if (!device->q_md[i + 1].setup) { + /* Create a simple ORDERED/ATOMIC + * queue at the end + */ + initialise_queue(device, + OPDL_Q_POS_END, + i); + + } else { + /* Create a simple ORDERED/ATOMIC + * queue in the middle + */ + initialise_queue(device, + OPDL_Q_POS_MIDDLE, + i); + } + } else if (device->q_md[i].type == + OPDL_Q_TYPE_SINGLE_LINK) { + + /* create last queue for this opdl */ + initialise_queue(device, + OPDL_Q_POS_END, + i); + + err = create_link_opdl(device, i); + + if (err) + break; + + + } + } + } + if (err) + destroy_queues_and_rings(dev); + else + fprintf(stdout, "Success - Created %u queues and %u opdls\n", + device->nb_queues, + device->nb_opdls); + + return err; +} + + +int +initialise_all_other_ports(struct rte_eventdev *dev) +{ + int err = 0; + struct opdl_stage *stage_inst = NULL; + + struct opdl_evdev *device = opdl_pmd_priv(dev); + + for (uint32_t i = 0; i < device->nb_ports; i++) { + struct opdl_port *port = &device->ports[i]; + struct opdl_queue *queue = &device->queue[port->queue_id]; + + if (port->queue_id == 0) { + continue; + } else if (queue->q_type != OPDL_Q_TYPE_SINGLE_LINK) { + + if (queue->q_pos == OPDL_Q_POS_MIDDLE) { + + /* Regular port with claim/disclaim */ + stage_inst = opdl_stage_add( + device->opdl[queue->opdl_id], + false, + false); + port->deq_stage_inst = stage_inst; + port->enq_stage_inst = stage_inst; + + if (queue->q_type == OPDL_Q_TYPE_ATOMIC) + port->atomic_claim = true; + else + port->atomic_claim = false; + + port->p_type = OPDL_REGULAR_PORT; + + /* Add the port to the queue array of ports */ + queue->ports[queue->nb_ports] = port; + port->instance_id = queue->nb_ports; + queue->nb_ports++; + } else if (queue->q_pos == OPDL_Q_POS_END) { + + /* tx port */ + stage_inst = opdl_stage_add( + device->opdl[queue->opdl_id], + false, + false); + port->deq_stage_inst = stage_inst; + port->enq_stage_inst = NULL; + port->p_type = OPDL_PURE_TX_PORT; + + /* Add the port to the queue array of ports */ + queue->ports[queue->nb_ports] = port; + port->instance_id = queue->nb_ports; + queue->nb_ports++; + } else { + + OPDL_LOG_ERR("port %u:, linked incorrectly" + " to a q_pos START/INVALID %u", + port->id, + queue->q_pos); + err = -EINVAL; + break; + } + + } else if (queue->q_type == OPDL_Q_TYPE_SINGLE_LINK) { + + port->p_type = OPDL_ASYNC_PORT; + + /* -- tx -- */ + stage_inst = opdl_stage_add( + device->opdl[queue->opdl_id], + false, + false); /* First stage */ + port->deq_stage_inst = stage_inst; + + /* Add the port to the queue array of ports */ + queue->ports[queue->nb_ports] = port; + port->instance_id = queue->nb_ports; + queue->nb_ports++; + + if (queue->nb_ports > 1) { + OPDL_LOG_ERR("queue %u:, setup as SINGLE_LINK" + " but has more than one port linked", + queue->external_qid); + err = -EINVAL; + break; + } + + /* -- single instance rx for next opdl -- */ + uint8_t next_qid = + device->q_map_ex_to_in[queue->external_qid] + 1; + if (next_qid < RTE_EVENT_MAX_QUEUES_PER_DEV && + device->queue[next_qid].configured) { + + /* Remap the queue */ + queue = &device->queue[next_qid]; + + stage_inst = opdl_stage_add( + device->opdl[queue->opdl_id], + false, + true); + port->enq_stage_inst = stage_inst; + + /* Add the port to the queue array of ports */ + queue->ports[queue->nb_ports] = port; + port->instance_id = queue->nb_ports; + queue->nb_ports++; + if (queue->nb_ports > 1) { + OPDL_LOG_ERR("dummy queue %u: for " + "port %u, " + "SINGLE_LINK but has more " + "than one port linked", + next_qid, + port->id); + err = -EINVAL; + break; + } + /* Set this queue to initialized as it is never + * referenced by any ports + */ + queue->initialized = 1; + } + } + } + + /* Now that all ports are initialised we need to + * setup the last bit of stage md + */ + if (!err) { + for (uint32_t i = 0; i < device->nb_ports; i++) { + struct opdl_port *port = &device->ports[i]; + struct opdl_queue *queue = + &device->queue[port->queue_id]; + + if (port->configured && + (port->queue_id != OPDL_INVALID_QID)) { + if (queue->nb_ports == 0) { + OPDL_LOG_ERR("queue:[%u] has no ports" + " linked to it", + port->id); + err = -EINVAL; + break; + } + + port->num_instance = queue->nb_ports; + port->initialized = 1; + queue->initialized = 1; + } else { + OPDL_LOG_ERR("Port:[%u] not configured invalid" + " queue configuration", + port->id); + err = -EINVAL; + break; + } + } + } + + if (!err) { + fprintf(stdout, + "Success - %u port(s) initialized\n", + device->nb_ports); + } + return err; +} + +int +initialise_queue_zero_ports(struct rte_eventdev *dev) +{ + int err = 0; + uint8_t mt_rx = 0; + struct opdl_stage *stage_inst = NULL; + struct opdl_queue *queue = NULL; + + struct opdl_evdev *device = opdl_pmd_priv(dev); + + /* Assign queue zero and figure out how many Q0 ports we have */ + for (uint32_t i = 0; i < device->nb_ports; i++) { + struct opdl_port *port = &device->ports[i]; + if (port->queue_id == OPDL_INVALID_QID) { + port->queue_id = 0; + port->external_qid = OPDL_INVALID_QID; + port->p_type = OPDL_PURE_RX_PORT; + mt_rx++; + } + } + + /* Create the stage */ + stage_inst = opdl_stage_add(device->opdl[0], + (mt_rx > 1 ? true : false), + true); + if (stage_inst) { + + /* Assign the new created input stage to all relevant ports */ + for (uint32_t i = 0; i < device->nb_ports; i++) { + struct opdl_port *port = &device->ports[i]; + if (port->queue_id == 0) { + queue = &device->queue[port->queue_id]; + port->enq_stage_inst = stage_inst; + port->deq_stage_inst = NULL; + port->configured = 1; + port->initialized = 1; + + queue->ports[queue->nb_ports] = port; + port->instance_id = queue->nb_ports; + queue->nb_ports++; + } + } + } else { + err = -1; + } + + if (!err) { + fprintf(stdout, "Success - (%u) \"Queue 0\" port(s) " + "initialized\n", + queue->nb_ports); + } + return err; +} + +int +assign_internal_queue_ids(struct rte_eventdev *dev) +{ + int err = 0; + struct opdl_evdev *device = opdl_pmd_priv(dev); + + for (uint32_t i = 0; i < device->nb_ports; i++) { + struct opdl_port *port = &device->ports[i]; + if (port->external_qid != OPDL_INVALID_QID) { + port->queue_id = + device->q_map_ex_to_in[port->external_qid]; + + /* Now do the external_qid of the next queue */ + struct opdl_queue *queue = + &device->queue[port->queue_id]; + if (queue->q_pos == OPDL_Q_POS_END) + port->next_external_qid = + device->queue[port->queue_id + 2].external_qid; + else + port->next_external_qid = + device->queue[port->queue_id + 1].external_qid; + } + } + return err; +} -- 2.7.5 -------------------------------------------------------------- Intel Research and Development Ireland Limited Registered in Ireland Registered Office: Collinstown Industrial Park, Leixlip, County Kildare Registered Number: 308263 This e-mail and any attachments may contain confidential material for the sole use of the intended recipient(s). Any review or distribution by others is strictly prohibited. If you are not the intended recipient, please contact the sender and delete all copies.