DPDK patches and discussions
 help / color / mirror / Atom feed
* [dpdk-dev] [RFC PATCH 0/7] RFC:EventDev OPDL PMD
@ 2017-11-24 11:23 liang.j.ma
  2017-11-24 11:23 ` [dpdk-dev] [PATCH 1/7] event/opdl: add the opdl ring infrastructure library liang.j.ma
                   ` (7 more replies)
  0 siblings, 8 replies; 13+ messages in thread
From: liang.j.ma @ 2017-11-24 11:23 UTC (permalink / raw)
  To: jerin.jacob
  Cc: dev, harry.van.haaren, bruce.richardson, deepak.k.jain, john.geary

From: Liang Ma <liang.j.ma@intel.com>

The OPDL (Ordered Packet Distribution Library) eventdev is a specific
implementation of the eventdev API. It is particularly suited to packet
processing workloads that have high throughput and low latency 
requirements. All packets follow the same path through the device.
The order which packets  follow is determinted by the order in which
queues are set up. Packets are left on the ring until they are transmitted.
As a result packets do not go out of order.

Features:

The OPDL eventdev implements a subset of features of the eventdev API;

Queues
 * Atomic
 * Ordered (Parallel is supported as parallel is a subset of Ordered)
 * Single-Link

Ports
 * Load balanced (for Atomic, Ordered, Parallel queues)
 * Single Link (for single-link queues)

Single Port Queue

It is possible to create a Single Port Queue 
RTE_EVENT_QUEUE_CFG_SINGLE_LINK. Packets dequeued from this queue do
not need to be re-enqueued (as is the case with an ordered queue). The 
purpose of this queue is to allow for asynchronous handling of packets in 
the middle of a pipeline. Ordered queues in the middle of a pipeline 
cannot delete packets.


Queue Dependencies

As stated the order in which packets travel through queues is static in
nature. They go through the queues in the order the queues are setup at
initialisation rte_event_queue_setup(). For example if an application
sets up 3 queues, Q0, Q1, Q2 and has 3 assoicated ports P0, P1, P2 and 
P3 then packets must be

 * Enqueued onto Q0 (typically through P0), then

 * Dequeued from Q0 (typically through P1), then

 * Enqueued onto Q1 (also through P1), then

 * Dequeued from Q2 (typically through P2),  then

 * Enqueued onto Q3 (also through P2), then

 * Dequeued from Q3 (typically through P3) and then transmitted on the 
   relevant eth port


Limitations

The opdl implementation has a number of limitations. These limitations are
due to the static nature of the underlying queues. It is because of this
that the implementation can achieve such high throughput and low latency

The following list is a comprehensive outline of the what is supported and
the limitations / restrictions imposed by the opdl pmd

 - The order in which packets moved between queues is static and fixed 
   (dynamic scheduling is not supported).

 - NEW, RELEASE op type are not explicitly supported. RX (first enqueue) 
   implicitly adds NEW event types, and TX (last dequeue) implicitly does
   RELEASE event types.

 - All packets follow the same path through device queues.

 - Flows within queues are NOT supported.

 - Event priority is NOT supported.

 - Once the device is stopped all inflight events are lost. Applications should 
   clear all inflight events before stopping it.

 - Each port can only be associated with one queue.

 - Each queue can have multiple ports associated with it.

 - Each worker core has to dequeue the maximum burst size for that port.

 - For performance, the rte_event flow_id should not be updated once 
    packet is enqueued on RX.

