DPDK patches and discussions
 help / color / mirror / Atom feed
From: liang.j.ma@intel.com
To: jerin.jacob@caviumnetworks.com
Cc: dev@dpdk.org, harry.van.haaren@intel.com,
	bruce.richardson@intel.com, deepak.k.jain@intel.com,
	john.geary@intel.com
Subject: [dpdk-dev] [PATCH 2/7] event/opdl: add the opdl pmd header and init helper function
Date: Fri, 24 Nov 2017 11:23:47 +0000	[thread overview]
Message-ID: <1511522632-139652-3-git-send-email-liang.j.ma@intel.com> (raw)
In-Reply-To: <1511522632-139652-1-git-send-email-liang.j.ma@intel.com>

From: Liang Ma <liang.j.ma@intel.com>

opdl_evdev.h include the main data structure of opdl device
and all the function prototype need to be exposed to support
eventdev API.

opdl_evdev_init.c implement all initailization helper function

Signed-off-by: Liang Ma <liang.j.ma@intel.com>
Signed-off-by: Peter, Mccarthy <peter.mccarthy@intel.com>
---
 drivers/event/opdl/opdl_evdev.h      | 353 +++++++++++++
 drivers/event/opdl/opdl_evdev_init.c | 945 +++++++++++++++++++++++++++++++++++
 2 files changed, 1298 insertions(+)
 create mode 100644 drivers/event/opdl/opdl_evdev.h
 create mode 100644 drivers/event/opdl/opdl_evdev_init.c