Reference
General concept of event driven programming model
[http://dpdk.org/doc/guides/eventdevs/index.html]

Original Ordered Pipeline Design slides 
[https://dpdksummit.com/Archive/pdf/2017Asia/DPDK-China2017-Ma-OPDL.pdf]


Liang Ma (7):
  event/opdl:  add the opdl ring infrastructure library
  event/opdl: add the opdl pmd header and init helper function
  event/opdl: add the opdl pmd main body and xstats helper function
  event/opdl: update the build system to enable compilation of pmd
  test/eventdev: opdl eventdev pmd unit test func and makefiles
  examples/eventdev_pipeline_opdl: adding example
  doc: add eventdev opdl pmd docuement and example usage document

 config/common_base                                 |    6 +
 doc/guides/eventdevs/index.rst                     |    1 +
 doc/guides/eventdevs/opdl.rst                      |  162 +++
 .../sample_app_ug/eventdev_pipeline_opdl_pmd.rst   |  128 +++
 doc/guides/sample_app_ug/index.rst                 |    1 +
 drivers/event/Makefile                             |    1 +
 drivers/event/opdl/Makefile                        |   65 ++
 drivers/event/opdl/opdl_evdev.c                    |  714 ++++++++++++
 drivers/event/opdl/opdl_evdev.h                    |  353 ++++++
 drivers/event/opdl/opdl_evdev_init.c               |  945 ++++++++++++++++
 drivers/event/opdl/opdl_evdev_xstats.c             |  205 ++++
 drivers/event/opdl/opdl_ring.c                     | 1170 ++++++++++++++++++++
 drivers/event/opdl/opdl_ring.h                     |  578 ++++++++++
 drivers/event/opdl/rte_pmd_evdev_opdl_version.map  |    3 +
 examples/eventdev_pipeline_opdl_pmd/Makefile       |   49 +
 examples/eventdev_pipeline_opdl_pmd/main.c         |  766 +++++++++++++
 mk/rte.app.mk                                      |    1 +
 test/test/Makefile                                 |    1 +
 test/test/test_eventdev_opdl.c                     | 1089 ++++++++++++++++++
 19 files changed, 6238 insertions(+)
 create mode 100644 doc/guides/eventdevs/opdl.rst
 create mode 100644 doc/guides/sample_app_ug/eventdev_pipeline_opdl_pmd.rst
 create mode 100644 drivers/event/opdl/Makefile
 create mode 100644 drivers/event/opdl/opdl_evdev.c
 create mode 100644 drivers/event/opdl/opdl_evdev.h
 create mode 100644 drivers/event/opdl/opdl_evdev_init.c
 create mode 100644 drivers/event/opdl/opdl_evdev_xstats.c
 create mode 100644 drivers/event/opdl/opdl_ring.c
 create mode 100644 drivers/event/opdl/opdl_ring.h
 create mode 100644 drivers/event/opdl/rte_pmd_evdev_opdl_version.map
 create mode 100644 examples/eventdev_pipeline_opdl_pmd/Makefile
 create mode 100644 examples/eventdev_pipeline_opdl_pmd/main.c
 create mode 100644 test/test/test_eventdev_opdl.c

-- 
2.7.5

--------------------------------------------------------------
Intel Research and Development Ireland Limited
Registered in Ireland
Registered Office: Collinstown Industrial Park, Leixlip, County Kildare
Registered Number: 308263


This e-mail and any attachments may contain confidential material for the sole
use of the intended recipient(s). Any review or distribution by others is
strictly prohibited. If you are not the intended recipient, please contact the
sender and delete all copies.

^ permalink raw reply	[flat|nested] 13+ messages in thread

* [dpdk-dev] [PATCH 1/7] event/opdl: add the opdl ring infrastructure library
  2017-11-24 11:23 [dpdk-dev] [RFC PATCH 0/7] RFC:EventDev OPDL PMD liang.j.ma
@ 2017-11-24 11:23 ` liang.j.ma
  2017-11-24 11:23 ` [dpdk-dev] [PATCH 2/7] event/opdl: add the opdl pmd header and init helper function liang.j.ma
                   ` (6 subsequent siblings)
  7 siblings, 0 replies; 13+ messages in thread
From: liang.j.ma @ 2017-11-24 11:23 UTC (permalink / raw)
  To: jerin.jacob
  Cc: dev, harry.van.haaren, bruce.richardson, deepak.k.jain, john.geary

From: Liang Ma <liang.j.ma@intel.com>

OPDL ring is the core infrastructure of OPDL PMD. OPDL ring library
provide the core data structure and core helper function set. The Ring
implements a single ring multi-port/stage pipelined packet distribution
mechanism. This mechanism has the following characteristics:

• No multiple queue cost, therefore, latency is significant reduced.
• Fixed dependencies between queue/ports is more suitable for complex.
  fixed pipelines of stateless packet processing (static pipeline).
• Has decentralized distribution (no scheduling core).
• Packets remain in order (no reorder core(s)).

Signed-off-by: Liang Ma <liang.j.ma@intel.com>
Signed-off-by: Peter, Mccarthy <peter.mccarthy@intel.com>
---
 drivers/event/opdl/Makefile                       |   65 ++
 drivers/event/opdl/opdl_ring.c                    | 1170 +++++++++++++++++++++
 drivers/event/opdl/opdl_ring.h                    |  578 ++++++++++
 drivers/event/opdl/rte_pmd_evdev_opdl_version.map |    3 +
 4 files changed, 1816 insertions(+)
 create mode 100644 drivers/event/opdl/Makefile
 create mode 100644 drivers/event/opdl/opdl_ring.c
 create mode 100644 drivers/event/opdl/opdl_ring.h
 create mode 100644 drivers/event/opdl/rte_pmd_evdev_opdl_version.map

diff --git a/drivers/event/opdl/Makefile b/drivers/event/opdl/Makefile
new file mode 100644
index 0000000..5c85139
--- /dev/null
+++ b/drivers/event/opdl/Makefile
@@ -0,0 +1,65 @@
+#   BSD LICENSE
+#
+#   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+#
+#   Redistribution and use in source and binary forms, with or without
+#   modification, are permitted provided that the following conditions
+#   are met:
+#
+#     * Redistributions of source code must retain the above copyright
+#       notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above copyright
+#       notice, this list of conditions and the following disclaimer in
+#       the documentation and/or other materials provided with the
+#       distribution.
+#     * Neither the name of Intel Corporation nor the names of its
+#       contributors may be used to endorse or promote products derived
+#       from this software without specific prior written permission.
+#
+#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# library name
+LIB = librte_pmd_opdl_event.a
+
+# build flags
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+# for older GCC versions, allow us to initialize an event using
+# designated initializers.
+ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y)
+ifeq ($(shell test $(GCC_VERSION) -le 50 && echo 1), 1)
+CFLAGS += -Wno-missing-field-initializers
+endif
+endif
+
+LDLIBS += -lrte_eal -lrte_eventdev -lrte_kvargs -lrte_ring
+LDLIBS += -lrte_bus_vdev
+
+# library version
+LIBABIVER := 1
+
+# versioning export map
+EXPORT_MAP := rte_pmd_evdev_opdl_version.map
+
+# library source files
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl_evdev.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl_evdev_init.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl_ring.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl_evdev_xstats.c
+
+# export include files
+SYMLINK-y-include +=
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/drivers/event/opdl/opdl_ring.c b/drivers/event/opdl/opdl_ring.c
new file mode 100644
index 0000000..f07bf3a
--- /dev/null
+++ b/drivers/event/opdl/opdl_ring.c
@@ -0,0 +1,1170 @@
+/*-
+ * <COPYRIGHT_TAG>
+ */
+
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <stdio.h>
+
+#include <rte_branch_prediction.h>
+#include <rte_debug.h>
+#include <rte_lcore.h>
+#include <rte_log.h>
+#include <rte_malloc.h>
+#include <rte_memcpy.h>
+#include <rte_memory.h>
+#include <rte_memzone.h>
+#include <rte_eal_memconfig.h>
+
+#include "opdl_ring.h"
+
+#define LIB_NAME "opdl_ring"
+
+#define OPDL_NAME_SIZE 64
+
+#define RTE_LOGTYPE_OPDL RTE_LOGTYPE_USER8
+#define log(level, fmt, ...) \
+		RTE_LOG(level, OPDL, LIB_NAME": " fmt "\n", ##__VA_ARGS__)
+
+#ifdef OPDL_DEBUG
+#define log_debug(...) log(DEBUG, __VA_ARGS__)
+#else
+#define log_debug(...)
+#endif
+
+#define POWER_OF_2(n) ((n) && !((n) & ((n) - 1)))
+
+/* Types of dependency between stages */
+enum dep_type {
+	DEP_NONE = 0,  /* no dependency */
+	DEP_DIRECT,  /* stage has direct dependency */
+	DEP_INDIRECT,  /* in-direct dependency through other stage(s) */
+	DEP_SELF,  /* stage dependency on itself, used to detect loops */
+};
+
+/* Shared section of stage state.
+ * Care is needed when accessing and the layout is important, especially to
+ * limit the adjacent cache-line HW prefetcher from impacting performance.
+ */
+struct shared_state {
+	/* Last known minimum sequence number of dependencies, used for multi
+	 * thread operation
+	 */
+	uint32_t available_seq;
+	char _pad1[RTE_CACHE_LINE_SIZE * 3];
+	uint32_t head;  /* Head sequence number (for multi thread operation) */
+	char _pad2[RTE_CACHE_LINE_SIZE * 3];
+	struct opdl_stage *stage;  /* back pointer */
+	uint32_t tail;  /* Tail sequence number */
+	char _pad3[RTE_CACHE_LINE_SIZE * 2];
+} __rte_cache_aligned;
+
+/* A structure to keep track of "unfinished" claims. This is only used for
+ * stages that are threadsafe. Each lcore accesses its own instance of this
+ * structure to record the entries it has claimed. This allows one lcore to make
+ * multiple claims without being blocked by another. When disclaiming it moves
+ * forward the shared tail when the shared tail matches the tail value recorded
+ * here.
+ */
+struct claim_manager {
+	uint32_t num_to_disclaim;
+	uint32_t num_claimed;
+	uint32_t mgr_head;
+	uint32_t mgr_tail;
+	struct {
+		uint32_t head;
+		uint32_t tail;
+	} claims[OPDL_DISCLAIMS_PER_LCORE];
+} __rte_cache_aligned;
+
+/* Context for each stage of opdl_ring.
+ * Calculations on sequence numbers need to be done with other uint32_t values
+ * so that results are modulus 2^32, and not undefined.
+ */
+struct opdl_stage {
+	struct opdl_ring *t;  /* back pointer, set at init */
+	uint32_t num_slots;  /* Number of slots for entries, set at init */
+	uint32_t index;  /* ID for this stage, set at init */
+	bool threadsafe;  /* Set to 1 if this stage supports threadsafe use */
+	/* Last known min seq number of dependencies for used for single thread
+	 * operation
+	 */
+	uint32_t available_seq;
+	uint32_t head;  /* Current head for single-thread operation */
+	uint32_t nb_instance;  /* Number of instances */
+	uint32_t instance_id;  /* ID of this stage instance */
+	uint16_t num_claimed;  /* Number of slots claimed */
+	uint16_t num_event;		/* Number of events */
+	uint32_t seq;			/* sequence number  */
+	uint32_t num_deps;  /* Number of direct dependencies */
+	/* Keep track of all dependencies, used during init only */
+	enum dep_type *dep_tracking;
+	/* Direct dependencies of this stage */
+	struct shared_state **deps;
+	/* Other stages read this! */
+	struct shared_state shared __rte_cache_aligned;
+	/* For managing disclaims in multi-threaded processing stages */
+	struct claim_manager pending_disclaims[RTE_MAX_LCORE]
+					       __rte_cache_aligned;
+} __rte_cache_aligned;
+
+/* Context for opdl_ring */
+struct opdl_ring {
+	char name[OPDL_NAME_SIZE];  /* Turbine queue instance name */
+	int socket;  /* NUMA socket that memory is allocated on */
+	uint32_t num_slots;  /* Number of slots for entries */
+	uint32_t mask;  /* Mask for sequence numbers (num_slots - 1) */
+	uint32_t slot_size;  /* Size of each slot in bytes */
+	uint32_t num_stages;  /* Number of stages that have been added */
+	uint32_t max_num_stages;  /* Max number of stages */
+	/* Stages indexed by ID */
+	struct opdl_stage *stages;
+	/* Memory for storing slot data */
+	uint8_t slots[0] __rte_cache_aligned;
+};
+
+
+/* Return input stage of a opdl_ring */
+static inline struct opdl_stage *__attribute__((always_inline))
+input_stage(const struct opdl_ring *t)
+{
+	return &t->stages[0];
+}
+
+/* Check if a stage is the input stage */
+static inline bool __attribute__((always_inline))
+is_input_stage(const struct opdl_stage *s)
+{
+	return s->index == 0;
+}
+
+/* Get slot pointer from sequence number */
+static inline void *__attribute__((always_inline))
+get_slot(const struct opdl_ring *t, uint32_t n)
+{
+	return (void *)(uintptr_t)&t->slots[(n & t->mask) * t->slot_size];
+}
+
+/* Find how many entries are available for processing */
+static inline uint32_t __attribute__((always_inline))
+available(const struct opdl_stage *s)
+{
+	if (s->threadsafe == true) {
+		uint32_t n = __atomic_load_n(&s->shared.available_seq,
+				__ATOMIC_ACQUIRE) -
+				__atomic_load_n(&s->shared.head,
+				__ATOMIC_ACQUIRE);
+
+		/* Return 0 if available_seq needs to be updated */
+		return (n <= s->num_slots) ? n : 0;
+	}
+
+	/* Single threaded */
+	return s->available_seq - s->head;
+}
+
+/* Read sequence number of dependencies and find minimum */
+static inline void __attribute__((always_inline))
+update_available_seq(struct opdl_stage *s)
+{
+	uint32_t i;
+	uint32_t this_tail = s->shared.tail;
+	uint32_t min_seq = __atomic_load_n(&s->deps[0]->tail, __ATOMIC_ACQUIRE);
+	/* Input stage sequence numbers are greater than the sequence numbers of
+	 * its dependencies so an offset of t->num_slots is needed when
+	 * calculating available slots and also the condition which is used to
+	 * determine the dependencies minimum sequence number must be reverted.
+	 */
+	uint32_t wrap;
+
+	if (is_input_stage(s)) {
+		wrap = s->num_slots;
+		for (i = 1; i < s->num_deps; i++) {
+			uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
+					__ATOMIC_ACQUIRE);
+			if ((this_tail - seq) > (this_tail - min_seq))
+				min_seq = seq;
+		}
+	} else {
+		wrap = 0;
+		for (i = 1; i < s->num_deps; i++) {
+			uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
+					__ATOMIC_ACQUIRE);
+			if ((seq - this_tail) < (min_seq - this_tail))
+				min_seq = seq;
+		}
+	}
+
+	if (s->threadsafe == false)
+		s->available_seq = min_seq + wrap;
+	else
+		__atomic_store_n(&s->shared.available_seq, min_seq + wrap,
+				__ATOMIC_RELEASE);
+}
+
+/* Wait until the number of available slots reaches number requested */
+static inline void __attribute__((always_inline))
+wait_for_available(struct opdl_stage *s, uint32_t n)
+{
+	while (available(s) < n) {
+		rte_pause();
+		update_available_seq(s);
+	}
+}
+
+/* Return number of slots to process based on number requested and mode */
+static inline uint32_t __attribute__((always_inline))
+num_to_process(struct opdl_stage *s, uint32_t n, bool block)
+{
+	/* Don't read tail sequences of dependencies if not needed */
+	if (available(s) >= n)
+		return n;
+
+	update_available_seq(s);
+
+	if (block == false) {
+		uint32_t avail = available(s);
+
+		if (avail == 0) {
+			rte_pause();
+			return 0;
+		}
+		return (avail <= n) ? avail : n;
+	}
+
+	if (unlikely(n > s->num_slots)) {
+		log(ERR, "%u entries is more than max (%u)", n, s->num_slots);
+		return 0;  /* Avoid infinite loop */
+	}
+	/* blocking */
+	wait_for_available(s, n);
+	return n;
+}
+
+/* Copy entries in to slots with wrap-around */
+static inline void __attribute__((always_inline))
+copy_entries_in(struct opdl_ring *t, uint32_t start, const void *entries,
+		uint32_t num_entries)
+{
+	uint32_t slot_size = t->slot_size;
+	uint32_t slot_index = start & t->mask;
+
+	if (slot_index + num_entries <= t->num_slots) {
+		rte_memcpy(get_slot(t, start), entries,
+				num_entries * slot_size);
+	} else {
+		uint32_t split = t->num_slots - slot_index;
+
+		rte_memcpy(get_slot(t, start), entries, split * slot_size);
+		rte_memcpy(get_slot(t, 0),
+				RTE_PTR_ADD(entries, split * slot_size),
+				(num_entries - split) * slot_size);
+	}
+}
+
+/* Copy entries out from slots with wrap-around */
+static inline void __attribute__((always_inline))
+copy_entries_out(struct opdl_ring *t, uint32_t start, void *entries,
+		uint32_t num_entries)
+{
+	uint32_t slot_size = t->slot_size;
+	uint32_t slot_index = start & t->mask;
+
+	if (slot_index + num_entries <= t->num_slots) {
+		rte_memcpy(entries, get_slot(t, start),
+				num_entries * slot_size);
+	} else {
+		uint32_t split = t->num_slots - slot_index;
+
+		rte_memcpy(entries, get_slot(t, start), split * slot_size);
+		rte_memcpy(RTE_PTR_ADD(entries, split * slot_size),
+				get_slot(t, 0),
+				(num_entries - split) * slot_size);
+	}
+}
+
+/* Input function optimised for single thread */
+static inline uint32_t __attribute__((always_inline))
+opdl_ring_input_singlethread(struct opdl_ring *t, const void *entries,
+		uint32_t num_entries, bool block)
+{
+	struct opdl_stage *s = input_stage(t);
+	uint32_t head = s->head;
+
+	num_entries = num_to_process(s, num_entries, block);
+	if (num_entries == 0)
+		return 0;
+
+	copy_entries_in(t, head, entries, num_entries);
+
+	s->head += num_entries;
+	__atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
+
+	return num_entries;
+}
+
+/* Convert head and tail of claim_manager into valid index */
+static inline uint32_t __attribute__((always_inline))
+claim_mgr_index(uint32_t n)
+{
+	return n & (OPDL_DISCLAIMS_PER_LCORE - 1);
+}
+
+/* Check if there are available slots in claim_manager */
+static inline bool __attribute__((always_inline))
+claim_mgr_available(struct claim_manager *mgr)
+{
+	return (mgr->mgr_head < (mgr->mgr_tail + OPDL_DISCLAIMS_PER_LCORE)) ?
+			true : false;
+}
+
+/* Record a new claim. Only use after first checking an entry is available */
+static inline void __attribute__((always_inline))
+claim_mgr_add(struct claim_manager *mgr, uint32_t tail, uint32_t head)
+{
+	if ((mgr->mgr_head != mgr->mgr_tail) &&
+			(mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head ==
+			tail)) {
+		/* Combine with previous claim */
+		mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head = head;
+	} else {
+		mgr->claims[claim_mgr_index(mgr->mgr_head)].head = head;
+		mgr->claims[claim_mgr_index(mgr->mgr_head)].tail = tail;
+		mgr->mgr_head++;
+	}
+
+	mgr->num_claimed += (head - tail);
+}
+
+/* Read the oldest recorded claim */
+static inline bool __attribute__((always_inline))
+claim_mgr_read(struct claim_manager *mgr, uint32_t *tail, uint32_t *head)
+{
+	if (mgr->mgr_head == mgr->mgr_tail)
+		return false;
+
+	*head = mgr->claims[claim_mgr_index(mgr->mgr_tail)].head;
+	*tail = mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail;
+	return true;
+}
+
+/* Remove the oldest recorded claim. Only use after first reading the entry */
+static inline void __attribute__((always_inline))
+claim_mgr_remove(struct claim_manager *mgr)
+{
+	mgr->num_claimed -= (mgr->claims[claim_mgr_index(mgr->mgr_tail)].head -
+			mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail);
+	mgr->mgr_tail++;
+}
+
+/* Update tail in the oldest claim. Only use after first reading the entry */
+static inline void __attribute__((always_inline))
+claim_mgr_move_tail(struct claim_manager *mgr, uint32_t num_entries)
+{
+	mgr->num_claimed -= num_entries;
+	mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail += num_entries;
+}
+
+static inline void __attribute__((always_inline))
+opdl_stage_disclaim_multithread_n(struct opdl_stage *s,
+		uint32_t num_entries, bool block)
+{
+	struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
+	uint32_t head;
+	uint32_t tail;
+
+	while (num_entries) {
+		bool ret = claim_mgr_read(disclaims, &tail, &head);
+
+		if (ret == false)
+			break;  /* nothing is claimed */
+		/* There should be no race condition here. If shared.tail
+		 * matches, no other core can update it until this one does.
+		 */
+		if (__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) ==
+				tail) {
+			if (num_entries >= (head - tail)) {
+				claim_mgr_remove(disclaims);
+				__atomic_store_n(&s->shared.tail, head,
+						__ATOMIC_RELEASE);
+				num_entries -= (head - tail);
+			} else {
+				claim_mgr_move_tail(disclaims, num_entries);
+				__atomic_store_n(&s->shared.tail,
+						num_entries + tail,
+						__ATOMIC_RELEASE);
+				num_entries = 0;
+			}
+		} else if (block == false)
+			break;  /* blocked by other thread */
+		/* Keep going until num_entries are disclaimed. */
+		rte_pause();
+	}
+
+	disclaims->num_to_disclaim = num_entries;
+}
+
+/* Move head atomically, returning number of entries available to process and
+ * the original value of head. For non-input stages, the claim is recorded
+ * so that the tail can be updated later by opdl_stage_disclaim().
+ */
+static inline void __attribute__((always_inline))
+move_head_atomically(struct opdl_stage *s, uint32_t *num_entries,
+		uint32_t *old_head, bool block, bool claim_func)
+{
+	uint32_t orig_num_entries = *num_entries;
+	uint32_t ret;
+	struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
+
+	/* Attempt to disclaim any outstanding claims */
+	opdl_stage_disclaim_multithread_n(s, disclaims->num_to_disclaim,
+			false);
+
+	*old_head = __atomic_load_n(&s->shared.head, __ATOMIC_ACQUIRE);
+	while (true) {
+		bool success;
+		/* If called by opdl_ring_input(), claim does not need to be
+		 * recorded, as there will be no disclaim.
+		 */
+		if (claim_func) {
+			/* Check that the claim can be recorded */
+			ret = claim_mgr_available(disclaims);
+			if (ret == false) {
+				/* exit out if claim can't be recorded */
+				*num_entries = 0;
+				return;
+			}
+		}
+
+		*num_entries = num_to_process(s, orig_num_entries, block);
+		if (*num_entries == 0)
+			return;
+
+		success = __atomic_compare_exchange_n(&s->shared.head, old_head,
+				*old_head + *num_entries,
+				true,  /* may fail spuriously */
+				__ATOMIC_RELEASE,  /* memory order on success */
+				__ATOMIC_ACQUIRE);  /* memory order on fail */
+		if (likely(success))
+			break;
+		rte_pause();
+	}
+
+	if (claim_func)
+		/* Store the claim record */
+		claim_mgr_add(disclaims, *old_head, *old_head + *num_entries);
+}
+
+/* Input function that supports multiple threads */
+static inline uint32_t __attribute__((always_inline))
+opdl_ring_input_multithread(struct opdl_ring *t, const void *entries,
+		uint32_t num_entries, bool block)
+{
+	struct opdl_stage *s = input_stage(t);
+	uint32_t old_head;
+
+	move_head_atomically(s, &num_entries, &old_head, block, false);
+	if (num_entries == 0)
+		return 0;
+
+	copy_entries_in(t, old_head, entries, num_entries);
+
+	/* If another thread started inputting before this one, but hasn't
+	 * finished, we need to wait for it to complete to update the tail.
+	 */
+	while (unlikely(__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) !=
+			old_head))
+		rte_pause();
+
+	__atomic_store_n(&s->shared.tail, old_head + num_entries,
+			__ATOMIC_RELEASE);
+
+	return num_entries;
+}
+
+static inline uint32_t __attribute__((always_inline))
+opdl_first_entry_id(uint32_t start_seq, uint8_t nb_p_lcores,
+		uint8_t this_lcore)
+{
+	return ((nb_p_lcores <= 1) ? 0 :
+			(nb_p_lcores - (start_seq % nb_p_lcores) + this_lcore) %
+			nb_p_lcores);
+}
+
+/* Claim slots to process, optimised for single-thread operation */
+static inline uint32_t __attribute__((always_inline))
+opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
+		uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
+{
+	uint32_t i = 0, j = 0,  offset;
+	void *get_slots;
+	struct rte_event *ev;
+	RTE_SET_USED(seq);
+	struct opdl_ring *t = s->t;
+	uint8_t *entries_offset = (uint8_t *)entries;
+
+	if (!atomic) {
+
+		offset = opdl_first_entry_id(s->seq, s->nb_instance,
+				s->instance_id);
+
+		num_entries = s->nb_instance * num_entries;
+
+		num_entries = num_to_process(s, num_entries, block);
+
+		for (; offset < num_entries; offset += s->nb_instance) {
+			get_slots = get_slot(t, s->head + offset);
+			memcpy(entries_offset, get_slots, t->slot_size);
+			entries_offset += t->slot_size;
+			i++;
+		}
+	} else {
+		num_entries = num_to_process(s, num_entries, block);
+
+		for (j = 0; j < num_entries; j++) {
+			ev = (struct rte_event *)get_slot(t, s->head+j);
+			if ((ev->flow_id%s->nb_instance) == s->instance_id) {
+				memcpy(entries_offset, ev, t->slot_size);
+				entries_offset += t->slot_size;
+				i++;
+			}
+		}
+	}
+	s->head += num_entries;
+	s->num_claimed = num_entries;
+	s->num_event = i;
+
+	/* automatically disclaim entries if number of rte_events is zero */
+	if (unlikely(i == 0))
+		opdl_stage_disclaim(s, 0, false);
+
+	return i;
+}
+
+/* Thread-safe version of function to claim slots for processing */
+static inline uint32_t __attribute__((always_inline))
+opdl_stage_claim_multithread(struct opdl_stage *s, void *entries,
+		uint32_t num_entries, uint32_t *seq, bool block)
+{
+	uint32_t old_head;
+	struct opdl_ring *t = s->t;
+	uint32_t i = 0, offset;
+	uint8_t *entries_offset = (uint8_t *)entries;
+
+	offset = opdl_first_entry_id(*seq, s->nb_instance, s->instance_id);
+	num_entries = offset + (s->nb_instance * num_entries);
+
+	move_head_atomically(s, &num_entries, &old_head, block, true);
+
+	for (; offset < num_entries; offset += s->nb_instance) {
+		memcpy(entries_offset, get_slot(t, s->head + offset),
+			t->slot_size);
+		entries_offset += t->slot_size;
+		i++;
+	}
+	if (seq != NULL)
+		*seq = old_head;
+
+	return i;
+}
+
+/* Claim and copy slot pointers, optimised for single-thread operation */
+static inline uint32_t __attribute__((always_inline))
+opdl_stage_claim_copy_singlethread(struct opdl_stage *s, void *entries,
+		uint32_t num_entries, uint32_t *seq, bool block)
+{
+	num_entries = num_to_process(s, num_entries, block);
+	if (num_entries == 0)
+		return 0;
+	copy_entries_out(s->t, s->head, entries, num_entries);
+	if (seq != NULL)
+		*seq = s->head;
+	s->head += num_entries;
+	return num_entries;
+}
+
+/* Thread-safe version of function to claim and copy pointers to slots */
+static inline uint32_t __attribute__((always_inline))
+opdl_stage_claim_copy_multithread(struct opdl_stage *s, void *entries,
+		uint32_t num_entries, uint32_t *seq, bool block)
+{
+	uint32_t old_head;
+
+	move_head_atomically(s, &num_entries, &old_head, block, true);
+	if (num_entries == 0)
+		return 0;
+	copy_entries_out(s->t, old_head, entries, num_entries);
+	if (seq != NULL)
+		*seq = old_head;
+	return num_entries;
+}
+
+static inline void __attribute__((always_inline))
+opdl_stage_disclaim_singlethread_n(struct opdl_stage *s,
+		uint32_t num_entries)
+{
+	uint32_t old_tail = s->shared.tail;
+
+	if (unlikely(num_entries > (s->head - old_tail))) {
+		log(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
+				num_entries, s->head - old_tail);
+		num_entries = s->head - old_tail;
+	}
+	__atomic_store_n(&s->shared.tail, num_entries + old_tail,
+			__ATOMIC_RELEASE);
+}
+
+uint32_t
+opdl_ring_input(struct opdl_ring *t, const void *entries, uint32_t num_entries,
+		bool block)
+{
+	if (input_stage(t)->threadsafe == false)
+		return opdl_ring_input_singlethread(t, entries, num_entries,
+				block);
+	else
+		return opdl_ring_input_multithread(t, entries, num_entries,
+				block);
+}
+
+uint32_t
+opdl_ring_copy_from_burst(struct opdl_ring *t, struct opdl_stage *s,
+		const void *entries, uint32_t num_entries, bool block)
+{
+	uint32_t head = s->head;
+
+	num_entries = num_to_process(s, num_entries, block);
+
+	if (num_entries == 0)
+		return 0;
+
+	copy_entries_in(t, head, entries, num_entries);
+
+	s->head += num_entries;
+	__atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
+
+	return num_entries;
+
+}
+
+uint32_t
+opdl_ring_copy_to_burst(struct opdl_ring *t, struct opdl_stage *s,
+		void *entries, uint32_t num_entries, bool block)
+{
+	uint32_t head = s->head;
+
+	num_entries = num_to_process(s, num_entries, block);
+	if (num_entries == 0)
+		return 0;
+
+	copy_entries_out(t, head, entries, num_entries);
+
+	s->head += num_entries;
+	__atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
+
+	return num_entries;
+}
+
+uint32_t
+opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries)
+{
+	/* return (num_to_process(s, num_entries, false)); */
+
+	if (available(s) >= num_entries)
+		return num_entries;
+
+	update_available_seq(s);
+
+	uint32_t avail = available(s);
+
+	if (avail == 0) {
+		rte_pause();
+		return 0;
+	}
+	return (avail <= num_entries) ? avail : num_entries;
+}
+
+uint32_t
+opdl_stage_claim(struct opdl_stage *s, void *entries,
+		uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
+{
+	if (s->threadsafe == false)
+		return opdl_stage_claim_singlethread(s, entries, num_entries,
+				seq, block, atomic);
+	else
+		return opdl_stage_claim_multithread(s, entries, num_entries,
+				seq, block);
+}
+
+uint32_t
+opdl_stage_claim_copy(struct opdl_stage *s, void *entries,
+		uint32_t num_entries, uint32_t *seq, bool block)
+{
+	if (s->threadsafe == false)
+		return opdl_stage_claim_copy_singlethread(s, entries,
+				num_entries, seq, block);
+	else
+		return opdl_stage_claim_copy_multithread(s, entries,
+				num_entries, seq, block);
+}
+
+void
+opdl_stage_disclaim_n(struct opdl_stage *s, uint32_t num_entries,
+		bool block)
+{
+
+	if (s->threadsafe == false) {
+		opdl_stage_disclaim_singlethread_n(s, s->num_claimed);
+	} else {
+		struct claim_manager *disclaims =
+			&s->pending_disclaims[rte_lcore_id()];
+
+		if (unlikely(num_entries > s->num_slots)) {
+			log(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
+					num_entries, disclaims->num_claimed);
+			num_entries = disclaims->num_claimed;
+		}
+
+		num_entries = RTE_MIN(num_entries + disclaims->num_to_disclaim,
+				disclaims->num_claimed);
+		opdl_stage_disclaim_multithread_n(s, num_entries, block);
+	}
+}
+
+int
+opdl_stage_disclaim(struct opdl_stage *s, uint32_t num_entries, bool block)
+{
+	if (num_entries != s->num_event) {
+		rte_errno = -EINVAL;
+		return 0;
+	}
+	if (s->threadsafe == false) {
+		__atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
+		s->seq += s->num_claimed;
+		s->num_claimed = 0;
+	} else {
+		struct claim_manager *disclaims =
+				&s->pending_disclaims[rte_lcore_id()];
+		opdl_stage_disclaim_multithread_n(s, disclaims->num_claimed,
+				block);
+	}
+	return num_entries;
+}
+
+uint32_t
+opdl_ring_available(struct opdl_ring *t)
+{
+	return opdl_stage_available(&t->stages[0]);
+}
+
+uint32_t
+opdl_stage_available(struct opdl_stage *s)
+{
+	update_available_seq(s);
+	return available(s);
+}
+
+void
+opdl_ring_flush(struct opdl_ring *t)
+{
+	struct opdl_stage *s = input_stage(t);
+
+	wait_for_available(s, s->num_slots);
+}
+
+/******************** Non performance sensitive functions ********************/
+
+/* Initial setup of a new stage's context */
+static int
+init_stage(struct opdl_ring *t, struct opdl_stage *s, bool threadsafe,
+		bool is_input)
+{
+	uint32_t available = (is_input) ? t->num_slots : 0;
+
+	s->t = t;
+	s->num_slots = t->num_slots;
+	s->index = t->num_stages;
+	s->threadsafe = threadsafe;
+	s->shared.stage = s;
+
+	/* Alloc memory for deps */
+	s->dep_tracking = rte_zmalloc_socket(LIB_NAME,
+			t->max_num_stages * sizeof(enum dep_type),
+			0, t->socket);
+	if (s->dep_tracking == NULL)
+		return -ENOMEM;
+
+	s->deps = rte_zmalloc_socket(LIB_NAME,
+			t->max_num_stages * sizeof(struct shared_state *),
+			0, t->socket);
+	if (s->deps == NULL) {
+		rte_free(s->dep_tracking);
+		return -ENOMEM;
+	}
+
+	s->dep_tracking[s->index] = DEP_SELF;
+
+	if (threadsafe == true)
+		s->shared.available_seq = available;
+	else
+		s->available_seq = available;
+
+	return 0;
+}
+
+/* Add direct or indirect dependencies between stages */
+static int
+add_dep(struct opdl_stage *dependent, const struct opdl_stage *dependency,
+		enum dep_type type)
+{
+	struct opdl_ring *t = dependent->t;
+	uint32_t i;
+
+	/* Add new direct dependency */
+	if ((type == DEP_DIRECT) &&
+			(dependent->dep_tracking[dependency->index] ==
+					DEP_NONE)) {
+		log_debug("%s:%u direct dependency on %u",
+				t->name, dependent->index, dependency->index);
+		dependent->dep_tracking[dependency->index] = DEP_DIRECT;
+	}
+
+	/* Add new indirect dependency or change direct to indirect */
+	if ((type == DEP_INDIRECT) &&
+			((dependent->dep_tracking[dependency->index] ==
+			DEP_NONE) ||
+			(dependent->dep_tracking[dependency->index] ==
+			DEP_DIRECT))) {
+		log_debug("%s:%u indirect dependency on %u",
+				t->name, dependent->index, dependency->index);
+		dependent->dep_tracking[dependency->index] = DEP_INDIRECT;
+	}
+
+	/* Shouldn't happen... */
+	if ((dependent->dep_tracking[dependency->index] == DEP_SELF) &&
+			(dependent != input_stage(t))) {
+		log(ERR, "Loop in dependency graph %s:%u",
+				t->name, dependent->index);
+		return -EINVAL;
+	}
+
+	/* Keep going to dependencies of the dependency, until input stage */
+	if (dependency != input_stage(t))
+		for (i = 0; i < dependency->num_deps; i++) {
+			int ret = add_dep(dependent, dependency->deps[i]->stage,
+					DEP_INDIRECT);
+
+			if (ret < 0)
+				return ret;
+		}
+
+	/* Make list of sequence numbers for direct dependencies only */
+	if (type == DEP_DIRECT)
+		for (i = 0, dependent->num_deps = 0; i < t->num_stages; i++)
+			if (dependent->dep_tracking[i] == DEP_DIRECT) {
+				if ((i == 0) && (dependent->num_deps > 1))
+					rte_panic("%s:%u depends on > input",
+							t->name,
+							dependent->index);
+				dependent->deps[dependent->num_deps++] =
+						&t->stages[i].shared;
+			}
+
+	return 0;
+}
+
+struct opdl_ring *
+opdl_ring_create(const char *name, uint32_t num_slots, uint32_t slot_size,
+		uint32_t max_num_stages, int socket)
+{
+	struct opdl_ring *t;
+	char mz_name[RTE_MEMZONE_NAMESIZE];
+	int mz_flags = 0;
+	struct opdl_stage *st = NULL;
+	const struct rte_memzone *mz = NULL;
+	size_t alloc_size = RTE_CACHE_LINE_ROUNDUP(sizeof(*t) +
+			(num_slots * slot_size));
+
+	/* Compile time checking */
+	RTE_BUILD_BUG_ON((sizeof(struct shared_state) & RTE_CACHE_LINE_MASK) !=
+			0);
+	RTE_BUILD_BUG_ON((offsetof(struct opdl_stage, shared) &
+			RTE_CACHE_LINE_MASK) != 0);
+	RTE_BUILD_BUG_ON((offsetof(struct opdl_ring, slots) &
+			RTE_CACHE_LINE_MASK) != 0);
+	RTE_BUILD_BUG_ON(!POWER_OF_2(OPDL_DISCLAIMS_PER_LCORE));
+
+	/* Parameter checking */
+	if (name == NULL) {
+		log(ERR, "name param is NULL");
+		return NULL;
+	}
+	if (!rte_is_power_of_2(num_slots)) {
+		log(ERR, "num_slots (%u) for %s is not power of 2",
+				num_slots, name);
+		return NULL;
+	}
+
+	/* Alloc memory for stages */
+	st = rte_zmalloc_socket(LIB_NAME,
+		max_num_stages * sizeof(struct opdl_stage),
+		RTE_CACHE_LINE_SIZE, socket);
+	if (st == NULL)
+		goto exit_fail;
+
+	snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
+
+	/* Alloc memory for memzone */
+	mz = rte_memzone_reserve(mz_name, alloc_size, socket, mz_flags);
+	if (mz == NULL)
+		goto exit_fail;
+
+	t = mz->addr;
+
+	/* Initialise opdl_ring queue */
+	memset(t, 0, sizeof(*t));
+	snprintf(t->name, sizeof(t->name), "%s", name);
+	t->socket = socket;
+	t->num_slots = num_slots;
+	t->mask = num_slots - 1;
+	t->slot_size = slot_size;
+	t->max_num_stages = max_num_stages;
+	t->stages = st;
+
+	log_debug("Created %s at %p (num_slots=%u,socket=%i,slot_size=%u)",
+			t->name, t, num_slots, socket, slot_size);
+
+	return t;
+
+exit_fail:
+	log(ERR, "Cannot reserve memory");
+	rte_free(st);
+	rte_memzone_free(mz);
+
+	return NULL;
+}
+
+void *
+opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index)
+{
+	return get_slot(t, index);
+}
+
+int
+opdl_ring_get_socket(const struct opdl_ring *t)
+{
+	return t->socket;
+}
+
+uint32_t
+opdl_ring_get_num_slots(const struct opdl_ring *t)
+{
+	return t->num_slots;
+}
+
+const char *
+opdl_ring_get_name(const struct opdl_ring *t)
+{
+	return t->name;
+}
+
+/* Check dependency list is valid for a given opdl_ring */
+static int
+check_deps(struct opdl_ring *t, struct opdl_stage *deps[],
+		uint32_t num_deps)
+{
+	unsigned int i;
+
+	for (i = 0; i < num_deps; ++i) {
+		if (!deps[i]) {
+			log(ERR, "deps[%u] is NULL", i);
+			return -EINVAL;
+		}
+		if (t != deps[i]->t) {
+			log(ERR, "deps[%u] is in opdl_ring %s, not %s",
+					i, deps[i]->t->name, t->name);
+			return -EINVAL;
+		}
+	}
+	if (num_deps > t->num_stages) {
+		log(ERR, "num_deps (%u) > number stages (%u)",
+				num_deps, t->num_stages);
+		return -EINVAL;
+	}
+	return 0;
+}
+
+struct opdl_stage *
+opdl_stage_add(struct opdl_ring *t, bool threadsafe, bool is_input)
+{
+	struct opdl_stage *s;
+
+	/* Parameter checking */
+	if (!t) {
+		log(ERR, "opdl_ring is NULL");
+		return NULL;
+	}
+	if (t->num_stages == t->max_num_stages) {
+		log(ERR, "%s has max number of stages (%u)",
+				t->name, t->max_num_stages);
+		return NULL;
+	}
+
+	s = &t->stages[t->num_stages];
+
+	if (((uintptr_t)&s->shared & RTE_CACHE_LINE_MASK) != 0)
+		log(WARNING, "Tail seq num (%p) of %s stage not cache aligned",
+				&s->shared, t->name);
+
+	if (init_stage(t, s, threadsafe, is_input) < 0) {
+		log(ERR, "Cannot reserve memory");
+		return NULL;
+	}
+	t->num_stages++;
+
+	return s;
+}
+
+uint32_t
+opdl_stage_deps_add(struct opdl_ring *t, struct opdl_stage *s,
+		uint32_t nb_instance, uint32_t instance_id,
+		struct opdl_stage *deps[],
+		uint32_t num_deps)
+{
+	uint32_t i;
+	int ret = 0;
+
+	if ((num_deps > 0) && (!deps)) {
+		log(ERR, "%s stage has NULL dependencies", t->name);
+		return -1;
+	}
+	ret = check_deps(t, deps, num_deps);
+	if (ret < 0)
+		return ret;
+
+	for (i = 0; i < num_deps; i++) {
+		ret = add_dep(s, deps[i], DEP_DIRECT);
+		if (ret < 0)
+			return ret;
+	}
+
+	s->nb_instance = nb_instance;
+	s->instance_id = instance_id;
+
+	return ret;
+}
+
+struct opdl_stage *
+opdl_ring_get_input_stage(const struct opdl_ring *t)
+{
+	return input_stage(t);
+}
+
+int
+opdl_stage_set_deps(struct opdl_stage *s, struct opdl_stage *deps[],
+		uint32_t num_deps)
+{
+	unsigned int i;
+	int ret;
+
+	if ((num_deps == 0) || (!deps)) {
+		log(ERR, "cannot set NULL dependencies");
+		return -EINVAL;
+	}
+
+	ret = check_deps(s->t, deps, num_deps);
+	if (ret < 0)
+		return ret;
+
+	/* Update deps */
+	for (i = 0; i < num_deps; i++)
+		s->deps[i] = &deps[i]->shared;
+	s->num_deps = num_deps;
+
+	return 0;
+}
+
+struct opdl_ring *
+opdl_stage_get_opdl_ring(const struct opdl_stage *s)
+{
+	return s->t;
+}
+
+void
+opdl_ring_dump(const struct opdl_ring *t, FILE *f)
+{
+	uint32_t i;
+
+	if (t == NULL) {
+		fprintf(f, "NULL Turbine!\n");
+		return;
+	}
+	fprintf(f, "Turbine \"%s\": num_slots=%u; mask=%#x; slot_size=%u; num_stages=%u; socket=%i\n",
+			t->name, t->num_slots, t->mask, t->slot_size,
+			t->num_stages, t->socket);
+	for (i = 0; i < t->num_stages; i++) {
+		uint32_t j;
+		const struct opdl_stage *s = &t->stages[i];
+
+		fprintf(f, "  %s[%u]: threadsafe=%s; head=%u; available_seq=%u; tail=%u; deps=%u",
+				t->name, i, (s->threadsafe) ? "true" : "false",
+				(s->threadsafe) ? s->shared.head : s->head,
+				(s->threadsafe) ? s->shared.available_seq :
+				s->available_seq,
+				s->shared.tail, (s->num_deps > 0) ?
+				s->deps[0]->stage->index : 0);
+		for (j = 1; j < s->num_deps; j++)
+			fprintf(f, ",%u", s->deps[j]->stage->index);
+		fprintf(f, "\n");
+	}
+	fflush(f);
+}
+
+void
+opdl_ring_free(struct opdl_ring *t)
+{
+	uint32_t i;
+	const struct rte_memzone *mz;
+	char mz_name[RTE_MEMZONE_NAMESIZE];
+
+	if (t == NULL) {
+		log_debug("Freeing NULL OPDL Ring!");
+		return;
+	}
+
+	log_debug("Freeing %s opdl_ring at %p", t->name, t);
+
+	for (i = 0; i < t->num_stages; ++i) {
+		rte_free(t->stages[i].deps);
+		rte_free(t->stages[i].dep_tracking);
+	}
+
+	rte_free(t->stages);
+
+	snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, t->name);
+	mz = rte_memzone_lookup(mz_name);
+	if (rte_memzone_free(mz) != 0)
+		log(ERR, "Cannot free memzone for %s", t->name);
+}
+
+/* search a opdl_ring from its name */
+struct opdl_ring *
+opdl_ring_lookup(const char *name)
+{
+	const struct rte_memzone *mz;
+	char mz_name[RTE_MEMZONE_NAMESIZE];
+
+	snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
+
+	mz = rte_memzone_lookup(mz_name);
+	if (mz == NULL)
+		return NULL;
+
+	return mz->addr;
+}
+
+void
+opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe)
+{
+	s->threadsafe = threadsafe;
+}
diff --git a/drivers/event/opdl/opdl_ring.h b/drivers/event/opdl/opdl_ring.h
new file mode 100644
index 0000000..52a5b2f
--- /dev/null
+++ b/drivers/event/opdl/opdl_ring.h
@@ -0,0 +1,578 @@
+/*-
+ * <COPYRIGHT_TAG>
+ */
+
+#ifndef _OPDL_H_
+#define _OPDL_H_
+
+/**
+ * @file
+ * The "opdl_ring" is a data structure that contains a fixed number of slots,
+ * with each slot having the same, but configurable, size. Entries are input
+ * into the opdl_ring by copying into available slots. Once in the opdl_ring,
+ * an entry is processed by a number of stages, with the ordering of stage
+ * processing controlled by making stages dependent on one or more other stages.
+ * An entry is not available for a stage to process until it has been processed
+ * by that stages dependencies. Entries are always made available for
+ * processing in the same order that they were input in to the opdl_ring.
+ * Inputting is considered as a stage that depends on all other stages,
+ * and is also a dependency of all stages.
+ *
+ * Inputting and processing in a stage can support multi-threading. Note that
+ * multi-thread processing can also be done by making stages co-operate e.g. two
+ * stages where one processes the even packets and the other processes odd
+ * packets.
+ *
+ * A opdl_ring can be used as the basis for pipeline based applications. Instead
+ * of each stage in a pipeline dequeueing from a ring, processing and enqueueing
+ * to another ring, it can process entries in-place on the ring. If stages do
+ * not depend on each other, they can run in parallel.
+ *
+ * The opdl_ring works with entries of configurable size, these could be
+ * pointers to mbufs, pointers to mbufs with application specific meta-data,
+ * tasks etc.
+ */
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdio.h>
+
+#include <rte_eventdev.h>
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#ifndef OPDL_DISCLAIMS_PER_LCORE
+/** Multi-threaded processing allows one thread to process multiple batches in a
+ * stage, while another thread is processing a single large batch. This number
+ * controls how many non-contiguous batches one stage can process before being
+ * blocked by the other stage.
+ */
+#define OPDL_DISCLAIMS_PER_LCORE 8
+#endif
+
+/** Opaque handle to a opdl_ring instance */
+struct opdl_ring;
+
+/** Opaque handle to a single stage in a opdl_ring */
+struct opdl_stage;
+
+/**
+ * Create a new instance of a opdl_ring.
+ *
+ * @param name
+ *   String containing the name to give the new opdl_ring instance.
+ * @param num_slots
+ *   How many slots the opdl_ring contains. Must be a power a 2!
+ * @param slot_size
+ *   How many bytes in each slot.
+ * @param max_num_stages
+ *   Maximum number of stages.
+ * @param socket
+ *   The NUMA socket (or SOCKET_ID_ANY) to allocate the memory used for this
+ *   opdl_ring instance.
+ * @param threadsafe
+ *   Whether to support multiple threads inputting to the opdl_ring or not.
+ *   Enabling this may have a negative impact on performance if only one thread
+ *   will be inputting.
+ *
+ * @return
+ *   A pointer to a new opdl_ring instance, or NULL on error.
+ */
+struct opdl_ring *
+opdl_ring_create(const char *name, uint32_t num_slots, uint32_t slot_size,
+		uint32_t max_num_stages, int socket);
+
+/**
+ * Get pointer to individual slot in a opdl_ring.
+ *
+ * @param t
+ *   The opdl_ring.
+ * @param index
+ *   Index of slot. If greater than the number of slots it will be masked to be
+ *   within correct range.
+ *
+ * @return
+ *   A pointer to that slot.
+ */
+void *
+opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index);
+
+/**
+ * Get NUMA socket used by a opdl_ring.
+ *
+ * @param t
+ *   The opdl_ring.
+ *
+ * @return
+ *   NUMA socket.
+ */
+int
+opdl_ring_get_socket(const struct opdl_ring *t);
+
+/**
+ * Get number of slots in a opdl_ring.
+ *
+ * @param t
+ *   The opdl_ring.
+ *
+ * @return
+ *   Number of slots.
+ */
+uint32_t
+opdl_ring_get_num_slots(const struct opdl_ring *t);
+
+/**
+ * Get name of a opdl_ring.
+ *
+ * @param t
+ *   The opdl_ring.
+ *
+ * @return
+ *   Name string.
+ */
+const char *
+opdl_ring_get_name(const struct opdl_ring *t);
+
+/**
+ * Adds a new processing stage to a specified opdl_ring instance. Adding a stage
+ * while there are entries in the opdl_ring being processed will cause undefined
+ * behaviour.
+ *
+ * @param t
+ *   The opdl_ring to add the stage to.
+ * @param deps
+ *   An array of pointers to other stages that this stage depends on. The other
+ *   stages must be part of the same opdl_ring! Note that input is an implied
+ *   dependency. This can be NULL if num_deps is 0.
+ * @param num_deps
+ *   The size of the deps array.
+ * @param threadsafe
+ *   Whether to support multiple threads processing this stage or  not.
+ *   Enabling this may have a negative impact on performance if only one thread
+ *   will be processing this stage.
+ * @param is_input
+ *   Indication to nitialise the stage with all slots available or none
+ *
+ * @return
+ *   A pointer to the new stage, or NULL on error.
+ */
+struct opdl_stage *
+opdl_stage_add(struct opdl_ring *t, bool threadsafe, bool is_input);
+
+/**
+ * Returns the input stage of a opdl_ring to be used by other API functions.
+ *
+ * @param t
+ *   The opdl_ring.
+ *
+ * @return
+ *   A pointer to the input stage.
+ */
+struct opdl_stage *
+opdl_ring_get_input_stage(const struct opdl_ring *t);
+
+/**
+ * Sets the dependencies for a stage (clears all the previous deps!). Changing
+ * dependencies while there are entries in the opdl_ring being processed will
+ * cause undefined behaviour.
+ *
+ * @param s
+ *   The stage to set the dependencies for.
+ * @param deps
+ *   An array of pointers to other stages that this stage will depends on. The
+ *   other stages must be part of the same opdl_ring!
+ * @param num_deps
+ *   The size of the deps array. This must be > 0.
+ *
+ * @return
+ *   0 on success, a negative value on error.
+ */
+int
+opdl_stage_set_deps(struct opdl_stage *s, struct opdl_stage *deps[],
+		uint32_t num_deps);
+
+/**
+ * Returns the opdl_ring that a stage belongs to.
+ *
+ * @param s
+ *   The stage
+ *
+ * @return
+ *   A pointer to the opdl_ring that the stage belongs to.
+ */
+struct opdl_ring *
+opdl_stage_get_opdl_ring(const struct opdl_stage *s);
+
+/**
+ * Inputs a new batch of entries into the opdl_ring. This function is only
+ * threadsafe (with the same opdl_ring parameter) if the threadsafe parameter of
+ * opdl_ring_create() was true. For performance reasons, this function does not
+ * check input parameters.
+ *
+ * @param t
+ *   The opdl_ring to input entries in to.
+ * @param entries
+ *   An array of entries that will be copied in to the opdl_ring.
+ * @param num_entries
+ *   The size of the entries array.
+ * @param block
+ *   If this is true, the function blocks until enough slots are available to
+ *   input all the requested entries. If false, then the function inputs as
+ *   many entries as currently possible.
+ *
+ * @return
+ *   The number of entries successfully input.
+ */
+uint32_t
+opdl_ring_input(struct opdl_ring *t, const void *entries, uint32_t num_entries,
+		bool block);
+
+/**
+ * Inputs a new batch of entries into a turbine stage. This function is only
+ * threadsafe (with the same turbine parameter) if the threadsafe parameter of
+ * turbine_create() was true. For performance reasons, this function does not
+ * check input parameters.
+ *
+ * @param t
+ *   The turbine to input entries in to.
+ * @param s
+ *   The stage to copy entries to.
+ * @param entries
+ *   An array of entries that will be copied in to the turbine.
+ * @param num_entries
+ *   The size of the entries array.
+ * @param block
+ *   If this is true, the function blocks until enough slots are available to
+ *   input all the requested entries. If false, then the function inputs as
+ *   many entries as currently possible.
+ *
+ * @return
+ *   The number of entries successfully input.
+ */
+uint32_t
+opdl_ring_copy_from_burst(struct opdl_ring *t, struct opdl_stage *s,
+			const void *entries, uint32_t num_entries, bool block);
+
+/**
+ * Copy a batch of entries from the turbine. This function is only
+ * threadsafe (with the same turbine parameter) if the threadsafe parameter of
+ * turbine_create() was true. For performance reasons, this function does not
+ * check input parameters.
+ *
+ * @param t
+ *   The turbine to copy entries from.
+ * @param s
+ *   The stage to copy entries from.
+ * @param entries
+ *   An array of entries that will be copied from the turbine.
+ * @param num_entries
+ *   The size of the entries array.
+ * @param block
+ *   If this is true, the function blocks until enough slots are available to
+ *   input all the requested entries. If false, then the function inputs as
+ *   many entries as currently possible.
+ *
+ * @return
+ *   The number of entries successfully input.
+ */
+uint32_t
+opdl_ring_copy_to_burst(struct opdl_ring *t, struct opdl_stage *s,
+		void *entries, uint32_t num_entries, bool block);
+
+/**
+ * Before processing a batch of entries, a stage must first claim them to get
+ * access. This function is threadsafe using same opdl_stage parameter if
+ * the stage was created with threadsafe set to true, otherwise it is only
+ * threadsafe with a different opdl_stage per thread. For performance
+ * reasons, this function does not check input parameters.
+ *
+ * @param s
+ *   The opdl_ring stage to read entries in.
+ * @param entries
+ *   An array of pointers to entries that will be filled in by this function.
+ * @param num_entries
+ *   The number of entries to attempt to claim for processing (and the size of
+ *   the entries array).
+ * @param seq
+ *   If not NULL, this is set to the value of the internal stage sequence number
+ *   associated with the first entry returned.
+ * @param block
+ *   If this is true, the function blocks until num_entries slots are available
+ *   to process. If false, then the function claims as many entries as
+ *   currently possible.
+ *
+ * @param atomic
+ *   if this is true, the function will return event according to event flow id
+ * @return
+ *   The number of pointers to entries filled in to the entries array.
+ */
+uint32_t
+opdl_stage_claim(struct opdl_stage *s, void *entries,
+		uint32_t num_entries, uint32_t *seq, bool block, bool atomic);
+
+/* TODO: Fillin desc*/
+uint32_t
+opdl_stage_deps_add(struct opdl_ring *t, struct opdl_stage *s,
+		uint32_t nb_instance, uint32_t instance_id,
+		struct opdl_stage *deps[], uint32_t num_deps);
+
+/**
+ * A function to check how many entries are ready to be claimed.
+ *
+ * @param entries
+ *   An array of pointers to entries.
+ * @param num_entries
+ *   Number of entries in an array.
+ * @param arg
+ *   An opaque pointer to data passed to the claim function.
+ * @param block
+ *   When set to true, the function should wait until num_entries are ready to
+ *   be processed. Otherwise it should return immediately.
+ *
+ * @return
+ *   Number of entries ready to be claimed.
+ */
+typedef uint32_t (opdl_ring_check_entries_t)(void *entries[],
+		uint32_t num_entries, void *arg, bool block);
+
+/**
+ * Before processing a batch of entries, a stage must first claim them to get
+ * access. Each entry is checked by the passed check() function and depending
+ * on block value, it waits until num_entries are ready or returns immediately.
+ * This function is only threadsafe with a different opdl_stage per thread.
+ *
+ * @param s
+ *   The opdl_ring stage to read entries in.
+ * @param entries
+ *   An array of pointers to entries that will be filled in by this function.
+ * @param num_entries
+ *   The number of entries to attempt to claim for processing (and the size of
+ *   the entries array).
+ * @param seq
+ *   If not NULL, this is set to the value of the internal stage sequence number
+ *   associated with the first entry returned.
+ * @param block
+ *   If this is true, the function blocks until num_entries ready slots are
+ *   available to process. If false, then the function claims as many ready
+ *   entries as currently possible.
+ * @param check
+ *   Pointer to a function called to check entries.
+ * @param arg
+ *   Opaque data passed to check() function.
+ *
+ * @return
+ *   The number of pointers to ready entries filled in to the entries array.
+ */
+uint32_t
+opdl_stage_claim_check(struct opdl_stage *s, void **entries,
+		uint32_t num_entries, uint32_t *seq, bool block,
+		opdl_ring_check_entries_t *check, void *arg);
+
+/**
+ * Before processing a batch of entries, a stage must first claim them to get
+ * access. This function is threadsafe using same opdl_stage parameter if
+ * the stage was created with threadsafe set to true, otherwise it is only
+ * threadsafe with a different opdl_stage per thread.
+ *
+ * The difference between this function and opdl_stage_claim() is that this
+ * function copies the entries from the opdl_ring. Note that any changes made to
+ * the copied entries will not be reflected back in to the entries in the
+ * opdl_ring, so this function probably only makes sense if the entries are
+ * pointers to other data. For performance reasons, this function does not check
+ * input parameters.
+ *
+ * @param s
+ *   The opdl_ring stage to read entries in.
+ * @param entries
+ *   An array of entries that will be filled in by this function.
+ * @param num_entries
+ *   The number of entries to attempt to claim for processing (and the size of
+ *   the entries array).
+ * @param seq
+ *   If not NULL, this is set to the value of the internal stage sequence number
+ *   associated with the first entry returned.
+ * @param block
+ *   If this is true, the function blocks until num_entries slots are available
+ *   to process. If false, then the function claims as many entries as
+ *   currently possible.
+ *
+ * @return
+ *   The number of entries copied in to the entries array.
+ */
+uint32_t
+opdl_stage_claim_copy(struct opdl_stage *s, void *entries,
+		uint32_t num_entries, uint32_t *seq, bool block);
+
+/**
+ * This function must be called when a stage has finished its processing of
+ * entries, to make them available to any dependent stages. All entries that are
+ * claimed by the calling thread in the stage will be disclaimed. It is possible
+ * to claim multiple batches before disclaiming. For performance reasons, this
+ * function does not check input parameters.
+ *
+ * @param s
+ *   The opdl_ring stage in which to disclaim all claimed entries.
+ *
+ * @param block
+ *   Entries are always made available to a stage in the same order that they
+ *   were input in the stage. If a stage is multithread safe, this may mean that
+ *   full disclaiming of a batch of entries can not be considered complete until
+ *   all earlier threads in the stage have disclaimed. If this parameter is true
+ *   then the function blocks until all entries are fully disclaimed, otherwise
+ *   it disclaims as many as currently possible, with non fully disclaimed
+ *   batches stored until the next call to a claim or disclaim function for this
+ *   stage on this thread.
+ *
+ *   If a thread is not going to process any more entries in this stage, it
+ *   *must* first call this function with this parameter set to true to ensure
+ *   it does not block the entire opdl_ring.
+ *
+ *   In a single threaded stage, this parameter has no effect.
+ */
+int
+opdl_stage_disclaim(struct opdl_stage *s, uint32_t num_entries,
+		bool block);
+
+/**
+ * This function can be called when a stage has finished its processing of
+ * entries, to make them available to any dependent stages. The difference
+ * between this function and opdl_stage_disclaim() is that here only a
+ * portion of entries are disclaimed, not all of them. For performance reasons,
+ * this function does not check input parameters.
+ *
+ * @param s
+ *   The opdl_ring stage in which to disclaim entries.
+ *
+ * @param num_entries
+ *   The number of entries to disclaim.
+ *
+ * @param block
+ *   Entries are always made available to a stage in the same order that they
+ *   were input in the stage. If a stage is multithread safe, this may mean that
+ *   full disclaiming of a batch of entries can not be considered complete until
+ *   all earlier threads in the stage have disclaimed. If this parameter is true
+ *   then the function blocks until the specified number of entries has been
+ *   disclaimed (or there are no more entries to disclaim). Otherwise it
+ *   disclaims as many claims as currently possible and an attempt to disclaim
+ *   them is made the next time a claim or disclaim function for this stage on
+ *   this thread is called.
+ *
+ *   In a single threaded stage, this parameter has no effect.
+ */
+void
+opdl_stage_disclaim_n(struct opdl_stage *s, uint32_t num_entries,
+		bool block);
+
+/**
+ * Check how many entries can be input.
+ *
+ * @param t
+ *   The opdl_ring instance to check.
+ *
+ * @return
+ *   The number of new entries currently allowed to be input.
+ */
+uint32_t
+opdl_ring_available(struct opdl_ring *t);
+
+/**
+ * Check how many entries can be processed in a stage.
+ *
+ * @param s
+ *   The stage to check.
+ *
+ * @return
+ *   The number of entries currently available to be processed in this stage.
+ */
+uint32_t
+opdl_stage_available(struct opdl_stage *s);
+
+/**
+ * Check how many entries are available to be processed.
+ *
+ * NOTE : DOES NOT CHANGE ANY STATE WITHIN THE STAGE
+ *
+ * @param s
+ *   The stage to check.
+ *
+ * @param num_entries
+ *   The number of entries to check for availability.
+ *
+ * @return
+ *   The number of entries currently available to be processed in this stage.
+ */
+uint32_t
+opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries);
+
+/**
+ * Create empty stage instance and return the pointer.
+ *
+ * @param t
+ *   The pointer of  opdl_ring.
+ *
+ * @param threadsafe
+ *    enable multiple thread or not.
+ * @return
+ *   The pointer of one empty stage instance.
+ */
+struct opdl_stage *
+opdl_stage_create(struct opdl_ring *t,  bool threadsafe);
+
+/**
+ * Prints information on opdl_ring instance and all its stages
+ *
+ * @param t
+ *   The stage to print info on.
+ * @param f
+ *   Where to print the info.
+ */
+void
+opdl_ring_dump(const struct opdl_ring *t, FILE *f);
+
+/**
+ * Blocks until all entries in a opdl_ring have been processed by all stages.
+ *
+ * @param t
+ *   The opdl_ring instance to flush.
+ */
+void
+opdl_ring_flush(struct opdl_ring *t);
+
+/**
+ * Deallocates all resources used by a opdl_ring instance
+ *
+ * @param t
+ *   The opdl_ring instance to free.
+ */
+void
+opdl_ring_free(struct opdl_ring *t);
+
+/**
+ * Search for a opdl_ring by its name
+ *
+ * @param name
+ *   The name of the opdl_ring.
+ * @return
+ *   The pointer to the opdl_ring matching the name, or NULL if not found.
+ *
+ */
+struct opdl_ring *
+opdl_ring_lookup(const char *name);
+
+/**
+ * Set a opdl_stage to threadsafe variable.
+ *
+ * @param s
+ *   The opdl_stage.
+ * @param s
+ *   Threadsafe value.
+ */
+void
+opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif  /* _OPDL_H_ */
diff --git a/drivers/event/opdl/rte_pmd_evdev_opdl_version.map b/drivers/event/opdl/rte_pmd_evdev_opdl_version.map
new file mode 100644
index 0000000..5352e7e
--- /dev/null
+++ b/drivers/event/opdl/rte_pmd_evdev_opdl_version.map
@@ -0,0 +1,3 @@
+DPDK_17.05 {
+	local: *;
+};
-- 
2.7.5

--------------------------------------------------------------
Intel Research and Development Ireland Limited
Registered in Ireland
Registered Office: Collinstown Industrial Park, Leixlip, County Kildare
Registered Number: 308263


This e-mail and any attachments may contain confidential material for the sole
use of the intended recipient(s). Any review or distribution by others is
strictly prohibited. If you are not the intended recipient, please contact the
sender and delete all copies.

^ permalink raw reply	[flat|nested] 13+ messages in thread

* [dpdk-dev] [PATCH 2/7] event/opdl: add the opdl pmd header and init helper function
  2017-11-24 11:23 [dpdk-dev] [RFC PATCH 0/7] RFC:EventDev OPDL PMD liang.j.ma
  2017-11-24 11:23 ` [dpdk-dev] [PATCH 1/7] event/opdl: add the opdl ring infrastructure library liang.j.ma
@ 2017-11-24 11:23 ` liang.j.ma
  2017-11-24 11:23 ` [dpdk-dev] [PATCH 3/7] event/opdl: add the opdl pmd main body and xstats " liang.j.ma
                   ` (5 subsequent siblings)
  7 siblings, 0 replies; 13+ messages in thread
From: liang.j.ma @ 2017-11-24 11:23 UTC (permalink / raw)
  To: jerin.jacob
  Cc: dev, harry.van.haaren, bruce.richardson, deepak.k.jain, john.geary

From: Liang Ma <liang.j.ma@intel.com>

opdl_evdev.h include the main data structure of opdl device
and all the function prototype need to be exposed to support
eventdev API.

opdl_evdev_init.c implement all initailization helper function

Signed-off-by: Liang Ma <liang.j.ma@intel.com>
Signed-off-by: Peter, Mccarthy <peter.mccarthy@intel.com>
---
 drivers/event/opdl/opdl_evdev.h      | 353 +++++++++++++
 drivers/event/opdl/opdl_evdev_init.c | 945 +++++++++++++++++++++++++++++++++++
 2 files changed, 1298 insertions(+)
 create mode 100644 drivers/event/opdl/opdl_evdev.h
 create mode 100644 drivers/event/opdl/opdl_evdev_init.c

diff --git a/drivers/event/opdl/opdl_evdev.h b/drivers/event/opdl/opdl_evdev.h
new file mode 100644
index 0000000..e2657de
--- /dev/null
+++ b/drivers/event/opdl/opdl_evdev.h
@@ -0,0 +1,353 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _OPDL_EVDEV_H_
+#define _OPDL_EVDEV_H_
+
+#include <rte_eventdev.h>
+#include <rte_eventdev_pmd_vdev.h>
+#include <rte_atomic.h>
+#include "opdl_ring.h"
+
+#define OPDL_QID_NUM_FIDS 1024
+#define OPDL_IQS_MAX 1
+#define OPDL_Q_PRIORITY_MAX 1
+#define OPDL_PORTS_MAX 64
+#define MAX_OPDL_CONS_Q_DEPTH 128
+/* OPDL size */
+#define OPDL_INFLIGHT_EVENTS_TOTAL 4096
+/* allow for lots of over-provisioning */
+#define OPDL_FRAGMENTS_MAX 1
+
+/* report dequeue burst sizes in buckets */
+#define OPDL_DEQ_STAT_BUCKET_SHIFT 2
+/* how many packets pulled from port by sched */
+#define SCHED_DEQUEUE_BURST_SIZE 32
+
+/* size of our history list */
+#define OPDL_PORT_HIST_LIST (MAX_OPDL_PROD_Q_DEPTH)
+
+/* how many data points use for average stats */
+#define NUM_SAMPLES 64
+
+#define EVENTDEV_NAME_OPDL_PMD event_opdl
+#define OPDL_PMD_NAME RTE_STR(event_opdl)
+#define OPDL_PMD_NAME_MAX 64
+
+#define OPDL_INVALID_QID 255
+
+#define OPDL_SCHED_TYPE_DIRECT (RTE_SCHED_TYPE_PARALLEL + 1)
+
+#define OPDL_NUM_POLL_BUCKETS  \
+	(MAX_OPDL_CONS_Q_DEPTH >> OPDL_DEQ_STAT_BUCKET_SHIFT)
+
+enum {
+	QE_FLAG_VALID_SHIFT = 0,
+	QE_FLAG_COMPLETE_SHIFT,
+	QE_FLAG_NOT_EOP_SHIFT,
+	_QE_FLAG_COUNT
+};
+
+enum port_type {
+	OPDL_INVALID_PORT = 0,
+	OPDL_REGULAR_PORT = 1,
+	OPDL_PURE_RX_PORT,
+	OPDL_PURE_TX_PORT,
+	OPDL_ASYNC_PORT
+};
+
+enum queue_type {
+	OPDL_Q_TYPE_INVALID = 0,
+	OPDL_Q_TYPE_SINGLE_LINK = 1,
+	OPDL_Q_TYPE_ATOMIC,
+	OPDL_Q_TYPE_ORDERED
+};
+
+enum queue_pos {
+	OPDL_Q_POS_START = 0,
+	OPDL_Q_POS_MIDDLE,
+	OPDL_Q_POS_END
+};
+
+#define QE_FLAG_VALID    (1 << QE_FLAG_VALID_SHIFT)    /* for NEW FWD, FRAG */
+#define QE_FLAG_COMPLETE (1 << QE_FLAG_COMPLETE_SHIFT) /* set for FWD, DROP  */
+#define QE_FLAG_NOT_EOP  (1 << QE_FLAG_NOT_EOP_SHIFT)  /* set for FRAG only  */
+
+static const uint8_t opdl_qe_flag_map[] = {
+	QE_FLAG_VALID /* NEW Event */,
+	QE_FLAG_VALID | QE_FLAG_COMPLETE /* FWD Event */,
+	QE_FLAG_COMPLETE /* RELEASE Event */,
+
+	/* Values which can be used for future support for partial
+	 * events, i.e. where one event comes back to the scheduler
+	 * as multiple which need to be tracked together
+	 */
+	QE_FLAG_VALID | QE_FLAG_COMPLETE | QE_FLAG_NOT_EOP,
+};
+
+#define OPDL_LOG_INFO(fmt, args...) \
+	RTE_LOG(INFO, EVENTDEV, "[%s] line %u: " fmt "\n", \
+			OPDL_PMD_NAME, \
+			__LINE__, ## args)
+
+#ifdef RTE_LIBRTE_PMD_EVDEV_OPDL_DEBUG
+#define OPDL_LOG_DBG(fmt, args...) \
+	RTE_LOG(DEBUG, EVENTDEV, "[%s] %s() line %u: " fmt "\n", \
+			OPDL_PMD_NAME, \
+			__func__, __LINE__, ## args)
+#else
+#define OPDL_LOG_DBG(fmt, args...)
+#endif
+
+#define OPDL_LOG_ERR(fmt, args...) \
+	RTE_LOG(ERR, EVENTDEV, "[%s] %s() line %u: " fmt "\n", \
+			OPDL_PMD_NAME, \
+			__func__, __LINE__, ## args)
+
+enum port_xstat_name {
+	claim_pkts_requested = 0,
+	claim_pkts_granted,
+	claim_non_empty,
+	claim_empty,
+	total_cycles,
+	max_num_port_xstat
+};
+
+#define OPDL_MAX_PORT_XSTAT_NUM (OPDL_PORTS_MAX * max_num_port_xstat)
+
+struct opdl_port;
+
+typedef uint16_t (*opdl_enq_operation)(struct opdl_port *port,
+		const struct rte_event ev[],
+		uint16_t num);
+
+typedef uint16_t (*opdl_deq_operation)(struct opdl_port *port,
+		struct rte_event ev[],
+		uint16_t num);
+
+struct opdl_evdev;
+
+struct opdl_stage_meta_data {
+	uint32_t num_claimed;	/* number of entries claimed by this stage */
+	uint32_t burst_sz;	/* Port claim burst size */
+};
+
+struct opdl_port {
+
+	/* back pointer */
+	struct opdl_evdev *opdl;
+
+	/* enq handler & stage instance */
+	opdl_enq_operation enq;
+	struct opdl_stage *enq_stage_inst;
+
+	/* deq handler & stage instance */
+	opdl_deq_operation deq;
+	struct opdl_stage *deq_stage_inst;
+
+	/* port id has correctly been set */
+	uint8_t configured;
+
+	/* set when the port is initialized */
+	uint8_t initialized;
+
+	/* A numeric ID for the port */
+	uint8_t id;
+
+	/* Space for claimed entries */
+	struct rte_event *entries[MAX_OPDL_CONS_Q_DEPTH];
+
+	/* RX/REGULAR/TX/ASYNC - determined on position in queue */
+	enum port_type p_type;
+
+	/* if the claim is static atomic type  */
+	bool atomic_claim;
+
+	/* Queue linked to this port - internal queue id*/
+	uint8_t queue_id;
+
+	/* Queue linked to this port - external queue id*/
+	uint8_t external_qid;
+
+	/* Next queue linked to this port - external queue id*/
+	uint8_t next_external_qid;
+
+	/* number of instances of this stage */
+	uint32_t num_instance;
+
+	/* instance ID of this stage*/
+	uint32_t instance_id;
+
+	/* track packets in and out of this port */
+	uint64_t port_stat[max_num_port_xstat];
+	uint64_t start_cycles;
+};
+
+struct opdl_queue_meta_data {
+	uint8_t         ext_id;
+	enum queue_type type;
+	int8_t          setup;
+};
+
+struct opdl_xstats_entry {
+	struct rte_event_dev_xstats_name stat;
+	unsigned int id;
+	uint64_t *value;
+};
+
+struct opdl_queue {
+
+	/* Turbine this queue is associated with */
+	uint32_t turbine_id;
+
+	/* type and position have correctly been set */
+	uint8_t configured;
+
+	/* port number and associated ports have been associated */
+	uint8_t initialized;
+
+	/* type of this queue (Atomic, Ordered, Parallel, Direct)*/
+	enum queue_type q_type;
+
+	/* position of queue (START, MIDDLE, END) */
+	enum queue_pos q_pos;
+
+	/* external queue id. It is mapped to the queue position */
+	uint8_t external_qid;
+
+	struct opdl_port *ports[OPDL_PORTS_MAX];
+	uint32_t nb_ports;
+
+	/* priority, reserved for future */
+	uint8_t priority;
+};
+
+
+#define OPDL_TUR_PER_DEV 12
+
+/* PMD needs an extra queue per turbine */
+#define OPDL_MAX_QUEUES (RTE_EVENT_MAX_QUEUES_PER_DEV - OPDL_TUR_PER_DEV)
+
+
+struct opdl_evdev {
+	struct rte_eventdev_data *data;
+
+	uint8_t started;
+
+	/* Max number of ports and queues*/
+	uint32_t max_port_nb;
+	uint32_t max_queue_nb;
+
+	/* slots in the turbine */
+	uint32_t nb_events_limit;
+
+	/*
+	 * Array holding all turbines for this device
+	 */
+	struct opdl_ring *turbine[OPDL_TUR_PER_DEV];
+	uint32_t nb_turbines;
+
+	struct opdl_queue_meta_data q_md[OPDL_MAX_QUEUES];
+	uint32_t nb_q_md;
+
+	/* Internal queues - one per logical queue */
+	struct opdl_queue
+		queue[RTE_EVENT_MAX_QUEUES_PER_DEV] __rte_cache_aligned;
+
+	uint32_t nb_queues;
+
+	struct opdl_stage_meta_data s_md[OPDL_PORTS_MAX];
+
+	/* Contains all ports - load balanced and directed */
+	struct opdl_port ports[OPDL_PORTS_MAX] __rte_cache_aligned;
+	uint32_t nb_ports;
+
+	uint8_t q_map_ex_to_in[OPDL_INVALID_QID];
+
+	/* Stats */
+	struct opdl_xstats_entry port_xstat[OPDL_MAX_PORT_XSTAT_NUM];
+
+	char service_name[OPDL_PMD_NAME_MAX];
+	int socket;
+	int do_validation;
+};
+
+
+static inline struct opdl_evdev *
+opdl_pmd_priv(const struct rte_eventdev *eventdev)
+{
+	return eventdev->data->dev_private;
+}
+
+static inline const struct opdl_evdev *
+opdl_pmd_priv_const(const struct rte_eventdev *eventdev)
+{
+	return eventdev->data->dev_private;
+}
+
+uint16_t opdl_event_enqueue(void *port, const struct rte_event *ev);
+uint16_t opdl_event_enqueue_burst(void *port, const struct rte_event ev[],
+		uint16_t num);
+
+uint16_t opdl_event_dequeue(void *port, struct rte_event *ev, uint64_t wait);
+uint16_t opdl_event_dequeue_burst(void *port, struct rte_event *ev,
+		uint16_t num, uint64_t wait);
+void opdl_event_schedule(struct rte_eventdev *dev);
+
+void opdl_xstats_init(struct rte_eventdev *dev);
+int opdl_xstats_uninit(struct rte_eventdev *dev);
+int opdl_xstats_get_names(const struct rte_eventdev *dev,
+		enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id,
+		struct rte_event_dev_xstats_name *xstats_names,
+		unsigned int *ids, unsigned int size);
+int opdl_xstats_get(const struct rte_eventdev *dev,
+		enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id,
+		const unsigned int ids[], uint64_t values[], unsigned int n);
+uint64_t opdl_xstats_get_by_name(const struct rte_eventdev *dev,
+		const char *name, unsigned int *id);
+int opdl_xstats_reset(struct rte_eventdev *dev,
+		enum rte_event_dev_xstats_mode mode,
+		int16_t queue_port_id,
+		const uint32_t ids[],
+		uint32_t nb_ids);
+
+int opdl_add_event_handlers(struct rte_eventdev *dev);
+int build_all_dependencies(struct rte_eventdev *dev);
+int check_queues_linked(struct rte_eventdev *dev);
+int create_queues_and_rings(struct rte_eventdev *dev);
+int initialise_all_other_ports(struct rte_eventdev *dev);
+int initialise_queue_zero_ports(struct rte_eventdev *dev);
+int assign_internal_queue_ids(struct rte_eventdev *dev);
+void destroy_queues_and_rings(struct rte_eventdev *dev);
+
+
+#endif /* _OPDL_EVDEV_H_ */
diff --git a/drivers/event/opdl/opdl_evdev_init.c b/drivers/event/opdl/opdl_evdev_init.c
new file mode 100644
index 0000000..2699cb2
--- /dev/null
+++ b/drivers/event/opdl/opdl_evdev_init.c
@@ -0,0 +1,945 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <inttypes.h>
+#include <string.h>
+
+#include <rte_bus_vdev.h>
+#include <rte_memzone.h>
+#include <rte_kvargs.h>
+#include <rte_ring.h>
+#include <rte_errno.h>
+#include <rte_cycles.h>
+
+#include "opdl_evdev.h"
+#include "opdl_ring.h"
+
+
+static inline uint32_t __attribute__((always_inline))
+enqueue_check(struct opdl_port *p,
+		const struct rte_event ev[],
+		uint16_t num,
+		uint16_t num_events)
+{
+	uint16_t i;
+
+	if (p->opdl->do_validation) {
+
+		for (i = 0; i < num; i++) {
+			if (ev[i].queue_id != p->next_external_qid) {
+				OPDL_LOG_ERR("ERROR - port:[%u] - event wants"
+						" to enq to q_id[%u],"
+						" but should be [%u]\n",
+						p->id,
+						ev[i].queue_id,
+						p->next_external_qid);
+				rte_errno = -EINVAL;
+				return 0;
+			}
+		}
+
+		/* Stats */
+		if (p->p_type == OPDL_PURE_RX_PORT ||
+				p->p_type == OPDL_ASYNC_PORT) {
+			/* Stats */
+			if (num_events) {
+				p->port_stat[claim_pkts_requested] += num;
+				p->port_stat[claim_pkts_granted] += num_events;
+				p->port_stat[claim_non_empty]++;
+				p->start_cycles = rte_rdtsc();
+			} else {
+				p->port_stat[claim_empty]++;
+				p->start_cycles = 0;
+			}
+		} else {
+			if (p->start_cycles) {
+				uint64_t end_cycles = rte_rdtsc();
+				p->port_stat[total_cycles] +=
+					end_cycles - p->start_cycles;
+			}
+		}
+	} else {
+		if (num > 0 &&
+				ev[0].queue_id != p->next_external_qid) {
+			rte_errno = -EINVAL;
+			return 0;
+		}
+	}
+
+	return num;
+}
+
+static inline void __attribute__((always_inline))
+update_on_dequeue(struct opdl_port *p,
+		struct rte_event ev[],
+		uint16_t num,
+		uint16_t num_events)
+{
+	if (p->opdl->do_validation) {
+		int16_t i;
+		for (i = 0; i < num; i++)
+			ev[i].queue_id =
+				p->opdl->queue[p->queue_id].external_qid;
+
+		/* Stats */
+		if (num_events) {
+			p->port_stat[claim_pkts_requested] += num;
+			p->port_stat[claim_pkts_granted] += num_events;
+			p->port_stat[claim_non_empty]++;
+			p->start_cycles = rte_rdtsc();
+		} else {
+			p->port_stat[claim_empty]++;
+			p->start_cycles = 0;
+		}
+	} else {
+		if (num > 0)
+			ev[0].queue_id =
+				p->opdl->queue[p->queue_id].external_qid;
+	}
+}
+
+
+/*
+ * Error RX enqueue:
+ *
+ *
+ */
+
+static uint16_t
+opdl_rx_error_enqueue(struct opdl_port *p,
+		const struct rte_event ev[],
+		uint16_t num)
+{
+	RTE_SET_USED(p);
+	RTE_SET_USED(ev);
+	RTE_SET_USED(num);
+
+	rte_errno = -ENOSPC;
+
+	return 0;
+}
+
+/*
+ * RX enqueue:
+ *
+ * This function handles enqueue for a single input stage_inst with
+ *	threadsafe disabled or enabled. eg 1 thread using a stage_inst or
+ *	multiple threads sharing a stage_inst
+ */
+
+static uint16_t
+opdl_rx_enqueue(struct opdl_port *p,
+		const struct rte_event ev[],
+		uint16_t num)
+{
+	uint16_t enqueued = 0;
+
+	enqueued = opdl_ring_input(opdl_stage_get_opdl_ring(p->enq_stage_inst),
+				   ev,
+				   num,
+				   false);
+	if (!enqueue_check(p, ev, num, enqueued))
+		return 0;
+
+
+	if (enqueued < num)
+		rte_errno = -ENOSPC;
+
+	return enqueued;
+}
+
+/*
+ * Error TX handler
+ *
+ */
+
+static uint16_t
+opdl_tx_error_dequeue(struct opdl_port *p,
+		struct rte_event ev[],
+		uint16_t num)
+{
+	RTE_SET_USED(p);
+	RTE_SET_USED(ev);
+	RTE_SET_USED(num);
+
+	rte_errno = -ENOSPC;
+
+	return 0;
+}
+
+/*
+ * TX single threaded claim
+ *
+ * This function handles dequeue for a single worker stage_inst with
+ *	threadsafe disabled. eg 1 thread using an stage_inst
+ */
+
+static uint16_t
+opdl_tx_dequeue_single_thread(struct opdl_port *p,
+			struct rte_event ev[],
+			uint16_t num)
+{
+	uint16_t returned;
+
+	struct opdl_ring  *ring;
+
+	ring = opdl_stage_get_opdl_ring(p->deq_stage_inst);
+
+	returned = opdl_ring_copy_to_burst(ring,
+					   p->deq_stage_inst,
+					   ev,
+					   num,
+					   false);
+
+	update_on_dequeue(p, ev, num, returned);
+
+	return returned;
+}
+
+/*
+ * TX multi threaded claim
+ *
+ * This function handles dequeue for multiple worker stage_inst with
+ *	threadsafe disabled. eg multiple stage_inst each with its own instance
+ */
+
+static uint16_t
+opdl_tx_dequeue_multi_inst(struct opdl_port *p,
+			struct rte_event ev[],
+			uint16_t num)
+{
+	uint32_t num_events = 0;
+
+	num_events = opdl_stage_claim(p->deq_stage_inst,
+				    (void *)ev,
+				    num,
+				    NULL,
+				    false,
+				    false);
+
+	update_on_dequeue(p, ev, num, num_events);
+
+	return opdl_stage_disclaim(p->deq_stage_inst, num_events, false);
+}
+
+
+/*
+ * Worker thread claim
+ *
+ */
+
+static uint16_t
+opdl_claim(struct opdl_port *p, struct rte_event ev[], uint16_t num)
+{
+	uint32_t num_events = 0;
+
+	if (unlikely(num > MAX_OPDL_CONS_Q_DEPTH)) {
+		OPDL_LOG_ERR(""
+				"Attempt to dequeue num of events larger than port (%d) max\n",
+				p->id);
+		rte_errno = -EINVAL;
+		return 0;
+	}
+
+
+	num_events = opdl_stage_claim(p->deq_stage_inst,
+			(void *)ev,
+			num,
+			NULL,
+			false,
+			p->atomic_claim);
+
+
+	update_on_dequeue(p, ev, num, num_events);
+
+	return num_events;
+}
+
+/*
+ * Worker thread disclaim
+ */
+
+static uint16_t
+opdl_disclaim(struct opdl_port *p, const struct rte_event ev[], uint16_t num)
+{
+	uint16_t enqueued = 0;
+
+	enqueued = opdl_stage_disclaim(p->enq_stage_inst,
+				       num,
+				       false);
+
+	return enqueue_check(p, ev, num, enqueued);
+}
+
+static inline struct opdl_stage *__attribute__((always_inline))
+stage_for_port(struct opdl_queue *q, unsigned int i)
+{
+	if (q->q_pos == OPDL_Q_POS_START || q->q_pos == OPDL_Q_POS_MIDDLE)
+		return q->ports[i]->enq_stage_inst;
+	else
+		return q->ports[i]->deq_stage_inst;
+}
+
+static int opdl_add_deps(struct opdl_evdev *device,
+			 int q_id,
+			 int deps_q_id)
+{
+	unsigned int i, j;
+	int status;
+	struct opdl_ring  *ring;
+	struct opdl_queue *queue = &device->queue[q_id];
+	struct opdl_queue *queue_deps = &device->queue[deps_q_id];
+	struct opdl_stage *dep_stages[OPDL_PORTS_MAX];
+
+	/* sanity check that all stages are for same turbine */
+	for (i = 0; i < queue->nb_ports; i++) {
+		struct opdl_ring *r =
+			opdl_stage_get_opdl_ring(stage_for_port(queue, i));
+		for (j = 0; j < queue_deps->nb_ports; j++) {
+			struct opdl_ring *rj =
+				opdl_stage_get_opdl_ring(
+						stage_for_port(queue_deps, j));
+			if (r != rj) {
+				OPDL_LOG_ERR("Stages and dependents"
+						" are not for same turbine");
+				for (uint32_t k = 0;
+						k < device->nb_turbines; k++) {
+					opdl_ring_dump(device->turbine[k],
+							stdout);
+				}
+				return -EINVAL;
+			}
+		}
+	}
+
+	/* Gather all stages instance in deps */
+	for (i = 0; i < queue_deps->nb_ports; i++)
+		dep_stages[i] = stage_for_port(queue_deps, i);
+
+
+	/* Add all deps for each port->stage_inst in this queue */
+	for (i = 0; i < queue->nb_ports; i++) {
+
+		ring = opdl_stage_get_opdl_ring(stage_for_port(queue, i));
+
+		status = opdl_stage_deps_add(ring,
+				stage_for_port(queue, i),
+				queue->ports[i]->num_instance,
+				queue->ports[i]->instance_id,
+				dep_stages,
+				queue_deps->nb_ports);
+		if (status < 0)
+			return -EINVAL;
+	}
+
+	return 0;
+}
+
+int
+opdl_add_event_handlers(struct rte_eventdev *dev)
+{
+	int err = 0;
+
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+	unsigned int i;
+
+	for (i = 0; i < device->max_port_nb; i++) {
+
+		struct opdl_port *port = &device->ports[i];
+
+		if (port->configured) {
+			if (port->p_type == OPDL_PURE_RX_PORT) {
+				port->enq = opdl_rx_enqueue;
+				port->deq = opdl_tx_error_dequeue;
+
+			} else if (port->p_type == OPDL_PURE_TX_PORT) {
+
+				port->enq = opdl_rx_error_enqueue;
+
+				if (port->num_instance == 1)
+					port->deq =
+						opdl_tx_dequeue_single_thread;
+				else
+					port->deq = opdl_tx_dequeue_multi_inst;
+
+			} else if (port->p_type == OPDL_REGULAR_PORT) {
+
+				port->enq = opdl_disclaim;
+				port->deq = opdl_claim;
+
+			} else if (port->p_type == OPDL_ASYNC_PORT) {
+
+				port->enq = opdl_rx_enqueue;
+
+				/* Always single instance */
+				port->deq = opdl_tx_dequeue_single_thread;
+			} else {
+				OPDL_LOG_ERR("port:[%u] has invalid port type - ",
+						port->id);
+				err = -EINVAL;
+				break;
+			}
+			port->initialized = 1;
+		}
+	}
+
+	if (!err)
+		fprintf(stdout, "Success - enqueue/dequeue handler(s) added\n");
+	return err;
+}
+
+int
+build_all_dependencies(struct rte_eventdev *dev)
+{
+
+	int err = 0;
+	unsigned int i;
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	uint8_t start_qid = 0;
+
+	for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
+		struct opdl_queue *queue = &device->queue[i];
+		if (!queue->initialized)
+			break;
+
+		if (queue->q_pos == OPDL_Q_POS_START) {
+			start_qid = i;
+			continue;
+		}
+
+		if (queue->q_pos == OPDL_Q_POS_MIDDLE) {
+			err = opdl_add_deps(device, i, i-1);
+			if (err < 0) {
+				OPDL_LOG_ERR("dependancy addition for queue:[%u] - FAILED",
+						queue->external_qid);
+				break;
+			}
+		}
+
+		if (queue->q_pos == OPDL_Q_POS_END) {
+			/* Add this dependency */
+			err = opdl_add_deps(device, i, i-1);
+			if (err < 0) {
+				OPDL_LOG_ERR("dependancy addition for queue:[%u] - FAILED",
+						queue->external_qid);
+				break;
+			}
+			/* Add dependency for rx on tx */
+			err = opdl_add_deps(device, start_qid, i);
+			if (err < 0) {
+				OPDL_LOG_ERR("dependancy addition for queue:[%u] - FAILED",
+						queue->external_qid);
+				break;
+			}
+		}
+	}
+
+	if (!err)
+		fprintf(stdout, "Success - dependencies built\n");
+
+	return err;
+}
+int
+check_queues_linked(struct rte_eventdev *dev)
+{
+
+	int err = 0;
+	unsigned int i;
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+	uint32_t nb_iq = 0;
+
+	for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
+		struct opdl_queue *queue = &device->queue[i];
+
+		if (!queue->initialized)
+			break;
+
+		if (queue->external_qid == OPDL_INVALID_QID)
+			nb_iq++;
+
+		if (queue->nb_ports == 0) {
+			OPDL_LOG_ERR("queue:[%u] has no associated ports",
+					i);
+			err = -EINVAL;
+			break;
+		}
+	}
+	if (!err) {
+		if ((i - nb_iq) != device->max_queue_nb) {
+			OPDL_LOG_ERR("%u queues counted but should be %u",
+					i - nb_iq,
+					device->max_queue_nb);
+			err = -1;
+		} else {
+			fprintf(stdout, "Success - %u queues (ex:%u + in:%u) validated\n",
+					i,
+					device->max_queue_nb,
+					nb_iq);
+		}
+
+	}
+	return err;
+}
+
+void
+destroy_queues_and_rings(struct rte_eventdev *dev)
+{
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	for (uint32_t i = 0; i < device->nb_turbines; i++) {
+		if (device->turbine[i])
+			opdl_ring_free(device->turbine[i]);
+	}
+
+	memset(&device->queue,
+			0,
+			sizeof(struct opdl_queue)
+			* RTE_EVENT_MAX_QUEUES_PER_DEV);
+}
+
+#define TURBINE_ID(d)(d->nb_turbines - 1)
+
+static inline void
+initialise_queue(struct opdl_evdev *device,
+		enum queue_pos pos,
+		int32_t i)
+{
+	struct opdl_queue *queue = &device->queue[device->nb_queues];
+
+	if (i == -1) {
+		queue->q_type = OPDL_Q_TYPE_ORDERED;
+		queue->external_qid = OPDL_INVALID_QID;
+	} else {
+		queue->q_type = device->q_md[i].type;
+		queue->external_qid = device->q_md[i].ext_id;
+		/* Add ex->in for queues setup */
+		device->q_map_ex_to_in[queue->external_qid] = device->nb_queues;
+	}
+	queue->turbine_id = TURBINE_ID(device);
+	queue->q_pos = pos;
+	queue->nb_ports = 0;
+	queue->configured = 1;
+
+	device->nb_queues++;
+}
+
+
+static inline int
+create_turbine(struct opdl_evdev *device)
+{
+	int err = 0;
+
+	char name[RTE_MEMZONE_NAMESIZE];
+
+	sprintf(name, "%s_%u", device->service_name, device->nb_turbines);
+
+	device->turbine[device->nb_turbines] =
+		opdl_ring_create(name,
+				device->nb_events_limit,
+				sizeof(struct rte_event),
+				device->max_port_nb * 2,
+				device->socket);
+
+	if (!device->turbine[device->nb_turbines]) {
+		OPDL_LOG_ERR("opdl ring %u creation - FAILED",
+				device->nb_turbines);
+		err = -EINVAL;
+	} else {
+		device->nb_turbines++;
+	}
+	return err;
+}
+
+static inline int
+create_link_turbine(struct opdl_evdev *device, uint32_t index)
+{
+
+	int err = 0;
+
+	if (device->q_md[index + 1].type !=
+			OPDL_Q_TYPE_SINGLE_LINK) {
+
+		/* async queue with regular
+		 * queue following it
+		 */
+
+		/* create a new turbine */
+		err = create_turbine(device);
+		if (!err) {
+			/* create an initial
+			 * dummy queue for new turbine
+			 */
+			initialise_queue(device,
+					OPDL_Q_POS_START,
+					-1);
+		} else {
+			err = -EINVAL;
+		}
+	} else {
+		OPDL_LOG_ERR("queue %u, 2"
+				" SINGLE_LINK queues, not allowed",
+				index);
+		err = -EINVAL;
+	}
+
+	return err;
+}
+
+int
+create_queues_and_rings(struct rte_eventdev *dev)
+{
+	int err = 0;
+
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	device->nb_queues = 0;
+
+	if (device->nb_ports != device->max_port_nb) {
+		OPDL_LOG_ERR("Number ports setup:%u NOT EQUAL to max port"
+				" number:%u for this device",
+				device->nb_ports,
+				device->max_port_nb);
+		err = -1;
+	}
+
+	if (!err) {
+		/* We will have at least one turbine so create it now */
+		err = create_turbine(device);
+	}
+
+	if (!err) {
+
+		/* Create 1st "dummy" queue */
+		initialise_queue(device,
+				 OPDL_Q_POS_START,
+				 -1);
+
+		for (uint32_t i = 0; i < device->nb_q_md; i++) {
+
+			/* Check */
+			if (!device->q_md[i].setup) {
+
+				OPDL_LOG_ERR("queue meta data slot %u"
+						" not setup - FAILING",
+						i);
+				err = -EINVAL;
+				break;
+			} else if (device->q_md[i].type !=
+					OPDL_Q_TYPE_SINGLE_LINK) {
+
+				if (!device->q_md[i + 1].setup) {
+					/* Create a simple ORDERED/ATOMIC
+					 * queue at the end
+					 */
+					initialise_queue(device,
+							OPDL_Q_POS_END,
+							i);
+
+				} else {
+					/* Create a simple ORDERED/ATOMIC
+					 * queue in the middle
+					 */
+					initialise_queue(device,
+							OPDL_Q_POS_MIDDLE,
+							i);
+				}
+			} else if (device->q_md[i].type ==
+					OPDL_Q_TYPE_SINGLE_LINK) {
+
+				/* create last queue for this turbine */
+				initialise_queue(device,
+						OPDL_Q_POS_END,
+						i);
+
+				err = create_link_turbine(device, i);
+
+				if (err)
+					break;
+
+
+			}
+		}
+	}
+	if (err)
+		destroy_queues_and_rings(dev);
+	else
+		fprintf(stdout, "Success - Created %u queues and %u turbines\n",
+				device->nb_queues,
+				device->nb_turbines);
+
+	return err;
+}
+
+
+int
+initialise_all_other_ports(struct rte_eventdev *dev)
+{
+	int err = 0;
+	struct opdl_stage *stage_inst = NULL;
+
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	for (uint32_t i = 0; i < device->nb_ports; i++) {
+		struct opdl_port *port = &device->ports[i];
+		struct opdl_queue *queue = &device->queue[port->queue_id];
+
+		if (port->queue_id == 0) {
+			continue;
+		} else if (queue->q_type != OPDL_Q_TYPE_SINGLE_LINK) {
+
+			if (queue->q_pos == OPDL_Q_POS_MIDDLE) {
+
+				/* Regular port with claim/disclaim */
+				stage_inst = opdl_stage_add(
+					device->turbine[queue->turbine_id],
+						false,
+						false);
+				port->deq_stage_inst = stage_inst;
+				port->enq_stage_inst = stage_inst;
+
+				if (queue->q_type == OPDL_Q_TYPE_ATOMIC)
+					port->atomic_claim = true;
+				else
+					port->atomic_claim = false;
+
+				port->p_type =  OPDL_REGULAR_PORT;
+
+				/* Add the port to the queue array of ports */
+				queue->ports[queue->nb_ports] = port;
+				port->instance_id = queue->nb_ports;
+				queue->nb_ports++;
+			} else if (queue->q_pos == OPDL_Q_POS_END) {
+
+				/* tx port  */
+				stage_inst = opdl_stage_add(
+					device->turbine[queue->turbine_id],
+						false,
+						false);
+				port->deq_stage_inst = stage_inst;
+				port->enq_stage_inst = NULL;
+				port->p_type = OPDL_PURE_TX_PORT;
+
+				/* Add the port to the queue array of ports */
+				queue->ports[queue->nb_ports] = port;
+				port->instance_id = queue->nb_ports;
+				queue->nb_ports++;
+			} else {
+
+				OPDL_LOG_ERR("port %u:, linked incorrectly"
+						" to a q_pos START/INVALID %u",
+						port->id,
+						queue->q_pos);
+				err = -EINVAL;
+				break;
+			}
+
+		} else if (queue->q_type == OPDL_Q_TYPE_SINGLE_LINK) {
+
+			port->p_type = OPDL_ASYNC_PORT;
+
+			/* -- tx -- */
+			stage_inst = opdl_stage_add(
+				device->turbine[queue->turbine_id],
+					false,
+					false); /* First stage */
+			port->deq_stage_inst = stage_inst;
+
+			/* Add the port to the queue array of ports */
+			queue->ports[queue->nb_ports] = port;
+			port->instance_id = queue->nb_ports;
+			queue->nb_ports++;
+
+			if (queue->nb_ports > 1) {
+				OPDL_LOG_ERR("queue %u:, setup as SINGLE_LINK"
+					" but has more than one port linked",
+						queue->external_qid);
+				err = -EINVAL;
+				break;
+			}
+
+			/* -- single instance rx for next turbine -- */
+			uint8_t next_qid =
+				device->q_map_ex_to_in[queue->external_qid] + 1;
+			if (next_qid < RTE_EVENT_MAX_QUEUES_PER_DEV &&
+					device->queue[next_qid].configured) {
+
+				/* Remap the queue */
+				queue = &device->queue[next_qid];
+
+				stage_inst = opdl_stage_add(
+					device->turbine[queue->turbine_id],
+						false,
+						true);
+				port->enq_stage_inst = stage_inst;
+
+				/* Add the port to the queue array of ports */
+				queue->ports[queue->nb_ports] = port;
+				port->instance_id = queue->nb_ports;
+				queue->nb_ports++;
+				if (queue->nb_ports > 1) {
+					OPDL_LOG_ERR("dummy queue %u: for "
+						"port %u, "
+						"SINGLE_LINK but has more "
+						"than one port linked",
+							next_qid,
+							port->id);
+					err = -EINVAL;
+					break;
+				}
+				/* Set this queue to initialized as it is never
+				 * referenced by any ports
+				 */
+				queue->initialized = 1;
+			}
+		}
+	}
+
+	/* Now that all ports are initialised we need to
+	 * setup the last bit of stage md
+	 */
+	if (!err) {
+		for (uint32_t i = 0; i < device->nb_ports; i++) {
+			struct opdl_port *port = &device->ports[i];
+			struct opdl_queue *queue =
+				&device->queue[port->queue_id];
+
+			if (port->configured &&
+					(port->queue_id != OPDL_INVALID_QID)) {
+				if (queue->nb_ports == 0) {
+					OPDL_LOG_ERR("queue:[%u] has no ports"
+							" linked to it",
+							port->id);
+					err = -EINVAL;
+					break;
+				}
+
+				port->num_instance = queue->nb_ports;
+				port->initialized = 1;
+				queue->initialized = 1;
+			} else {
+				OPDL_LOG_ERR("Port:[%u] not configured  invalid"
+						" queue configuration",
+						port->id);
+				err = -EINVAL;
+				break;
+			}
+		}
+	}
+
+	if (!err) {
+		fprintf(stdout,
+				"Success - %u port(s) initialized\n",
+				device->nb_ports);
+	}
+	return err;
+}
+
+int
+initialise_queue_zero_ports(struct rte_eventdev *dev)
+{
+	int err = 0;
+	uint8_t mt_rx = 0;
+	struct opdl_stage *stage_inst = NULL;
+	struct opdl_queue *queue = NULL;
+
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	/* Assign queue zero and figure out how many Q0 ports we have */
+	for (uint32_t i = 0; i < device->nb_ports; i++) {
+		struct opdl_port *port = &device->ports[i];
+		if (port->queue_id == OPDL_INVALID_QID) {
+			port->queue_id = 0;
+			port->external_qid = OPDL_INVALID_QID;
+			port->p_type = OPDL_PURE_RX_PORT;
+			mt_rx++;
+		}
+	}
+
+	/* Create the stage */
+	stage_inst = opdl_stage_add(device->turbine[0],
+			(mt_rx > 1 ? true : false),
+			true);
+	if (stage_inst) {
+
+		/* Assign the new created input stage to all relevant ports */
+		for (uint32_t i = 0; i < device->nb_ports; i++) {
+			struct opdl_port *port = &device->ports[i];
+			if (port->queue_id == 0) {
+				queue = &device->queue[port->queue_id];
+				port->enq_stage_inst = stage_inst;
+				port->deq_stage_inst = NULL;
+				port->configured = 1;
+				port->initialized = 1;
+
+				queue->ports[queue->nb_ports] = port;
+				port->instance_id = queue->nb_ports;
+				queue->nb_ports++;
+			}
+		}
+	} else {
+		err = -1;
+	}
+
+	if (!err) {
+		fprintf(stdout, "Success - (%u) \"Queue 0\" port(s) "
+				"initialized\n",
+				queue->nb_ports);
+	}
+	return err;
+}
+
+int
+assign_internal_queue_ids(struct rte_eventdev *dev)
+{
+	int err = 0;
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	for (uint32_t i = 0; i < device->nb_ports; i++) {
+		struct opdl_port *port = &device->ports[i];
+		if (port->external_qid != OPDL_INVALID_QID) {
+			port->queue_id =
+				device->q_map_ex_to_in[port->external_qid];
+
+			/* Now do the external_qid of the next queue */
+			struct opdl_queue *queue =
+				&device->queue[port->queue_id];
+			if (queue->q_pos == OPDL_Q_POS_END)
+				port->next_external_qid =
+				device->queue[port->queue_id + 2].external_qid;
+			else
+				port->next_external_qid =
+				device->queue[port->queue_id + 1].external_qid;
+		}
+	}
+	return err;
+}
-- 
2.7.5

--------------------------------------------------------------
Intel Research and Development Ireland Limited
Registered in Ireland
Registered Office: Collinstown Industrial Park, Leixlip, County Kildare
Registered Number: 308263


This e-mail and any attachments may contain confidential material for the sole
use of the intended recipient(s). Any review or distribution by others is
strictly prohibited. If you are not the intended recipient, please contact the
sender and delete all copies.

^ permalink raw reply	[flat|nested] 13+ messages in thread

* [dpdk-dev] [PATCH 3/7] event/opdl: add the opdl pmd main body and xstats helper function
  2017-11-24 11:23 [dpdk-dev] [RFC PATCH 0/7] RFC:EventDev OPDL PMD liang.j.ma
  2017-11-24 11:23 ` [dpdk-dev] [PATCH 1/7] event/opdl: add the opdl ring infrastructure library liang.j.ma
  2017-11-24 11:23 ` [dpdk-dev] [PATCH 2/7] event/opdl: add the opdl pmd header and init helper function liang.j.ma
@ 2017-11-24 11:23 ` liang.j.ma
  2017-11-24 11:23 ` [dpdk-dev] [PATCH 4/7] event/opdl: update the build system to enable compilation of pmd liang.j.ma
                   ` (4 subsequent siblings)
  7 siblings, 0 replies; 13+ messages in thread
From: liang.j.ma @ 2017-11-24 11:23 UTC (permalink / raw)
  To: jerin.jacob
  Cc: dev, harry.van.haaren, bruce.richardson, deepak.k.jain, john.geary

From: Liang Ma <liang.j.ma@intel.com>

This commit adds a OPDL implementation of the eventdev API. The
implementation here is intended to enable the community to use
the OPDL infrastructure under eventdev API.

The main components of the implementation is three files:
  - opdl_evdev.c              Creation, configuration, etc
  - opdl_evdev_xstats.c       helper function to support stats collection

This commit only adds the implementation, no existing DPDK files
are modified.

Signed-off-by: Liang Ma <liang.j.ma@intel.com>
Signed-off-by: Peter, Mccarthy <peter.mccarthy@intel.com>
---
 drivers/event/opdl/opdl_evdev.c        | 714 +++++++++++++++++++++++++++++++++
 drivers/event/opdl/opdl_evdev_xstats.c | 205 ++++++++++
 2 files changed, 919 insertions(+)
 create mode 100644 drivers/event/opdl/opdl_evdev.c
 create mode 100644 drivers/event/opdl/opdl_evdev_xstats.c

diff --git a/drivers/event/opdl/opdl_evdev.c b/drivers/event/opdl/opdl_evdev.c
new file mode 100644
index 0000000..be43fcf
--- /dev/null
+++ b/drivers/event/opdl/opdl_evdev.c
@@ -0,0 +1,714 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <inttypes.h>
+#include <string.h>
+
+#include <rte_bus_vdev.h>
+#include <rte_memzone.h>
+#include <rte_kvargs.h>
+#include <rte_ring.h>
+#include <rte_errno.h>
+#include <rte_event_ring.h>
+#include <rte_service_component.h>
+#include <rte_cycles.h>
+
+#include "opdl_evdev.h"
+#include "opdl_ring.h"
+
+#define EVENTDEV_NAME_OPDL_PMD event_opdl
+#define NUMA_NODE_ARG "numa_node"
+#define DO_VALIDATION_ARG "do_validation"
+
+
+
+uint16_t
+opdl_event_enqueue_burst(void *port,
+			 const struct rte_event ev[],
+			 uint16_t num)
+{
+	struct opdl_port *p = port;
+
+	if (unlikely(!p->opdl->data->dev_started))
+		return 0;
+
+
+	/* either rx_enqueue or disclaim*/
+	return p->enq(p, ev, num);
+}
+
+uint16_t
+opdl_event_enqueue(void *port, const struct rte_event *ev)
+{
+	struct opdl_port *p = port;
+
+	if (unlikely(!p->opdl->data->dev_started))
+		return 0;
+
+
+	return p->enq(p, ev, 1);
+}
+
+uint16_t
+opdl_event_dequeue_burst(void *port,
+			 struct rte_event *ev,
+			 uint16_t num,
+			 uint64_t wait)
+{
+	struct opdl_port *p = (void *)port;
+
+	RTE_SET_USED(wait);
+
+	if (unlikely(!p->opdl->data->dev_started))
+		return 0;
+
+	/* This function pointer can point to tx_dequeue or claim*/
+	return p->deq(p, ev, num);
+}
+
+uint16_t
+opdl_event_dequeue(void *port,
+		   struct rte_event *ev,
+		   uint64_t wait)
+{
+	struct opdl_port *p = (void *)port;
+
+	if (unlikely(!p->opdl->data->dev_started))
+		return 0;
+
+	RTE_SET_USED(wait);
+
+	return p->deq(p, ev, 1);
+}
+
+static void
+opdl_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info);
+
+
+static int
+opdl_port_link(struct rte_eventdev *dev,
+	       void *port,
+	       const uint8_t queues[],
+	       const uint8_t priorities[],
+	       uint16_t num)
+{
+	struct opdl_port *p = port;
+
+	RTE_SET_USED(priorities);
+	RTE_SET_USED(dev);
+
+	if (unlikely(dev->data->dev_started)) {
+		OPDL_LOG_ERR("Attempt to link queue (%u) to port %d while device started\n",
+				queues[0],
+				p->id);
+		rte_errno = -EINVAL;
+		return 0;
+	}
+
+	/* Max of 1 queue per port */
+	if (num > 1) {
+		OPDL_LOG_ERR("Attempt to link more than one queue (%u) to port %d requested\n",
+				num,
+				p->id);
+		rte_errno = -EDQUOT;
+		return 0;
+	}
+
+	if (!p->configured) {
+		OPDL_LOG_ERR("port %d not configured, cannot link to %u\n",
+				p->id,
+				queues[0]);
+		rte_errno = -EINVAL;
+		return 0;
+	}
+
+	if (p->external_qid != OPDL_INVALID_QID) {
+		OPDL_LOG_ERR("port %d already linked to queue %u, cannot link to %u\n",
+				p->id,
+				p->external_qid,
+				queues[0]);
+		rte_errno = -EINVAL;
+		return 0;
+	}
+
+	p->external_qid = queues[0];
+
+	return 1;
+}
+
+static int
+opdl_port_unlink(struct rte_eventdev *dev,
+		 void *port,
+		 uint8_t queues[],
+		 uint16_t nb_unlinks)
+{
+	struct opdl_port *p = port;
+
+	RTE_SET_USED(queues);
+	RTE_SET_USED(nb_unlinks);
+
+	if (unlikely(dev->data->dev_started)) {
+		OPDL_LOG_ERR("Attempt to unlink queue (%u) to port %d while device started\n",
+				queues[0],
+				p->id);
+		rte_errno = -EINVAL;
+		return 0;
+	}
+	RTE_SET_USED(nb_unlinks);
+
+	/* Port Stuff */
+	p->queue_id = OPDL_INVALID_QID;
+	p->p_type = OPDL_INVALID_PORT;
+	p->external_qid = OPDL_INVALID_QID;
+
+	return 1;
+}
+
+static int
+opdl_port_setup(struct rte_eventdev *dev,
+		uint8_t port_id,
+		const struct rte_event_port_conf *conf)
+{
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+	struct opdl_port *p = &device->ports[port_id];
+
+	RTE_SET_USED(conf);
+
+	/* Check if port already configured */
+	if (p->configured) {
+		OPDL_LOG_ERR("Attempt to setup port %d which is already setup\n",
+				p->id);
+		return -EDQUOT;
+	}
+
+	*p = (struct opdl_port){0}; /* zero entire structure */
+	p->id = port_id;
+	p->opdl = device;
+	p->queue_id = OPDL_INVALID_QID;
+	p->external_qid = OPDL_INVALID_QID;
+	dev->data->ports[port_id] = p;
+	rte_smp_wmb();
+	p->configured = 1;
+	device->nb_ports++;
+	return 0;
+}
+
+static void
+opdl_port_release(void *port)
+{
+	struct opdl_port *p = (void *)port;
+
+	if (p == NULL ||
+	    p->opdl->data->dev_started) {
+		return;
+	}
+
+	p->configured = 0;
+	p->initialized = 0;
+}
+
+static int
+opdl_queue_setup(struct rte_eventdev *dev,
+		 uint8_t queue_id,
+		 const struct rte_event_queue_conf *conf)
+{
+	enum queue_type type;
+
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	/* Extra sanity check, probably not needed */
+	if (queue_id == OPDL_INVALID_QID) {
+		OPDL_LOG_ERR("Invalid queue id %u requested\n",
+				queue_id);
+		return -EINVAL;
+	}
+
+	if (device->nb_q_md > device->max_queue_nb) {
+		OPDL_LOG_ERR("Max number of queues %u exceeded by request %u\n",
+			     device->max_queue_nb,
+			     device->nb_q_md);
+		return -EINVAL;
+	}
+
+	if (RTE_EVENT_QUEUE_CFG_ALL_TYPES
+	    & conf->event_queue_cfg) {
+		OPDL_LOG_ERR("QUEUE_CFG_ALL_TYPES not supported\n");
+		return -ENOTSUP;
+	} else if (RTE_EVENT_QUEUE_CFG_SINGLE_LINK
+		   & conf->event_queue_cfg) {
+		type = OPDL_Q_TYPE_SINGLE_LINK;
+	} else {
+		switch (conf->schedule_type) {
+		case RTE_SCHED_TYPE_ORDERED:
+			type = OPDL_Q_TYPE_ORDERED;
+			break;
+		case RTE_SCHED_TYPE_ATOMIC:
+			type = OPDL_Q_TYPE_ATOMIC;
+			break;
+		case RTE_SCHED_TYPE_PARALLEL:
+			type = OPDL_Q_TYPE_ORDERED;
+			break;
+		default:
+			OPDL_LOG_ERR("Unknown queue type %d requested\n",
+				     conf->event_queue_cfg);
+			return -EINVAL;
+		}
+	}
+	/* Check if queue id has been setup already */
+	for (uint32_t i = 0; i < device->nb_q_md; i++) {
+		if (device->q_md[i].ext_id == queue_id) {
+			OPDL_LOG_ERR("queue id %u already setup\n",
+					queue_id);
+			return -EINVAL;
+		}
+	}
+
+	device->q_md[device->nb_q_md].ext_id = queue_id;
+	device->q_md[device->nb_q_md].type = type;
+	device->q_md[device->nb_q_md].setup = 1;
+	device->nb_q_md++;
+
+	return 1;
+}
+
+static void
+opdl_queue_release(struct rte_eventdev *dev, uint8_t queue_id)
+{
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	RTE_SET_USED(queue_id);
+
+	if (device->data->dev_started)
+		return;
+
+}
+
+static void
+opdl_queue_def_conf(struct rte_eventdev *dev,
+		    uint8_t queue_id,
+		    struct rte_event_queue_conf *conf)
+{
+	RTE_SET_USED(dev);
+	RTE_SET_USED(queue_id);
+
+	static const struct rte_event_queue_conf default_conf = {
+		.nb_atomic_flows = 1024,
+		.nb_atomic_order_sequences = 1,
+		.event_queue_cfg = 0,
+		.schedule_type = RTE_SCHED_TYPE_ORDERED,
+		.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+	};
+
+	*conf = default_conf;
+}
+
+static void
+opdl_port_def_conf(struct rte_eventdev *dev, uint8_t port_id,
+		struct rte_event_port_conf *port_conf)
+{
+	RTE_SET_USED(dev);
+	RTE_SET_USED(port_id);
+
+	port_conf->new_event_threshold = MAX_OPDL_CONS_Q_DEPTH;
+	port_conf->dequeue_depth = MAX_OPDL_CONS_Q_DEPTH;
+	port_conf->enqueue_depth = MAX_OPDL_CONS_Q_DEPTH;
+}
+
+static int
+opdl_dev_configure(const struct rte_eventdev *dev)
+{
+	struct opdl_evdev *opdl = opdl_pmd_priv(dev);
+	const struct rte_eventdev_data *data = dev->data;
+	const struct rte_event_dev_config *conf = &data->dev_conf;
+
+	opdl->max_queue_nb = conf->nb_event_queues;
+	opdl->max_port_nb = conf->nb_event_ports;
+	opdl->nb_events_limit = conf->nb_events_limit;
+
+	if (conf->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT) {
+		OPDL_LOG_ERR("DEQUEUE_TIMEOUT not supported\n");
+		return -ENOTSUP;
+	}
+
+	return 0;
+}
+
+static void
+opdl_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info)
+{
+	RTE_SET_USED(dev);
+
+	static const struct rte_event_dev_info evdev_opdl_info = {
+		.driver_name = OPDL_PMD_NAME,
+		.max_event_queues = RTE_EVENT_MAX_QUEUES_PER_DEV,
+		.max_event_queue_flows = OPDL_QID_NUM_FIDS,
+		.max_event_queue_priority_levels = OPDL_Q_PRIORITY_MAX,
+		.max_event_priority_levels = OPDL_IQS_MAX,
+		.max_event_ports = OPDL_PORTS_MAX,
+		.max_event_port_dequeue_depth = MAX_OPDL_CONS_Q_DEPTH,
+		.max_event_port_enqueue_depth = MAX_OPDL_CONS_Q_DEPTH,
+		.max_num_events = OPDL_INFLIGHT_EVENTS_TOTAL,
+		.event_dev_cap = RTE_EVENT_DEV_CAP_BURST_MODE,
+	};
+
+	*info = evdev_opdl_info;
+}
+
+static void
+opdl_dump(struct rte_eventdev *dev, FILE *f)
+{
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	if (!device->do_validation)
+		return;
+
+	fprintf(f,
+		"\n\n -- RING STATISTICS --\n");
+
+	for (uint32_t i = 0; i < device->nb_turbines; i++)
+		opdl_ring_dump(device->turbine[i], f);
+
+	fprintf(f,
+		"\n\n -- PORT STATISTICS --\n"
+		"Type Port Index  Port Id  Queue Id     Av. Req Size  "
+		"Av. Grant Size     Av. Cycles PP"
+		"      Empty DEQs   Non Empty DEQs   Pkts Processed\n");
+
+	for (uint32_t i = 0; i < device->max_port_nb; i++) {
+		char queue_id[64];
+		char total_cyc[64];
+		const char *p_type;
+
+		uint64_t cne, cpg;
+		struct opdl_port *port = &device->ports[i];
+
+		if (port->initialized) {
+			cne = port->port_stat[claim_non_empty];
+			cpg = port->port_stat[claim_pkts_granted];
+			if (port->p_type == OPDL_REGULAR_PORT)
+				p_type = "REG";
+			else if (port->p_type == OPDL_PURE_RX_PORT)
+				p_type = "  RX";
+			else if (port->p_type == OPDL_PURE_TX_PORT)
+				p_type = "  TX";
+			else if (port->p_type == OPDL_ASYNC_PORT)
+				p_type = "SYNC";
+			else
+				p_type = "????";
+
+			sprintf(queue_id, "%02u", port->external_qid);
+			if (port->p_type == OPDL_REGULAR_PORT ||
+					port->p_type == OPDL_ASYNC_PORT)
+				sprintf(total_cyc,
+					" %'16"PRIu64"",
+					(cpg != 0 ?
+					 port->port_stat[total_cycles] / cpg
+					 : 0));
+			else
+				sprintf(total_cyc,
+					"             ----");
+			fprintf(f,
+				"%4s %10u %8u %9s %'16"PRIu64" %'16"PRIu64" %s "
+				"%'16"PRIu64" %'16"PRIu64" %'16"PRIu64"\n",
+				p_type,
+				i,
+				port->id,
+				(port->external_qid == OPDL_INVALID_QID ? "---"
+				 : queue_id),
+				(cne != 0 ?
+				 port->port_stat[claim_pkts_requested] / cne
+				 : 0),
+				(cne != 0 ?
+				 port->port_stat[claim_pkts_granted] / cne
+				 : 0),
+				total_cyc,
+				port->port_stat[claim_empty],
+				port->port_stat[claim_non_empty],
+				port->port_stat[claim_pkts_granted]);
+		}
+	}
+	fprintf(f, "\n");
+}
+
+
+static void
+opdl_stop(struct rte_eventdev *dev)
+{
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	opdl_xstats_uninit(dev);
+
+	destroy_queues_and_rings(dev);
+
+
+	device->started = 0;
+
+	rte_smp_wmb();
+}
+
+static int
+opdl_start(struct rte_eventdev *dev)
+{
+	int err = 0;
+
+	if (!err)
+		err = create_queues_and_rings(dev);
+
+
+	if (!err)
+		err = assign_internal_queue_ids(dev);
+
+
+	if (!err)
+		err = initialise_queue_zero_ports(dev);
+
+
+	if (!err)
+		err = initialise_all_other_ports(dev);
+
+
+	if (!err)
+		err = check_queues_linked(dev);
+
+
+	if (!err)
+		err = opdl_add_event_handlers(dev);
+
+
+	if (!err)
+		err = build_all_dependencies(dev);
+
+	if (!err)
+		opdl_xstats_init(dev);
+	else
+		opdl_stop(dev);
+
+	return err;
+}
+
+static int
+opdl_close(struct rte_eventdev *dev)
+{
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+	uint32_t i;
+
+	for (i = 0; i < device->max_port_nb; i++) {
+		memset(&device->ports[i],
+		       0,
+		       sizeof(struct opdl_port));
+	}
+
+	memset(&device->s_md,
+			0x0,
+			sizeof(struct opdl_stage_meta_data)*OPDL_PORTS_MAX);
+
+	memset(&device->q_md,
+			0xFF,
+			sizeof(struct opdl_queue_meta_data)*OPDL_MAX_QUEUES);
+
+
+	memset(device->q_map_ex_to_in,
+			0,
+			sizeof(uint8_t)*OPDL_INVALID_QID);
+
+	opdl_xstats_uninit(dev);
+
+	device->max_port_nb = 0;
+
+	device->max_queue_nb = 0;
+
+	device->nb_turbines = 0;
+
+	device->nb_queues   = 0;
+
+	device->nb_ports    = 0;
+
+	device->nb_q_md     = 0;
+
+	dev->data->nb_queues = 0;
+
+	dev->data->nb_ports = 0;
+
+
+	return 0;
+}
+
+static int
+assign_numa_node(const char *key __rte_unused, const char *value, void *opaque)
+{
+	int *socket_id = opaque;
+	*socket_id = atoi(value);
+	if (*socket_id >= RTE_MAX_NUMA_NODES)
+		return -1;
+	return 0;
+}
+
+static int
+set_do_validation(const char *key __rte_unused, const char *value, void *opaque)
+{
+	int *do_val = opaque;
+	*do_val = atoi(value);
+	if (*do_val != 0)
+		*do_val = 1;
+
+	return 0;
+}
+
+static int
+opdl_probe(struct rte_vdev_device *vdev)
+{
+	static const struct rte_eventdev_ops evdev_opdl_ops = {
+		.dev_configure = opdl_dev_configure,
+		.dev_infos_get = opdl_info_get,
+		.dev_close = opdl_close,
+		.dev_start = opdl_start,
+		.dev_stop = opdl_stop,
+		.dump = opdl_dump,
+
+		.queue_def_conf = opdl_queue_def_conf,
+		.queue_setup = opdl_queue_setup,
+		.queue_release = opdl_queue_release,
+		.port_def_conf = opdl_port_def_conf,
+		.port_setup = opdl_port_setup,
+		.port_release = opdl_port_release,
+		.port_link = opdl_port_link,
+		.port_unlink = opdl_port_unlink,
+
+		.xstats_get = opdl_xstats_get,
+		.xstats_get_names = opdl_xstats_get_names,
+		.xstats_get_by_name = opdl_xstats_get_by_name,
+		.xstats_reset = opdl_xstats_reset,
+	};
+
+	static const char *const args[] = {
+		NUMA_NODE_ARG,
+		DO_VALIDATION_ARG,
+		NULL
+	};
+	const char *name;
+	const char *params;
+	struct rte_eventdev *dev;
+	struct opdl_evdev *opdl;
+	int socket_id = rte_socket_id();
+	int do_validation = 0;
+	int str_len;
+
+	name = rte_vdev_device_name(vdev);
+	params = rte_vdev_device_args(vdev);
+	if (params != NULL && params[0] != '\0') {
+		struct rte_kvargs *kvlist = rte_kvargs_parse(params, args);
+
+		if (!kvlist) {
+			OPDL_LOG_INFO(
+					"Ignoring unsupported parameters when creating device '%s'\n",
+					name);
+		} else {
+			int ret = rte_kvargs_process(kvlist, NUMA_NODE_ARG,
+					assign_numa_node, &socket_id);
+			if (ret != 0) {
+				OPDL_LOG_ERR(
+						"%s: Error parsing numa node parameter",
+						name);
+
+				rte_kvargs_free(kvlist);
+				return ret;
+			}
+
+			ret = rte_kvargs_process(kvlist, DO_VALIDATION_ARG,
+					set_do_validation, &do_validation);
+			if (ret != 0) {
+				OPDL_LOG_ERR(
+					"%s: Error parsing do validation parameter",
+					name);
+				rte_kvargs_free(kvlist);
+				return ret;
+			}
+
+			rte_kvargs_free(kvlist);
+		}
+	}
+
+	OPDL_LOG_INFO("\tSuccess - creating eventdev device %s, numa_node:[%d], do_valdation:[%s]",
+		      name,
+		      socket_id,
+		      (do_validation ? "true" : "false"));
+
+	dev = rte_event_pmd_vdev_init(name,
+			sizeof(struct opdl_evdev), socket_id);
+
+	if (dev == NULL) {
+		OPDL_LOG_ERR("eventdev vdev init() failed");
+		return -EFAULT;
+	}
+	dev->dev_ops = &evdev_opdl_ops;
+	dev->enqueue = opdl_event_enqueue;
+	dev->enqueue_burst = opdl_event_enqueue_burst;
+	dev->enqueue_new_burst = opdl_event_enqueue_burst;
+	dev->enqueue_forward_burst = opdl_event_enqueue_burst;
+	dev->dequeue = opdl_event_dequeue;
+	dev->dequeue_burst = opdl_event_dequeue_burst;
+
+	if (rte_eal_process_type() != RTE_PROC_PRIMARY)
+		return 0;
+
+	opdl = dev->data->dev_private;
+	opdl->data = dev->data;
+	opdl->socket = socket_id;
+	opdl->do_validation = do_validation;
+	str_len = strlen(name);
+	memcpy(opdl->service_name, name, str_len);
+
+	return 0;
+}
+
+static int
+opdl_remove(struct rte_vdev_device *vdev)
+{
+	const char *name;
+
+	name = rte_vdev_device_name(vdev);
+	if (name == NULL)
+		return -EINVAL;
+
+	OPDL_LOG_INFO("Closing eventdev opdl device %s\n", name);
+
+	return rte_event_pmd_vdev_uninit(name);
+}
+
+static struct rte_vdev_driver evdev_opdl_pmd_drv = {
+	.probe = opdl_probe,
+	.remove = opdl_remove
+};
+
+RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_OPDL_PMD, evdev_opdl_pmd_drv);
+RTE_PMD_REGISTER_PARAM_STRING(event_opdl, NUMA_NODE_ARG "=<int> "
+			      DO_VALIDATION_ARG "=<int> ");
diff --git a/drivers/event/opdl/opdl_evdev_xstats.c b/drivers/event/opdl/opdl_evdev_xstats.c
new file mode 100644
index 0000000..a2abc76
--- /dev/null
+++ b/drivers/event/opdl/opdl_evdev_xstats.c
@@ -0,0 +1,205 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <rte_event_ring.h>
+#include "opdl_evdev.h"
+
+static const char * const port_xstat_str[] = {
+
+	"claim_pkts_requested",
+	"claim_pkts_granted",
+	"claim_non_empty",
+	"claim_empty",
+	"total_cycles",
+};
+
+
+void
+opdl_xstats_init(struct rte_eventdev *dev)
+{
+	uint32_t i, j;
+
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	if (!device->do_validation)
+		return;
+
+	for (i = 0; i < device->max_port_nb; i++) {
+		struct opdl_port *port = &device->ports[i];
+
+		for (j = 0; j < max_num_port_xstat; j++) {
+			uint32_t index = (i * max_num_port_xstat) + j;
+
+			/* Name */
+			sprintf(device->port_xstat[index].stat.name,
+			       "port_%02u_%s",
+			       i,
+			       port_xstat_str[j]);
+
+			/* ID */
+			device->port_xstat[index].id = index;
+
+			/* Stats ptr */
+			device->port_xstat[index].value = &port->port_stat[j];
+		}
+	}
+}
+
+int
+opdl_xstats_uninit(struct rte_eventdev *dev)
+{
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	if (!device->do_validation)
+		return 0;
+
+	memset(device->port_xstat,
+	       0,
+	       sizeof(device->port_xstat));
+
+	return 0;
+}
+
+int
+opdl_xstats_get_names(const struct rte_eventdev *dev,
+		enum rte_event_dev_xstats_mode mode,
+		uint8_t queue_port_id,
+		struct rte_event_dev_xstats_name *xstats_names,
+		unsigned int *ids, unsigned int size)
+{
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	if (!device->do_validation)
+		return -ENOTSUP;
+
+	if (mode == RTE_EVENT_DEV_XSTATS_DEVICE ||
+			mode == RTE_EVENT_DEV_XSTATS_QUEUE)
+		return -EINVAL;
+
+	if (queue_port_id >= device->max_port_nb)
+		return -EINVAL;
+
+	if (size < max_num_port_xstat)
+		return max_num_port_xstat;
+
+	uint32_t port_idx = queue_port_id * max_num_port_xstat;
+
+	for (uint32_t j = 0; j < max_num_port_xstat; j++) {
+
+		strcpy(xstats_names[j].name,
+				device->port_xstat[j + port_idx].stat.name);
+		ids[j] = device->port_xstat[j + port_idx].id;
+	}
+
+	return max_num_port_xstat;
+}
+
+int
+opdl_xstats_get(const struct rte_eventdev *dev,
+		enum rte_event_dev_xstats_mode mode,
+		uint8_t queue_port_id,
+		const unsigned int ids[],
+		uint64_t values[], unsigned int n)
+{
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	if (!device->do_validation)
+		return -ENOTSUP;
+
+	if (mode == RTE_EVENT_DEV_XSTATS_DEVICE ||
+			mode == RTE_EVENT_DEV_XSTATS_QUEUE)
+		return -EINVAL;
+
+	if (queue_port_id >= device->max_port_nb)
+		return -EINVAL;
+
+	if (n > max_num_port_xstat)
+		return -EINVAL;
+
+	uint32_t p_start = queue_port_id * max_num_port_xstat;
+	uint32_t p_finish = p_start + max_num_port_xstat;
+
+	for (uint32_t i = 0; i < n; i++) {
+		if (ids[i] < p_start || ids[i] >= p_finish)
+			return -EINVAL;
+
+		values[i] = *(device->port_xstat[ids[i]].value);
+	}
+
+	return n;
+}
+
+uint64_t
+opdl_xstats_get_by_name(const struct rte_eventdev *dev,
+		const char *name, unsigned int *id)
+{
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	if (!device->do_validation)
+		return -ENOTSUP;
+
+	uint32_t max_index = device->max_port_nb * max_num_port_xstat;
+
+	for (uint32_t i = 0; i < max_index; i++) {
+
+		if (strncmp(name,
+			   device->port_xstat[i].stat.name,
+			   RTE_EVENT_DEV_XSTATS_NAME_SIZE) == 0) {
+			if (id != NULL)
+				*id = i;
+			if (device->port_xstat[i].value)
+				return *(device->port_xstat[i].value);
+			break;
+		}
+	}
+	return -EINVAL;
+}
+
+int
+opdl_xstats_reset(struct rte_eventdev *dev,
+		enum rte_event_dev_xstats_mode mode,
+		int16_t queue_port_id, const uint32_t ids[],
+		uint32_t nb_ids)
+{
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	if (!device->do_validation)
+		return -ENOTSUP;
+
+	RTE_SET_USED(dev);
+	RTE_SET_USED(mode);
+	RTE_SET_USED(queue_port_id);
+	RTE_SET_USED(ids);
+	RTE_SET_USED(nb_ids);
+
+	return -ENOTSUP;
+}
-- 
2.7.5

--------------------------------------------------------------
Intel Research and Development Ireland Limited
Registered in Ireland
Registered Office: Collinstown Industrial Park, Leixlip, County Kildare
Registered Number: 308263


This e-mail and any attachments may contain confidential material for the sole
use of the intended recipient(s). Any review or distribution by others is
strictly prohibited. If you are not the intended recipient, please contact the
sender and delete all copies.

^ permalink raw reply	[flat|nested] 13+ messages in thread

* [dpdk-dev] [PATCH 4/7] event/opdl: update the build system to enable compilation of pmd
  2017-11-24 11:23 [dpdk-dev] [RFC PATCH 0/7] RFC:EventDev OPDL PMD liang.j.ma
                   ` (2 preceding siblings ...)
  2017-11-24 11:23 ` [dpdk-dev] [PATCH 3/7] event/opdl: add the opdl pmd main body and xstats " liang.j.ma
@ 2017-11-24 11:23 ` liang.j.ma
  2017-11-24 11:23 ` [dpdk-dev] [PATCH 5/7] test/eventdev: opdl eventdev pmd unit test func and makefiles liang.j.ma
                   ` (3 subsequent siblings)
  7 siblings, 0 replies; 13+ messages in thread
From: liang.j.ma @ 2017-11-24 11:23 UTC (permalink / raw)
  To: jerin.jacob
  Cc: dev, harry.van.haaren, bruce.richardson, deepak.k.jain, john.geary

From: Liang Ma <liang.j.ma@intel.com>

update the base config, add OPDL event dev flag
update the driver/event  Makefile to add opdl subdir
update the rte.app.mk    allow app link the pmd lib

Signed-off-by: Liang Ma <liang.j.ma@intel.com>
Signed-off-by: Peter, Mccarthy <peter.mccarthy@intel.com>
---
 config/common_base     | 6 ++++++
 drivers/event/Makefile | 1 +
 mk/rte.app.mk          | 1 +
 3 files changed, 8 insertions(+)

diff --git a/config/common_base b/config/common_base
index e74febe..67adaba 100644
--- a/config/common_base
+++ b/config/common_base
@@ -594,6 +594,12 @@ CONFIG_RTE_LIBRTE_PMD_OCTEONTX_SSOVF=y
 CONFIG_RTE_LIBRTE_PMD_OCTEONTX_SSOVF_DEBUG=n
 
 #
+# Compile PMD for OPDL event device
+#
+CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV=y
+CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV_DEBUG=n
+
+#
 # Compile librte_ring
 #
 CONFIG_RTE_LIBRTE_RING=y
diff --git a/drivers/event/Makefile b/drivers/event/Makefile
index 1f9c0ba..d626666 100644
--- a/drivers/event/Makefile
+++ b/drivers/event/Makefile
@@ -35,5 +35,6 @@ DIRS-$(CONFIG_RTE_LIBRTE_PMD_SKELETON_EVENTDEV) += skeleton
 DIRS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw
 DIRS-$(CONFIG_RTE_LIBRTE_PMD_OCTEONTX_SSOVF) += octeontx
 DIRS-$(CONFIG_RTE_LIBRTE_PMD_DPAA2_EVENTDEV) += dpaa2
+DIRS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl
 
 include $(RTE_SDK)/mk/rte.subdir.mk
diff --git a/mk/rte.app.mk b/mk/rte.app.mk
index 6a6a745..a55a21d 100644
--- a/mk/rte.app.mk
+++ b/mk/rte.app.mk
@@ -200,6 +200,7 @@ _LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_OCTEONTX_SSOVF) += -lrte_pmd_octeontx_ssovf
 _LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_DPAA2_EVENTDEV) += -lrte_pmd_dpaa2_event
 _LDLIBS-$(CONFIG_RTE_LIBRTE_OCTEONTX_MEMPOOL) += -lrte_mempool_octeontx
 _LDLIBS-$(CONFIG_RTE_LIBRTE_OCTEONTX_PMD) += -lrte_pmd_octeontx
+_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += -lrte_pmd_opdl_event
 endif # CONFIG_RTE_LIBRTE_EVENTDEV
 
 ifeq ($(CONFIG_RTE_LIBRTE_DPAA2_PMD),y)
-- 
2.7.5

--------------------------------------------------------------
Intel Research and Development Ireland Limited
Registered in Ireland
Registered Office: Collinstown Industrial Park, Leixlip, County Kildare
Registered Number: 308263


This e-mail and any attachments may contain confidential material for the sole
use of the intended recipient(s). Any review or distribution by others is
strictly prohibited. If you are not the intended recipient, please contact the
sender and delete all copies.

^ permalink raw reply	[flat|nested] 13+ messages in thread

* [dpdk-dev] [PATCH 5/7] test/eventdev: opdl eventdev pmd unit test func and makefiles
  2017-11-24 11:23 [dpdk-dev] [RFC PATCH 0/7] RFC:EventDev OPDL PMD liang.j.ma
                   ` (3 preceding siblings ...)
  2017-11-24 11:23 ` [dpdk-dev] [PATCH 4/7] event/opdl: update the build system to enable compilation of pmd liang.j.ma
@ 2017-11-24 11:23 ` liang.j.ma
  2017-11-24 11:23 ` [dpdk-dev] [PATCH 6/7] examples/eventdev_pipeline_opdl: adding example liang.j.ma
                   ` (2 subsequent siblings)
  7 siblings, 0 replies; 13+ messages in thread