diff --git a/drivers/event/opdl/opdl_evdev.h b/drivers/event/opdl/opdl_evdev.h
new file mode 100644
index 0000000..e2657de
--- /dev/null
+++ b/drivers/event/opdl/opdl_evdev.h
@@ -0,0 +1,353 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _OPDL_EVDEV_H_
+#define _OPDL_EVDEV_H_
+
+#include <rte_eventdev.h>
+#include <rte_eventdev_pmd_vdev.h>
+#include <rte_atomic.h>
+#include "opdl_ring.h"
+
+#define OPDL_QID_NUM_FIDS 1024
+#define OPDL_IQS_MAX 1
+#define OPDL_Q_PRIORITY_MAX 1
+#define OPDL_PORTS_MAX 64
+#define MAX_OPDL_CONS_Q_DEPTH 128
+/* OPDL size */
+#define OPDL_INFLIGHT_EVENTS_TOTAL 4096
+/* allow for lots of over-provisioning */
+#define OPDL_FRAGMENTS_MAX 1
+
+/* report dequeue burst sizes in buckets */
+#define OPDL_DEQ_STAT_BUCKET_SHIFT 2
+/* how many packets pulled from port by sched */
+#define SCHED_DEQUEUE_BURST_SIZE 32
+
+/* size of our history list */
+#define OPDL_PORT_HIST_LIST (MAX_OPDL_PROD_Q_DEPTH)
+
+/* how many data points use for average stats */
+#define NUM_SAMPLES 64
+
+#define EVENTDEV_NAME_OPDL_PMD event_opdl
+#define OPDL_PMD_NAME RTE_STR(event_opdl)
+#define OPDL_PMD_NAME_MAX 64
+
+#define OPDL_INVALID_QID 255
+
+#define OPDL_SCHED_TYPE_DIRECT (RTE_SCHED_TYPE_PARALLEL + 1)
+
+#define OPDL_NUM_POLL_BUCKETS  \
+	(MAX_OPDL_CONS_Q_DEPTH >> OPDL_DEQ_STAT_BUCKET_SHIFT)
+
+enum {
+	QE_FLAG_VALID_SHIFT = 0,
+	QE_FLAG_COMPLETE_SHIFT,
+	QE_FLAG_NOT_EOP_SHIFT,
+	_QE_FLAG_COUNT
+};
+
+enum port_type {
+	OPDL_INVALID_PORT = 0,
+	OPDL_REGULAR_PORT = 1,
+	OPDL_PURE_RX_PORT,
+	OPDL_PURE_TX_PORT,
+	OPDL_ASYNC_PORT
+};
+
+enum queue_type {
+	OPDL_Q_TYPE_INVALID = 0,
+	OPDL_Q_TYPE_SINGLE_LINK = 1,
+	OPDL_Q_TYPE_ATOMIC,
+	OPDL_Q_TYPE_ORDERED
+};
+
+enum queue_pos {
+	OPDL_Q_POS_START = 0,
+	OPDL_Q_POS_MIDDLE,
+	OPDL_Q_POS_END
+};
+
+#define QE_FLAG_VALID    (1 << QE_FLAG_VALID_SHIFT)    /* for NEW FWD, FRAG */
+#define QE_FLAG_COMPLETE (1 << QE_FLAG_COMPLETE_SHIFT) /* set for FWD, DROP  */
+#define QE_FLAG_NOT_EOP  (1 << QE_FLAG_NOT_EOP_SHIFT)  /* set for FRAG only  */
+
+static const uint8_t opdl_qe_flag_map[] = {
+	QE_FLAG_VALID /* NEW Event */,
+	QE_FLAG_VALID | QE_FLAG_COMPLETE /* FWD Event */,
+	QE_FLAG_COMPLETE /* RELEASE Event */,
+
+	/* Values which can be used for future support for partial
+	 * events, i.e. where one event comes back to the scheduler
+	 * as multiple which need to be tracked together
+	 */
+	QE_FLAG_VALID | QE_FLAG_COMPLETE | QE_FLAG_NOT_EOP,
+};
+
+#define OPDL_LOG_INFO(fmt, args...) \
+	RTE_LOG(INFO, EVENTDEV, "[%s] line %u: " fmt "\n", \
+			OPDL_PMD_NAME, \
+			__LINE__, ## args)
+
+#ifdef RTE_LIBRTE_PMD_EVDEV_OPDL_DEBUG
+#define OPDL_LOG_DBG(fmt, args...) \
+	RTE_LOG(DEBUG, EVENTDEV, "[%s] %s() line %u: " fmt "\n", \
+			OPDL_PMD_NAME, \
+			__func__, __LINE__, ## args)
+#else
+#define OPDL_LOG_DBG(fmt, args...)
+#endif
+
+#define OPDL_LOG_ERR(fmt, args...) \
+	RTE_LOG(ERR, EVENTDEV, "[%s] %s() line %u: " fmt "\n", \
+			OPDL_PMD_NAME, \
+			__func__, __LINE__, ## args)
+
+enum port_xstat_name {
+	claim_pkts_requested = 0,
+	claim_pkts_granted,
+	claim_non_empty,
+	claim_empty,
+	total_cycles,
+	max_num_port_xstat
+};
+
+#define OPDL_MAX_PORT_XSTAT_NUM (OPDL_PORTS_MAX * max_num_port_xstat)
+
+struct opdl_port;
+
+typedef uint16_t (*opdl_enq_operation)(struct opdl_port *port,
+		const struct rte_event ev[],
+		uint16_t num);
+
+typedef uint16_t (*opdl_deq_operation)(struct opdl_port *port,
+		struct rte_event ev[],
+		uint16_t num);
+
+struct opdl_evdev;
+
+struct opdl_stage_meta_data {
+	uint32_t num_claimed;	/* number of entries claimed by this stage */
+	uint32_t burst_sz;	/* Port claim burst size */
+};
+
+struct opdl_port {
+
+	/* back pointer */
+	struct opdl_evdev *opdl;
+
+	/* enq handler & stage instance */
+	opdl_enq_operation enq;
+	struct opdl_stage *enq_stage_inst;
+
+	/* deq handler & stage instance */
+	opdl_deq_operation deq;
+	struct opdl_stage *deq_stage_inst;
+
+	/* port id has correctly been set */
+	uint8_t configured;
+
+	/* set when the port is initialized */
+	uint8_t initialized;
+
+	/* A numeric ID for the port */
+	uint8_t id;
+
+	/* Space for claimed entries */
+	struct rte_event *entries[MAX_OPDL_CONS_Q_DEPTH];
+
+	/* RX/REGULAR/TX/ASYNC - determined on position in queue */
+	enum port_type p_type;
+
+	/* if the claim is static atomic type  */
+	bool atomic_claim;
+
+	/* Queue linked to this port - internal queue id*/
+	uint8_t queue_id;
+
+	/* Queue linked to this port - external queue id*/
+	uint8_t external_qid;
+
+	/* Next queue linked to this port - external queue id*/
+	uint8_t next_external_qid;
+
+	/* number of instances of this stage */
+	uint32_t num_instance;
+
+	/* instance ID of this stage*/
+	uint32_t instance_id;
+
+	/* track packets in and out of this port */
+	uint64_t port_stat[max_num_port_xstat];
+	uint64_t start_cycles;
+};
+
+struct opdl_queue_meta_data {
+	uint8_t         ext_id;
+	enum queue_type type;
+	int8_t          setup;
+};
+
+struct opdl_xstats_entry {
+	struct rte_event_dev_xstats_name stat;
+	unsigned int id;
+	uint64_t *value;
+};
+
+struct opdl_queue {
+
+	/* Turbine this queue is associated with */
+	uint32_t turbine_id;
+
+	/* type and position have correctly been set */
+	uint8_t configured;
+
+	/* port number and associated ports have been associated */
+	uint8_t initialized;
+
+	/* type of this queue (Atomic, Ordered, Parallel, Direct)*/
+	enum queue_type q_type;
+
+	/* position of queue (START, MIDDLE, END) */
+	enum queue_pos q_pos;
+
+	/* external queue id. It is mapped to the queue position */
+	uint8_t external_qid;
+
+	struct opdl_port *ports[OPDL_PORTS_MAX];
+	uint32_t nb_ports;
+
+	/* priority, reserved for future */
+	uint8_t priority;
+};
+
+
+#define OPDL_TUR_PER_DEV 12
+
+/* PMD needs an extra queue per turbine */
+#define OPDL_MAX_QUEUES (RTE_EVENT_MAX_QUEUES_PER_DEV - OPDL_TUR_PER_DEV)
+
+
+struct opdl_evdev {
+	struct rte_eventdev_data *data;
+
+	uint8_t started;
+
+	/* Max number of ports and queues*/
+	uint32_t max_port_nb;
+	uint32_t max_queue_nb;
+
+	/* slots in the turbine */
+	uint32_t nb_events_limit;
+
+	/*
+	 * Array holding all turbines for this device
+	 */
+	struct opdl_ring *turbine[OPDL_TUR_PER_DEV];
+	uint32_t nb_turbines;
+
+	struct opdl_queue_meta_data q_md[OPDL_MAX_QUEUES];
+	uint32_t nb_q_md;
+
+	/* Internal queues - one per logical queue */
+	struct opdl_queue
+		queue[RTE_EVENT_MAX_QUEUES_PER_DEV] __rte_cache_aligned;
+
+	uint32_t nb_queues;
+
+	struct opdl_stage_meta_data s_md[OPDL_PORTS_MAX];
+
+	/* Contains all ports - load balanced and directed */
+	struct opdl_port ports[OPDL_PORTS_MAX] __rte_cache_aligned;
+	uint32_t nb_ports;
+
+	uint8_t q_map_ex_to_in[OPDL_INVALID_QID];
+
+	/* Stats */
+	struct opdl_xstats_entry port_xstat[OPDL_MAX_PORT_XSTAT_NUM];
+
+	char service_name[OPDL_PMD_NAME_MAX];
+	int socket;
+	int do_validation;
+};
+
+
+static inline struct opdl_evdev *
+opdl_pmd_priv(const struct rte_eventdev *eventdev)
+{
+	return eventdev->data->dev_private;
+}
+
+static inline const struct opdl_evdev *
+opdl_pmd_priv_const(const struct rte_eventdev *eventdev)
+{
+	return eventdev->data->dev_private;
+}
+
+uint16_t opdl_event_enqueue(void *port, const struct rte_event *ev);
+uint16_t opdl_event_enqueue_burst(void *port, const struct rte_event ev[],
+		uint16_t num);
+
+uint16_t opdl_event_dequeue(void *port, struct rte_event *ev, uint64_t wait);
+uint16_t opdl_event_dequeue_burst(void *port, struct rte_event *ev,
+		uint16_t num, uint64_t wait);
+void opdl_event_schedule(struct rte_eventdev *dev);
+
+void opdl_xstats_init(struct rte_eventdev *dev);
+int opdl_xstats_uninit(struct rte_eventdev *dev);
+int opdl_xstats_get_names(const struct rte_eventdev *dev,
+		enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id,
+		struct rte_event_dev_xstats_name *xstats_names,
+		unsigned int *ids, unsigned int size);
+int opdl_xstats_get(const struct rte_eventdev *dev,
+		enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id,
+		const unsigned int ids[], uint64_t values[], unsigned int n);
+uint64_t opdl_xstats_get_by_name(const struct rte_eventdev *dev,
+		const char *name, unsigned int *id);
+int opdl_xstats_reset(struct rte_eventdev *dev,
+		enum rte_event_dev_xstats_mode mode,
+		int16_t queue_port_id,
+		const uint32_t ids[],
+		uint32_t nb_ids);
+
+int opdl_add_event_handlers(struct rte_eventdev *dev);
+int build_all_dependencies(struct rte_eventdev *dev);
+int check_queues_linked(struct rte_eventdev *dev);
+int create_queues_and_rings(struct rte_eventdev *dev);
+int initialise_all_other_ports(struct rte_eventdev *dev);
+int initialise_queue_zero_ports(struct rte_eventdev *dev);
+int assign_internal_queue_ids(struct rte_eventdev *dev);
+void destroy_queues_and_rings(struct rte_eventdev *dev);
+
+
+#endif /* _OPDL_EVDEV_H_ */
diff --git a/drivers/event/opdl/opdl_evdev_init.c b/drivers/event/opdl/opdl_evdev_init.c
new file mode 100644
index 0000000..2699cb2
--- /dev/null
+++ b/drivers/event/opdl/opdl_evdev_init.c
@@ -0,0 +1,945 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <inttypes.h>
+#include <string.h>
+
+#include <rte_bus_vdev.h>
+#include <rte_memzone.h>
+#include <rte_kvargs.h>
+#include <rte_ring.h>
+#include <rte_errno.h>
+#include <rte_cycles.h>
+
+#include "opdl_evdev.h"
+#include "opdl_ring.h"
+
+
+static inline uint32_t __attribute__((always_inline))
+enqueue_check(struct opdl_port *p,
+		const struct rte_event ev[],
+		uint16_t num,
+		uint16_t num_events)
+{
+	uint16_t i;
+
+	if (p->opdl->do_validation) {
+
+		for (i = 0; i < num; i++) {
+			if (ev[i].queue_id != p->next_external_qid) {
+				OPDL_LOG_ERR("ERROR - port:[%u] - event wants"
+						" to enq to q_id[%u],"
+						" but should be [%u]\n",
+						p->id,
+						ev[i].queue_id,
+						p->next_external_qid);
+				rte_errno = -EINVAL;
+				return 0;
+			}
+		}
+
+		/* Stats */
+		if (p->p_type == OPDL_PURE_RX_PORT ||
+				p->p_type == OPDL_ASYNC_PORT) {
+			/* Stats */
+			if (num_events) {
+				p->port_stat[claim_pkts_requested] += num;
+				p->port_stat[claim_pkts_granted] += num_events;
+				p->port_stat[claim_non_empty]++;
+				p->start_cycles = rte_rdtsc();
+			} else {
+				p->port_stat[claim_empty]++;
+				p->start_cycles = 0;
+			}
+		} else {
+			if (p->start_cycles) {
+				uint64_t end_cycles = rte_rdtsc();
+				p->port_stat[total_cycles] +=
+					end_cycles - p->start_cycles;
+			}
+		}
+	} else {
+		if (num > 0 &&
+				ev[0].queue_id != p->next_external_qid) {
+			rte_errno = -EINVAL;
+			return 0;
+		}
+	}
+
+	return num;
+}
+
+static inline void __attribute__((always_inline))
+update_on_dequeue(struct opdl_port *p,
+		struct rte_event ev[],
+		uint16_t num,
+		uint16_t num_events)
+{
+	if (p->opdl->do_validation) {
+		int16_t i;
+		for (i = 0; i < num; i++)
+			ev[i].queue_id =
+				p->opdl->queue[p->queue_id].external_qid;
+
+		/* Stats */
+		if (num_events) {
+			p->port_stat[claim_pkts_requested] += num;
+			p->port_stat[claim_pkts_granted] += num_events;
+			p->port_stat[claim_non_empty]++;
+			p->start_cycles = rte_rdtsc();
+		} else {
+			p->port_stat[claim_empty]++;
+			p->start_cycles = 0;
+		}
+	} else {
+		if (num > 0)
+			ev[0].queue_id =
+				p->opdl->queue[p->queue_id].external_qid;
+	}
+}
+
+
+/*
+ * Error RX enqueue:
+ *
+ *
+ */
+
+static uint16_t
+opdl_rx_error_enqueue(struct opdl_port *p,
+		const struct rte_event ev[],
+		uint16_t num)
+{
+	RTE_SET_USED(p);
+	RTE_SET_USED(ev);
+	RTE_SET_USED(num);
+
+	rte_errno = -ENOSPC;
+
+	return 0;
+}
+
+/*
+ * RX enqueue:
+ *
+ * This function handles enqueue for a single input stage_inst with
+ *	threadsafe disabled or enabled. eg 1 thread using a stage_inst or
+ *	multiple threads sharing a stage_inst
+ */
+
+static uint16_t
+opdl_rx_enqueue(struct opdl_port *p,
+		const struct rte_event ev[],
+		uint16_t num)
+{
+	uint16_t enqueued = 0;
+
+	enqueued = opdl_ring_input(opdl_stage_get_opdl_ring(p->enq_stage_inst),
+				   ev,
+				   num,
+				   false);
+	if (!enqueue_check(p, ev, num, enqueued))
+		return 0;
+
+
+	if (enqueued < num)
+		rte_errno = -ENOSPC;
+
+	return enqueued;
+}
+
+/*
+ * Error TX handler
+ *
+ */
+
+static uint16_t
+opdl_tx_error_dequeue(struct opdl_port *p,
+		struct rte_event ev[],
+		uint16_t num)
+{
+	RTE_SET_USED(p);
+	RTE_SET_USED(ev);
+	RTE_SET_USED(num);
+
+	rte_errno = -ENOSPC;
+
+	return 0;
+}
+
+/*
+ * TX single threaded claim
+ *
+ * This function handles dequeue for a single worker stage_inst with
+ *	threadsafe disabled. eg 1 thread using an stage_inst
+ */
+
+static uint16_t
+opdl_tx_dequeue_single_thread(struct opdl_port *p,
+			struct rte_event ev[],
+			uint16_t num)
+{
+	uint16_t returned;
+
+	struct opdl_ring  *ring;
+
+	ring = opdl_stage_get_opdl_ring(p->deq_stage_inst);
+
+	returned = opdl_ring_copy_to_burst(ring,
+					   p->deq_stage_inst,
+					   ev,
+					   num,
+					   false);
+
+	update_on_dequeue(p, ev, num, returned);
+
+	return returned;
+}
+
+/*
+ * TX multi threaded claim
+ *
+ * This function handles dequeue for multiple worker stage_inst with
+ *	threadsafe disabled. eg multiple stage_inst each with its own instance
+ */
+
+static uint16_t
+opdl_tx_dequeue_multi_inst(struct opdl_port *p,
+			struct rte_event ev[],
+			uint16_t num)
+{
+	uint32_t num_events = 0;
+
+	num_events = opdl_stage_claim(p->deq_stage_inst,
+				    (void *)ev,
+				    num,
+				    NULL,
+				    false,
+				    false);
+
+	update_on_dequeue(p, ev, num, num_events);
+
+	return opdl_stage_disclaim(p->deq_stage_inst, num_events, false);
+}
+
+
+/*
+ * Worker thread claim
+ *
+ */
+
+static uint16_t
+opdl_claim(struct opdl_port *p, struct rte_event ev[], uint16_t num)
+{
+	uint32_t num_events = 0;
+
+	if (unlikely(num > MAX_OPDL_CONS_Q_DEPTH)) {
+		OPDL_LOG_ERR(""
+				"Attempt to dequeue num of events larger than port (%d) max\n",
+				p->id);
+		rte_errno = -EINVAL;
+		return 0;
+	}
+
+
+	num_events = opdl_stage_claim(p->deq_stage_inst,
+			(void *)ev,
+			num,
+			NULL,
+			false,
+			p->atomic_claim);
+
+
+	update_on_dequeue(p, ev, num, num_events);
+
+	return num_events;
+}
+
+/*
+ * Worker thread disclaim
+ */
+
+static uint16_t
+opdl_disclaim(struct opdl_port *p, const struct rte_event ev[], uint16_t num)
+{
+	uint16_t enqueued = 0;
+
+	enqueued = opdl_stage_disclaim(p->enq_stage_inst,
+				       num,
+				       false);
+
+	return enqueue_check(p, ev, num, enqueued);
+}
+
+static inline struct opdl_stage *__attribute__((always_inline))
+stage_for_port(struct opdl_queue *q, unsigned int i)
+{
+	if (q->q_pos == OPDL_Q_POS_START || q->q_pos == OPDL_Q_POS_MIDDLE)
+		return q->ports[i]->enq_stage_inst;
+	else
+		return q->ports[i]->deq_stage_inst;
+}
+
+static int opdl_add_deps(struct opdl_evdev *device,
+			 int q_id,
+			 int deps_q_id)
+{
+	unsigned int i, j;
+	int status;
+	struct opdl_ring  *ring;
+	struct opdl_queue *queue = &device->queue[q_id];
+	struct opdl_queue *queue_deps = &device->queue[deps_q_id];
+	struct opdl_stage *dep_stages[OPDL_PORTS_MAX];
+
+	/* sanity check that all stages are for same turbine */
+	for (i = 0; i < queue->nb_ports; i++) {
+		struct opdl_ring *r =
+			opdl_stage_get_opdl_ring(stage_for_port(queue, i));
+		for (j = 0; j < queue_deps->nb_ports; j++) {
+			struct opdl_ring *rj =
+				opdl_stage_get_opdl_ring(
+						stage_for_port(queue_deps, j));
+			if (r != rj) {
+				OPDL_LOG_ERR("Stages and dependents"
+						" are not for same turbine");
+				for (uint32_t k = 0;
+						k < device->nb_turbines; k++) {
+					opdl_ring_dump(device->turbine[k],
+							stdout);
+				}
+				return -EINVAL;
+			}
+		}
+	}
+
+	/* Gather all stages instance in deps */
+	for (i = 0; i < queue_deps->nb_ports; i++)
+		dep_stages[i] = stage_for_port(queue_deps, i);
+
+
+	/* Add all deps for each port->stage_inst in this queue */
+	for (i = 0; i < queue->nb_ports; i++) {
+
+		ring = opdl_stage_get_opdl_ring(stage_for_port(queue, i));
+
+		status = opdl_stage_deps_add(ring,
+				stage_for_port(queue, i),
+				queue->ports[i]->num_instance,
+				queue->ports[i]->instance_id,
+				dep_stages,
+				queue_deps->nb_ports);
+		if (status < 0)
+			return -EINVAL;
+	}
+
+	return 0;
+}
+
+int
+opdl_add_event_handlers(struct rte_eventdev *dev)
+{
+	int err = 0;
+
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+	unsigned int i;
+
+	for (i = 0; i < device->max_port_nb; i++) {
+
+		struct opdl_port *port = &device->ports[i];
+
+		if (port->configured) {
+			if (port->p_type == OPDL_PURE_RX_PORT) {
+				port->enq = opdl_rx_enqueue;
+				port->deq = opdl_tx_error_dequeue;
+
+			} else if (port->p_type == OPDL_PURE_TX_PORT) {
+
+				port->enq = opdl_rx_error_enqueue;
+
+				if (port->num_instance == 1)
+					port->deq =
+						opdl_tx_dequeue_single_thread;
+				else
+					port->deq = opdl_tx_dequeue_multi_inst;
+
+			} else if (port->p_type == OPDL_REGULAR_PORT) {
+
+				port->enq = opdl_disclaim;
+				port->deq = opdl_claim;
+
+			} else if (port->p_type == OPDL_ASYNC_PORT) {
+
+				port->enq = opdl_rx_enqueue;
+
+				/* Always single instance */
+				port->deq = opdl_tx_dequeue_single_thread;
+			} else {
+				OPDL_LOG_ERR("port:[%u] has invalid port type - ",
+						port->id);
+				err = -EINVAL;
+				break;
+			}
+			port->initialized = 1;
+		}
+	}
+
+	if (!err)
+		fprintf(stdout, "Success - enqueue/dequeue handler(s) added\n");
+	return err;
+}
+
+int
+build_all_dependencies(struct rte_eventdev *dev)
+{
+
+	int err = 0;
+	unsigned int i;
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	uint8_t start_qid = 0;
+
+	for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
+		struct opdl_queue *queue = &device->queue[i];
+		if (!queue->initialized)
+			break;
+
+		if (queue->q_pos == OPDL_Q_POS_START) {
+			start_qid = i;
+			continue;
+		}
+
+		if (queue->q_pos == OPDL_Q_POS_MIDDLE) {
+			err = opdl_add_deps(device, i, i-1);
+			if (err < 0) {
+				OPDL_LOG_ERR("dependancy addition for queue:[%u] - FAILED",
+						queue->external_qid);
+				break;
+			}
+		}
+
+		if (queue->q_pos == OPDL_Q_POS_END) {
+			/* Add this dependency */
+			err = opdl_add_deps(device, i, i-1);
+			if (err < 0) {
+				OPDL_LOG_ERR("dependancy addition for queue:[%u] - FAILED",
+						queue->external_qid);
+				break;
+			}
+			/* Add dependency for rx on tx */
+			err = opdl_add_deps(device, start_qid, i);
+			if (err < 0) {
+				OPDL_LOG_ERR("dependancy addition for queue:[%u] - FAILED",
+						queue->external_qid);
+				break;
+			}
+		}
+	}
+
+	if (!err)
+		fprintf(stdout, "Success - dependencies built\n");
+
+	return err;
+}
+int
+check_queues_linked(struct rte_eventdev *dev)
+{
+
+	int err = 0;
+	unsigned int i;
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+	uint32_t nb_iq = 0;
+
+	for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
+		struct opdl_queue *queue = &device->queue[i];
+
+		if (!queue->initialized)
+			break;
+
+		if (queue->external_qid == OPDL_INVALID_QID)
+			nb_iq++;
+
+		if (queue->nb_ports == 0) {
+			OPDL_LOG_ERR("queue:[%u] has no associated ports",
+					i);
+			err = -EINVAL;
+			break;
+		}
+	}
+	if (!err) {
+		if ((i - nb_iq) != device->max_queue_nb) {
+			OPDL_LOG_ERR("%u queues counted but should be %u",
+					i - nb_iq,
+					device->max_queue_nb);
+			err = -1;
+		} else {
+			fprintf(stdout, "Success - %u queues (ex:%u + in:%u) validated\n",
+					i,
+					device->max_queue_nb,
+					nb_iq);
+		}
+
+	}
+	return err;
+}
+
+void
+destroy_queues_and_rings(struct rte_eventdev *dev)
+{
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	for (uint32_t i = 0; i < device->nb_turbines; i++) {
+		if (device->turbine[i])
+			opdl_ring_free(device->turbine[i]);
+	}
+
+	memset(&device->queue,
+			0,
+			sizeof(struct opdl_queue)
+			* RTE_EVENT_MAX_QUEUES_PER_DEV);
+}
+
+#define TURBINE_ID(d)(d->nb_turbines - 1)
+
+static inline void
+initialise_queue(struct opdl_evdev *device,
+		enum queue_pos pos,
+		int32_t i)
+{
+	struct opdl_queue *queue = &device->queue[device->nb_queues];
+
+	if (i == -1) {
+		queue->q_type = OPDL_Q_TYPE_ORDERED;
+		queue->external_qid = OPDL_INVALID_QID;
+	} else {
+		queue->q_type = device->q_md[i].type;
+		queue->external_qid = device->q_md[i].ext_id;
+		/* Add ex->in for queues setup */
+		device->q_map_ex_to_in[queue->external_qid] = device->nb_queues;
+	}
+	queue->turbine_id = TURBINE_ID(device);
+	queue->q_pos = pos;
+	queue->nb_ports = 0;
+	queue->configured = 1;
+
+	device->nb_queues++;
+}
+
+
+static inline int
+create_turbine(struct opdl_evdev *device)
+{
+	int err = 0;
+
+	char name[RTE_MEMZONE_NAMESIZE];
+
+	sprintf(name, "%s_%u", device->service_name, device->nb_turbines);
+
+	device->turbine[device->nb_turbines] =
+		opdl_ring_create(name,
+				device->nb_events_limit,
+				sizeof(struct rte_event),
+				device->max_port_nb * 2,
+				device->socket);
+
+	if (!device->turbine[device->nb_turbines]) {
+		OPDL_LOG_ERR("opdl ring %u creation - FAILED",
+				device->nb_turbines);
+		err = -EINVAL;
+	} else {
+		device->nb_turbines++;
+	}
+	return err;
+}
+
+static inline int
+create_link_turbine(struct opdl_evdev *device, uint32_t index)
+{
+
+	int err = 0;
+
+	if (device->q_md[index + 1].type !=
+			OPDL_Q_TYPE_SINGLE_LINK) {
+
+		/* async queue with regular
+		 * queue following it
+		 */
+
+		/* create a new turbine */
+		err = create_turbine(device);
+		if (!err) {
+			/* create an initial
+			 * dummy queue for new turbine
+			 */
+			initialise_queue(device,
+					OPDL_Q_POS_START,
+					-1);
+		} else {
+			err = -EINVAL;
+		}
+	} else {
+		OPDL_LOG_ERR("queue %u, 2"
+				" SINGLE_LINK queues, not allowed",
+				index);
+		err = -EINVAL;
+	}
+
+	return err;
+}
+
+int
+create_queues_and_rings(struct rte_eventdev *dev)
+{
+	int err = 0;
+
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	device->nb_queues = 0;
+
+	if (device->nb_ports != device->max_port_nb) {
+		OPDL_LOG_ERR("Number ports setup:%u NOT EQUAL to max port"
+				" number:%u for this device",
+				device->nb_ports,
+				device->max_port_nb);
+		err = -1;
+	}
+
+	if (!err) {
+		/* We will have at least one turbine so create it now */
+		err = create_turbine(device);
+	}
+
+	if (!err) {
+
+		/* Create 1st "dummy" queue */
+		initialise_queue(device,
+				 OPDL_Q_POS_START,
+				 -1);
+
+		for (uint32_t i = 0; i < device->nb_q_md; i++) {
+
+			/* Check */
+			if (!device->q_md[i].setup) {
+
+				OPDL_LOG_ERR("queue meta data slot %u"
+						" not setup - FAILING",
+						i);
+				err = -EINVAL;
+				break;
+			} else if (device->q_md[i].type !=
+					OPDL_Q_TYPE_SINGLE_LINK) {
+
+				if (!device->q_md[i + 1].setup) {
+					/* Create a simple ORDERED/ATOMIC
+					 * queue at the end
+					 */
+					initialise_queue(device,
+							OPDL_Q_POS_END,
+							i);
+
+				} else {
+					/* Create a simple ORDERED/ATOMIC
+					 * queue in the middle
+					 */
+					initialise_queue(device,
+							OPDL_Q_POS_MIDDLE,
+							i);
+				}
+			} else if (device->q_md[i].type ==
+					OPDL_Q_TYPE_SINGLE_LINK) {
+
+				/* create last queue for this turbine */
+				initialise_queue(device,
+						OPDL_Q_POS_END,
+						i);
+
+				err = create_link_turbine(device, i);
+
+				if (err)
+					break;
+
+
+			}
+		}
+	}
+	if (err)
+		destroy_queues_and_rings(dev);
+	else
+		fprintf(stdout, "Success - Created %u queues and %u turbines\n",
+				device->nb_queues,
+				device->nb_turbines);
+
+	return err;
+}
+
+
+int
+initialise_all_other_ports(struct rte_eventdev *dev)
+{
+	int err = 0;
+	struct opdl_stage *stage_inst = NULL;
+
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	for (uint32_t i = 0; i < device->nb_ports; i++) {
+		struct opdl_port *port = &device->ports[i];
+		struct opdl_queue *queue = &device->queue[port->queue_id];
+
+		if (port->queue_id == 0) {
+			continue;
+		} else if (queue->q_type != OPDL_Q_TYPE_SINGLE_LINK) {
+
+			if (queue->q_pos == OPDL_Q_POS_MIDDLE) {
+
+				/* Regular port with claim/disclaim */
+				stage_inst = opdl_stage_add(
+					device->turbine[queue->turbine_id],
+						false,
+						false);
+				port->deq_stage_inst = stage_inst;
+				port->enq_stage_inst = stage_inst;
+
+				if (queue->q_type == OPDL_Q_TYPE_ATOMIC)
+					port->atomic_claim = true;
+				else
+					port->atomic_claim = false;
+
+				port->p_type =  OPDL_REGULAR_PORT;
+
+				/* Add the port to the queue array of ports */
+				queue->ports[queue->nb_ports] = port;
+				port->instance_id = queue->nb_ports;
+				queue->nb_ports++;
+			} else if (queue->q_pos == OPDL_Q_POS_END) {
+
+				/* tx port  */
+				stage_inst = opdl_stage_add(
+					device->turbine[queue->turbine_id],
+						false,
+						false);
+				port->deq_stage_inst = stage_inst;
+				port->enq_stage_inst = NULL;
+				port->p_type = OPDL_PURE_TX_PORT;
+
+				/* Add the port to the queue array of ports */
+				queue->ports[queue->nb_ports] = port;
+				port->instance_id = queue->nb_ports;
+				queue->nb_ports++;
+			} else {
+
+				OPDL_LOG_ERR("port %u:, linked incorrectly"
+						" to a q_pos START/INVALID %u",
+						port->id,
+						queue->q_pos);
+				err = -EINVAL;
+				break;
+			}
+
+		} else if (queue->q_type == OPDL_Q_TYPE_SINGLE_LINK) {
+
+			port->p_type = OPDL_ASYNC_PORT;
+
+			/* -- tx -- */
+			stage_inst = opdl_stage_add(
+				device->turbine[queue->turbine_id],
+					false,
+					false); /* First stage */
+			port->deq_stage_inst = stage_inst;
+
+			/* Add the port to the queue array of ports */
+			queue->ports[queue->nb_ports] = port;
+			port->instance_id = queue->nb_ports;
+			queue->nb_ports++;
+
+			if (queue->nb_ports > 1) {
+				OPDL_LOG_ERR("queue %u:, setup as SINGLE_LINK"
+					" but has more than one port linked",
+						queue->external_qid);
+				err = -EINVAL;
+				break;
+			}
+
+			/* -- single instance rx for next turbine -- */
+			uint8_t next_qid =
+				device->q_map_ex_to_in[queue->external_qid] + 1;
+			if (next_qid < RTE_EVENT_MAX_QUEUES_PER_DEV &&
+					device->queue[next_qid].configured) {
+
+				/* Remap the queue */
+				queue = &device->queue[next_qid];
+
+				stage_inst = opdl_stage_add(
+					device->turbine[queue->turbine_id],
+						false,
+						true);
+				port->enq_stage_inst = stage_inst;
+
+				/* Add the port to the queue array of ports */
+				queue->ports[queue->nb_ports] = port;
+				port->instance_id = queue->nb_ports;
+				queue->nb_ports++;
+				if (queue->nb_ports > 1) {
+					OPDL_LOG_ERR("dummy queue %u: for "
+						"port %u, "
+						"SINGLE_LINK but has more "
+						"than one port linked",
+							next_qid,
+							port->id);
+					err = -EINVAL;
+					break;
+				}
+				/* Set this queue to initialized as it is never
+				 * referenced by any ports
+				 */
+				queue->initialized = 1;
+			}
+		}
+	}
+
+	/* Now that all ports are initialised we need to
+	 * setup the last bit of stage md
+	 */
+	if (!err) {
+		for (uint32_t i = 0; i < device->nb_ports; i++) {
+			struct opdl_port *port = &device->ports[i];
+			struct opdl_queue *queue =
+				&device->queue[port->queue_id];
+
+			if (port->configured &&
+					(port->queue_id != OPDL_INVALID_QID)) {
+				if (queue->nb_ports == 0) {
+					OPDL_LOG_ERR("queue:[%u] has no ports"
+							" linked to it",
+							port->id);
+					err = -EINVAL;
+					break;
+				}
+
+				port->num_instance = queue->nb_ports;
+				port->initialized = 1;
+				queue->initialized = 1;
+			} else {
+				OPDL_LOG_ERR("Port:[%u] not configured  invalid"
+						" queue configuration",
+						port->id);
+				err = -EINVAL;
+				break;
+			}
+		}
+	}
+
+	if (!err) {
+		fprintf(stdout,
+				"Success - %u port(s) initialized\n",
+				device->nb_ports);
+	}
+	return err;
+}
+
+int
+initialise_queue_zero_ports(struct rte_eventdev *dev)
+{
+	int err = 0;
+	uint8_t mt_rx = 0;
+	struct opdl_stage *stage_inst = NULL;
+	struct opdl_queue *queue = NULL;
+
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	/* Assign queue zero and figure out how many Q0 ports we have */
+	for (uint32_t i = 0; i < device->nb_ports; i++) {
+		struct opdl_port *port = &device->ports[i];
+		if (port->queue_id == OPDL_INVALID_QID) {
+			port->queue_id = 0;
+			port->external_qid = OPDL_INVALID_QID;
+			port->p_type = OPDL_PURE_RX_PORT;
+			mt_rx++;
+		}
+	}
+
+	/* Create the stage */
+	stage_inst = opdl_stage_add(device->turbine[0],
+			(mt_rx > 1 ? true : false),
+			true);
+	if (stage_inst) {
+
+		/* Assign the new created input stage to all relevant ports */
+		for (uint32_t i = 0; i < device->nb_ports; i++) {
+			struct opdl_port *port = &device->ports[i];
+			if (port->queue_id == 0) {
+				queue = &device->queue[port->queue_id];
+				port->enq_stage_inst = stage_inst;
+				port->deq_stage_inst = NULL;
+				port->configured = 1;
+				port->initialized = 1;
+
+				queue->ports[queue->nb_ports] = port;
+				port->instance_id = queue->nb_ports;
+				queue->nb_ports++;
+			}
+		}
+	} else {
+		err = -1;
+	}
+
+	if (!err) {
+		fprintf(stdout, "Success - (%u) \"Queue 0\" port(s) "
+				"initialized\n",
+				queue->nb_ports);
+	}
+	return err;
+}
+
+int
+assign_internal_queue_ids(struct rte_eventdev *dev)
+{
+	int err = 0;
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	for (uint32_t i = 0; i < device->nb_ports; i++) {
+		struct opdl_port *port = &device->ports[i];
+		if (port->external_qid != OPDL_INVALID_QID) {
+			port->queue_id =
+				device->q_map_ex_to_in[port->external_qid];
+
+			/* Now do the external_qid of the next queue */
+			struct opdl_queue *queue =
+				&device->queue[port->queue_id];
+			if (queue->q_pos == OPDL_Q_POS_END)
+				port->next_external_qid =
+				device->queue[port->queue_id + 2].external_qid;
+			else
+				port->next_external_qid =
+				device->queue[port->queue_id + 1].external_qid;
+		}
+	}
+	return err;
+}
-- 
2.7.5

--------------------------------------------------------------
Intel Research and Development Ireland Limited
Registered in Ireland
Registered Office: Collinstown Industrial Park, Leixlip, County Kildare
Registered Number: 308263


This e-mail and any attachments may contain confidential material for the sole
use of the intended recipient(s). Any review or distribution by others is
strictly prohibited. If you are not the intended recipient, please contact the
sender and delete all copies.

  parent reply	other threads:[~2017-11-24 11:24 UTC|newest]

Thread overview: 13+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2017-11-24 11:23 [dpdk-dev] [RFC PATCH 0/7] RFC:EventDev OPDL PMD liang.j.ma
2017-11-24 11:23 ` [dpdk-dev] [PATCH 1/7] event/opdl: add the opdl ring infrastructure library liang.j.ma
2017-11-24 11:23 ` liang.j.ma [this message]
2017-11-24 11:23 ` [dpdk-dev] [PATCH 3/7] event/opdl: add the opdl pmd main body and xstats helper function liang.j.ma
2017-11-24 11:23 ` [dpdk-dev] [PATCH 4/7] event/opdl: update the build system to enable compilation of pmd liang.j.ma
2017-11-24 11:23 ` [dpdk-dev] [PATCH 5/7] test/eventdev: opdl eventdev pmd unit test func and makefiles liang.j.ma
2017-11-24 11:23 ` [dpdk-dev] [PATCH 6/7] examples/eventdev_pipeline_opdl: adding example liang.j.ma
2017-11-24 11:23 ` [dpdk-dev] [PATCH 7/7] doc: add eventdev opdl pmd docuement and example usage document liang.j.ma
2017-11-24 20:55 ` [dpdk-dev] [RFC PATCH 0/7] RFC:EventDev OPDL PMD Jerin Jacob
2017-11-29 12:19   ` Ma, Liang
2017-11-29 12:56     ` Jerin Jacob
2017-11-29 17:15       ` Ma, Liang
2017-11-30 17:41         ` Jerin Jacob

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1511522632-139652-3-git-send-email-liang.j.ma@intel.com \
    --to=liang.j.ma@intel.com \
    --cc=bruce.richardson@intel.com \
    --cc=deepak.k.jain@intel.com \
    --cc=dev@dpdk.org \
    --cc=harry.van.haaren@intel.com \
    --cc=jerin.jacob@caviumnetworks.com \
    --cc=john.geary@intel.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).