From: liang.j.ma @ 2017-11-24 11:23 UTC (permalink / raw)
  To: jerin.jacob
  Cc: dev, harry.van.haaren, bruce.richardson, deepak.k.jain, john.geary

From: Liang Ma <liang.j.ma@intel.com>

This adds the minimal changes to allow a OPDL eventdev implementation to
be compiled, linked and created at run time. Opdl eventdev does nothing,
but can be created via vdev on commandline, e.g.

  sudo ./x86_64-native-linuxapp-gcc/app/test --vdev=event_opdl0
  ...
  PMD: Creating eventdev opdl device event_opdl0
  RTE>> eventdev_opdl_autotest

Signed-off-by: Liang Ma <liang.j.ma@intel.com>
Signed-off-by: Peter, Mccarthy <peter.mccarthy@intel.com>
---
 test/test/Makefile             |    1 +
 test/test/test_eventdev_opdl.c | 1089 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 1090 insertions(+)
 create mode 100644 test/test/test_eventdev_opdl.c

diff --git a/test/test/Makefile b/test/test/Makefile
index bb54c98..4d385ce 100644
--- a/test/test/Makefile
+++ b/test/test/Makefile
@@ -212,6 +212,7 @@ SRCS-y += test_event_ring.c
 SRCS-y += test_event_eth_rx_adapter.c
 SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += test_eventdev_sw.c
 SRCS-$(CONFIG_RTE_LIBRTE_PMD_OCTEONTX_SSOVF) += test_eventdev_octeontx.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += test_eventdev_opdl.c
 endif
 
 SRCS-$(CONFIG_RTE_LIBRTE_KVARGS) += test_kvargs.c
diff --git a/test/test/test_eventdev_opdl.c b/test/test/test_eventdev_opdl.c
new file mode 100644
index 0000000..00b1e23
--- /dev/null
+++ b/test/test/test_eventdev_opdl.c
@@ -0,0 +1,1089 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <stdint.h>
+#include <errno.h>
+#include <unistd.h>
+#include <sys/queue.h>
+
+#include <rte_memory.h>
+#include <rte_memzone.h>
+#include <rte_launch.h>
+#include <rte_eal.h>
+#include <rte_per_lcore.h>
+#include <rte_lcore.h>
+#include <rte_debug.h>
+#include <rte_ethdev.h>
+#include <rte_cycles.h>
+#include <rte_eventdev.h>
+#include <rte_bus_vdev.h>
+#include <rte_pause.h>
+
+#include "test.h"
+
+#define MAX_PORTS 16
+#define MAX_QIDS 16
+#define NUM_PACKETS (1<<18)
+#define NUM_EVENTS 256
+#define BURST_SIZE 32
+
+
+
+static int evdev;
+
+struct test {
+	struct rte_mempool *mbuf_pool;
+	uint8_t port[MAX_PORTS];
+	uint8_t qid[MAX_QIDS];
+	int nb_qids;
+};
+
+enum queue_type {
+	OPDL_Q_TYPE_INVALID = 0,
+	OPDL_Q_TYPE_SINGLE_LINK = 1,
+	OPDL_Q_TYPE_ATOMIC,
+	OPDL_Q_TYPE_ORDERED
+};
+
+
+static struct rte_mempool *eventdev_func_mempool;
+
+static inline struct rte_mbuf *
+rte_gen_arp(int portid, struct rte_mempool *mp)
+{
+	/*
+	 * len = 14 + 46
+	 * ARP, Request who-has 10.0.0.1 tell 10.0.0.2, length 46
+	 */
+	static const uint8_t arp_request[] = {
+		/*0x0000:*/ 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xec, 0xa8,
+		0x6b, 0xfd, 0x02, 0x29, 0x08, 0x06, 0x00, 0x01,
+		/*0x0010:*/ 0x08, 0x00, 0x06, 0x04, 0x00, 0x01, 0xec, 0xa8,
+		0x6b, 0xfd, 0x02, 0x29, 0x0a, 0x00, 0x00, 0x01,
+		/*0x0020:*/ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00,
+		0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+		/*0x0030:*/ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x00
+	};
+	struct rte_mbuf *m;
+	int pkt_len = sizeof(arp_request) - 1;
+
+	m = rte_pktmbuf_alloc(mp);
+	if (!m)
+		return 0;
+
+	memcpy((void *)((uintptr_t)m->buf_addr + m->data_off),
+		arp_request, pkt_len);
+	rte_pktmbuf_pkt_len(m) = pkt_len;
+	rte_pktmbuf_data_len(m) = pkt_len;
+
+	RTE_SET_USED(portid);
+
+	return m;
+}
+
+/* initialization and config */
+static inline int
+init(struct test *t, int nb_queues, int nb_ports)
+{
+	struct rte_event_dev_config config = {
+			.nb_event_queues = nb_queues,
+			.nb_event_ports = nb_ports,
+			.nb_event_queue_flows = 1024,
+			.nb_events_limit = 4096,
+			.nb_event_port_dequeue_depth = 128,
+			.nb_event_port_enqueue_depth = 128,
+	};
+	int ret;
+
+	void *temp = t->mbuf_pool; /* save and restore mbuf pool */
+
+	memset(t, 0, sizeof(*t));
+	t->mbuf_pool = temp;
+
+	ret = rte_event_dev_configure(evdev, &config);
+	if (ret < 0)
+		printf("%d: Error configuring device\n", __LINE__);
+	return ret;
+};
+
+static inline int
+create_ports(struct test *t, int num_ports)
+{
+	int i;
+	static const struct rte_event_port_conf conf = {
+			.new_event_threshold = 1024,
+			.dequeue_depth = 32,
+			.enqueue_depth = 32,
+	};
+	if (num_ports > MAX_PORTS)
+		return -1;
+
+	for (i = 0; i < num_ports; i++) {
+		if (rte_event_port_setup(evdev, i, &conf) < 0) {
+			printf("Error setting up port %d\n", i);
+			return -1;
+		}
+		t->port[i] = i;
+	}
+
+	return 0;
+};
+
+static inline int
+create_queues_type(struct test *t, int num_qids, enum queue_type flags)
+{
+	int i;
+	uint8_t type;
+
+	switch (flags) {
+	case OPDL_Q_TYPE_ORDERED:
+		type = RTE_SCHED_TYPE_ORDERED;
+		break;
+	case OPDL_Q_TYPE_ATOMIC:
+		type = RTE_SCHED_TYPE_ATOMIC;
+		break;
+	default:
+		type = 0;
+	}
+
+	/* Q creation */
+	const struct rte_event_queue_conf conf = {
+		.event_queue_cfg =
+		(flags == OPDL_Q_TYPE_SINGLE_LINK ?
+		 RTE_EVENT_QUEUE_CFG_SINGLE_LINK : 0),
+		.schedule_type = type,
+		.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+		.nb_atomic_flows = 1024,
+		.nb_atomic_order_sequences = 1024,
+	};
+
+	for (i = t->nb_qids ; i < t->nb_qids + num_qids; i++) {
+		if (rte_event_queue_setup(evdev, i, &conf) < 0) {
+			printf("%d: error creating qid %d\n ", __LINE__, i);
+			return -1;
+		}
+		t->qid[i] = i;
+	}
+
+	t->nb_qids += num_qids;
+
+	if (t->nb_qids > MAX_QIDS)
+		return -1;
+
+	return 0;
+}
+
+
+/* destruction */
+static inline int
+cleanup(struct test *t __rte_unused)
+{
+	rte_event_dev_stop(evdev);
+	rte_event_dev_close(evdev);
+	printf("cleanup for test done\n\n");
+	return 0;
+};
+
+static int
+ordered_basic(struct test *t)
+{
+	const uint8_t rx_port = 0;
+	const uint8_t w1_port = 1;
+	const uint8_t w3_port = 3;
+	const uint8_t tx_port = 4;
+	int err;
+	uint32_t i;
+	uint32_t deq_pkts;
+	struct rte_mbuf *mbufs[3];
+
+	const uint32_t MAGIC_SEQN = 1234;
+
+	/* Create instance with 5 ports */
+	if (init(t, 2, tx_port+1) < 0 ||
+	    create_ports(t, tx_port+1) < 0 ||
+	    create_queues_type(t, 2, OPDL_Q_TYPE_ORDERED)) {
+		printf("%d: Error initializing device\n", __LINE__);
+		return -1;
+	}
+
+	/*
+	 * CQ mapping to QID
+	 * We need three ports, all mapped to the same ordered qid0. Then we'll
+	 * take a packet out to each port, re-enqueue in reverse order,
+	 * then make sure the reordering has taken place properly when we
+	 * dequeue from the tx_port.
+	 *
+	 * Simplified test setup diagram:
+	 *
+	 * rx_port        w1_port
+	 *        \     /         \
+	 *         qid0 - w2_port - qid1
+	 *              \         /     \
+	 *                w3_port        tx_port
+	 */
+	/* CQ mapping to QID for LB ports (directed mapped on create) */
+	for (i = w1_port; i <= w3_port; i++) {
+		err = rte_event_port_link(evdev, t->port[i], &t->qid[0], NULL,
+				1);
+		if (err != 1) {
+			printf("%d: error mapping lb qid\n", __LINE__);
+			cleanup(t);
+			return -1;
+		}
+	}
+
+	err = rte_event_port_link(evdev, t->port[tx_port], &t->qid[1], NULL,
+			1);
+	if (err != 1) {
+		printf("%d: error mapping TX  qid\n", __LINE__);
+		cleanup(t);
+		return -1;
+	}
+
+	if (rte_event_dev_start(evdev) < 0) {
+		printf("%d: Error with start call\n", __LINE__);
+		return -1;
+	}
+	/* Enqueue 3 packets to the rx port */
+	for (i = 0; i < 3; i++) {
+		struct rte_event ev;
+		mbufs[i] = rte_gen_arp(0, t->mbuf_pool);
+		if (!mbufs[i]) {
+			printf("%d: gen of pkt failed\n", __LINE__);
+			return -1;
+		}
+
+		ev.queue_id = t->qid[0];
+		ev.op = RTE_EVENT_OP_NEW;
+		ev.mbuf = mbufs[i];
+		mbufs[i]->seqn = MAGIC_SEQN + i;
+
+		/* generate pkt and enqueue */
+		err = rte_event_enqueue_burst(evdev, t->port[rx_port], &ev, 1);
+		if (err != 1) {
+			printf("%d: Failed to enqueue pkt %u, retval = %u\n",
+					__LINE__, i, err);
+			return -1;
+		}
+	}
+
+	/* use extra slot to make logic in loops easier */
+	struct rte_event deq_ev[w3_port + 1];
+
+	uint32_t  seq  = 0;
+
+	/* Dequeue the 3 packets, one from each worker port */
+	for (i = w1_port; i <= w3_port; i++) {
+		deq_pkts = rte_event_dequeue_burst(evdev, t->port[i],
+				&deq_ev[i], 1, 0);
+		if (deq_pkts != 1) {
+			printf("%d: Failed to deq\n", __LINE__);
+			rte_event_dev_dump(evdev, stdout);
+			return -1;
+		}
+		seq = deq_ev[i].mbuf->seqn  - MAGIC_SEQN;
+
+		if (seq != (i-1)) {
+			printf(" seq test failed ! eq is %d , "
+					"port number is %u\n", seq, i);
+			return -1;
+		}
+	}
+
+	/* Enqueue each packet in reverse order, flushing after each one */
+	for (i = w3_port; i >= w1_port; i--) {
+
+		deq_ev[i].op = RTE_EVENT_OP_FORWARD;
+		deq_ev[i].queue_id = t->qid[1];
+		err = rte_event_enqueue_burst(evdev, t->port[i], &deq_ev[i], 1);
+		if (err != 1) {
+			printf("%d: Failed to enqueue\n", __LINE__);
+			return -1;
+		}
+	}
+
+	/* dequeue from the tx ports, we should get 3 packets */
+	deq_pkts = rte_event_dequeue_burst(evdev, t->port[tx_port], deq_ev,
+			3, 0);
+
+	/* Check to see if we've got all 3 packets */
+	if (deq_pkts != 3) {
+		printf("%d: expected 3 pkts at tx port got %d from port %d\n",
+			__LINE__, deq_pkts, tx_port);
+		rte_event_dev_dump(evdev, stdout);
+		return 1;
+	}
+
+	/* Destroy the instance */
+	cleanup(t);
+
+	return 0;
+}
+
+
+static int
+atomic_basic(struct test *t)
+{
+	const uint8_t rx_port = 0;
+	const uint8_t w1_port = 1;
+	const uint8_t w3_port = 3;
+	const uint8_t tx_port = 4;
+	int err;
+	int i;
+	uint32_t deq_pkts;
+	struct rte_mbuf *mbufs[3];
+	const uint32_t MAGIC_SEQN = 1234;
+
+	/* Create instance with 5 ports */
+	if (init(t, 2, tx_port+1) < 0 ||
+	    create_ports(t, tx_port+1) < 0 ||
+	    create_queues_type(t, 2, OPDL_Q_TYPE_ATOMIC)) {
+		printf("%d: Error initializing device\n", __LINE__);
+		return -1;
+	}
+
+
+	/*
+	 * CQ mapping to QID
+	 * We need three ports, all mapped to the same ordered qid0. Then we'll
+	 * take a packet out to each port, re-enqueue in reverse order,
+	 * then make sure the reordering has taken place properly when we
+	 * dequeue from the tx_port.
+	 *
+	 * Simplified test setup diagram:
+	 *
+	 * rx_port        w1_port
+	 *        \     /         \
+	 *         qid0 - w2_port - qid1
+	 *              \         /     \
+	 *                w3_port        tx_port
+	 */
+	/* CQ mapping to QID for Atomic  ports (directed mapped on create) */
+	for (i = w1_port; i <= w3_port; i++) {
+		err = rte_event_port_link(evdev, t->port[i], &t->qid[0], NULL,
+				1);
+		if (err != 1) {
+			printf("%d: error mapping lb qid\n", __LINE__);
+			cleanup(t);
+			return -1;
+		}
+	}
+
+	err = rte_event_port_link(evdev, t->port[tx_port], &t->qid[1], NULL,
+			1);
+	if (err != 1) {
+		printf("%d: error mapping TX  qid\n", __LINE__);
+		cleanup(t);
+		return -1;
+	}
+
+	if (rte_event_dev_start(evdev) < 0) {
+		printf("%d: Error with start call\n", __LINE__);
+		return -1;
+	}
+
+	/* Enqueue 3 packets to the rx port */
+	for (i = 0; i < 3; i++) {
+		struct rte_event ev;
+		mbufs[i] = rte_gen_arp(0, t->mbuf_pool);
+		if (!mbufs[i]) {
+			printf("%d: gen of pkt failed\n", __LINE__);
+			return -1;
+		}
+
+		ev.queue_id = t->qid[0];
+		ev.op = RTE_EVENT_OP_NEW;
+		ev.flow_id = 1;
+		ev.mbuf = mbufs[i];
+		mbufs[i]->seqn = MAGIC_SEQN + i;
+
+		/* generate pkt and enqueue */
+		err = rte_event_enqueue_burst(evdev, t->port[rx_port], &ev, 1);
+		if (err != 1) {
+			printf("%d: Failed to enqueue pkt %u, retval = %u\n",
+					__LINE__, i, err);
+			return -1;
+		}
+	}
+
+	/* use extra slot to make logic in loops easier */
+	struct rte_event deq_ev[w3_port + 1];
+
+	/* Dequeue the 3 packets, one from each worker port */
+	for (i = w1_port; i <= w3_port; i++) {
+
+		deq_pkts = rte_event_dequeue_burst(evdev, t->port[i],
+				deq_ev, 3, 0);
+
+		if (t->port[i] != 2) {
+			if (deq_pkts != 0) {
+				printf("%d: deq none zero !\n", __LINE__);
+				rte_event_dev_dump(evdev, stdout);
+				return -1;
+			}
+		} else {
+
+			if (deq_pkts != 3) {
+				printf("%d: deq not eqal to 3 %u !\n",
+						__LINE__, deq_pkts);
+				rte_event_dev_dump(evdev, stdout);
+				return -1;
+			}
+
+			for (int j = 0; j < 3; j++) {
+				deq_ev[j].op = RTE_EVENT_OP_FORWARD;
+				deq_ev[j].queue_id = t->qid[1];
+			}
+
+			err = rte_event_enqueue_burst(evdev, t->port[i],
+					deq_ev, 3);
+
+			if (err != 3) {
+				printf("port %d: Failed to enqueue pkt %u, "
+						"retval = %u\n",
+						t->port[i], 3, err);
+				return -1;
+			}
+
+		}
+
+	}
+
+
+	/* dequeue from the tx ports, we should get 3 packets */
+	deq_pkts = rte_event_dequeue_burst(evdev, t->port[tx_port], deq_ev,
+			3, 0);
+
+	/* Check to see if we've got all 3 packets */
+	if (deq_pkts != 3) {
+		printf("%d: expected 3 pkts at tx port got %d from port %d\n",
+			__LINE__, deq_pkts, tx_port);
+		rte_event_dev_dump(evdev, stdout);
+		return 1;
+	}
+
+	cleanup(t);
+
+	return 0;
+}
+static inline int
+check_qid_stats(uint32_t id[], int index)
+{
+
+	if (index == 0) {
+		if (id[0] != 3 || id[1] != 3
+				|| id[2] != 3)
+			return -1;
+	} else if (index == 1) {
+		if (id[0] != 5 || id[1] != 5
+				|| id[2] != 2)
+			return -1;
+	} else if (index == 2) {
+		if (id[0] != 3 || id[1] != 1
+				|| id[2] != 1)
+			return -1;
+	}
+
+	return 0;
+}
+
+
+static int
+check_statistics(void)
+{
+	int num_ports = 3; /* Hard-coded for this app */
+
+	for (int i = 0; i < num_ports; i++) {
+		int num_stats, num_stats_returned;
+
+		num_stats = rte_event_dev_xstats_names_get(0,
+				RTE_EVENT_DEV_XSTATS_PORT,
+				i,
+				NULL,
+				NULL,
+				0);
+		if (num_stats > 0) {
+
+			uint32_t id[num_stats];
+			struct rte_event_dev_xstats_name names[num_stats];
+			uint64_t values[num_stats];
+
+			num_stats_returned = rte_event_dev_xstats_names_get(0,
+					RTE_EVENT_DEV_XSTATS_PORT,
+					i,
+					names,
+					id,
+					num_stats);
+
+			if (num_stats == num_stats_returned) {
+				num_stats_returned = rte_event_dev_xstats_get(0,
+						RTE_EVENT_DEV_XSTATS_PORT,
+						i,
+						id,
+						values,
+						num_stats);
+
+				if (num_stats == num_stats_returned) {
+					int err;
+
+					err = check_qid_stats(id, i);
+
+					if (err)
+						return err;
+
+					} else {
+					return -1;
+				}
+			} else {
+				return -1;
+			}
+		} else {
+			return -1;
+		}
+	}
+	return 0;
+}
+
+#define OLD_NUM_PACKETS 3
+#define NEW_NUM_PACKETS 2
+static int
+single_link_w_stats(struct test *t)
+{
+	const uint8_t rx_port = 0;
+	const uint8_t w1_port = 1;
+	const uint8_t tx_port = 2;
+	int err;
+	int i;
+	uint32_t deq_pkts;
+	struct rte_mbuf *mbufs[3];
+	RTE_SET_USED(mbufs);
+	RTE_SET_USED(i);
+
+	/* Create instance with 3 ports */
+	if (init(t, 2, tx_port + 1) < 0 ||
+	    create_ports(t, 3) < 0 || /* 0,1,2 */
+	    create_queues_type(t, 1, OPDL_Q_TYPE_SINGLE_LINK) < 0 ||
+	    create_queues_type(t, 1, OPDL_Q_TYPE_ORDERED) < 0) {
+		printf("%d: Error initializing device\n", __LINE__);
+		return -1;
+	}
+
+
+	/*
+	 *
+	 * Simplified test setup diagram:
+	 *
+	 * rx_port(0)
+	 *           \
+	 *            qid0 - w1_port(1) - qid1
+	 *                                    \
+	 *                                     tx_port(2)
+	 */
+
+	err = rte_event_port_link(evdev, t->port[1], &t->qid[0], NULL,
+				  1);
+	if (err != 1) {
+		printf("%d: error linking port:[%u] to queue:[%u]\n",
+		       __LINE__,
+		       t->port[1],
+		       t->qid[0]);
+		cleanup(t);
+		return -1;
+	}
+
+	err = rte_event_port_link(evdev, t->port[2], &t->qid[1], NULL,
+				  1);
+	if (err != 1) {
+		printf("%d: error linking port:[%u] to queue:[%u]\n",
+		       __LINE__,
+		       t->port[2],
+		       t->qid[1]);
+		cleanup(t);
+		return -1;
+	}
+
+	if (rte_event_dev_start(evdev) != 0) {
+		printf("%d: failed to start device\n", __LINE__);
+		cleanup(t);
+		return -1;
+	}
+
+	/*
+	 * Enqueue 3 packets to the rx port
+	 */
+	for (i = 0; i < 3; i++) {
+		struct rte_event ev;
+		mbufs[i] = rte_gen_arp(0, t->mbuf_pool);
+		if (!mbufs[i]) {
+			printf("%d: gen of pkt failed\n", __LINE__);
+			return -1;
+		}
+
+		ev.queue_id = t->qid[0];
+		ev.op = RTE_EVENT_OP_NEW;
+		ev.mbuf = mbufs[i];
+		mbufs[i]->seqn = 1234 + i;
+
+		/* generate pkt and enqueue */
+		err = rte_event_enqueue_burst(evdev, t->port[rx_port], &ev, 1);
+		if (err != 1) {
+			printf("%d: Failed to enqueue pkt %u, retval = %u\n",
+			       __LINE__,
+			       t->port[rx_port],
+			       err);
+			return -1;
+		}
+	}
+
+	/* Dequeue the 3 packets, from SINGLE_LINK worker port */
+	struct rte_event deq_ev[3];
+
+	deq_pkts = rte_event_dequeue_burst(evdev,
+					   t->port[w1_port],
+					   deq_ev, 3, 0);
+
+	if (deq_pkts != 3) {
+		printf("%d: deq not 3 !\n", __LINE__);
+		cleanup(t);
+		return -1;
+	}
+
+	/* Just enqueue 2 onto new ring */
+	for (i = 0; i < NEW_NUM_PACKETS; i++)
+		deq_ev[i].queue_id = t->qid[1];
+
+	deq_pkts = rte_event_enqueue_burst(evdev,
+					   t->port[w1_port],
+					   deq_ev,
+					   NEW_NUM_PACKETS);
+
+	if (deq_pkts != 2) {
+		printf("%d: enq not 2 but %u!\n", __LINE__, deq_pkts);
+		cleanup(t);
+		return -1;
+	}
+
+	/* dequeue from the tx ports, we should get 2 packets */
+	deq_pkts = rte_event_dequeue_burst(evdev,
+					   t->port[tx_port],
+					   deq_ev,
+					   3,
+					   0);
+
+	/* Check to see if we've got all 2 packets */
+	if (deq_pkts != 2) {
+		printf("%d: expected 2 pkts at tx port got %d from port %d\n",
+			__LINE__, deq_pkts, tx_port);
+		cleanup(t);
+		return -1;
+	}
+
+	if (!check_statistics()) {
+		printf("xstats check failed");
+		cleanup(t);
+		return -1;
+	}
+
+	cleanup(t);
+
+	return 0;
+}
+
+static int
+single_link(struct test *t)
+{
+	/* const uint8_t rx_port = 0; */
+	/* const uint8_t w1_port = 1; */
+	/* const uint8_t w3_port = 3; */
+	const uint8_t tx_port = 2;
+	int err;
+	int i;
+	struct rte_mbuf *mbufs[3];
+	RTE_SET_USED(mbufs);
+	RTE_SET_USED(i);
+
+	/* Create instance with 5 ports */
+	if (init(t, 2, tx_port+1) < 0 ||
+	    create_ports(t, 3) < 0 || /* 0,1,2 */
+	    create_queues_type(t, 1, OPDL_Q_TYPE_SINGLE_LINK) < 0 ||
+	    create_queues_type(t, 1, OPDL_Q_TYPE_ORDERED) < 0) {
+		printf("%d: Error initializing device\n", __LINE__);
+		return -1;
+	}
+
+
+	/*
+	 *
+	 * Simplified test setup diagram:
+	 *
+	 * rx_port(0)
+	 *           \
+	 *            qid0 - w1_port(1) - qid1
+	 *                                    \
+	 *                                     tx_port(2)
+	 */
+
+	err = rte_event_port_link(evdev, t->port[1], &t->qid[0], NULL,
+				  1);
+	if (err != 1) {
+		printf("%d: error mapping lb qid\n", __LINE__);
+		cleanup(t);
+		return -1;
+	}
+
+	err = rte_event_port_link(evdev, t->port[2], &t->qid[0], NULL,
+				  1);
+	if (err != 1) {
+		printf("%d: error mapping lb qid\n", __LINE__);
+		cleanup(t);
+		return -1;
+	}
+
+	if (rte_event_dev_start(evdev) == 0) {
+		printf("%d: start DIDN'T FAIL with more than 1 "
+				"SINGLE_LINK PORT\n", __LINE__);
+		cleanup(t);
+		return -1;
+	}
+
+	cleanup(t);
+
+	return 0;
+}
+
+
+static inline void
+populate_event_burst(struct rte_event ev[],
+		     uint8_t qid,
+		     uint16_t num_events) {
+	uint16_t i;
+	for (i = 0; i < num_events; i++) {
+		ev[i].flow_id = 1;
+		ev[i].op = RTE_EVENT_OP_NEW;
+		ev[i].sched_type = RTE_SCHED_TYPE_ORDERED;
+		ev[i].queue_id = qid;
+		ev[i].event_type = RTE_EVENT_TYPE_ETHDEV;
+		ev[i].sub_event_type = 0;
+		ev[i].priority = RTE_EVENT_DEV_PRIORITY_NORMAL;
+		ev[i].mbuf = (struct rte_mbuf *)0xdead0000;
+	}
+}
+
+#define NUM_QUEUES 3
+#define BATCH_SIZE 32
+
+static int
+qid_basic(struct test *t)
+{
+	int err = 0;
+
+	uint8_t q_id = 0;
+	uint8_t p_id = 0;
+
+	uint32_t num_events;
+	uint32_t i;
+
+	struct rte_event ev[BATCH_SIZE];
+
+	/* Create instance with 4 ports */
+	if (init(t, NUM_QUEUES, NUM_QUEUES+1) < 0 ||
+	    create_ports(t, NUM_QUEUES+1) < 0 ||
+	    create_queues_type(t, NUM_QUEUES, OPDL_Q_TYPE_ORDERED)) {
+		printf("%d: Error initializing device\n", __LINE__);
+		return -1;
+	}
+
+	for (i = 0; i < NUM_QUEUES; i++) {
+		int nb_linked;
+		q_id = i;
+
+		nb_linked = rte_event_port_link(evdev,
+				i+1, /* port = q_id + 1*/
+				&q_id,
+				NULL,
+				1);
+
+		if (nb_linked != 1) {
+
+			printf("%s:%d: error mapping port:%u to queue:%u\n",
+					__FILE__,
+					__LINE__,
+					i + 1,
+					q_id);
+
+			err = -1;
+			break;
+		}
+
+	}
+
+
+	/* Try and link to the same port again */
+	if (!err) {
+		uint8_t t_qid = 0;
+		if (rte_event_port_link(evdev,
+					1,
+					&t_qid,
+					NULL,
+					1) > 0) {
+			printf("%s:%d: Second call to port link on same port DID NOT fail\n",
+					__FILE__,
+					__LINE__);
+			err = -1;
+		}
+
+		uint32_t test_num_events;
+
+		if (!err) {
+			test_num_events = rte_event_dequeue_burst(evdev,
+					p_id,
+					ev,
+					BATCH_SIZE,
+					0);
+			if (test_num_events != 0) {
+				printf("%s:%d: Error dequeuing 0 packets from port %u on stopped device\n",
+						__FILE__,
+						__LINE__,
+						p_id);
+				err = -1;
+			}
+		}
+
+		if (!err) {
+			test_num_events = rte_event_enqueue_burst(evdev,
+					p_id,
+					ev,
+					BATCH_SIZE);
+			if (test_num_events != 0) {
+				printf("%s:%d: Error enqueuing 0 packets to port %u on stopped device\n",
+						__FILE__,
+						__LINE__,
+						p_id);
+				err = -1;
+			}
+		}
+	}
+
+
+	/* Start the devicea */
+	if (!err) {
+		if (rte_event_dev_start(evdev) < 0) {
+			printf("%s:%d: Error with start call\n",
+					__FILE__,
+					__LINE__);
+			err = -1;
+		}
+	}
+
+
+	/* Check we can't do any more links now that device is started.*/
+	if (!err) {
+		uint8_t t_qid = 0;
+		if (rte_event_port_link(evdev,
+					1,
+					&t_qid,
+					NULL,
+					1) > 0) {
+			printf("%s:%d: Call to port link on started device DID NOT fail\n",
+					__FILE__,
+					__LINE__);
+			err = -1;
+		}
+	}
+
+	if (!err) {
+
+		q_id = 0;
+
+		populate_event_burst(ev,
+				q_id,
+				BATCH_SIZE);
+
+		num_events = rte_event_enqueue_burst(evdev,
+				p_id,
+				ev,
+				BATCH_SIZE);
+		if (num_events != BATCH_SIZE) {
+			printf("%s:%d: Error enqueuing rx packets\n",
+					__FILE__,
+					__LINE__);
+			err = -1;
+		}
+	}
+
+	if (!err) {
+		while (++p_id < NUM_QUEUES) {
+
+			num_events = rte_event_dequeue_burst(evdev,
+					p_id,
+					ev,
+					BATCH_SIZE,
+					0);
+
+			if (num_events != BATCH_SIZE) {
+				printf("%s:%d: Error dequeuing packets from port %u\n",
+						__FILE__,
+						__LINE__,
+						p_id);
+				err = -1;
+				break;
+			}
+
+			if (ev[0].queue_id != q_id) {
+				printf("%s:%d: Error event portid[%u] q_id:[%u] does not match expected:[%u]\n",
+						__FILE__,
+						__LINE__,
+						p_id,
+						ev[0].queue_id,
+						q_id);
+				err = -1;
+				break;
+			}
+
+			populate_event_burst(ev,
+					++q_id,
+					BATCH_SIZE);
+
+			num_events = rte_event_enqueue_burst(evdev,
+					p_id,
+					ev,
+					BATCH_SIZE);
+			if (num_events != BATCH_SIZE) {
+				printf("%s:%d: Error enqueuing packets from port:%u to queue:%u\n",
+						__FILE__,
+						__LINE__,
+						p_id,
+						q_id);
+				err = -1;
+				break;
+			}
+		}
+	}
+
+	if (!err) {
+		num_events = rte_event_dequeue_burst(evdev,
+				p_id,
+				ev,
+				BATCH_SIZE,
+				0);
+		if (num_events != BATCH_SIZE) {
+			printf("%s:%d: Error dequeuing packets from tx port %u\n",
+					__FILE__,
+					__LINE__,
+					p_id);
+			err = -1;
+		}
+	}
+
+	cleanup(t);
+
+	return err;
+}
+
+
+
+static int
+test_opdl_eventdev(void)
+{
+	struct test *t = malloc(sizeof(struct test));
+	int ret;
+
+	const char *eventdev_name = "event_opdl0";
+
+	evdev = rte_event_dev_get_dev_id(eventdev_name);
+
+	if (evdev < 0) {
+		printf("%d: Eventdev %s not found - creating.\n",
+				__LINE__, eventdev_name);
+		/* turn on stats by default */
+		if (rte_vdev_init(eventdev_name, "do_validation=1") < 0) {
+			printf("Error creating eventdev\n");
+			return -1;
+		}
+		evdev = rte_event_dev_get_dev_id(eventdev_name);
+		if (evdev < 0) {
+			printf("Error finding newly created eventdev\n");
+			return -1;
+		}
+	}
+
+	/* Only create mbuf pool once, reuse for each test run */
+	if (!eventdev_func_mempool) {
+		eventdev_func_mempool = rte_pktmbuf_pool_create(
+				"EVENTDEV_SW_SA_MBUF_POOL",
+				(1<<12), /* 4k buffers */
+				32 /*MBUF_CACHE_SIZE*/,
+				0,
+				512, /* use very small mbufs */
+				rte_socket_id());
+		if (!eventdev_func_mempool) {
+			printf("ERROR creating mempool\n");
+			return -1;
+		}
+	}
+	t->mbuf_pool = eventdev_func_mempool;
+
+	printf("*** Running Ordered Basic test...\n");
+	ret = ordered_basic(t);
+
+	printf("*** Running Atomic Basic test...\n");
+	ret = atomic_basic(t);
+
+
+	printf("*** Running QID  Basic test...\n");
+	ret = qid_basic(t);
+
+	printf("*** Running SINGLE LINK failure test...\n");
+	ret = single_link(t);
+
+	printf("*** Running SINGLE LINK w stats test...\n");
+	ret = single_link_w_stats(t);
+
+	/*
+	 * Free test instance, leaving mempool initialized, and a pointer to it
+	 * in static eventdev_func_mempool, as it is re-used on re-runs
+	 */
+	free(t);
+
+	if (ret != 0)
+		return ret;
+	return 0;
+
+}
+
+
+
+REGISTER_TEST_COMMAND(eventdev_opdl_autotest, test_opdl_eventdev);
-- 
2.7.5

--------------------------------------------------------------
Intel Research and Development Ireland Limited
Registered in Ireland
Registered Office: Collinstown Industrial Park, Leixlip, County Kildare
Registered Number: 308263


This e-mail and any attachments may contain confidential material for the sole
use of the intended recipient(s). Any review or distribution by others is
strictly prohibited. If you are not the intended recipient, please contact the
sender and delete all copies.

^ permalink raw reply	[flat|nested] 13+ messages in thread

* [dpdk-dev] [PATCH 6/7] examples/eventdev_pipeline_opdl: adding example
  2017-11-24 11:23 [dpdk-dev] [RFC PATCH 0/7] RFC:EventDev OPDL PMD liang.j.ma
                   ` (4 preceding siblings ...)
  2017-11-24 11:23 ` [dpdk-dev] [PATCH 5/7] test/eventdev: opdl eventdev pmd unit test func and makefiles liang.j.ma
@ 2017-11-24 11:23 ` liang.j.ma
  2017-11-24 11:23 ` [dpdk-dev] [PATCH 7/7] doc: add eventdev opdl pmd docuement and example usage document liang.j.ma
  2017-11-24 20:55 ` [dpdk-dev] [RFC PATCH 0/7] RFC:EventDev OPDL PMD Jerin Jacob
  7 siblings, 0 replies; 13+ messages in thread
From: liang.j.ma @ 2017-11-24 11:23 UTC (permalink / raw)
  To: jerin.jacob
  Cc: dev, harry.van.haaren, bruce.richardson, deepak.k.jain, john.geary

From: Liang Ma <liang.j.ma@intel.com>

This patch adds a sample app to the examples/ directory, which can
be used as a reference application and for general testing.
The application requires two ethdev ports and expects traffic to be
flowing. The application must be run with the --vdev flags as
follows to indicate to EAL that a virtual eventdev device called
"evdev_opdl0" is available to be used:

    ./build/eventdev_pipeline_opdl_pmd  --vdev=event_opdl0

The general flow of the traffic is as follows:

    Rx core -> Ordered Queue (4 worker) =>
    Ordered Queue (4 worker) => TX core

Signed-off-by: Liang Ma <liang.j.ma@intel.com>
Signed-off-by: Peter, Mccarthy <peter.mccarthy@intel.com>
---
 examples/eventdev_pipeline_opdl_pmd/Makefile |  49 ++
 examples/eventdev_pipeline_opdl_pmd/main.c   | 766 +++++++++++++++++++++++++++
 2 files changed, 815 insertions(+)
 create mode 100644 examples/eventdev_pipeline_opdl_pmd/Makefile
 create mode 100644 examples/eventdev_pipeline_opdl_pmd/main.c

diff --git a/examples/eventdev_pipeline_opdl_pmd/Makefile b/examples/eventdev_pipeline_opdl_pmd/Makefile
new file mode 100644
index 0000000..63aea65
--- /dev/null
+++ b/examples/eventdev_pipeline_opdl_pmd/Makefile
@@ -0,0 +1,49 @@
+#   BSD LICENSE
+#
+#   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+#
+#   Redistribution and use in source and binary forms, with or without
+#   modification, are permitted provided that the following conditions
+#   are met:
+#
+#     * Redistributions of source code must retain the above copyright
+#       notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above copyright
+#       notice, this list of conditions and the following disclaimer in
+#       the documentation and/or other materials provided with the
+#       distribution.
+#     * Neither the name of Intel Corporation nor the names of its
+#       contributors may be used to endorse or promote products derived
+#       from this software without specific prior written permission.
+#
+#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+ifeq ($(RTE_SDK),)
+$(error "Please define RTE_SDK environment variable")
+endif
+
+# Default target, can be overridden by command line or environment
+RTE_TARGET ?= x86_64-native-linuxapp-gcc
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# binary name
+APP = eventdev_pipeline_opdl_pmd
+
+# all source are stored in SRCS-y
+SRCS-y := main.c
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+
+include $(RTE_SDK)/mk/rte.extapp.mk
diff --git a/examples/eventdev_pipeline_opdl_pmd/main.c b/examples/eventdev_pipeline_opdl_pmd/main.c
new file mode 100644
index 0000000..6b9e48a
--- /dev/null
+++ b/examples/eventdev_pipeline_opdl_pmd/main.c
@@ -0,0 +1,766 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <getopt.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <signal.h>
+#include <sched.h>
+#include <stdbool.h>
+
+#include <rte_eal.h>
+#include <rte_mempool.h>
+#include <rte_mbuf.h>
+#include <rte_launch.h>
+#include <rte_malloc.h>
+#include <rte_random.h>
+#include <rte_cycles.h>
+#include <rte_ethdev.h>
+#include <rte_eventdev.h>
+#include <rte_service.h>
+
+#define BATCH_SIZE 32
+
+static struct rte_event_dev_config config = {
+		.nb_event_queues = 3,
+		.nb_event_ports = 10,
+		.nb_events_limit  = 4096,
+		.nb_event_queue_flows = 1024,
+		.nb_event_port_dequeue_depth = 128,
+		.nb_event_port_enqueue_depth = 128,
+};
+
+static struct rte_event_port_conf wkr_p_conf = {
+		.dequeue_depth = 128,
+		.enqueue_depth = 128,
+		.new_event_threshold = 1024,
+};
+
+static struct rte_event_queue_conf wkr_q_conf = {
+		.event_queue_cfg = 0,
+		.schedule_type = RTE_SCHED_TYPE_ORDERED,
+		.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+		.nb_atomic_flows = 1024,
+		.nb_atomic_order_sequences = 1024,
+};
+
+static struct rte_event_port_conf tx_p_conf = {
+		.dequeue_depth = 32,
+		.enqueue_depth = 32,
+		.new_event_threshold = 32,
+};
+
+static const struct rte_event_queue_conf tx_q_conf = {
+		.event_queue_cfg = 0,
+		.schedule_type = RTE_SCHED_TYPE_ORDERED,
+		.nb_atomic_flows = 1024,
+		.nb_atomic_order_sequences = 1024,
+};
+
+static struct rte_event_port_conf rx_p_conf = {
+		.dequeue_depth = 32,
+		.enqueue_depth = 32,
+		.new_event_threshold = 1024,
+};
+
+struct prod_data {
+	uint8_t dev_id;
+	uint8_t port_id;
+	int32_t qid;
+	unsigned int num_nic_ports;
+} __rte_cache_aligned;
+
+static struct prod_data producer_data;
+
+struct cons_data {
+	uint8_t dev_id;
+	uint8_t port_id;
+	struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
+} __rte_cache_aligned;
+
+static struct cons_data consumer_data;
+
+struct worker_data {
+	uint8_t dev_id;
+	uint8_t port_id;
+	uint8_t thread_id;
+};
+
+#define QUEUE_0_ID 0
+#define QUEUE_1_ID 1
+#define QUEUE_2_ID 2
+
+
+#define NUM_WORKERS 8
+static struct worker_data worker_data[NUM_WORKERS];
+
+volatile int done;
+
+static uint8_t port_map_4[4] = { 1, 2, 3, 0 };
+static uint8_t port_map_3[3] = { 1, 2, 0 };
+static uint8_t port_map_2[2] = { 1, 0 };
+static uint8_t port_map_1[1] = { 0 };
+static uint8_t *port_map;
+
+__thread  long long  packet_num;
+
+static void
+eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
+			void *userdata)
+{
+	int port_id = (uintptr_t) userdata;
+	unsigned int _sent = 0;
+
+	do {
+		/* Note: hard-coded TX queue */
+		_sent += rte_eth_tx_burst(port_id, 0, &pkts[_sent],
+					  unsent - _sent);
+	} while (_sent != unsent);
+}
+
+/*
+ * Initializes a given port using global settings and with the RX buffers
+ * coming from the mbuf_pool passed as a parameter.
+ */
+static inline int
+port_init(uint8_t port, struct rte_mempool *mbuf_pool)
+{
+	static const struct rte_eth_conf port_conf_default = {
+		.rxmode = {
+			.mq_mode = ETH_MQ_RX_RSS,
+			.max_rx_pkt_len = ETHER_MAX_LEN
+		},
+		.rx_adv_conf = {
+			.rss_conf = {
+				.rss_hf = ETH_RSS_IP |
+					  ETH_RSS_TCP |
+					  ETH_RSS_UDP,
+			}
+		}
+	};
+	const uint16_t rx_rings = 1, tx_rings = 1;
+	const uint16_t rx_ring_size = 512, tx_ring_size = 512;
+	struct rte_eth_conf port_conf = port_conf_default;
+	int retval;
+	uint16_t q;
+
+	if (port >= rte_eth_dev_count())
+		return -1;
+
+	/* Configure the Ethernet device. */
+	retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
+	if (retval != 0)
+		return retval;
+
+	/* Allocate and set up 1 RX queue per Ethernet port. */
+	for (q = 0; q < rx_rings; q++) {
+		retval = rte_eth_rx_queue_setup(port, q, rx_ring_size,
+				rte_eth_dev_socket_id(port), NULL, mbuf_pool);
+		if (retval < 0)
+			return retval;
+	}
+
+	/* Allocate and set up 1 TX queue per Ethernet port. */
+	for (q = 0; q < tx_rings; q++) {
+		retval = rte_eth_tx_queue_setup(port, q, tx_ring_size,
+				rte_eth_dev_socket_id(port), NULL);
+		if (retval < 0)
+			return retval;
+	}
+
+	/* Start the Ethernet port. */
+	retval = rte_eth_dev_start(port);
+	if (retval < 0)
+		return retval;
+
+	/* Display the port MAC address. */
+	struct ether_addr addr;
+	rte_eth_macaddr_get(port, &addr);
+	printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
+			   " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
+			(unsigned int)port,
+			addr.addr_bytes[0], addr.addr_bytes[1],
+			addr.addr_bytes[2], addr.addr_bytes[3],
+			addr.addr_bytes[4], addr.addr_bytes[5]);
+
+	/* Enable RX in promiscuous mode for the Ethernet device. */
+	rte_eth_promiscuous_enable(port);
+
+	return 0;
+}
+
+static int
+init_ports(unsigned int num_ports)
+{
+	uint8_t portid;
+	unsigned int i;
+
+	struct rte_mempool *mp = rte_pktmbuf_pool_create("packet_pool",
+			/* mbufs */ 16384 * num_ports,
+			/* cache_size */ 512,
+			/* priv_size*/ 0,
+			/* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE,
+			rte_socket_id());
+
+	for (portid = 0; portid < num_ports; portid++)
+		if (port_init(portid, mp) != 0)
+			rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n",
+					portid);
+
+	for (i = 0; i < num_ports; i++) {
+		void *userdata = (void *)(uintptr_t) i;
+		consumer_data.tx_buf[i] =
+				rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(32), 0);
+
+		rte_eth_promiscuous_enable(i);
+		if (consumer_data.tx_buf[i] == NULL)
+			rte_panic("Out of memory\n");
+		rte_eth_tx_buffer_init(consumer_data.tx_buf[i], 32);
+		rte_eth_tx_buffer_set_err_callback(consumer_data.tx_buf[i],
+				eth_tx_buffer_retry,
+				userdata);
+	}
+
+	return 0;
+}
+
+static int
+rx_thread(void *arg)
+{
+	uint16_t nb_rx;
+
+	struct rte_mbuf *mbufs[BATCH_SIZE*3];
+	struct rte_event ev[BATCH_SIZE*3] = {0};
+	uint32_t i, j;
+	uint16_t nb_enqueue = 0;
+
+	int32_t qid = producer_data.qid;
+	uint8_t dev_id = producer_data.dev_id;
+	uint8_t port_id = producer_data.port_id;
+	uint32_t prio_idx = 0;
+	uint32_t num_ports = rte_eth_dev_count();
+	RTE_SET_USED(arg);
+
+	printf("Rx thread port_id %d started\n", port_id);
+
+	packet_num  = 0;
+
+	while (!done) {
+
+		for (j = 0; j < num_ports; j++) {
+
+			nb_rx = rte_eth_rx_burst(j, 0, mbufs, BATCH_SIZE);
+
+			for (i = 0; i < nb_rx; i++) {
+				ev[i].flow_id = 1;
+				ev[i].op = RTE_EVENT_OP_NEW;
+				ev[i].sched_type = RTE_SCHED_TYPE_ORDERED;
+				ev[i].queue_id = qid;
+				ev[i].event_type = RTE_EVENT_TYPE_ETHDEV;
+				ev[i].sub_event_type = 0;
+				ev[i].priority = RTE_EVENT_DEV_PRIORITY_NORMAL;
+				ev[i].mbuf = mbufs[i];
+				RTE_SET_USED(prio_idx);
+			}
+
+			nb_enqueue = rte_event_enqueue_burst(dev_id,
+					port_id, ev, nb_rx);
+
+			packet_num += nb_enqueue;
+
+			if (nb_enqueue != nb_rx) {
+				for (i = nb_enqueue; i < nb_rx; i++)
+					rte_pktmbuf_free(mbufs[i]);
+			}
+
+
+		}
+
+	}
+
+	printf("Rx done,  total packet num is %lld\n", packet_num);
+
+	return 0;
+}
+
+static inline void
+work(struct rte_mbuf *m)
+{
+	m->port = port_map[m->port];
+}
+
+static int
+worker_thread(void *arg)
+{
+	unsigned int i;
+	struct rte_event events[BATCH_SIZE] = {0};
+	struct worker_data *w_data = (struct worker_data *)arg;
+	uint8_t dev_id = w_data->dev_id;
+	uint8_t port_id = w_data->port_id;
+	RTE_SET_USED(arg);
+	uint16_t nb_event_deq;
+
+	printf("Worker thread %d port_id %d started\n",
+			w_data->thread_id, port_id);
+
+	packet_num =  0;
+
+	while (!done) {
+		nb_event_deq = rte_event_dequeue_burst(dev_id,
+				port_id,
+				events,
+				RTE_DIM(events),
+				0);
+
+		if (nb_event_deq == 0) {
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_event_deq; i++) {
+			events[i].queue_id += 1;
+			events[i].op = RTE_EVENT_OP_FORWARD;
+			events[i].sched_type = RTE_SCHED_TYPE_ORDERED;
+			if (w_data->thread_id > 3)
+				work(events[i].mbuf);
+		}
+
+		packet_num += nb_event_deq;
+
+		rte_event_enqueue_burst(dev_id, port_id,
+				events, nb_event_deq);
+	}
+
+	return 0;
+}
+
+static int
+tx_thread(void *arg)
+{
+	int j;
+	struct rte_event events[BATCH_SIZE];
+	uint8_t dev_id = consumer_data.dev_id;
+	uint8_t eth_dev_count, port_id = consumer_data.port_id;
+
+	RTE_SET_USED(arg);
+
+	eth_dev_count = rte_eth_dev_count();
+
+	packet_num = 0;
+
+	while (!done) {
+
+		uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
+				events, RTE_DIM(events), 0);
+
+		packet_num += n;
+
+		int i;
+
+		if (n == 0) {
+			for (j = 0; j < eth_dev_count; j++)
+				rte_eth_tx_buffer_flush(j,
+						0,
+						consumer_data.tx_buf[j]);
+			continue;
+		}
+
+		if (events[0].queue_id != QUEUE_2_ID)
+			printf("ERROR TX expected q_id:[%u], "
+					"but received:[%u]\n",
+					QUEUE_2_ID, events[0].queue_id);
+
+		for (i = 0; i < n; i++) {
+			uint8_t outport = events[i].mbuf->port;
+			rte_eth_tx_buffer(outport,
+					0,
+					consumer_data.tx_buf[outport],
+					events[i].mbuf);
+		}
+	}
+
+	return 0;
+}
+
+static int
+setup_eventdev(uint8_t dev_id)
+{
+	uint8_t i, num_wrk_port = 8;
+	uint8_t ev_port = 0, queue0 = QUEUE_0_ID;
+	uint8_t queue1 = QUEUE_1_ID, queue2 = QUEUE_2_ID;
+
+	int ret, ndev = rte_event_dev_count();
+	if (ndev < 1) {
+		printf("%d: No Eventdev Devices Found\n", __LINE__);
+		return -1;
+	}
+
+	struct rte_event_dev_info dev_info;
+	ret = rte_event_dev_info_get(dev_id, &dev_info);
+	if (ret < 0) {
+		printf("%s%d: Error getting device info\n", __FILE__, __LINE__);
+		return -1;
+	}
+
+	if (dev_info.max_event_port_dequeue_depth <
+			config.nb_event_port_dequeue_depth)
+		config.nb_event_port_dequeue_depth =
+			dev_info.max_event_port_dequeue_depth;
+	if (dev_info.max_event_port_enqueue_depth <
+			config.nb_event_port_enqueue_depth)
+		config.nb_event_port_enqueue_depth =
+			dev_info.max_event_port_enqueue_depth;
+
+	ret = rte_event_dev_configure(dev_id, &config);
+	if (ret < 0) {
+		printf("%s%d: Error configuring device\n", __FILE__, __LINE__);
+		return -1;
+	}
+
+	/* Create Queues */
+	if (rte_event_queue_setup(dev_id, queue0, &wkr_q_conf) < 0) {
+		printf("%d: error creating qid %d\n", __LINE__, queue0);
+		return -1;
+	}
+
+	if (rte_event_queue_setup(dev_id, queue1, &wkr_q_conf) < 0) {
+		printf("%d: error creating qid %d\n", __LINE__, queue1);
+		return -1;
+	}
+
+	if (rte_event_queue_setup(dev_id, queue2, &tx_q_conf) < 0) {
+		printf("%d: error creating qid %d\n", __LINE__, queue2);
+		return -1;
+	}
+
+	/* Check that port configs are valid for this device */
+
+	if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
+		wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
+	if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
+		wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
+
+	if (tx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
+		tx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
+	if (tx_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
+		tx_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
+
+	if (rx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
+		rx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
+	if (rx_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
+		rx_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
+
+	/* Create rx_port */
+	if (rte_event_port_setup(dev_id, ev_port, &rx_p_conf) < 0) {
+		printf("Error setting up rx port\n");
+		return -1;
+	}
+	ev_port++;
+
+	/* Create worker ports */
+	for (i = 0; i < num_wrk_port; i++) {
+		if (rte_event_port_setup(dev_id, ev_port, &wkr_p_conf) < 0) {
+			printf("Error setting up worker port %d\n", i);
+			return -1;
+		}
+		ev_port++;
+	}
+
+	/* Create tx_port */
+	if (rte_event_port_setup(dev_id, ev_port, &tx_p_conf) < 0) {
+		printf("Error setting up tx port\n");
+		return -1;
+	}
+	ev_port++;
+
+	/* Link ports 1 - 4 to queue 1*/
+	for (i = 1; i < 5; i++) {
+		if (rte_event_port_link(dev_id,
+					i,
+					&queue0,
+					&wkr_q_conf.priority,
+					1)
+				!= 1) {
+			printf("%d: error creating link for port %d\n",
+					__LINE__, i);
+			return -1;
+		}
+	}
+	/* Link ports 5 - 8 to queue 2*/
+	for (i = 5; i < 9; i++) {
+		if (rte_event_port_link(dev_id,
+					i,
+					&queue1,
+					&wkr_q_conf.priority,
+					1)
+				!= 1) {
+			printf("%d: error creating link for port %d\n",
+					__LINE__, i);
+			return -1;
+		}
+	}
+	/* Link tx port to queue 3*/
+	if (rte_event_port_link(dev_id,
+				i,
+				&queue2,
+				&tx_q_conf.priority,
+				1) != 1) {
+		printf("%d: error creating link for port %d\n", __LINE__, i);
+		return -1;
+	}
+
+	if (rte_event_dev_start(dev_id) < 0) {
+		printf("Error starting eventdev\n");
+		return -1;
+	}
+
+	rte_event_dev_dump(dev_id, stdout);
+
+	return 0;
+}
+
+static void print_statistics(FILE *f)
+{
+	int num_ports = 10; /* Hard-coded for this app */
+
+	for (int i = 0; i < num_ports; i++) {
+		int num_stats, num_stats_returned;
+
+		num_stats = rte_event_dev_xstats_names_get(0,
+				RTE_EVENT_DEV_XSTATS_PORT,
+				i,
+				NULL,
+				NULL,
+				0);
+		if (num_stats > 0) {
+
+			uint32_t ids[num_stats];
+			struct rte_event_dev_xstats_name names[num_stats];
+			uint64_t values[num_stats];
+
+			num_stats_returned = rte_event_dev_xstats_names_get(0,
+					RTE_EVENT_DEV_XSTATS_PORT,
+					i,
+					names,
+					ids,
+					num_stats);
+			if (num_stats == num_stats_returned) {
+				num_stats_returned = rte_event_dev_xstats_get(0,
+						RTE_EVENT_DEV_XSTATS_PORT,
+						i,
+						ids,
+						values,
+						num_stats);
+				if (num_stats == num_stats_returned) {
+					fprintf(f,
+						"Port : [%02u] Statistics\n",
+						i);
+					for (int j = 0; j < num_stats; j++) {
+						fprintf(f,
+							"\t%30s = %16lu\n",
+							names[j].name,
+							values[j]);
+					}
+				}
+			}
+		}
+	}
+}
+
+static void
+signal_handler(int signum)
+{
+
+	if (signum == SIGINT || signum == SIGTERM) {
+		printf("\n\nSignal %d received, preparing to exit...\n",
+				signum);
+		done = 1;
+	}
+}
+
+static int
+scheduler_thread(void *arg)
+{
+	uint8_t dev_id = *(uint8_t *)arg;
+	uint32_t evdev_service_id;
+
+	if (rte_event_dev_service_id_get(dev_id, &evdev_service_id))
+		return -1;
+
+	while (!done) {
+
+		/* run one iteration of the service on the calling core */
+		rte_service_run_iter_on_app_lcore(evdev_service_id, true);
+	}
+	return 0;
+}
+
+
+int
+main(int argc, char **argv)
+{
+	unsigned int num_ports;
+	uint8_t dev_id = 0;
+	int err, lcore_id, this_lcore;
+	int i, worker_nb = 0;
+	int rx_th = 0, sch_th = 0, tx_th = 0;
+
+	signal(SIGINT, signal_handler);
+	signal(SIGTERM, signal_handler);
+	signal(SIGTSTP, signal_handler);
+
+	err = rte_eal_init(argc, argv);
+	if (err < 0)
+		rte_panic("Invalid EAL arguments\n");
+
+	num_ports = rte_eth_dev_count();
+	if (num_ports == 4) {
+		port_map = port_map_4;
+	} else if (num_ports == 3) {
+		port_map = port_map_3;
+	} else if (num_ports == 2) {
+		port_map = port_map_2;
+	} else if (num_ports == 1) {
+		port_map = port_map_1;
+	} else {
+		rte_panic("Incorrect number of ethernet ports found:[%u]\n",
+				num_ports);
+	}
+
+	const unsigned int ndevs = rte_event_dev_count();
+	if (ndevs == 0)
+		rte_panic("No dev_id devs found. Pass in a --vdev eventdev.\n");
+	if (ndevs > 1)
+		fprintf(stderr, "Warning: More than one eventdev, using idx 0");
+
+	err = setup_eventdev(dev_id);
+
+	if (err < 0) {
+		fprintf(stderr, "Error: setup_eventdev failed\n");
+		return -1;
+	}
+
+	struct rte_event_dev_info dev_info;
+	err = rte_event_dev_info_get(dev_id, &dev_info);
+
+	if (err < 0) {
+		printf("%s%d: Error getting device info\n", __FILE__, __LINE__);
+		return -1;
+	}
+	producer_data.dev_id = dev_id;
+	producer_data.num_nic_ports = num_ports;
+	producer_data.qid = QUEUE_0_ID;
+	producer_data.port_id = 0;
+
+	for (i = 0; i < NUM_WORKERS; i++) {
+		worker_data[i].dev_id = dev_id;
+		worker_data[i].port_id = i+1;
+		worker_data[i].thread_id = i;
+	}
+
+	consumer_data.dev_id = dev_id;
+	consumer_data.port_id = 9;
+
+	init_ports(num_ports);
+
+	this_lcore = rte_lcore_id();
+
+	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
+
+		if ((rx_th == 0) && (this_lcore != lcore_id)) {
+			err = rte_eal_remote_launch(rx_thread, NULL, lcore_id);
+			printf("Start rx thread\n");
+			if (err) {
+				printf("Failed to launch rx on core %d\n",
+						lcore_id);
+			} else {
+				rx_th = 1;
+				continue;
+			}
+		}
+
+		if ((worker_nb < NUM_WORKERS) && (this_lcore != lcore_id)) {
+			err = rte_eal_remote_launch(worker_thread,
+					&worker_data[worker_nb],
+					lcore_id);
+			if (err) {
+				printf("Failed to launch worker on core %d\n",
+						lcore_id);
+			} else {
+				worker_nb++;
+				continue;
+			}
+		}
+
+		if ((tx_th == 0) && (this_lcore != lcore_id)) {
+			printf("Start tx thread\n");
+			err = rte_eal_remote_launch(tx_thread, NULL, lcore_id);
+			if (err) {
+				printf("Failed to launch tx on core %d\n",
+						lcore_id);
+			} else {
+
+				tx_th = 1;
+				continue;
+			}
+		}
+
+		if (strcmp(dev_info.driver_name, "event_opdl")) {
+			if ((sch_th == 0) && (this_lcore != lcore_id)) {
+				printf("Start SW scheduling thread\n");
+				err = rte_eal_remote_launch(scheduler_thread,
+						&dev_id,
+						lcore_id);
+				if (err) {
+					printf("Failed to launch scheduler on core %d\n",
+					       lcore_id);
+				} else {
+
+					sch_th = 1;
+					continue;
+				}
+			}
+		}
+	}
+
+	rte_eal_mp_wait_lcore();
+
+	print_statistics(stdout);
+
+	rte_event_dev_dump(0, stdout);
+
+	rte_event_dev_stop(dev_id);
+
+	rte_event_dev_close(dev_id);
+
+	return 0;
+}
-- 
2.7.5

--------------------------------------------------------------
Intel Research and Development Ireland Limited
Registered in Ireland
Registered Office: Collinstown Industrial Park, Leixlip, County Kildare
Registered Number: 308263


This e-mail and any attachments may contain confidential material for the sole
use of the intended recipient(s). Any review or distribution by others is
strictly prohibited. If you are not the intended recipient, please contact the
sender and delete all copies.

^ permalink raw reply	[flat|nested] 13+ messages in thread

* [dpdk-dev] [PATCH 7/7] doc: add eventdev opdl pmd docuement and example usage document
  2017-11-24 11:23 [dpdk-dev] [RFC PATCH 0/7] RFC:EventDev OPDL PMD liang.j.ma
                   ` (5 preceding siblings ...)
  2017-11-24 11:23 ` [dpdk-dev] [PATCH 6/7] examples/eventdev_pipeline_opdl: adding example liang.j.ma
@ 2017-11-24 11:23 ` liang.j.ma
  2017-11-24 20:55 ` [dpdk-dev] [RFC PATCH 0/7] RFC:EventDev OPDL PMD Jerin Jacob
  7 siblings, 0 replies; 13+ messages in thread
From: liang.j.ma @ 2017-11-24 11:23 UTC (permalink / raw)
  To: jerin.jacob
  Cc: dev, harry.van.haaren, bruce.richardson, deepak.k.jain, john.geary

From: Liang Ma <liang.j.ma@intel.com>

Add the description about opdl pmd and example usage/descrption

Signed-off-by: Liang Ma <liang.j.ma@intel.com>
Signed-off-by: Peter, Mccarthy <peter.mccarthy@intel.com>
---
 doc/guides/eventdevs/index.rst                     |   1 +
 doc/guides/eventdevs/opdl.rst                      | 162 +++++++++++++++++++++
 .../sample_app_ug/eventdev_pipeline_opdl_pmd.rst   | 128 ++++++++++++++++
 doc/guides/sample_app_ug/index.rst                 |   1 +
 4 files changed, 292 insertions(+)
 create mode 100644 doc/guides/eventdevs/opdl.rst
 create mode 100644 doc/guides/sample_app_ug/eventdev_pipeline_opdl_pmd.rst

diff --git a/doc/guides/eventdevs/index.rst b/doc/guides/eventdevs/index.rst
index ba2048c..07a41bc 100644
--- a/doc/guides/eventdevs/index.rst
+++ b/doc/guides/eventdevs/index.rst
@@ -40,3 +40,4 @@ application trough the eventdev API.
     dpaa2
     sw
     octeontx
+    opdl
diff --git a/doc/guides/eventdevs/opdl.rst b/doc/guides/eventdevs/opdl.rst
new file mode 100644
index 0000000..fffd673
--- /dev/null
+++ b/doc/guides/eventdevs/opdl.rst
@@ -0,0 +1,162 @@
+..  BSD LICENSE
+    Copyright(c) 2017 Intel Corporation. All rights reserved.
+
+    Redistribution and use in source and binary forms, with or without
+    modification, are permitted provided that the following conditions
+    are met:
+
+    * Redistributions of source code must retain the above copyright
+    notice, this list of conditions and the following disclaimer.
+    * Redistributions in binary form must reproduce the above copyright
+    notice, this list of conditions and the following disclaimer in
+    the documentation and/or other materials provided with the
+    distribution.
+    * Neither the name of Intel Corporation nor the names of its
+    contributors may be used to endorse or promote products derived
+    from this software without specific prior written permission.
+
+    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+    A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+    OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+    SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+    LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+    DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+    THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+OPDL Eventdev Poll Mode Driver
+==================================
+
+The OPDL (Ordered Packet Distribution Library) eventdev is a specific\
+implementation of the eventdev API. It is particularly suited to packet\
+processing workloads that have high throughput and low latency requirements.\
+All packets follow the same path through the device. The order in which\
+packets  follow is determinted by the order in which queues are set up.\
+Events are left on the ring until they are transmitted. As a result packets\
+do not go out of order
+
+
+Features
+--------
+
+The OPDL  eventdev implements a subset of features of the eventdev API;
+
+Queues
+ * Atomic
+ * Ordered (Parallel is supported as parallel is a subset of Ordered)
+ * Single-Link
+
+Ports
+ * Load balanced (for Atomic, Ordered, Parallel queues)
+ * Single Link (for single-link queues)
+
+
+Configuration and Options
+-------------------------
+
+The software eventdev is a vdev device, and as such can be created from the
+application code, or from the EAL command line:
+
+* Call ``rte_vdev_init("event_opdl0")`` from the application
+
+* Use ``--vdev="event_opdl0"`` in the EAL options, which will call
+  rte_vdev_init() internally
+
+Example:
+
+.. code-block:: console
+
+    ./your_eventdev_application --vdev="event_opdl0"
+
+
+Single Port Queue
+~~~~~~~~~~~~~~~~~
+
+It is possible to create a Single Port Queue ``RTE_EVENT_QUEUE_CFG_SINGLE_LINK``.
+Packets dequeued from this queue do not need to be re-enqueued (as is the
+case with an ordered queue). The purpose of this queue is to allow for
+asynchronous handling of packets in the middle of a pipeline. Ordered
+queues in the middle of a pipeline cannot delete packets.
+
+
+Queue Dependencies
+~~~~~~~~~~~~~~~~~~
+
+As stated the order in which packets travel through queues is static in
+nature. They go through the queues in the order the queues are setup at
+initialisation ``rte_event_queue_setup()``. For example if an application
+sets up 3 queues, Q0, Q1, Q2 and has 3 assoicated ports P0, P1, P2 and
+P3 then packets must be
+
+ * Enqueued onto Q0 (typically through P0), then
+
+ * Dequeued from Q0 (typically through P1), then
+
+ * Enqueued onto Q1 (also through P1), then
+
+ * Dequeued from Q2 (typically through P2),  then
+
+ * Enqueued onto Q3 (also through P2), then
+
+ * Dequeued from Q3 (typically through P3) and then transmitted on the relevant \
+   eth port
+
+
+Limitations
+-----------
+
+The opdl implementation has a number of limitations. These limitations are
+due to the static nature of the underlying queues. It is because of this
+that the implementation can achieve such high throughput and low latency
+
+The following list is a comprehensive outline of the what is supported and
+the limitations / restrictions imposed by the opdl pmd
+
+ - The order in which packets moved between queues is static and fixed \
+   (dynamic scheduling is not supported).
+
+ - NEW, RELEASE are not explicitly supported. RX (first enqueue) implicitly \
+   adds NEW event types, and TX (last dequeue) implicitly does RELEASE event types.
+
+ - All packets follow the same path through device queues.
+
+ - Flows within queues are NOT supported.
+
+ - Event priority is NOT supported.
+
+ - Once the device is stopped all inflight events are lost. Applications should \
+   clear all inflight events before stopping it.
+
+ - Each port can only be associated with one queue.
+
+ - Each queue can have multiple ports associated with it.
+
+ - Each worker core has to dequeue the maximum burst size for that port.
+
+ - For performance, the rte_event flow_id should not be updated once packet\
+   is enqueued on RX.
+
+
+
+Validation & Statistics
+~~~~~~~~~~~~~~~~~~~~~~~
+
+Validation can be turned on through a command line parameter
+
+.. code-block:: console
+
+    --vdev="event_opdl0,do_validation=1"
+
+If validation is turned on every packet (as opposed to just the first in
+each burst), is validated to have come from the right queue. Statistics
+are also produced in this mode. The statistics are available through the
+eventdev xstats API. Statistics are per port as follows:
+
+ - claim_pkts_requested
+ - claim_pkts_granted
+ - claim_non_empty
+ - claim_empty
+ - total_cycles
diff --git a/doc/guides/sample_app_ug/eventdev_pipeline_opdl_pmd.rst b/doc/guides/sample_app_ug/eventdev_pipeline_opdl_pmd.rst
new file mode 100644
index 0000000..3465a8e
--- /dev/null
+++ b/doc/guides/sample_app_ug/eventdev_pipeline_opdl_pmd.rst
@@ -0,0 +1,128 @@
+
+..  BSD LICENSE
+    Copyright(c) 2017 Intel Corporation. All rights reserved.
+    All rights reserved.
+
+    Redistribution and use in source and binary forms, with or without
+    modification, are permitted provided that the following conditions
+    are met:
+
+    * Redistributions of source code must retain the above copyright
+    notice, this list of conditions and the following disclaimer.
+    * Redistributions in binary form must reproduce the above copyright
+    notice, this list of conditions and the following disclaimer in
+    the documentation and/or other materials provided with the
+    distribution.
+    * Neither the name of Intel Corporation nor the names of its
+    contributors may be used to endorse or promote products derived
+    from this software without specific prior written permission.
+
+    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+    A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+    OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+    SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+    LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+    DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+    THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+Eventdev Pipeline OPDL PMD Sample Application
+=============================================
+
+The eventdev pipeline sample application is a sample app that demonstrates
+the usage of the eventdev API using the OPDL PMD. It shows how an
+application can configure a pipeline and assign a set of worker cores to
+perform the processing required.
+
+The application has a range of command line arguments allowing it to be
+configured for various numbers worker cores, stages,queue depths and cycles per
+stage of work. This is useful for performance testing as well as quickly testing
+a particular pipeline configuration.
+
+
+Compiling the Application
+-------------------------
+
+To compile the sample application see :doc:`compiling`.
+
+The application is located in the ``examples`` sub-directory.
+
+Sample Application
+~~~~~~~~~~~~~~~~~~
+
+A sample application has been produced to help illustrate how to use the
+opdl implementation of the eventdev API. It is available from the following
+directory
+
+.. code-block:: console
+
+         $(DPDK_ROOT)/examples/eventdev_pipeline_opdl_pmd/main.c
+
+It implements the following pipeline::
+
+                            +------+     +------+
+                            |worker|     |worker|
+                      +-----+(null)+-----+(l2fd)+-----+
+                      |     |   1  |     |   5  |     |
+                      |     +------+     +------+     |
+                      |                               |
+                      |     +------+     +------+     |
+                      |     |worker|     |worker|     |
+                      +-----+(null)+-----+(l2fd)+-----+
+        +------+      |     |   2  |     |   6  |     |     +------+
+        |      |      |     +------+     +------+     |     |      |
+        | RX(0)+------+                               +-----+ TX(9)|
+        |      |      |     +------+     +------+     |     |      |
+        +------+      |     |worker|     |worker|     |     +------+
+                      +-----+(null)+-----+(l2fd)+-----+
+                      |     |   3  |     |   7  |     |
+                      |     +------+     +------+     |
+                      |                               |
+                      |     +------+     +------+     |
+                      |     |worker|     |worker|     |
+                      +-----+(null)+-----+(l2fd)+-----+
+                            |   4  |     |   8  |
+                            +------+     +------+
+
+
+
+- The application creates 10 ports (0-9), and 3 queues (0-2).
+
+- Packets are received on RX thread and enqueued onto P0-Q0.
+
+- Packets are then evenly distributed across ports 1-4
+
+  + dequeue P1,P2,P3,P4 Q0 (null work)
+  + enqueue P1,P2,P3,P4 Q1
+
+- Packets are then eventl distributed across ports 5-8
+
+  + dequeue P5,P6,P7,P8 Q1 (l2fd work)
+  + enqueue P5,P6,P7,P8 Q2
+
+- Finally packets are dequeued from P9-Q2 and transmitted on relevant ethernet \
+  port by the TX thread.
+
+
+Validation & Statistics
+~~~~~~~~~~~~~~~~~~~~~~~
+
+Validation can be turned on through a command line parameter
+
+.. code-block:: console
+
+    --vdev="event_opdl0,do_validation=1"
+
+If validation is turned on every packet (as opposed to just the first in
+each burst), is validated to have come from the right queue. Statistics
+are also produced in this mode. The statistics are available through the
+eventdev xstats API. Statistics are per port as follows:
+
+ - claim_pkts_requested
+ - claim_pkts_granted
+ - claim_non_empty
+ - claim_empty
+ - total_cycles
diff --git a/doc/guides/sample_app_ug/index.rst b/doc/guides/sample_app_ug/index.rst
index db68ef7..376dbb2 100644
--- a/doc/guides/sample_app_ug/index.rst
+++ b/doc/guides/sample_app_ug/index.rst
@@ -75,6 +75,7 @@ Sample Applications User Guides
     ip_pipeline
     test_pipeline
     eventdev_pipeline_sw_pmd
+    eventdev_pipeline_opdl_pmd
     dist_app
     vm_power_management
     tep_termination
-- 
2.7.5

--------------------------------------------------------------
Intel Research and Development Ireland Limited
Registered in Ireland
Registered Office: Collinstown Industrial Park, Leixlip, County Kildare
Registered Number: 308263


This e-mail and any attachments may contain confidential material for the sole
use of the intended recipient(s). Any review or distribution by others is
strictly prohibited. If you are not the intended recipient, please contact the
sender and delete all copies.

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [dpdk-dev] [RFC PATCH 0/7] RFC:EventDev OPDL PMD
  2017-11-24 11:23 [dpdk-dev] [RFC PATCH 0/7] RFC:EventDev OPDL PMD liang.j.ma
                   ` (6 preceding siblings ...)
  2017-11-24 11:23 ` [dpdk-dev] [PATCH 7/7] doc: add eventdev opdl pmd docuement and example usage document liang.j.ma
@ 2017-11-24 20:55 ` Jerin Jacob
  2017-11-29 12:19   ` Ma, Liang
  7 siblings, 1 reply; 13+ messages in thread
From: Jerin Jacob @ 2017-11-24 20:55 UTC (permalink / raw)
  To: liang.j.ma
  Cc: dev, harry.van.haaren, bruce.richardson, deepak.k.jain, john.geary

-----Original Message-----
> Date: Fri, 24 Nov 2017 11:23:45 +0000
> From: liang.j.ma@intel.com
> To: jerin.jacob@caviumnetworks.com
> CC: dev@dpdk.org, harry.van.haaren@intel.com, bruce.richardson@intel.com,
>  deepak.k.jain@intel.com, john.geary@intel.com
> Subject: [RFC PATCH 0/7] RFC:EventDev OPDL PMD
> X-Mailer: git-send-email 2.7.5
> 
> From: Liang Ma <liang.j.ma@intel.com>


Thanks Liang Ma for the RFC.

> 
> The OPDL (Ordered Packet Distribution Library) eventdev is a specific
> implementation of the eventdev API. It is particularly suited to packet
> processing workloads that have high throughput and low latency 
> requirements. All packets follow the same path through the device.
> The order which packets  follow is determinted by the order in which
> queues are set up. Packets are left on the ring until they are transmitted.
> As a result packets do not go out of order.
> 
> Features:
> 
> The OPDL eventdev implements a subset of features of the eventdev API;
> 
> Queues
>  * Atomic
>  * Ordered (Parallel is supported as parallel is a subset of Ordered)
>  * Single-Link
> 
> Ports
>  * Load balanced (for Atomic, Ordered, Parallel queues)
>  * Single Link (for single-link queues)
> 
> Single Port Queue
> 
> It is possible to create a Single Port Queue 
> RTE_EVENT_QUEUE_CFG_SINGLE_LINK. Packets dequeued from this queue do
> not need to be re-enqueued (as is the case with an ordered queue). The 
> purpose of this queue is to allow for asynchronous handling of packets in 
> the middle of a pipeline. Ordered queues in the middle of a pipeline 
> cannot delete packets.
> 
> 
> Queue Dependencies
> 
> As stated the order in which packets travel through queues is static in
> nature. They go through the queues in the order the queues are setup at
> initialisation rte_event_queue_setup(). For example if an application
> sets up 3 queues, Q0, Q1, Q2 and has 3 assoicated ports P0, P1, P2 and 
> P3 then packets must be
> 
>  * Enqueued onto Q0 (typically through P0), then
> 
>  * Dequeued from Q0 (typically through P1), then
> 
>  * Enqueued onto Q1 (also through P1), then
> 
>  * Dequeued from Q2 (typically through P2),  then
> 
>  * Enqueued onto Q3 (also through P2), then
> 
>  * Dequeued from Q3 (typically through P3) and then transmitted on the 
>    relevant eth port
> 
> 
> Limitations
> 
> The opdl implementation has a number of limitations. These limitations are
> due to the static nature of the underlying queues. It is because of this
> that the implementation can achieve such high throughput and low latency
> 
> The following list is a comprehensive outline of the what is supported and
> the limitations / restrictions imposed by the opdl pmd
> 
>  - The order in which packets moved between queues is static and fixed 
>    (dynamic scheduling is not supported).
> 
>  - NEW, RELEASE op type are not explicitly supported. RX (first enqueue) 
>    implicitly adds NEW event types, and TX (last dequeue) implicitly does
>    RELEASE event types.
> 
>  - All packets follow the same path through device queues.
> 
>  - Flows within queues are NOT supported.
> 
>  - Event priority is NOT supported.
> 
>  - Once the device is stopped all inflight events are lost. Applications should 
>    clear all inflight events before stopping it.
> 
>  - Each port can only be associated with one queue.
> 
>  - Each queue can have multiple ports associated with it.
> 
>  - Each worker core has to dequeue the maximum burst size for that port.
> 
>  - For performance, the rte_event flow_id should not be updated once 
>     packet is enqueued on RX.

Some top-level comments,

# How does application knows this PMD has above limitations?

I think, We need to add more capability RTE_EVENT_DEV_CAP_*
to depict these constraints. On the same note, I believe this
PMD is "radically" different than other SW/HW PMD then anyway
we cannot write the portable application using this PMD. So there
is no point in abstracting it as eventdev PMD. Could you please
work on the new capabilities are required to enable this PMD.
If it needs more capability flags to express this PMD capability,
we might have a different library for this as it defects the
purpose of portable eventdev applications.

# We should not add yet another "PMD" specific example application
in example area like "examples/eventdev_pipeline_opdl_pmd". We are
working on making examples/eventdev/pipeline_sw_pmd to make work
on both HW and SW.

# We should not add new PMD specific test cases in
test/test/test_eventdev_opdl.c area.I think existing PMD specific
test case can be moved to respective driver area, and it can do 
the self-test by passing some command line arguments to vdev.

# Do you have relative performance number with exiting SW PMD?
Meaning how much it improves for any specific use case WRT exiting
SW PMD. That should a metric to define the need for new PMD.

# There could be another SW driver from another vendor like ARM.
So, I think, it is important to define the need for another SW
PMD and how much limitation/new capabilities it needs to define to
fit into the eventdev framework,

Jerin

> Reference
> General concept of event driven programming model
> [http://dpdk.org/doc/guides/eventdevs/index.html]
> 
> Original Ordered Pipeline Design slides 
> [https://dpdksummit.com/Archive/pdf/2017Asia/DPDK-China2017-Ma-OPDL.pdf]
> 
> 
> Liang Ma (7):
>   event/opdl:  add the opdl ring infrastructure library
>   event/opdl: add the opdl pmd header and init helper function
>   event/opdl: add the opdl pmd main body and xstats helper function
>   event/opdl: update the build system to enable compilation of pmd
>   test/eventdev: opdl eventdev pmd unit test func and makefiles
>   examples/eventdev_pipeline_opdl: adding example
>   doc: add eventdev opdl pmd docuement and example usage document
> 
>  config/common_base                                 |    6 +
>  doc/guides/eventdevs/index.rst                     |    1 +
>  doc/guides/eventdevs/opdl.rst                      |  162 +++
>  .../sample_app_ug/eventdev_pipeline_opdl_pmd.rst   |  128 +++
>  doc/guides/sample_app_ug/index.rst                 |    1 +
>  drivers/event/Makefile                             |    1 +
>  drivers/event/opdl/Makefile                        |   65 ++
>  drivers/event/opdl/opdl_evdev.c                    |  714 ++++++++++++
>  drivers/event/opdl/opdl_evdev.h                    |  353 ++++++
>  drivers/event/opdl/opdl_evdev_init.c               |  945 ++++++++++++++++
>  drivers/event/opdl/opdl_evdev_xstats.c             |  205 ++++
>  drivers/event/opdl/opdl_ring.c                     | 1170 ++++++++++++++++++++
>  drivers/event/opdl/opdl_ring.h                     |  578 ++++++++++
>  drivers/event/opdl/rte_pmd_evdev_opdl_version.map  |    3 +
>  examples/eventdev_pipeline_opdl_pmd/Makefile       |   49 +
>  examples/eventdev_pipeline_opdl_pmd/main.c         |  766 +++++++++++++
>  mk/rte.app.mk                                      |    1 +
>  test/test/Makefile                                 |    1 +
>  test/test/test_eventdev_opdl.c                     | 1089 ++++++++++++++++++
>  19 files changed, 6238 insertions(+)
>  create mode 100644 doc/guides/eventdevs/opdl.rst
>  create mode 100644 doc/guides/sample_app_ug/eventdev_pipeline_opdl_pmd.rst
>  create mode 100644 drivers/event/opdl/Makefile
>  create mode 100644 drivers/event/opdl/opdl_evdev.c
>  create mode 100644 drivers/event/opdl/opdl_evdev.h
>  create mode 100644 drivers/event/opdl/opdl_evdev_init.c
>  create mode 100644 drivers/event/opdl/opdl_evdev_xstats.c
>  create mode 100644 drivers/event/opdl/opdl_ring.c
>  create mode 100644 drivers/event/opdl/opdl_ring.h
>  create mode 100644 drivers/event/opdl/rte_pmd_evdev_opdl_version.map
>  create mode 100644 examples/eventdev_pipeline_opdl_pmd/Makefile
>  create mode 100644 examples/eventdev_pipeline_opdl_pmd/main.c
>  create mode 100644 test/test/test_eventdev_opdl.c
> 
> -- 
> 2.7.5
> 
> --------------------------------------------------------------
> Intel Research and Development Ireland Limited
> Registered in Ireland
> Registered Office: Collinstown Industrial Park, Leixlip, County Kildare
> Registered Number: 308263
> 
> 
> This e-mail and any attachments may contain confidential material for the sole
> use of the intended recipient(s). Any review or distribution by others is
> strictly prohibited. If you are not the intended recipient, please contact the
> sender and delete all copies.

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [dpdk-dev] [RFC PATCH 0/7] RFC:EventDev OPDL PMD
  2017-11-24 20:55 ` [dpdk-dev] [RFC PATCH 0/7] RFC:EventDev OPDL PMD Jerin Jacob
@ 2017-11-29 12:19   ` Ma, Liang
  2017-11-29 12:56     ` Jerin Jacob
  0 siblings, 1 reply; 13+ messages in thread
From: Ma, Liang @ 2017-11-29 12:19 UTC (permalink / raw)
  To: Jerin Jacob
  Cc: dev, Van Haaren, Harry, Richardson, Bruce, Jain, Deepak K,
	Mccarthy, Peter

Hi Jerin,
   Many thanks for your comments. Please check my comment below. 

On 25 Nov 02:25, Jerin Jacob wrote:
> -----Original Message-----
> > Date: Fri, 24 Nov 2017 11:23:45 +0000
> > From: liang.j.ma@intel.com
> > To: jerin.jacob@caviumnetworks.com
> > CC: dev@dpdk.org, harry.van.haaren@intel.com, bruce.richardson@intel.com,
> >  deepak.k.jain@intel.com, john.geary@intel.com
> > Subject: [RFC PATCH 0/7] RFC:EventDev OPDL PMD
> > X-Mailer: git-send-email 2.7.5
> > 
> > From: Liang Ma <liang.j.ma@intel.com>
> 
> 
> Thanks Liang Ma for the RFC.
> 
> > 
> > The OPDL (Ordered Packet Distribution Library) eventdev is a specific
> > implementation of the eventdev API. It is particularly suited to packet
> > processing workloads that have high throughput and low latency 
> > requirements. All packets follow the same path through the device.
> > The order which packets  follow is determinted by the order in which
> > queues are set up. Packets are left on the ring until they are transmitted.
> > As a result packets do not go out of order.
> > 
> > Features:
> > 
> > The OPDL eventdev implements a subset of features of the eventdev API;
> > 
> > Queues
> >  * Atomic
> >  * Ordered (Parallel is supported as parallel is a subset of Ordered)
> >  * Single-Link
> > 
> > Ports
> >  * Load balanced (for Atomic, Ordered, Parallel queues)
> >  * Single Link (for single-link queues)
> > 
> > Single Port Queue
> > 
> > It is possible to create a Single Port Queue 
> > RTE_EVENT_QUEUE_CFG_SINGLE_LINK. Packets dequeued from this queue do
> > not need to be re-enqueued (as is the case with an ordered queue). The 
> > purpose of this queue is to allow for asynchronous handling of packets in 
> > the middle of a pipeline. Ordered queues in the middle of a pipeline 
> > cannot delete packets.
> > 
> > 
> > Queue Dependencies
> > 
> > As stated the order in which packets travel through queues is static in
> > nature. They go through the queues in the order the queues are setup at
> > initialisation rte_event_queue_setup(). For example if an application
> > sets up 3 queues, Q0, Q1, Q2 and has 3 assoicated ports P0, P1, P2 and 
> > P3 then packets must be
> > 
> >  * Enqueued onto Q0 (typically through P0), then
> > 
> >  * Dequeued from Q0 (typically through P1), then
> > 
> >  * Enqueued onto Q1 (also through P1), then
> > 
> >  * Dequeued from Q2 (typically through P2),  then
> > 
> >  * Enqueued onto Q3 (also through P2), then
> > 
> >  * Dequeued from Q3 (typically through P3) and then transmitted on the 
> >    relevant eth port
> > 
> > 
> > Limitations
> > 
> > The opdl implementation has a number of limitations. These limitations are
> > due to the static nature of the underlying queues. It is because of this
> > that the implementation can achieve such high throughput and low latency
> > 
> > The following list is a comprehensive outline of the what is supported and
> > the limitations / restrictions imposed by the opdl pmd
> > 
> >  - The order in which packets moved between queues is static and fixed 
> >    (dynamic scheduling is not supported).
> > 
> >  - NEW, RELEASE op type are not explicitly supported. RX (first enqueue) 
> >    implicitly adds NEW event types, and TX (last dequeue) implicitly does
> >    RELEASE event types.
> > 
> >  - All packets follow the same path through device queues.
> > 
> >  - Flows within queues are NOT supported.
> > 
> >  - Event priority is NOT supported.
> > 
> >  - Once the device is stopped all inflight events are lost. Applications should 
> >    clear all inflight events before stopping it.
> > 
> >  - Each port can only be associated with one queue.
> > 
> >  - Each queue can have multiple ports associated with it.
> > 
> >  - Each worker core has to dequeue the maximum burst size for that port.
> > 
> >  - For performance, the rte_event flow_id should not be updated once 
> >     packet is enqueued on RX.
> 
> Some top-level comments,
> 
> # How does application knows this PMD has above limitations?
> 
> I think, We need to add more capability RTE_EVENT_DEV_CAP_*
> to depict these constraints. On the same note, I believe this
> PMD is "radically" different than other SW/HW PMD then anyway
> we cannot write the portable application using this PMD. So there
> is no point in abstracting it as eventdev PMD. Could you please
> work on the new capabilities are required to enable this PMD.
> If it needs more capability flags to express this PMD capability,
> we might have a different library for this as it defects the
> purpose of portable eventdev applications.
>
Agree with improve capability information with add more details with 
RTE_EVENT_DEV_CAP_*. While the OPDL is designed around a different 
load-balancing architecture, that of load-balancing across pipeline 
stages where a consumer is only working on a single stage, this does not 
necessarily mean that it is completely incompatible with other eventdev 
implementations. Although, it is true that an application written to use 
one of the existing eventdevs probably won't work nicely with the OPDL
eventdev, the converse situation should work ok. That is, an application
written as a pipeline using the OPDL eventdev for load balancing should 
work without changes with the generic SW implementation, and there should 
be no reason why it should not also work with other HW implementations 
in DPDK too. 
OPDL PMD implement a subset functionality of eventdev API. I demonstrate 
OPDL on this year PRC DPDK summit,  got some early feedback from potential
users. Most of them would like to use that under existing API(aka eventdev) 
rather than another new API/lib. That let potential user easier to swap to 
exist SW/HW eventdev PMD.

> # We should not add yet another "PMD" specific example application
> in example area like "examples/eventdev_pipeline_opdl_pmd". We are
> working on making examples/eventdev/pipeline_sw_pmd to make work
> on both HW and SW.
> 
We would agree here that we don't need a proliferation of example applications.
However this is a different architecture (not a dynamic packet scheduler rather
a static pipeline work distributer), and as such perhaps we should have a 
sample app that demonstrates each contrasting architecture.

> # We should not add new PMD specific test cases in
> test/test/test_eventdev_opdl.c area.I think existing PMD specific
> test case can be moved to respective driver area, and it can do 
> the self-test by passing some command line arguments to vdev.
> 
We simply followed the existing test structure here. Would it be confusing to 
have another variant of example test code, is this done anywhere else? 
Also would there be a chance that DTS would miss running tests or not like 
having to run them using a different method. However we would defer to the consensus here.
Could you elaborate on your concerns with having another test file in the test area ?

> # Do you have relative performance number with exiting SW PMD?
> Meaning how much it improves for any specific use case WRT exiting
> SW PMD. That should a metric to define the need for new PMD.
> 
Yes, we definitely has the number. Given the limitation(Ref cover letter), OPDL 
can achieve 3X-5X times schedule rate(on Xeon 2699 v4 platform) compare with the 
standard SW PMD and no need of schedule core. This is the core value of OPDL PMD.
For certain user case, "static pipeline"  "strong order ",  OPDL is very useful 
and efficient and generic to processor arch.

> # There could be another SW driver from another vendor like ARM.
> So, I think, it is important to define the need for another SW
> PMD and how much limitation/new capabilities it needs to define to
> fit into the eventdev framework,
>
Put a summary here, OPDL is designed for certain user case, performance is increase 
dramatically. Also OPDL can fallback to standard SW PMD seamless.
That definitely fit into the eventdev API

Liang

> Jerin
> 
> > Reference
> > General concept of event driven programming model
> > [http://dpdk.org/doc/guides/eventdevs/index.html]
> > 
> > Original Ordered Pipeline Design slides 
> > [https://dpdksummit.com/Archive/pdf/2017Asia/DPDK-China2017-Ma-OPDL.pdf]
> > 
> > 
> > Liang Ma (7):
> >   event/opdl:  add the opdl ring infrastructure library
> >   event/opdl: add the opdl pmd header and init helper function
> >   event/opdl: add the opdl pmd main body and xstats helper function
> >   event/opdl: update the build system to enable compilation of pmd
> >   test/eventdev: opdl eventdev pmd unit test func and makefiles
> >   examples/eventdev_pipeline_opdl: adding example
> >   doc: add eventdev opdl pmd docuement and example usage document
> > 
> >  config/common_base                                 |    6 +
> >  doc/guides/eventdevs/index.rst                     |    1 +
> >  doc/guides/eventdevs/opdl.rst                      |  162 +++
> >  .../sample_app_ug/eventdev_pipeline_opdl_pmd.rst   |  128 +++
> >  doc/guides/sample_app_ug/index.rst                 |    1 +
> >  drivers/event/Makefile                             |    1 +
> >  drivers/event/opdl/Makefile                        |   65 ++
> >  drivers/event/opdl/opdl_evdev.c                    |  714 ++++++++++++
> >  drivers/event/opdl/opdl_evdev.h                    |  353 ++++++
> >  drivers/event/opdl/opdl_evdev_init.c               |  945 ++++++++++++++++
> >  drivers/event/opdl/opdl_evdev_xstats.c             |  205 ++++
> >  drivers/event/opdl/opdl_ring.c                     | 1170 ++++++++++++++++++++
> >  drivers/event/opdl/opdl_ring.h                     |  578 ++++++++++
> >  drivers/event/opdl/rte_pmd_evdev_opdl_version.map  |    3 +
> >  examples/eventdev_pipeline_opdl_pmd/Makefile       |   49 +
> >  examples/eventdev_pipeline_opdl_pmd/main.c         |  766 +++++++++++++
> >  mk/rte.app.mk                                      |    1 +
> >  test/test/Makefile                                 |    1 +
> >  test/test/test_eventdev_opdl.c                     | 1089 ++++++++++++++++++
> >  19 files changed, 6238 insertions(+)
> >  create mode 100644 doc/guides/eventdevs/opdl.rst
> >  create mode 100644 doc/guides/sample_app_ug/eventdev_pipeline_opdl_pmd.rst
> >  create mode 100644 drivers/event/opdl/Makefile
> >  create mode 100644 drivers/event/opdl/opdl_evdev.c
> >  create mode 100644 drivers/event/opdl/opdl_evdev.h
> >  create mode 100644 drivers/event/opdl/opdl_evdev_init.c
> >  create mode 100644 drivers/event/opdl/opdl_evdev_xstats.c
> >  create mode 100644 drivers/event/opdl/opdl_ring.c
> >  create mode 100644 drivers/event/opdl/opdl_ring.h
> >  create mode 100644 drivers/event/opdl/rte_pmd_evdev_opdl_version.map
> >  create mode 100644 examples/eventdev_pipeline_opdl_pmd/Makefile
> >  create mode 100644 examples/eventdev_pipeline_opdl_pmd/main.c
> >  create mode 100644 test/test/test_eventdev_opdl.c
> > 
> > -- 
> > 2.7.5
> > 
> > --------------------------------------------------------------
> > Intel Research and Development Ireland Limited
> > Registered in Ireland
> > Registered Office: Collinstown Industrial Park, Leixlip, County Kildare
> > Registered Number: 308263
> > 
> > 
> > This e-mail and any attachments may contain confidential material for the sole
> > use of the intended recipient(s). Any review or distribution by others is
> > strictly prohibited. If you are not the intended recipient, please contact the
> > sender and delete all copies.

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [dpdk-dev] [RFC PATCH 0/7] RFC:EventDev OPDL PMD
  2017-11-29 12:19   ` Ma, Liang
@ 2017-11-29 12:56     ` Jerin Jacob
  2017-11-29 17:15       ` Ma, Liang
  0 siblings, 1 reply; 13+ messages in thread
From: Jerin Jacob @ 2017-11-29 12:56 UTC (permalink / raw)
  To: Ma, Liang
  Cc: dev, Van Haaren, Harry, Richardson, Bruce, Jain, Deepak K,
	Mccarthy, Peter

-----Original Message-----
> Date: Wed, 29 Nov 2017 12:19:54 +0000
> From: "Ma, Liang" <liang.j.ma@intel.com>
> To: Jerin Jacob <jerin.jacob@caviumnetworks.com>
> CC: dev@dpdk.org, "Van Haaren, Harry" <harry.van.haaren@intel.com>,
>  "Richardson, Bruce" <bruce.richardson@intel.com>, "Jain, Deepak K"
>  <deepak.k.jain@intel.com>, "Mccarthy, Peter" <peter.mccarthy@intel.com>
> Subject: Re: [RFC PATCH 0/7] RFC:EventDev OPDL PMD
> User-Agent: Mutt/1.5.20 (2009-06-14)
> 
> Hi Jerin,
>    Many thanks for your comments. Please check my comment below. 
> 
> On 25 Nov 02:25, Jerin Jacob wrote:
> > -----Original Message-----
> > > Date: Fri, 24 Nov 2017 11:23:45 +0000
> > > From: liang.j.ma@intel.com
> > > To: jerin.jacob@caviumnetworks.com
> > > CC: dev@dpdk.org, harry.van.haaren@intel.com, bruce.richardson@intel.com,
> > >  deepak.k.jain@intel.com, john.geary@intel.com
> > > Subject: [RFC PATCH 0/7] RFC:EventDev OPDL PMD
> > > X-Mailer: git-send-email 2.7.5
> > > 
> > > From: Liang Ma <liang.j.ma@intel.com>
> > 
> > 
> > Thanks Liang Ma for the RFC.
> > 
> > > 
> > > The OPDL (Ordered Packet Distribution Library) eventdev is a specific
> > > implementation of the eventdev API. It is particularly suited to packet
> > > processing workloads that have high throughput and low latency 
> > > requirements. All packets follow the same path through the device.
> > > The order which packets  follow is determinted by the order in which
> > > queues are set up. Packets are left on the ring until they are transmitted.
> > > As a result packets do not go out of order.
> > > 
> > > Features:
> > > 
> > > The OPDL eventdev implements a subset of features of the eventdev API;
> > > 
> > > Queues
> > >  * Atomic
> > >  * Ordered (Parallel is supported as parallel is a subset of Ordered)
> > >  * Single-Link
> > > 
> > > Ports
> > >  * Load balanced (for Atomic, Ordered, Parallel queues)
> > >  * Single Link (for single-link queues)
> > > 
> > > Single Port Queue
> > > 
> > > It is possible to create a Single Port Queue 
> > > RTE_EVENT_QUEUE_CFG_SINGLE_LINK. Packets dequeued from this queue do
> > > not need to be re-enqueued (as is the case with an ordered queue). The 
> > > purpose of this queue is to allow for asynchronous handling of packets in 
> > > the middle of a pipeline. Ordered queues in the middle of a pipeline 
> > > cannot delete packets.
> > > 
> > > 
> > > Queue Dependencies
> > > 
> > > As stated the order in which packets travel through queues is static in
> > > nature. They go through the queues in the order the queues are setup at
> > > initialisation rte_event_queue_setup(). For example if an application
> > > sets up 3 queues, Q0, Q1, Q2 and has 3 assoicated ports P0, P1, P2 and 
> > > P3 then packets must be
> > > 
> > >  * Enqueued onto Q0 (typically through P0), then
> > > 
> > >  * Dequeued from Q0 (typically through P1), then
> > > 
> > >  * Enqueued onto Q1 (also through P1), then
> > > 
> > >  * Dequeued from Q2 (typically through P2),  then
> > > 
> > >  * Enqueued onto Q3 (also through P2), then
> > > 
> > >  * Dequeued from Q3 (typically through P3) and then transmitted on the 
> > >    relevant eth port
> > > 
> > > 
> > > Limitations
> > > 
> > > The opdl implementation has a number of limitations. These limitations are
> > > due to the static nature of the underlying queues. It is because of this
> > > that the implementation can achieve such high throughput and low latency
> > > 
> > > The following list is a comprehensive outline of the what is supported and
> > > the limitations / restrictions imposed by the opdl pmd
> > > 
> > >  - The order in which packets moved between queues is static and fixed 
> > >    (dynamic scheduling is not supported).
> > > 
> > >  - NEW, RELEASE op type are not explicitly supported. RX (first enqueue) 
> > >    implicitly adds NEW event types, and TX (last dequeue) implicitly does
> > >    RELEASE event types.
> > > 
> > >  - All packets follow the same path through device queues.
> > > 
> > >  - Flows within queues are NOT supported.
> > > 
> > >  - Event priority is NOT supported.
> > > 
> > >  - Once the device is stopped all inflight events are lost. Applications should 
> > >    clear all inflight events before stopping it.
> > > 
> > >  - Each port can only be associated with one queue.
> > > 
> > >  - Each queue can have multiple ports associated with it.
> > > 
> > >  - Each worker core has to dequeue the maximum burst size for that port.
> > > 
> > >  - For performance, the rte_event flow_id should not be updated once 
> > >     packet is enqueued on RX.
> > 
> > Some top-level comments,
> > 
> > # How does application knows this PMD has above limitations?
> > 
> > I think, We need to add more capability RTE_EVENT_DEV_CAP_*
> > to depict these constraints. On the same note, I believe this
> > PMD is "radically" different than other SW/HW PMD then anyway
> > we cannot write the portable application using this PMD. So there
> > is no point in abstracting it as eventdev PMD. Could you please
> > work on the new capabilities are required to enable this PMD.
> > If it needs more capability flags to express this PMD capability,
> > we might have a different library for this as it defects the
> > purpose of portable eventdev applications.
> >
> Agree with improve capability information with add more details with 
> RTE_EVENT_DEV_CAP_*. While the OPDL is designed around a different 

Please submit patches required for new caps required for this PMD to
depict the constraints. That is the only way application can know 
the constraints for the given PMD.

> load-balancing architecture, that of load-balancing across pipeline 
> stages where a consumer is only working on a single stage, this does not 
> necessarily mean that it is completely incompatible with other eventdev 
> implementations. Although, it is true that an application written to use 
> one of the existing eventdevs probably won't work nicely with the OPDL
> eventdev, the converse situation should work ok. That is, an application
> written as a pipeline using the OPDL eventdev for load balancing should 
> work without changes with the generic SW implementation, and there should 
> be no reason why it should not also work with other HW implementations 
> in DPDK too. 
> OPDL PMD implement a subset functionality of eventdev API. I demonstrate 
> OPDL on this year PRC DPDK summit,  got some early feedback from potential
> users. Most of them would like to use that under existing API(aka eventdev) 
> rather than another new API/lib. That let potential user easier to swap to 
> exist SW/HW eventdev PMD.

Perfect. Lets have one application then so it will it make easy to swap
SW/HW eventdev PMD.

> 
> > # We should not add yet another "PMD" specific example application
> > in example area like "examples/eventdev_pipeline_opdl_pmd". We are
> > working on making examples/eventdev/pipeline_sw_pmd to make work
> > on both HW and SW.
> > 
> We would agree here that we don't need a proliferation of example applications.
> However this is a different architecture (not a dynamic packet scheduler rather
> a static pipeline work distributer), and as such perhaps we should have a 
> sample app that demonstrates each contrasting architecture.

I agree. We need sample application. Why not change the exiting
examples/eventdev/pipeline_sw_pmd to make it work as we are addressing the
pipeling here. Let write the application based on THE USE CASE not
specific to PMD. PMD specific application won't scale.

> 
> > # We should not add new PMD specific test cases in
> > test/test/test_eventdev_opdl.c area.I think existing PMD specific
> > test case can be moved to respective driver area, and it can do 
> > the self-test by passing some command line arguments to vdev.
> > 
> We simply followed the existing test structure here. Would it be confusing to 
> have another variant of example test code, is this done anywhere else? 
> Also would there be a chance that DTS would miss running tests or not like 
> having to run them using a different method. However we would defer to the consensus here.
> Could you elaborate on your concerns with having another test file in the test area ?

PMD specific test cases wont scale. It defect the purpose of the common
framework. Cryptodev fell into that trap earlier then they fixed it.
For DTS case, I think, still it can verified through vdev command line
arguments to the new PMD. What do you think?

> 
> > # Do you have relative performance number with exiting SW PMD?
> > Meaning how much it improves for any specific use case WRT exiting
> > SW PMD. That should a metric to define the need for new PMD.
> > 
> Yes, we definitely has the number. Given the limitation(Ref cover letter), OPDL 
> can achieve 3X-5X times schedule rate(on Xeon 2699 v4 platform) compare with the 
> standard SW PMD and no need of schedule core. This is the core value of OPDL PMD.
> For certain user case, "static pipeline"  "strong order ",  OPDL is very useful 
> and efficient and generic to processor arch.

Sounds good.

> 
> > # There could be another SW driver from another vendor like ARM.
> > So, I think, it is important to define the need for another SW
> > PMD and how much limitation/new capabilities it needs to define to
> > fit into the eventdev framework,
> >
> Put a summary here, OPDL is designed for certain user case, performance is increase 
> dramatically. Also OPDL can fallback to standard SW PMD seamless.
> That definitely fit into the eventdev API
> 

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [dpdk-dev] [RFC PATCH 0/7] RFC:EventDev OPDL PMD
  2017-11-29 12:56     ` Jerin Jacob
@ 2017-11-29 17:15       ` Ma, Liang
  2017-11-30 17:41         ` Jerin Jacob
  0 siblings, 1 reply; 13+ messages in thread
From: Ma, Liang @ 2017-11-29 17:15 UTC (permalink / raw)
  To: Jerin Jacob
  Cc: dev, Van Haaren, Harry, Richardson, Bruce, Jain, Deepak K,
	Mccarthy, Peter

On 29 Nov 04:56, Jerin Jacob wrote:
> -----Original Message-----
> > Date: Wed, 29 Nov 2017 12:19:54 +0000
> > From: "Ma, Liang" <liang.j.ma@intel.com>
> > To: Jerin Jacob <jerin.jacob@caviumnetworks.com>
> > CC: dev@dpdk.org, "Van Haaren, Harry" <harry.van.haaren@intel.com>,
> >  "Richardson, Bruce" <bruce.richardson@intel.com>, "Jain, Deepak K"
> >  <deepak.k.jain@intel.com>, "Mccarthy, Peter" <peter.mccarthy@intel.com>
> > Subject: Re: [RFC PATCH 0/7] RFC:EventDev OPDL PMD
> > User-Agent: Mutt/1.5.20 (2009-06-14)
> > 
> > Hi Jerin,
> >    Many thanks for your comments. Please check my comment below. 
> > 
> > On 25 Nov 02:25, Jerin Jacob wrote:
> > > -----Original Message-----
> > > > Date: Fri, 24 Nov 2017 11:23:45 +0000
> > > > From: liang.j.ma@intel.com
> > > > To: jerin.jacob@caviumnetworks.com
> > > > CC: dev@dpdk.org, harry.van.haaren@intel.com, bruce.richardson@intel.com,
> > > >  deepak.k.jain@intel.com, john.geary@intel.com
> > > > Subject: [RFC PATCH 0/7] RFC:EventDev OPDL PMD
> > > > X-Mailer: git-send-email 2.7.5
> > > > 
> > > > From: Liang Ma <liang.j.ma@intel.com>
> > > 
> > > 
> > > Thanks Liang Ma for the RFC.
> > > 
> > > > 
> > > > The OPDL (Ordered Packet Distribution Library) eventdev is a specific
> > > > implementation of the eventdev API. It is particularly suited to packet
> > > > processing workloads that have high throughput and low latency 
> > > > requirements. All packets follow the same path through the device.
> > > > The order which packets  follow is determinted by the order in which
> > > > queues are set up. Packets are left on the ring until they are transmitted.
> > > > As a result packets do not go out of order.
> > > > 
> > > > Features:
> > > > 
> > > > The OPDL eventdev implements a subset of features of the eventdev API;
> > > > 
> > > > Queues
> > > >  * Atomic
> > > >  * Ordered (Parallel is supported as parallel is a subset of Ordered)
> > > >  * Single-Link
> > > > 
> > > > Ports
> > > >  * Load balanced (for Atomic, Ordered, Parallel queues)
> > > >  * Single Link (for single-link queues)
> > > > 
> > > > Single Port Queue
> > > > 
> > > > It is possible to create a Single Port Queue 
> > > > RTE_EVENT_QUEUE_CFG_SINGLE_LINK. Packets dequeued from this queue do
> > > > not need to be re-enqueued (as is the case with an ordered queue). The 
> > > > purpose of this queue is to allow for asynchronous handling of packets in 
> > > > the middle of a pipeline. Ordered queues in the middle of a pipeline 
> > > > cannot delete packets.
> > > > 
> > > > 
> > > > Queue Dependencies
> > > > 
> > > > As stated the order in which packets travel through queues is static in
> > > > nature. They go through the queues in the order the queues are setup at
> > > > initialisation rte_event_queue_setup(). For example if an application
> > > > sets up 3 queues, Q0, Q1, Q2 and has 3 assoicated ports P0, P1, P2 and 
> > > > P3 then packets must be
> > > > 
> > > >  * Enqueued onto Q0 (typically through P0), then
> > > > 
> > > >  * Dequeued from Q0 (typically through P1), then
> > > > 
> > > >  * Enqueued onto Q1 (also through P1), then
> > > > 
> > > >  * Dequeued from Q2 (typically through P2),  then
> > > > 
> > > >  * Enqueued onto Q3 (also through P2), then
> > > > 
> > > >  * Dequeued from Q3 (typically through P3) and then transmitted on the 
> > > >    relevant eth port
> > > > 
> > > > 
> > > > Limitations
> > > > 
> > > > The opdl implementation has a number of limitations. These limitations are
> > > > due to the static nature of the underlying queues. It is because of this
> > > > that the implementation can achieve such high throughput and low latency
> > > > 
> > > > The following list is a comprehensive outline of the what is supported and
> > > > the limitations / restrictions imposed by the opdl pmd
> > > > 
> > > >  - The order in which packets moved between queues is static and fixed 
> > > >    (dynamic scheduling is not supported).
> > > > 
> > > >  - NEW, RELEASE op type are not explicitly supported. RX (first enqueue) 
> > > >    implicitly adds NEW event types, and TX (last dequeue) implicitly does
> > > >    RELEASE event types.
> > > > 
> > > >  - All packets follow the same path through device queues.
> > > > 
> > > >  - Flows within queues are NOT supported.
> > > > 
> > > >  - Event priority is NOT supported.
> > > > 
> > > >  - Once the device is stopped all inflight events are lost. Applications should 
> > > >    clear all inflight events before stopping it.
> > > > 
> > > >  - Each port can only be associated with one queue.
> > > > 
> > > >  - Each queue can have multiple ports associated with it.
> > > > 
> > > >  - Each worker core has to dequeue the maximum burst size for that port.
> > > > 
> > > >  - For performance, the rte_event flow_id should not be updated once 
> > > >     packet is enqueued on RX.
> > > 
> > > Some top-level comments,
> > > 
> > > # How does application knows this PMD has above limitations?
> > > 
> > > I think, We need to add more capability RTE_EVENT_DEV_CAP_*
> > > to depict these constraints. On the same note, I believe this
> > > PMD is "radically" different than other SW/HW PMD then anyway
> > > we cannot write the portable application using this PMD. So there
> > > is no point in abstracting it as eventdev PMD. Could you please
> > > work on the new capabilities are required to enable this PMD.
> > > If it needs more capability flags to express this PMD capability,
> > > we might have a different library for this as it defects the
> > > purpose of portable eventdev applications.
> > >
> > Agree with improve capability information with add more details with 
> > RTE_EVENT_DEV_CAP_*. While the OPDL is designed around a different 
> 
> Please submit patches required for new caps required for this PMD to
> depict the constraints. That is the only way application can know 
> the constraints for the given PMD.
> 
I will work on capability issue and submit V2 patches when that's ready.
> > load-balancing architecture, that of load-balancing across pipeline 
> > stages where a consumer is only working on a single stage, this does not 
> > necessarily mean that it is completely incompatible with other eventdev 
> > implementations. Although, it is true that an application written to use 
> > one of the existing eventdevs probably won't work nicely with the OPDL
> > eventdev, the converse situation should work ok. That is, an application
> > written as a pipeline using the OPDL eventdev for load balancing should 
> > work without changes with the generic SW implementation, and there should 
> > be no reason why it should not also work with other HW implementations 
> > in DPDK too. 
> > OPDL PMD implement a subset functionality of eventdev API. I demonstrate 
> > OPDL on this year PRC DPDK summit,  got some early feedback from potential
> > users. Most of them would like to use that under existing API(aka eventdev) 
> > rather than another new API/lib. That let potential user easier to swap to 
> > exist SW/HW eventdev PMD.
> 
> Perfect. Lets have one application then so it will it make easy to swap
> SW/HW eventdev PMD.
> 
> > 
> > > # We should not add yet another "PMD" specific example application
> > > in example area like "examples/eventdev_pipeline_opdl_pmd". We are
> > > working on making examples/eventdev/pipeline_sw_pmd to make work
> > > on both HW and SW.
> > > 
> > We would agree here that we don't need a proliferation of example applications.
> > However this is a different architecture (not a dynamic packet scheduler rather
> > a static pipeline work distributer), and as such perhaps we should have a 
> > sample app that demonstrates each contrasting architecture.
> 
> I agree. We need sample application. Why not change the exiting
> examples/eventdev/pipeline_sw_pmd to make it work as we are addressing the
> pipeling here. Let write the application based on THE USE CASE not
> specific to PMD. PMD specific application won't scale.
> 
I perfer to pending upstream OPDL example code in this patches set. 
it's better to upstream/merge example code in another track.
> > 
> > > # We should not add new PMD specific test cases in
> > > test/test/test_eventdev_opdl.c area.I think existing PMD specific
> > > test case can be moved to respective driver area, and it can do 
> > > the self-test by passing some command line arguments to vdev.
> > > 
> > We simply followed the existing test structure here. Would it be confusing to 
> > have another variant of example test code, is this done anywhere else? 
> > Also would there be a chance that DTS would miss running tests or not like 
> > having to run them using a different method. However we would defer to the consensus here.
> > Could you elaborate on your concerns with having another test file in the test area ?
> 
> PMD specific test cases wont scale. It defect the purpose of the common
> framework. Cryptodev fell into that trap earlier then they fixed it.
> For DTS case, I think, still it can verified through vdev command line
> arguments to the new PMD. What do you think?
> 
Agree, I would like to intergrate the test code with PMD, but any API is avaiable 
for self test purpose ? I didn't find existing api support self test. any hints ?

> > 
> > > # Do you have relative performance number with exiting SW PMD?
> > > Meaning how much it improves for any specific use case WRT exiting
> > > SW PMD. That should a metric to define the need for new PMD.
> > > 
> > Yes, we definitely has the number. Given the limitation(Ref cover letter), OPDL 
> > can achieve 3X-5X times schedule rate(on Xeon 2699 v4 platform) compare with the 
> > standard SW PMD and no need of schedule core. This is the core value of OPDL PMD.
> > For certain user case, "static pipeline"  "strong order ",  OPDL is very useful 
> > and efficient and generic to processor arch.
> 
> Sounds good.
> 
> > 
> > > # There could be another SW driver from another vendor like ARM.
> > > So, I think, it is important to define the need for another SW
> > > PMD and how much limitation/new capabilities it needs to define to
> > > fit into the eventdev framework,
> > >
> > Put a summary here, OPDL is designed for certain user case, performance is increase 
> > dramatically. Also OPDL can fallback to standard SW PMD seamless.
> > That definitely fit into the eventdev API
> > 

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [dpdk-dev] [RFC PATCH 0/7] RFC:EventDev OPDL PMD
  2017-11-29 17:15       ` Ma, Liang
@ 2017-11-30 17:41         ` Jerin Jacob
  0 siblings, 0 replies; 13+ messages in thread
From: Jerin Jacob @ 2017-11-30 17:41 UTC (permalink / raw)
  To: Ma, Liang
  Cc: dev, Van Haaren, Harry, Richardson, Bruce, Jain, Deepak K,
	Mccarthy, Peter

-----Original Message-----
> Date: Wed, 29 Nov 2017 17:15:12 +0000
> From: "Ma, Liang" <liang.j.ma@intel.com>
> To: Jerin Jacob <jerin.jacob@caviumnetworks.com>
> CC: "dev@dpdk.org" <dev@dpdk.org>, "Van Haaren, Harry"
>  <harry.van.haaren@intel.com>, "Richardson, Bruce"
>  <bruce.richardson@intel.com>, "Jain, Deepak K" <deepak.k.jain@intel.com>,
>  "Mccarthy, Peter" <peter.mccarthy@intel.com>
> Subject: Re: [RFC PATCH 0/7] RFC:EventDev OPDL PMD
> User-Agent: Mutt/1.9.1 (2017-09-22)
> 
> On 29 Nov 04:56, Jerin Jacob wrote:
> > -----Original Message-----
> > > Date: Wed, 29 Nov 2017 12:19:54 +0000
> > > From: "Ma, Liang" <liang.j.ma@intel.com>
> > > To: Jerin Jacob <jerin.jacob@caviumnetworks.com>
> > > CC: dev@dpdk.org, "Van Haaren, Harry" <harry.van.haaren@intel.com>,
> > >  "Richardson, Bruce" <bruce.richardson@intel.com>, "Jain, Deepak K"
> > >  <deepak.k.jain@intel.com>, "Mccarthy, Peter" <peter.mccarthy@intel.com>
> > > Subject: Re: [RFC PATCH 0/7] RFC:EventDev OPDL PMD
> > > User-Agent: Mutt/1.5.20 (2009-06-14)
> > > 
> > > Hi Jerin,
> > >    Many thanks for your comments. Please check my comment below. 
> > > 
> > > On 25 Nov 02:25, Jerin Jacob wrote:
> > > > -----Original Message-----
> > > > > Date: Fri, 24 Nov 2017 11:23:45 +0000
> > > > > From: liang.j.ma@intel.com
> > > > > To: jerin.jacob@caviumnetworks.com
> > > > > CC: dev@dpdk.org, harry.van.haaren@intel.com, bruce.richardson@intel.com,
> > > > >  deepak.k.jain@intel.com, john.geary@intel.com
> > > > > Subject: [RFC PATCH 0/7] RFC:EventDev OPDL PMD
> > > > > X-Mailer: git-send-email 2.7.5
> > > > > 
> > > > > From: Liang Ma <liang.j.ma@intel.com>
> > > > 
> > > > 
> > > > 
> > > > # How does application knows this PMD has above limitations?
> > > > 
> > > > I think, We need to add more capability RTE_EVENT_DEV_CAP_*
> > > > to depict these constraints. On the same note, I believe this
> > > > PMD is "radically" different than other SW/HW PMD then anyway
> > > > we cannot write the portable application using this PMD. So there
> > > > is no point in abstracting it as eventdev PMD. Could you please
> > > > work on the new capabilities are required to enable this PMD.
> > > > If it needs more capability flags to express this PMD capability,
> > > > we might have a different library for this as it defects the
> > > > purpose of portable eventdev applications.
> > > >
> > > Agree with improve capability information with add more details with 
> > > RTE_EVENT_DEV_CAP_*. While the OPDL is designed around a different 
> > 
> > Please submit patches required for new caps required for this PMD to
> > depict the constraints. That is the only way application can know 
> > the constraints for the given PMD.
> > 
> I will work on capability issue and submit V2 patches when that's ready.

OK

> > > load-balancing architecture, that of load-balancing across pipeline 
> > > stages where a consumer is only working on a single stage, this does not 
> > > necessarily mean that it is completely incompatible with other eventdev 
> > > implementations. Although, it is true that an application written to use 
> > > one of the existing eventdevs probably won't work nicely with the OPDL
> > > eventdev, the converse situation should work ok. That is, an application
> > > written as a pipeline using the OPDL eventdev for load balancing should 
> > > work without changes with the generic SW implementation, and there should 
> > > be no reason why it should not also work with other HW implementations 
> > > in DPDK too. 
> > > OPDL PMD implement a subset functionality of eventdev API. I demonstrate 
> > > OPDL on this year PRC DPDK summit,  got some early feedback from potential
> > > users. Most of them would like to use that under existing API(aka eventdev) 
> > > rather than another new API/lib. That let potential user easier to swap to 
> > > exist SW/HW eventdev PMD.
> > 
> > Perfect. Lets have one application then so it will it make easy to swap
> > SW/HW eventdev PMD.
> > 
> > > 
> > > > # We should not add yet another "PMD" specific example application
> > > > in example area like "examples/eventdev_pipeline_opdl_pmd". We are
> > > > working on making examples/eventdev/pipeline_sw_pmd to make work
> > > > on both HW and SW.
> > > > 
> > > We would agree here that we don't need a proliferation of example applications.
> > > However this is a different architecture (not a dynamic packet scheduler rather
> > > a static pipeline work distributer), and as such perhaps we should have a 
> > > sample app that demonstrates each contrasting architecture.
> > 
> > I agree. We need sample application. Why not change the exiting
> > examples/eventdev/pipeline_sw_pmd to make it work as we are addressing the
> > pipeling here. Let write the application based on THE USE CASE not
> > specific to PMD. PMD specific application won't scale.
> > 
> I perfer to pending upstream OPDL example code in this patches set. 
> it's better to upstream/merge example code in another track.

OK. Example code can be added later once examples/eventdev/pipeline_sw_pmd cleaned up.
The static pipeline aka OPDL PMD use case can be separate file inside the
example/eventdev_pipeline

> > > 
> > > > # We should not add new PMD specific test cases in
> > > > test/test/test_eventdev_opdl.c area.I think existing PMD specific
> > > > test case can be moved to respective driver area, and it can do 
> > > > the self-test by passing some command line arguments to vdev.
> > > > 
> > > We simply followed the existing test structure here. Would it be confusing to 
> > > have another variant of example test code, is this done anywhere else? 
> > > Also would there be a chance that DTS would miss running tests or not like 
> > > having to run them using a different method. However we would defer to the consensus here.
> > > Could you elaborate on your concerns with having another test file in the test area ?
> > 
> > PMD specific test cases wont scale. It defect the purpose of the common
> > framework. Cryptodev fell into that trap earlier then they fixed it.
> > For DTS case, I think, still it can verified through vdev command line
> > arguments to the new PMD. What do you think?
> > 
> Agree, I would like to intergrate the test code with PMD, but any API is avaiable 
> for self test purpose ? I didn't find existing api support self test. any hints ?

I may not need any special API for that.
I was thinking to invoke the self test with vdev argument, something like
--vdev=event_<your_driver>,selftest=1. On the end of driver probe, you can invoke
the driver specific test case if selftest == 1


> 
> > > 
> > > > # Do you have relative performance number with exiting SW PMD?
> > > > Meaning how much it improves for any specific use case WRT exiting
> > > > SW PMD. That should a metric to define the need for new PMD.
> > > > 
> > > Yes, we definitely has the number. Given the limitation(Ref cover letter), OPDL 
> > > can achieve 3X-5X times schedule rate(on Xeon 2699 v4 platform) compare with the 
> > > standard SW PMD and no need of schedule core. This is the core value of OPDL PMD.
> > > For certain user case, "static pipeline"  "strong order ",  OPDL is very useful 
> > > and efficient and generic to processor arch.
> > 
> > Sounds good.
> > 
> > > 
> > > > # There could be another SW driver from another vendor like ARM.
> > > > So, I think, it is important to define the need for another SW
> > > > PMD and how much limitation/new capabilities it needs to define to
> > > > fit into the eventdev framework,
> > > >
> > > Put a summary here, OPDL is designed for certain user case, performance is increase 
> > > dramatically. Also OPDL can fallback to standard SW PMD seamless.
> > > That definitely fit into the eventdev API
> > > 

^ permalink raw reply	[flat|nested] 13+ messages in thread

end of thread, other threads:[~2017-11-30 17:42 UTC | newest]

Thread overview: 13+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2017-11-24 11:23 [dpdk-dev] [RFC PATCH 0/7] RFC:EventDev OPDL PMD liang.j.ma
2017-11-24 11:23 ` [dpdk-dev] [PATCH 1/7] event/opdl: add the opdl ring infrastructure library liang.j.ma
2017-11-24 11:23 ` [dpdk-dev] [PATCH 2/7] event/opdl: add the opdl pmd header and init helper function liang.j.ma
2017-11-24 11:23 ` [dpdk-dev] [PATCH 3/7] event/opdl: add the opdl pmd main body and xstats " liang.j.ma
2017-11-24 11:23 ` [dpdk-dev] [PATCH 4/7] event/opdl: update the build system to enable compilation of pmd liang.j.ma
2017-11-24 11:23 ` [dpdk-dev] [PATCH 5/7] test/eventdev: opdl eventdev pmd unit test func and makefiles liang.j.ma
2017-11-24 11:23 ` [dpdk-dev] [PATCH 6/7] examples/eventdev_pipeline_opdl: adding example liang.j.ma
2017-11-24 11:23 ` [dpdk-dev] [PATCH 7/7] doc: add eventdev opdl pmd docuement and example usage document liang.j.ma
2017-11-24 20:55 ` [dpdk-dev] [RFC PATCH 0/7] RFC:EventDev OPDL PMD Jerin Jacob
2017-11-29 12:19   ` Ma, Liang
2017-11-29 12:56     ` Jerin Jacob
2017-11-29 17:15       ` Ma, Liang
2017-11-30 17:41         ` Jerin Jacob

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).