From: liang.j.ma@intel.com
To: jerin.jacob@caviumnetworks.com
Cc: dev@dpdk.org, harry.van.haaren@intel.com,
bruce.richardson@intel.com, deepak.k.jain@intel.com,
john.geary@intel.com
Subject: [dpdk-dev] [PATCH 1/7] event/opdl: add the opdl ring infrastructure library
Date: Fri, 24 Nov 2017 11:23:46 +0000 [thread overview]
Message-ID: <1511522632-139652-2-git-send-email-liang.j.ma@intel.com> (raw)
In-Reply-To: <1511522632-139652-1-git-send-email-liang.j.ma@intel.com>
From: Liang Ma <liang.j.ma@intel.com>
OPDL ring is the core infrastructure of OPDL PMD. OPDL ring library
provide the core data structure and core helper function set. The Ring
implements a single ring multi-port/stage pipelined packet distribution
mechanism. This mechanism has the following characteristics:
• No multiple queue cost, therefore, latency is significant reduced.
• Fixed dependencies between queue/ports is more suitable for complex.
fixed pipelines of stateless packet processing (static pipeline).
• Has decentralized distribution (no scheduling core).
• Packets remain in order (no reorder core(s)).
Signed-off-by: Liang Ma <liang.j.ma@intel.com>
Signed-off-by: Peter, Mccarthy <peter.mccarthy@intel.com>
---
drivers/event/opdl/Makefile | 65 ++
drivers/event/opdl/opdl_ring.c | 1170 +++++++++++++++++++++
drivers/event/opdl/opdl_ring.h | 578 ++++++++++
drivers/event/opdl/rte_pmd_evdev_opdl_version.map | 3 +
4 files changed, 1816 insertions(+)
create mode 100644 drivers/event/opdl/Makefile
create mode 100644 drivers/event/opdl/opdl_ring.c
create mode 100644 drivers/event/opdl/opdl_ring.h
create mode 100644 drivers/event/opdl/rte_pmd_evdev_opdl_version.map
diff --git a/drivers/event/opdl/Makefile b/drivers/event/opdl/Makefile
new file mode 100644
index 0000000..5c85139
--- /dev/null
+++ b/drivers/event/opdl/Makefile
@@ -0,0 +1,65 @@
+# BSD LICENSE
+#
+# Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Intel Corporation nor the names of its
+# contributors may be used to endorse or promote products derived
+# from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# library name
+LIB = librte_pmd_opdl_event.a
+
+# build flags
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+# for older GCC versions, allow us to initialize an event using
+# designated initializers.
+ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y)
+ifeq ($(shell test $(GCC_VERSION) -le 50 && echo 1), 1)
+CFLAGS += -Wno-missing-field-initializers
+endif
+endif
+
+LDLIBS += -lrte_eal -lrte_eventdev -lrte_kvargs -lrte_ring
+LDLIBS += -lrte_bus_vdev
+
+# library version
+LIBABIVER := 1
+
+# versioning export map
+EXPORT_MAP := rte_pmd_evdev_opdl_version.map
+
+# library source files
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl_evdev.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl_evdev_init.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl_ring.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl_evdev_xstats.c
+
+# export include files
+SYMLINK-y-include +=
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/drivers/event/opdl/opdl_ring.c b/drivers/event/opdl/opdl_ring.c
new file mode 100644
index 0000000..f07bf3a
--- /dev/null
+++ b/drivers/event/opdl/opdl_ring.c
@@ -0,0 +1,1170 @@
+/*-
+ * <COPYRIGHT_TAG>
+ */
+
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <stdio.h>
+
+#include <rte_branch_prediction.h>
+#include <rte_debug.h>
+#include <rte_lcore.h>
+#include <rte_log.h>
+#include <rte_malloc.h>
+#include <rte_memcpy.h>
+#include <rte_memory.h>
+#include <rte_memzone.h>
+#include <rte_eal_memconfig.h>
+
+#include "opdl_ring.h"
+
+#define LIB_NAME "opdl_ring"
+
+#define OPDL_NAME_SIZE 64
+
+#define RTE_LOGTYPE_OPDL RTE_LOGTYPE_USER8
+#define log(level, fmt, ...) \
+ RTE_LOG(level, OPDL, LIB_NAME": " fmt "\n", ##__VA_ARGS__)
+
+#ifdef OPDL_DEBUG
+#define log_debug(...) log(DEBUG, __VA_ARGS__)
+#else
+#define log_debug(...)
+#endif
+
+#define POWER_OF_2(n) ((n) && !((n) & ((n) - 1)))
+
+/* Types of dependency between stages */
+enum dep_type {
+ DEP_NONE = 0, /* no dependency */
+ DEP_DIRECT, /* stage has direct dependency */
+ DEP_INDIRECT, /* in-direct dependency through other stage(s) */
+ DEP_SELF, /* stage dependency on itself, used to detect loops */
+};
+
+/* Shared section of stage state.
+ * Care is needed when accessing and the layout is important, especially to
+ * limit the adjacent cache-line HW prefetcher from impacting performance.
+ */
+struct shared_state {
+ /* Last known minimum sequence number of dependencies, used for multi
+ * thread operation
+ */
+ uint32_t available_seq;
+ char _pad1[RTE_CACHE_LINE_SIZE * 3];
+ uint32_t head; /* Head sequence number (for multi thread operation) */
+ char _pad2[RTE_CACHE_LINE_SIZE * 3];
+ struct opdl_stage *stage; /* back pointer */
+ uint32_t tail; /* Tail sequence number */
+ char _pad3[RTE_CACHE_LINE_SIZE * 2];
+} __rte_cache_aligned;
+
+/* A structure to keep track of "unfinished" claims. This is only used for
+ * stages that are threadsafe. Each lcore accesses its own instance of this
+ * structure to record the entries it has claimed. This allows one lcore to make
+ * multiple claims without being blocked by another. When disclaiming it moves
+ * forward the shared tail when the shared tail matches the tail value recorded
+ * here.
+ */
+struct claim_manager {
+ uint32_t num_to_disclaim;
+ uint32_t num_claimed;
+ uint32_t mgr_head;
+ uint32_t mgr_tail;
+ struct {
+ uint32_t head;
+ uint32_t tail;
+ } claims[OPDL_DISCLAIMS_PER_LCORE];
+} __rte_cache_aligned;
+
+/* Context for each stage of opdl_ring.
+ * Calculations on sequence numbers need to be done with other uint32_t values
+ * so that results are modulus 2^32, and not undefined.
+ */
+struct opdl_stage {
+ struct opdl_ring *t; /* back pointer, set at init */
+ uint32_t num_slots; /* Number of slots for entries, set at init */
+ uint32_t index; /* ID for this stage, set at init */
+ bool threadsafe; /* Set to 1 if this stage supports threadsafe use */
+ /* Last known min seq number of dependencies for used for single thread
+ * operation
+ */
+ uint32_t available_seq;
+ uint32_t head; /* Current head for single-thread operation */
+ uint32_t nb_instance; /* Number of instances */
+ uint32_t instance_id; /* ID of this stage instance */
+ uint16_t num_claimed; /* Number of slots claimed */
+ uint16_t num_event; /* Number of events */
+ uint32_t seq; /* sequence number */
+ uint32_t num_deps; /* Number of direct dependencies */
+ /* Keep track of all dependencies, used during init only */
+ enum dep_type *dep_tracking;
+ /* Direct dependencies of this stage */
+ struct shared_state **deps;
+ /* Other stages read this! */
+ struct shared_state shared __rte_cache_aligned;
+ /* For managing disclaims in multi-threaded processing stages */
+ struct claim_manager pending_disclaims[RTE_MAX_LCORE]
+ __rte_cache_aligned;
+} __rte_cache_aligned;
+
+/* Context for opdl_ring */
+struct opdl_ring {
+ char name[OPDL_NAME_SIZE]; /* Turbine queue instance name */
+ int socket; /* NUMA socket that memory is allocated on */
+ uint32_t num_slots; /* Number of slots for entries */
+ uint32_t mask; /* Mask for sequence numbers (num_slots - 1) */
+ uint32_t slot_size; /* Size of each slot in bytes */
+ uint32_t num_stages; /* Number of stages that have been added */
+ uint32_t max_num_stages; /* Max number of stages */
+ /* Stages indexed by ID */
+ struct opdl_stage *stages;
+ /* Memory for storing slot data */
+ uint8_t slots[0] __rte_cache_aligned;
+};
+
+
+/* Return input stage of a opdl_ring */
+static inline struct opdl_stage *__attribute__((always_inline))
+input_stage(const struct opdl_ring *t)
+{
+ return &t->stages[0];
+}
+
+/* Check if a stage is the input stage */
+static inline bool __attribute__((always_inline))
+is_input_stage(const struct opdl_stage *s)
+{
+ return s->index == 0;
+}
+
+/* Get slot pointer from sequence number */
+static inline void *__attribute__((always_inline))
+get_slot(const struct opdl_ring *t, uint32_t n)
+{
+ return (void *)(uintptr_t)&t->slots[(n & t->mask) * t->slot_size];
+}
+
+/* Find how many entries are available for processing */
+static inline uint32_t __attribute__((always_inline))
+available(const struct opdl_stage *s)
+{
+ if (s->threadsafe == true) {
+ uint32_t n = __atomic_load_n(&s->shared.available_seq,
+ __ATOMIC_ACQUIRE) -
+ __atomic_load_n(&s->shared.head,
+ __ATOMIC_ACQUIRE);
+
+ /* Return 0 if available_seq needs to be updated */
+ return (n <= s->num_slots) ? n : 0;
+ }
+
+ /* Single threaded */
+ return s->available_seq - s->head;
+}
+
+/* Read sequence number of dependencies and find minimum */
+static inline void __attribute__((always_inline))
+update_available_seq(struct opdl_stage *s)
+{
+ uint32_t i;
+ uint32_t this_tail = s->shared.tail;
+ uint32_t min_seq = __atomic_load_n(&s->deps[0]->tail, __ATOMIC_ACQUIRE);
+ /* Input stage sequence numbers are greater than the sequence numbers of
+ * its dependencies so an offset of t->num_slots is needed when
+ * calculating available slots and also the condition which is used to
+ * determine the dependencies minimum sequence number must be reverted.
+ */
+ uint32_t wrap;
+
+ if (is_input_stage(s)) {
+ wrap = s->num_slots;
+ for (i = 1; i < s->num_deps; i++) {
+ uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
+ __ATOMIC_ACQUIRE);
+ if ((this_tail - seq) > (this_tail - min_seq))
+ min_seq = seq;
+ }
+ } else {
+ wrap = 0;
+ for (i = 1; i < s->num_deps; i++) {
+ uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
+ __ATOMIC_ACQUIRE);
+ if ((seq - this_tail) < (min_seq - this_tail))
+ min_seq = seq;
+ }
+ }
+
+ if (s->threadsafe == false)
+ s->available_seq = min_seq + wrap;
+ else
+ __atomic_store_n(&s->shared.available_seq, min_seq + wrap,
+ __ATOMIC_RELEASE);
+}
+
+/* Wait until the number of available slots reaches number requested */
+static inline void __attribute__((always_inline))
+wait_for_available(struct opdl_stage *s, uint32_t n)
+{
+ while (available(s) < n) {
+ rte_pause();
+ update_available_seq(s);
+ }
+}
+
+/* Return number of slots to process based on number requested and mode */
+static inline uint32_t __attribute__((always_inline))
+num_to_process(struct opdl_stage *s, uint32_t n, bool block)
+{
+ /* Don't read tail sequences of dependencies if not needed */
+ if (available(s) >= n)
+ return n;
+
+ update_available_seq(s);
+
+ if (block == false) {
+ uint32_t avail = available(s);
+
+ if (avail == 0) {
+ rte_pause();
+ return 0;
+ }
+ return (avail <= n) ? avail : n;
+ }
+
+ if (unlikely(n > s->num_slots)) {
+ log(ERR, "%u entries is more than max (%u)", n, s->num_slots);
+ return 0; /* Avoid infinite loop */
+ }
+ /* blocking */
+ wait_for_available(s, n);
+ return n;
+}
+
+/* Copy entries in to slots with wrap-around */
+static inline void __attribute__((always_inline))
+copy_entries_in(struct opdl_ring *t, uint32_t start, const void *entries,
+ uint32_t num_entries)
+{
+ uint32_t slot_size = t->slot_size;
+ uint32_t slot_index = start & t->mask;
+
+ if (slot_index + num_entries <= t->num_slots) {
+ rte_memcpy(get_slot(t, start), entries,
+ num_entries * slot_size);
+ } else {
+ uint32_t split = t->num_slots - slot_index;
+
+ rte_memcpy(get_slot(t, start), entries, split * slot_size);
+ rte_memcpy(get_slot(t, 0),
+ RTE_PTR_ADD(entries, split * slot_size),
+ (num_entries - split) * slot_size);
+ }
+}
+
+/* Copy entries out from slots with wrap-around */
+static inline void __attribute__((always_inline))
+copy_entries_out(struct opdl_ring *t, uint32_t start, void *entries,
+ uint32_t num_entries)
+{
+ uint32_t slot_size = t->slot_size;
+ uint32_t slot_index = start & t->mask;
+
+ if (slot_index + num_entries <= t->num_slots) {
+ rte_memcpy(entries, get_slot(t, start),
+ num_entries * slot_size);
+ } else {
+ uint32_t split = t->num_slots - slot_index;
+
+ rte_memcpy(entries, get_slot(t, start), split * slot_size);
+ rte_memcpy(RTE_PTR_ADD(entries, split * slot_size),
+ get_slot(t, 0),
+ (num_entries - split) * slot_size);
+ }
+}
+
+/* Input function optimised for single thread */
+static inline uint32_t __attribute__((always_inline))
+opdl_ring_input_singlethread(struct opdl_ring *t, const void *entries,
+ uint32_t num_entries, bool block)
+{
+ struct opdl_stage *s = input_stage(t);
+ uint32_t head = s->head;
+
+ num_entries = num_to_process(s, num_entries, block);
+ if (num_entries == 0)
+ return 0;
+
+ copy_entries_in(t, head, entries, num_entries);
+
+ s->head += num_entries;
+ __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
+
+ return num_entries;
+}
+
+/* Convert head and tail of claim_manager into valid index */
+static inline uint32_t __attribute__((always_inline))
+claim_mgr_index(uint32_t n)
+{
+ return n & (OPDL_DISCLAIMS_PER_LCORE - 1);
+}
+
+/* Check if there are available slots in claim_manager */
+static inline bool __attribute__((always_inline))
+claim_mgr_available(struct claim_manager *mgr)
+{
+ return (mgr->mgr_head < (mgr->mgr_tail + OPDL_DISCLAIMS_PER_LCORE)) ?
+ true : false;
+}
+
+/* Record a new claim. Only use after first checking an entry is available */
+static inline void __attribute__((always_inline))
+claim_mgr_add(struct claim_manager *mgr, uint32_t tail, uint32_t head)
+{
+ if ((mgr->mgr_head != mgr->mgr_tail) &&
+ (mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head ==
+ tail)) {
+ /* Combine with previous claim */
+ mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head = head;
+ } else {
+ mgr->claims[claim_mgr_index(mgr->mgr_head)].head = head;
+ mgr->claims[claim_mgr_index(mgr->mgr_head)].tail = tail;
+ mgr->mgr_head++;
+ }
+
+ mgr->num_claimed += (head - tail);
+}
+
+/* Read the oldest recorded claim */
+static inline bool __attribute__((always_inline))
+claim_mgr_read(struct claim_manager *mgr, uint32_t *tail, uint32_t *head)
+{
+ if (mgr->mgr_head == mgr->mgr_tail)
+ return false;
+
+ *head = mgr->claims[claim_mgr_index(mgr->mgr_tail)].head;
+ *tail = mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail;
+ return true;
+}
+
+/* Remove the oldest recorded claim. Only use after first reading the entry */
+static inline void __attribute__((always_inline))
+claim_mgr_remove(struct claim_manager *mgr)
+{
+ mgr->num_claimed -= (mgr->claims[claim_mgr_index(mgr->mgr_tail)].head -
+ mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail);
+ mgr->mgr_tail++;
+}
+
+/* Update tail in the oldest claim. Only use after first reading the entry */
+static inline void __attribute__((always_inline))
+claim_mgr_move_tail(struct claim_manager *mgr, uint32_t num_entries)
+{
+ mgr->num_claimed -= num_entries;
+ mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail += num_entries;
+}
+
+static inline void __attribute__((always_inline))
+opdl_stage_disclaim_multithread_n(struct opdl_stage *s,
+ uint32_t num_entries, bool block)
+{
+ struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
+ uint32_t head;
+ uint32_t tail;
+
+ while (num_entries) {
+ bool ret = claim_mgr_read(disclaims, &tail, &head);
+
+ if (ret == false)
+ break; /* nothing is claimed */
+ /* There should be no race condition here. If shared.tail
+ * matches, no other core can update it until this one does.
+ */
+ if (__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) ==
+ tail) {
+ if (num_entries >= (head - tail)) {
+ claim_mgr_remove(disclaims);
+ __atomic_store_n(&s->shared.tail, head,
+ __ATOMIC_RELEASE);
+ num_entries -= (head - tail);
+ } else {
+ claim_mgr_move_tail(disclaims, num_entries);
+ __atomic_store_n(&s->shared.tail,
+ num_entries + tail,
+ __ATOMIC_RELEASE);
+ num_entries = 0;
+ }
+ } else if (block == false)
+ break; /* blocked by other thread */
+ /* Keep going until num_entries are disclaimed. */
+ rte_pause();
+ }
+
+ disclaims->num_to_disclaim = num_entries;
+}
+
+/* Move head atomically, returning number of entries available to process and
+ * the original value of head. For non-input stages, the claim is recorded
+ * so that the tail can be updated later by opdl_stage_disclaim().
+ */
+static inline void __attribute__((always_inline))
+move_head_atomically(struct opdl_stage *s, uint32_t *num_entries,
+ uint32_t *old_head, bool block, bool claim_func)
+{
+ uint32_t orig_num_entries = *num_entries;
+ uint32_t ret;
+ struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
+
+ /* Attempt to disclaim any outstanding claims */
+ opdl_stage_disclaim_multithread_n(s, disclaims->num_to_disclaim,
+ false);
+
+ *old_head = __atomic_load_n(&s->shared.head, __ATOMIC_ACQUIRE);
+ while (true) {
+ bool success;
+ /* If called by opdl_ring_input(), claim does not need to be
+ * recorded, as there will be no disclaim.
+ */
+ if (claim_func) {
+ /* Check that the claim can be recorded */
+ ret = claim_mgr_available(disclaims);
+ if (ret == false) {
+ /* exit out if claim can't be recorded */
+ *num_entries = 0;
+ return;
+ }
+ }
+
+ *num_entries = num_to_process(s, orig_num_entries, block);
+ if (*num_entries == 0)
+ return;
+
+ success = __atomic_compare_exchange_n(&s->shared.head, old_head,
+ *old_head + *num_entries,
+ true, /* may fail spuriously */
+ __ATOMIC_RELEASE, /* memory order on success */
+ __ATOMIC_ACQUIRE); /* memory order on fail */
+ if (likely(success))
+ break;
+ rte_pause();
+ }
+
+ if (claim_func)
+ /* Store the claim record */
+ claim_mgr_add(disclaims, *old_head, *old_head + *num_entries);
+}
+
+/* Input function that supports multiple threads */
+static inline uint32_t __attribute__((always_inline))
+opdl_ring_input_multithread(struct opdl_ring *t, const void *entries,
+ uint32_t num_entries, bool block)
+{
+ struct opdl_stage *s = input_stage(t);
+ uint32_t old_head;
+
+ move_head_atomically(s, &num_entries, &old_head, block, false);
+ if (num_entries == 0)
+ return 0;
+
+ copy_entries_in(t, old_head, entries, num_entries);
+
+ /* If another thread started inputting before this one, but hasn't
+ * finished, we need to wait for it to complete to update the tail.
+ */
+ while (unlikely(__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) !=
+ old_head))
+ rte_pause();
+
+ __atomic_store_n(&s->shared.tail, old_head + num_entries,
+ __ATOMIC_RELEASE);
+
+ return num_entries;
+}
+
+static inline uint32_t __attribute__((always_inline))
+opdl_first_entry_id(uint32_t start_seq, uint8_t nb_p_lcores,
+ uint8_t this_lcore)
+{
+ return ((nb_p_lcores <= 1) ? 0 :
+ (nb_p_lcores - (start_seq % nb_p_lcores) + this_lcore) %
+ nb_p_lcores);
+}
+
+/* Claim slots to process, optimised for single-thread operation */
+static inline uint32_t __attribute__((always_inline))
+opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
+ uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
+{
+ uint32_t i = 0, j = 0, offset;
+ void *get_slots;
+ struct rte_event *ev;
+ RTE_SET_USED(seq);
+ struct opdl_ring *t = s->t;
+ uint8_t *entries_offset = (uint8_t *)entries;
+
+ if (!atomic) {
+
+ offset = opdl_first_entry_id(s->seq, s->nb_instance,
+ s->instance_id);
+
+ num_entries = s->nb_instance * num_entries;
+
+ num_entries = num_to_process(s, num_entries, block);
+
+ for (; offset < num_entries; offset += s->nb_instance) {
+ get_slots = get_slot(t, s->head + offset);
+ memcpy(entries_offset, get_slots, t->slot_size);
+ entries_offset += t->slot_size;
+ i++;
+ }
+ } else {
+ num_entries = num_to_process(s, num_entries, block);
+
+ for (j = 0; j < num_entries; j++) {
+ ev = (struct rte_event *)get_slot(t, s->head+j);
+ if ((ev->flow_id%s->nb_instance) == s->instance_id) {
+ memcpy(entries_offset, ev, t->slot_size);
+ entries_offset += t->slot_size;
+ i++;
+ }
+ }
+ }
+ s->head += num_entries;
+ s->num_claimed = num_entries;
+ s->num_event = i;
+
+ /* automatically disclaim entries if number of rte_events is zero */
+ if (unlikely(i == 0))
+ opdl_stage_disclaim(s, 0, false);
+
+ return i;
+}
+
+/* Thread-safe version of function to claim slots for processing */
+static inline uint32_t __attribute__((always_inline))
+opdl_stage_claim_multithread(struct opdl_stage *s, void *entries,
+ uint32_t num_entries, uint32_t *seq, bool block)
+{
+ uint32_t old_head;
+ struct opdl_ring *t = s->t;
+ uint32_t i = 0, offset;
+ uint8_t *entries_offset = (uint8_t *)entries;
+
+ offset = opdl_first_entry_id(*seq, s->nb_instance, s->instance_id);
+ num_entries = offset + (s->nb_instance * num_entries);
+
+ move_head_atomically(s, &num_entries, &old_head, block, true);
+
+ for (; offset < num_entries; offset += s->nb_instance) {
+ memcpy(entries_offset, get_slot(t, s->head + offset),
+ t->slot_size);
+ entries_offset += t->slot_size;
+ i++;
+ }
+ if (seq != NULL)
+ *seq = old_head;
+
+ return i;
+}
+
+/* Claim and copy slot pointers, optimised for single-thread operation */
+static inline uint32_t __attribute__((always_inline))
+opdl_stage_claim_copy_singlethread(struct opdl_stage *s, void *entries,
+ uint32_t num_entries, uint32_t *seq, bool block)
+{
+ num_entries = num_to_process(s, num_entries, block);
+ if (num_entries == 0)
+ return 0;
+ copy_entries_out(s->t, s->head, entries, num_entries);
+ if (seq != NULL)
+ *seq = s->head;
+ s->head += num_entries;
+ return num_entries;
+}
+
+/* Thread-safe version of function to claim and copy pointers to slots */
+static inline uint32_t __attribute__((always_inline))
+opdl_stage_claim_copy_multithread(struct opdl_stage *s, void *entries,
+ uint32_t num_entries, uint32_t *seq, bool block)
+{
+ uint32_t old_head;
+
+ move_head_atomically(s, &num_entries, &old_head, block, true);
+ if (num_entries == 0)
+ return 0;
+ copy_entries_out(s->t, old_head, entries, num_entries);
+ if (seq != NULL)
+ *seq = old_head;
+ return num_entries;
+}
+
+static inline void __attribute__((always_inline))
+opdl_stage_disclaim_singlethread_n(struct opdl_stage *s,
+ uint32_t num_entries)
+{
+ uint32_t old_tail = s->shared.tail;
+
+ if (unlikely(num_entries > (s->head - old_tail))) {
+ log(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
+ num_entries, s->head - old_tail);
+ num_entries = s->head - old_tail;
+ }
+ __atomic_store_n(&s->shared.tail, num_entries + old_tail,
+ __ATOMIC_RELEASE);
+}
+
+uint32_t
+opdl_ring_input(struct opdl_ring *t, const void *entries, uint32_t num_entries,
+ bool block)
+{
+ if (input_stage(t)->threadsafe == false)
+ return opdl_ring_input_singlethread(t, entries, num_entries,
+ block);
+ else
+ return opdl_ring_input_multithread(t, entries, num_entries,
+ block);
+}
+
+uint32_t
+opdl_ring_copy_from_burst(struct opdl_ring *t, struct opdl_stage *s,
+ const void *entries, uint32_t num_entries, bool block)
+{
+ uint32_t head = s->head;
+
+ num_entries = num_to_process(s, num_entries, block);
+
+ if (num_entries == 0)
+ return 0;
+
+ copy_entries_in(t, head, entries, num_entries);
+
+ s->head += num_entries;
+ __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
+
+ return num_entries;
+
+}
+
+uint32_t
+opdl_ring_copy_to_burst(struct opdl_ring *t, struct opdl_stage *s,
+ void *entries, uint32_t num_entries, bool block)
+{
+ uint32_t head = s->head;
+
+ num_entries = num_to_process(s, num_entries, block);
+ if (num_entries == 0)
+ return 0;
+
+ copy_entries_out(t, head, entries, num_entries);
+
+ s->head += num_entries;
+ __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
+
+ return num_entries;
+}
+
+uint32_t
+opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries)
+{
+ /* return (num_to_process(s, num_entries, false)); */
+
+ if (available(s) >= num_entries)
+ return num_entries;
+
+ update_available_seq(s);
+
+ uint32_t avail = available(s);
+
+ if (avail == 0) {
+ rte_pause();
+ return 0;
+ }
+ return (avail <= num_entries) ? avail : num_entries;
+}
+
+uint32_t
+opdl_stage_claim(struct opdl_stage *s, void *entries,
+ uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
+{
+ if (s->threadsafe == false)
+ return opdl_stage_claim_singlethread(s, entries, num_entries,
+ seq, block, atomic);
+ else
+ return opdl_stage_claim_multithread(s, entries, num_entries,
+ seq, block);
+}
+
+uint32_t
+opdl_stage_claim_copy(struct opdl_stage *s, void *entries,
+ uint32_t num_entries, uint32_t *seq, bool block)
+{
+ if (s->threadsafe == false)
+ return opdl_stage_claim_copy_singlethread(s, entries,
+ num_entries, seq, block);
+ else
+ return opdl_stage_claim_copy_multithread(s, entries,
+ num_entries, seq, block);
+}
+
+void
+opdl_stage_disclaim_n(struct opdl_stage *s, uint32_t num_entries,
+ bool block)
+{
+
+ if (s->threadsafe == false) {
+ opdl_stage_disclaim_singlethread_n(s, s->num_claimed);
+ } else {
+ struct claim_manager *disclaims =
+ &s->pending_disclaims[rte_lcore_id()];
+
+ if (unlikely(num_entries > s->num_slots)) {
+ log(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
+ num_entries, disclaims->num_claimed);
+ num_entries = disclaims->num_claimed;
+ }
+
+ num_entries = RTE_MIN(num_entries + disclaims->num_to_disclaim,
+ disclaims->num_claimed);
+ opdl_stage_disclaim_multithread_n(s, num_entries, block);
+ }
+}
+
+int
+opdl_stage_disclaim(struct opdl_stage *s, uint32_t num_entries, bool block)
+{
+ if (num_entries != s->num_event) {
+ rte_errno = -EINVAL;
+ return 0;
+ }
+ if (s->threadsafe == false) {
+ __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
+ s->seq += s->num_claimed;
+ s->num_claimed = 0;
+ } else {
+ struct claim_manager *disclaims =
+ &s->pending_disclaims[rte_lcore_id()];
+ opdl_stage_disclaim_multithread_n(s, disclaims->num_claimed,
+ block);
+ }
+ return num_entries;
+}
+
+uint32_t
+opdl_ring_available(struct opdl_ring *t)
+{
+ return opdl_stage_available(&t->stages[0]);
+}
+
+uint32_t
+opdl_stage_available(struct opdl_stage *s)
+{
+ update_available_seq(s);
+ return available(s);
+}
+
+void
+opdl_ring_flush(struct opdl_ring *t)
+{
+ struct opdl_stage *s = input_stage(t);
+
+ wait_for_available(s, s->num_slots);
+}
+
+/******************** Non performance sensitive functions ********************/
+
+/* Initial setup of a new stage's context */
+static int
+init_stage(struct opdl_ring *t, struct opdl_stage *s, bool threadsafe,
+ bool is_input)
+{
+ uint32_t available = (is_input) ? t->num_slots : 0;
+
+ s->t = t;
+ s->num_slots = t->num_slots;
+ s->index = t->num_stages;
+ s->threadsafe = threadsafe;
+ s->shared.stage = s;
+
+ /* Alloc memory for deps */
+ s->dep_tracking = rte_zmalloc_socket(LIB_NAME,
+ t->max_num_stages * sizeof(enum dep_type),
+ 0, t->socket);
+ if (s->dep_tracking == NULL)
+ return -ENOMEM;
+
+ s->deps = rte_zmalloc_socket(LIB_NAME,
+ t->max_num_stages * sizeof(struct shared_state *),
+ 0, t->socket);
+ if (s->deps == NULL) {
+ rte_free(s->dep_tracking);
+ return -ENOMEM;
+ }
+
+ s->dep_tracking[s->index] = DEP_SELF;
+
+ if (threadsafe == true)
+ s->shared.available_seq = available;
+ else
+ s->available_seq = available;
+
+ return 0;
+}
+
+/* Add direct or indirect dependencies between stages */
+static int
+add_dep(struct opdl_stage *dependent, const struct opdl_stage *dependency,
+ enum dep_type type)
+{
+ struct opdl_ring *t = dependent->t;
+ uint32_t i;
+
+ /* Add new direct dependency */
+ if ((type == DEP_DIRECT) &&
+ (dependent->dep_tracking[dependency->index] ==
+ DEP_NONE)) {
+ log_debug("%s:%u direct dependency on %u",
+ t->name, dependent->index, dependency->index);
+ dependent->dep_tracking[dependency->index] = DEP_DIRECT;
+ }
+
+ /* Add new indirect dependency or change direct to indirect */
+ if ((type == DEP_INDIRECT) &&
+ ((dependent->dep_tracking[dependency->index] ==
+ DEP_NONE) ||
+ (dependent->dep_tracking[dependency->index] ==
+ DEP_DIRECT))) {
+ log_debug("%s:%u indirect dependency on %u",
+ t->name, dependent->index, dependency->index);
+ dependent->dep_tracking[dependency->index] = DEP_INDIRECT;
+ }
+
+ /* Shouldn't happen... */
+ if ((dependent->dep_tracking[dependency->index] == DEP_SELF) &&
+ (dependent != input_stage(t))) {
+ log(ERR, "Loop in dependency graph %s:%u",
+ t->name, dependent->index);
+ return -EINVAL;
+ }
+
+ /* Keep going to dependencies of the dependency, until input stage */
+ if (dependency != input_stage(t))
+ for (i = 0; i < dependency->num_deps; i++) {
+ int ret = add_dep(dependent, dependency->deps[i]->stage,
+ DEP_INDIRECT);
+
+ if (ret < 0)
+ return ret;
+ }
+
+ /* Make list of sequence numbers for direct dependencies only */
+ if (type == DEP_DIRECT)
+ for (i = 0, dependent->num_deps = 0; i < t->num_stages; i++)
+ if (dependent->dep_tracking[i] == DEP_DIRECT) {
+ if ((i == 0) && (dependent->num_deps > 1))
+ rte_panic("%s:%u depends on > input",
+ t->name,
+ dependent->index);
+ dependent->deps[dependent->num_deps++] =
+ &t->stages[i].shared;
+ }
+
+ return 0;
+}
+
+struct opdl_ring *
+opdl_ring_create(const char *name, uint32_t num_slots, uint32_t slot_size,
+ uint32_t max_num_stages, int socket)
+{
+ struct opdl_ring *t;
+ char mz_name[RTE_MEMZONE_NAMESIZE];
+ int mz_flags = 0;
+ struct opdl_stage *st = NULL;
+ const struct rte_memzone *mz = NULL;
+ size_t alloc_size = RTE_CACHE_LINE_ROUNDUP(sizeof(*t) +
+ (num_slots * slot_size));
+
+ /* Compile time checking */
+ RTE_BUILD_BUG_ON((sizeof(struct shared_state) & RTE_CACHE_LINE_MASK) !=
+ 0);
+ RTE_BUILD_BUG_ON((offsetof(struct opdl_stage, shared) &
+ RTE_CACHE_LINE_MASK) != 0);
+ RTE_BUILD_BUG_ON((offsetof(struct opdl_ring, slots) &
+ RTE_CACHE_LINE_MASK) != 0);
+ RTE_BUILD_BUG_ON(!POWER_OF_2(OPDL_DISCLAIMS_PER_LCORE));
+
+ /* Parameter checking */
+ if (name == NULL) {
+ log(ERR, "name param is NULL");
+ return NULL;
+ }
+ if (!rte_is_power_of_2(num_slots)) {
+ log(ERR, "num_slots (%u) for %s is not power of 2",
+ num_slots, name);
+ return NULL;
+ }
+
+ /* Alloc memory for stages */
+ st = rte_zmalloc_socket(LIB_NAME,
+ max_num_stages * sizeof(struct opdl_stage),
+ RTE_CACHE_LINE_SIZE, socket);
+ if (st == NULL)
+ goto exit_fail;
+
+ snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
+
+ /* Alloc memory for memzone */
+ mz = rte_memzone_reserve(mz_name, alloc_size, socket, mz_flags);
+ if (mz == NULL)
+ goto exit_fail;
+
+ t = mz->addr;
+
+ /* Initialise opdl_ring queue */
+ memset(t, 0, sizeof(*t));
+ snprintf(t->name, sizeof(t->name), "%s", name);
+ t->socket = socket;
+ t->num_slots = num_slots;
+ t->mask = num_slots - 1;
+ t->slot_size = slot_size;
+ t->max_num_stages = max_num_stages;
+ t->stages = st;
+
+ log_debug("Created %s at %p (num_slots=%u,socket=%i,slot_size=%u)",
+ t->name, t, num_slots, socket, slot_size);
+
+ return t;
+
+exit_fail:
+ log(ERR, "Cannot reserve memory");
+ rte_free(st);
+ rte_memzone_free(mz);
+
+ return NULL;
+}
+
+void *
+opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index)
+{
+ return get_slot(t, index);
+}
+
+int
+opdl_ring_get_socket(const struct opdl_ring *t)
+{
+ return t->socket;
+}
+
+uint32_t
+opdl_ring_get_num_slots(const struct opdl_ring *t)
+{
+ return t->num_slots;
+}
+
+const char *
+opdl_ring_get_name(const struct opdl_ring *t)
+{
+ return t->name;
+}
+
+/* Check dependency list is valid for a given opdl_ring */
+static int
+check_deps(struct opdl_ring *t, struct opdl_stage *deps[],
+ uint32_t num_deps)
+{
+ unsigned int i;
+
+ for (i = 0; i < num_deps; ++i) {
+ if (!deps[i]) {
+ log(ERR, "deps[%u] is NULL", i);
+ return -EINVAL;
+ }
+ if (t != deps[i]->t) {
+ log(ERR, "deps[%u] is in opdl_ring %s, not %s",
+ i, deps[i]->t->name, t->name);
+ return -EINVAL;
+ }
+ }
+ if (num_deps > t->num_stages) {
+ log(ERR, "num_deps (%u) > number stages (%u)",
+ num_deps, t->num_stages);
+ return -EINVAL;
+ }
+ return 0;
+}
+
+struct opdl_stage *
+opdl_stage_add(struct opdl_ring *t, bool threadsafe, bool is_input)
+{
+ struct opdl_stage *s;
+
+ /* Parameter checking */
+ if (!t) {
+ log(ERR, "opdl_ring is NULL");
+ return NULL;
+ }
+ if (t->num_stages == t->max_num_stages) {
+ log(ERR, "%s has max number of stages (%u)",
+ t->name, t->max_num_stages);
+ return NULL;
+ }
+
+ s = &t->stages[t->num_stages];
+
+ if (((uintptr_t)&s->shared & RTE_CACHE_LINE_MASK) != 0)
+ log(WARNING, "Tail seq num (%p) of %s stage not cache aligned",
+ &s->shared, t->name);
+
+ if (init_stage(t, s, threadsafe, is_input) < 0) {
+ log(ERR, "Cannot reserve memory");
+ return NULL;
+ }
+ t->num_stages++;
+
+ return s;
+}
+
+uint32_t
+opdl_stage_deps_add(struct opdl_ring *t, struct opdl_stage *s,
+ uint32_t nb_instance, uint32_t instance_id,
+ struct opdl_stage *deps[],
+ uint32_t num_deps)
+{
+ uint32_t i;
+ int ret = 0;
+
+ if ((num_deps > 0) && (!deps)) {
+ log(ERR, "%s stage has NULL dependencies", t->name);
+ return -1;
+ }
+ ret = check_deps(t, deps, num_deps);
+ if (ret < 0)
+ return ret;
+
+ for (i = 0; i < num_deps; i++) {
+ ret = add_dep(s, deps[i], DEP_DIRECT);
+ if (ret < 0)
+ return ret;
+ }
+
+ s->nb_instance = nb_instance;
+ s->instance_id = instance_id;
+
+ return ret;
+}
+
+struct opdl_stage *
+opdl_ring_get_input_stage(const struct opdl_ring *t)
+{
+ return input_stage(t);
+}
+
+int
+opdl_stage_set_deps(struct opdl_stage *s, struct opdl_stage *deps[],
+ uint32_t num_deps)
+{
+ unsigned int i;
+ int ret;
+
+ if ((num_deps == 0) || (!deps)) {
+ log(ERR, "cannot set NULL dependencies");
+ return -EINVAL;
+ }
+
+ ret = check_deps(s->t, deps, num_deps);
+ if (ret < 0)
+ return ret;
+
+ /* Update deps */
+ for (i = 0; i < num_deps; i++)
+ s->deps[i] = &deps[i]->shared;
+ s->num_deps = num_deps;
+
+ return 0;
+}
+
+struct opdl_ring *
+opdl_stage_get_opdl_ring(const struct opdl_stage *s)
+{
+ return s->t;
+}
+
+void
+opdl_ring_dump(const struct opdl_ring *t, FILE *f)
+{
+ uint32_t i;
+
+ if (t == NULL) {
+ fprintf(f, "NULL Turbine!\n");
+ return;
+ }
+ fprintf(f, "Turbine \"%s\": num_slots=%u; mask=%#x; slot_size=%u; num_stages=%u; socket=%i\n",
+ t->name, t->num_slots, t->mask, t->slot_size,
+ t->num_stages, t->socket);
+ for (i = 0; i < t->num_stages; i++) {
+ uint32_t j;
+ const struct opdl_stage *s = &t->stages[i];
+
+ fprintf(f, " %s[%u]: threadsafe=%s; head=%u; available_seq=%u; tail=%u; deps=%u",
+ t->name, i, (s->threadsafe) ? "true" : "false",
+ (s->threadsafe) ? s->shared.head : s->head,
+ (s->threadsafe) ? s->shared.available_seq :
+ s->available_seq,
+ s->shared.tail, (s->num_deps > 0) ?
+ s->deps[0]->stage->index : 0);
+ for (j = 1; j < s->num_deps; j++)
+ fprintf(f, ",%u", s->deps[j]->stage->index);
+ fprintf(f, "\n");
+ }
+ fflush(f);
+}
+
+void
+opdl_ring_free(struct opdl_ring *t)
+{
+ uint32_t i;
+ const struct rte_memzone *mz;
+ char mz_name[RTE_MEMZONE_NAMESIZE];
+
+ if (t == NULL) {
+ log_debug("Freeing NULL OPDL Ring!");
+ return;
+ }
+
+ log_debug("Freeing %s opdl_ring at %p", t->name, t);
+
+ for (i = 0; i < t->num_stages; ++i) {
+ rte_free(t->stages[i].deps);
+ rte_free(t->stages[i].dep_tracking);
+ }
+
+ rte_free(t->stages);
+
+ snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, t->name);
+ mz = rte_memzone_lookup(mz_name);
+ if (rte_memzone_free(mz) != 0)
+ log(ERR, "Cannot free memzone for %s", t->name);
+}
+
+/* search a opdl_ring from its name */
+struct opdl_ring *
+opdl_ring_lookup(const char *name)
+{
+ const struct rte_memzone *mz;
+ char mz_name[RTE_MEMZONE_NAMESIZE];
+
+ snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
+
+ mz = rte_memzone_lookup(mz_name);
+ if (mz == NULL)
+ return NULL;
+
+ return mz->addr;
+}
+
+void
+opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe)
+{
+ s->threadsafe = threadsafe;
+}
diff --git a/drivers/event/opdl/opdl_ring.h b/drivers/event/opdl/opdl_ring.h
new file mode 100644
index 0000000..52a5b2f
--- /dev/null
+++ b/drivers/event/opdl/opdl_ring.h
@@ -0,0 +1,578 @@
+/*-
+ * <COPYRIGHT_TAG>
+ */
+
+#ifndef _OPDL_H_
+#define _OPDL_H_
+
+/**
+ * @file
+ * The "opdl_ring" is a data structure that contains a fixed number of slots,
+ * with each slot having the same, but configurable, size. Entries are input
+ * into the opdl_ring by copying into available slots. Once in the opdl_ring,
+ * an entry is processed by a number of stages, with the ordering of stage
+ * processing controlled by making stages dependent on one or more other stages.
+ * An entry is not available for a stage to process until it has been processed
+ * by that stages dependencies. Entries are always made available for
+ * processing in the same order that they were input in to the opdl_ring.
+ * Inputting is considered as a stage that depends on all other stages,
+ * and is also a dependency of all stages.
+ *
+ * Inputting and processing in a stage can support multi-threading. Note that
+ * multi-thread processing can also be done by making stages co-operate e.g. two
+ * stages where one processes the even packets and the other processes odd
+ * packets.
+ *
+ * A opdl_ring can be used as the basis for pipeline based applications. Instead
+ * of each stage in a pipeline dequeueing from a ring, processing and enqueueing
+ * to another ring, it can process entries in-place on the ring. If stages do
+ * not depend on each other, they can run in parallel.
+ *
+ * The opdl_ring works with entries of configurable size, these could be
+ * pointers to mbufs, pointers to mbufs with application specific meta-data,
+ * tasks etc.
+ */
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdio.h>
+
+#include <rte_eventdev.h>
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#ifndef OPDL_DISCLAIMS_PER_LCORE
+/** Multi-threaded processing allows one thread to process multiple batches in a
+ * stage, while another thread is processing a single large batch. This number
+ * controls how many non-contiguous batches one stage can process before being
+ * blocked by the other stage.
+ */
+#define OPDL_DISCLAIMS_PER_LCORE 8
+#endif
+
+/** Opaque handle to a opdl_ring instance */
+struct opdl_ring;
+
+/** Opaque handle to a single stage in a opdl_ring */
+struct opdl_stage;
+
+/**
+ * Create a new instance of a opdl_ring.
+ *
+ * @param name
+ * String containing the name to give the new opdl_ring instance.
+ * @param num_slots
+ * How many slots the opdl_ring contains. Must be a power a 2!
+ * @param slot_size
+ * How many bytes in each slot.
+ * @param max_num_stages
+ * Maximum number of stages.
+ * @param socket
+ * The NUMA socket (or SOCKET_ID_ANY) to allocate the memory used for this
+ * opdl_ring instance.
+ * @param threadsafe
+ * Whether to support multiple threads inputting to the opdl_ring or not.
+ * Enabling this may have a negative impact on performance if only one thread
+ * will be inputting.
+ *
+ * @return
+ * A pointer to a new opdl_ring instance, or NULL on error.
+ */
+struct opdl_ring *
+opdl_ring_create(const char *name, uint32_t num_slots, uint32_t slot_size,
+ uint32_t max_num_stages, int socket);
+
+/**
+ * Get pointer to individual slot in a opdl_ring.
+ *
+ * @param t
+ * The opdl_ring.
+ * @param index
+ * Index of slot. If greater than the number of slots it will be masked to be
+ * within correct range.
+ *
+ * @return
+ * A pointer to that slot.
+ */
+void *
+opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index);
+
+/**
+ * Get NUMA socket used by a opdl_ring.
+ *
+ * @param t
+ * The opdl_ring.
+ *
+ * @return
+ * NUMA socket.
+ */
+int
+opdl_ring_get_socket(const struct opdl_ring *t);
+
+/**
+ * Get number of slots in a opdl_ring.
+ *
+ * @param t
+ * The opdl_ring.
+ *
+ * @return
+ * Number of slots.
+ */
+uint32_t
+opdl_ring_get_num_slots(const struct opdl_ring *t);
+
+/**
+ * Get name of a opdl_ring.
+ *
+ * @param t
+ * The opdl_ring.
+ *
+ * @return
+ * Name string.
+ */
+const char *
+opdl_ring_get_name(const struct opdl_ring *t);
+
+/**
+ * Adds a new processing stage to a specified opdl_ring instance. Adding a stage
+ * while there are entries in the opdl_ring being processed will cause undefined
+ * behaviour.
+ *
+ * @param t
+ * The opdl_ring to add the stage to.
+ * @param deps
+ * An array of pointers to other stages that this stage depends on. The other
+ * stages must be part of the same opdl_ring! Note that input is an implied
+ * dependency. This can be NULL if num_deps is 0.
+ * @param num_deps
+ * The size of the deps array.
+ * @param threadsafe
+ * Whether to support multiple threads processing this stage or not.
+ * Enabling this may have a negative impact on performance if only one thread
+ * will be processing this stage.
+ * @param is_input
+ * Indication to nitialise the stage with all slots available or none
+ *
+ * @return
+ * A pointer to the new stage, or NULL on error.
+ */
+struct opdl_stage *
+opdl_stage_add(struct opdl_ring *t, bool threadsafe, bool is_input);
+
+/**
+ * Returns the input stage of a opdl_ring to be used by other API functions.
+ *
+ * @param t
+ * The opdl_ring.
+ *
+ * @return
+ * A pointer to the input stage.
+ */
+struct opdl_stage *
+opdl_ring_get_input_stage(const struct opdl_ring *t);
+
+/**
+ * Sets the dependencies for a stage (clears all the previous deps!). Changing
+ * dependencies while there are entries in the opdl_ring being processed will
+ * cause undefined behaviour.
+ *
+ * @param s
+ * The stage to set the dependencies for.
+ * @param deps
+ * An array of pointers to other stages that this stage will depends on. The
+ * other stages must be part of the same opdl_ring!
+ * @param num_deps
+ * The size of the deps array. This must be > 0.
+ *
+ * @return
+ * 0 on success, a negative value on error.
+ */
+int
+opdl_stage_set_deps(struct opdl_stage *s, struct opdl_stage *deps[],
+ uint32_t num_deps);
+
+/**
+ * Returns the opdl_ring that a stage belongs to.
+ *
+ * @param s
+ * The stage
+ *
+ * @return
+ * A pointer to the opdl_ring that the stage belongs to.
+ */
+struct opdl_ring *
+opdl_stage_get_opdl_ring(const struct opdl_stage *s);
+
+/**
+ * Inputs a new batch of entries into the opdl_ring. This function is only
+ * threadsafe (with the same opdl_ring parameter) if the threadsafe parameter of
+ * opdl_ring_create() was true. For performance reasons, this function does not
+ * check input parameters.
+ *
+ * @param t
+ * The opdl_ring to input entries in to.
+ * @param entries
+ * An array of entries that will be copied in to the opdl_ring.
+ * @param num_entries
+ * The size of the entries array.
+ * @param block
+ * If this is true, the function blocks until enough slots are available to
+ * input all the requested entries. If false, then the function inputs as
+ * many entries as currently possible.
+ *
+ * @return
+ * The number of entries successfully input.
+ */
+uint32_t
+opdl_ring_input(struct opdl_ring *t, const void *entries, uint32_t num_entries,
+ bool block);
+
+/**
+ * Inputs a new batch of entries into a turbine stage. This function is only
+ * threadsafe (with the same turbine parameter) if the threadsafe parameter of
+ * turbine_create() was true. For performance reasons, this function does not
+ * check input parameters.
+ *
+ * @param t
+ * The turbine to input entries in to.
+ * @param s
+ * The stage to copy entries to.
+ * @param entries
+ * An array of entries that will be copied in to the turbine.
+ * @param num_entries
+ * The size of the entries array.
+ * @param block
+ * If this is true, the function blocks until enough slots are available to
+ * input all the requested entries. If false, then the function inputs as
+ * many entries as currently possible.
+ *
+ * @return
+ * The number of entries successfully input.
+ */
+uint32_t
+opdl_ring_copy_from_burst(struct opdl_ring *t, struct opdl_stage *s,
+ const void *entries, uint32_t num_entries, bool block);
+
+/**
+ * Copy a batch of entries from the turbine. This function is only
+ * threadsafe (with the same turbine parameter) if the threadsafe parameter of
+ * turbine_create() was true. For performance reasons, this function does not
+ * check input parameters.
+ *
+ * @param t
+ * The turbine to copy entries from.
+ * @param s
+ * The stage to copy entries from.
+ * @param entries
+ * An array of entries that will be copied from the turbine.
+ * @param num_entries
+ * The size of the entries array.
+ * @param block
+ * If this is true, the function blocks until enough slots are available to
+ * input all the requested entries. If false, then the function inputs as
+ * many entries as currently possible.
+ *
+ * @return
+ * The number of entries successfully input.
+ */
+uint32_t
+opdl_ring_copy_to_burst(struct opdl_ring *t, struct opdl_stage *s,
+ void *entries, uint32_t num_entries, bool block);
+
+/**
+ * Before processing a batch of entries, a stage must first claim them to get
+ * access. This function is threadsafe using same opdl_stage parameter if
+ * the stage was created with threadsafe set to true, otherwise it is only
+ * threadsafe with a different opdl_stage per thread. For performance
+ * reasons, this function does not check input parameters.
+ *
+ * @param s
+ * The opdl_ring stage to read entries in.
+ * @param entries
+ * An array of pointers to entries that will be filled in by this function.
+ * @param num_entries
+ * The number of entries to attempt to claim for processing (and the size of
+ * the entries array).
+ * @param seq
+ * If not NULL, this is set to the value of the internal stage sequence number
+ * associated with the first entry returned.
+ * @param block
+ * If this is true, the function blocks until num_entries slots are available
+ * to process. If false, then the function claims as many entries as
+ * currently possible.
+ *
+ * @param atomic
+ * if this is true, the function will return event according to event flow id
+ * @return
+ * The number of pointers to entries filled in to the entries array.
+ */
+uint32_t
+opdl_stage_claim(struct opdl_stage *s, void *entries,
+ uint32_t num_entries, uint32_t *seq, bool block, bool atomic);
+
+/* TODO: Fillin desc*/
+uint32_t
+opdl_stage_deps_add(struct opdl_ring *t, struct opdl_stage *s,
+ uint32_t nb_instance, uint32_t instance_id,
+ struct opdl_stage *deps[], uint32_t num_deps);
+
+/**
+ * A function to check how many entries are ready to be claimed.
+ *
+ * @param entries
+ * An array of pointers to entries.
+ * @param num_entries
+ * Number of entries in an array.
+ * @param arg
+ * An opaque pointer to data passed to the claim function.
+ * @param block
+ * When set to true, the function should wait until num_entries are ready to
+ * be processed. Otherwise it should return immediately.
+ *
+ * @return
+ * Number of entries ready to be claimed.
+ */
+typedef uint32_t (opdl_ring_check_entries_t)(void *entries[],
+ uint32_t num_entries, void *arg, bool block);
+
+/**
+ * Before processing a batch of entries, a stage must first claim them to get
+ * access. Each entry is checked by the passed check() function and depending
+ * on block value, it waits until num_entries are ready or returns immediately.
+ * This function is only threadsafe with a different opdl_stage per thread.
+ *
+ * @param s
+ * The opdl_ring stage to read entries in.
+ * @param entries
+ * An array of pointers to entries that will be filled in by this function.
+ * @param num_entries
+ * The number of entries to attempt to claim for processing (and the size of
+ * the entries array).
+ * @param seq
+ * If not NULL, this is set to the value of the internal stage sequence number
+ * associated with the first entry returned.
+ * @param block
+ * If this is true, the function blocks until num_entries ready slots are
+ * available to process. If false, then the function claims as many ready
+ * entries as currently possible.
+ * @param check
+ * Pointer to a function called to check entries.
+ * @param arg
+ * Opaque data passed to check() function.
+ *
+ * @return
+ * The number of pointers to ready entries filled in to the entries array.
+ */
+uint32_t
+opdl_stage_claim_check(struct opdl_stage *s, void **entries,
+ uint32_t num_entries, uint32_t *seq, bool block,
+ opdl_ring_check_entries_t *check, void *arg);
+
+/**
+ * Before processing a batch of entries, a stage must first claim them to get
+ * access. This function is threadsafe using same opdl_stage parameter if
+ * the stage was created with threadsafe set to true, otherwise it is only
+ * threadsafe with a different opdl_stage per thread.
+ *
+ * The difference between this function and opdl_stage_claim() is that this
+ * function copies the entries from the opdl_ring. Note that any changes made to
+ * the copied entries will not be reflected back in to the entries in the
+ * opdl_ring, so this function probably only makes sense if the entries are
+ * pointers to other data. For performance reasons, this function does not check
+ * input parameters.
+ *
+ * @param s
+ * The opdl_ring stage to read entries in.
+ * @param entries
+ * An array of entries that will be filled in by this function.
+ * @param num_entries
+ * The number of entries to attempt to claim for processing (and the size of
+ * the entries array).
+ * @param seq
+ * If not NULL, this is set to the value of the internal stage sequence number
+ * associated with the first entry returned.
+ * @param block
+ * If this is true, the function blocks until num_entries slots are available
+ * to process. If false, then the function claims as many entries as
+ * currently possible.
+ *
+ * @return
+ * The number of entries copied in to the entries array.
+ */
+uint32_t
+opdl_stage_claim_copy(struct opdl_stage *s, void *entries,
+ uint32_t num_entries, uint32_t *seq, bool block);
+
+/**
+ * This function must be called when a stage has finished its processing of
+ * entries, to make them available to any dependent stages. All entries that are
+ * claimed by the calling thread in the stage will be disclaimed. It is possible
+ * to claim multiple batches before disclaiming. For performance reasons, this
+ * function does not check input parameters.
+ *
+ * @param s
+ * The opdl_ring stage in which to disclaim all claimed entries.
+ *
+ * @param block
+ * Entries are always made available to a stage in the same order that they
+ * were input in the stage. If a stage is multithread safe, this may mean that
+ * full disclaiming of a batch of entries can not be considered complete until
+ * all earlier threads in the stage have disclaimed. If this parameter is true
+ * then the function blocks until all entries are fully disclaimed, otherwise
+ * it disclaims as many as currently possible, with non fully disclaimed
+ * batches stored until the next call to a claim or disclaim function for this
+ * stage on this thread.
+ *
+ * If a thread is not going to process any more entries in this stage, it
+ * *must* first call this function with this parameter set to true to ensure
+ * it does not block the entire opdl_ring.
+ *
+ * In a single threaded stage, this parameter has no effect.
+ */
+int
+opdl_stage_disclaim(struct opdl_stage *s, uint32_t num_entries,
+ bool block);
+
+/**
+ * This function can be called when a stage has finished its processing of
+ * entries, to make them available to any dependent stages. The difference
+ * between this function and opdl_stage_disclaim() is that here only a
+ * portion of entries are disclaimed, not all of them. For performance reasons,
+ * this function does not check input parameters.
+ *
+ * @param s
+ * The opdl_ring stage in which to disclaim entries.
+ *
+ * @param num_entries
+ * The number of entries to disclaim.
+ *
+ * @param block
+ * Entries are always made available to a stage in the same order that they
+ * were input in the stage. If a stage is multithread safe, this may mean that
+ * full disclaiming of a batch of entries can not be considered complete until
+ * all earlier threads in the stage have disclaimed. If this parameter is true
+ * then the function blocks until the specified number of entries has been
+ * disclaimed (or there are no more entries to disclaim). Otherwise it
+ * disclaims as many claims as currently possible and an attempt to disclaim
+ * them is made the next time a claim or disclaim function for this stage on
+ * this thread is called.
+ *
+ * In a single threaded stage, this parameter has no effect.
+ */
+void
+opdl_stage_disclaim_n(struct opdl_stage *s, uint32_t num_entries,
+ bool block);
+
+/**
+ * Check how many entries can be input.
+ *
+ * @param t
+ * The opdl_ring instance to check.
+ *
+ * @return
+ * The number of new entries currently allowed to be input.
+ */
+uint32_t
+opdl_ring_available(struct opdl_ring *t);
+
+/**
+ * Check how many entries can be processed in a stage.
+ *
+ * @param s
+ * The stage to check.
+ *
+ * @return
+ * The number of entries currently available to be processed in this stage.
+ */
+uint32_t
+opdl_stage_available(struct opdl_stage *s);
+
+/**
+ * Check how many entries are available to be processed.
+ *
+ * NOTE : DOES NOT CHANGE ANY STATE WITHIN THE STAGE
+ *
+ * @param s
+ * The stage to check.
+ *
+ * @param num_entries
+ * The number of entries to check for availability.
+ *
+ * @return
+ * The number of entries currently available to be processed in this stage.
+ */
+uint32_t
+opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries);
+
+/**
+ * Create empty stage instance and return the pointer.
+ *
+ * @param t
+ * The pointer of opdl_ring.
+ *
+ * @param threadsafe
+ * enable multiple thread or not.
+ * @return
+ * The pointer of one empty stage instance.
+ */
+struct opdl_stage *
+opdl_stage_create(struct opdl_ring *t, bool threadsafe);
+
+/**
+ * Prints information on opdl_ring instance and all its stages
+ *
+ * @param t
+ * The stage to print info on.
+ * @param f
+ * Where to print the info.
+ */
+void
+opdl_ring_dump(const struct opdl_ring *t, FILE *f);
+
+/**
+ * Blocks until all entries in a opdl_ring have been processed by all stages.
+ *
+ * @param t
+ * The opdl_ring instance to flush.
+ */
+void
+opdl_ring_flush(struct opdl_ring *t);
+
+/**
+ * Deallocates all resources used by a opdl_ring instance
+ *
+ * @param t
+ * The opdl_ring instance to free.
+ */
+void
+opdl_ring_free(struct opdl_ring *t);
+
+/**
+ * Search for a opdl_ring by its name
+ *
+ * @param name
+ * The name of the opdl_ring.
+ * @return
+ * The pointer to the opdl_ring matching the name, or NULL if not found.
+ *
+ */
+struct opdl_ring *
+opdl_ring_lookup(const char *name);
+
+/**
+ * Set a opdl_stage to threadsafe variable.
+ *
+ * @param s
+ * The opdl_stage.
+ * @param s
+ * Threadsafe value.
+ */
+void
+opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _OPDL_H_ */
diff --git a/drivers/event/opdl/rte_pmd_evdev_opdl_version.map b/drivers/event/opdl/rte_pmd_evdev_opdl_version.map
new file mode 100644
index 0000000..5352e7e
--- /dev/null
+++ b/drivers/event/opdl/rte_pmd_evdev_opdl_version.map
@@ -0,0 +1,3 @@
+DPDK_17.05 {
+ local: *;
+};
--
2.7.5
--------------------------------------------------------------
Intel Research and Development Ireland Limited
Registered in Ireland
Registered Office: Collinstown Industrial Park, Leixlip, County Kildare
Registered Number: 308263
This e-mail and any attachments may contain confidential material for the sole
use of the intended recipient(s). Any review or distribution by others is
strictly prohibited. If you are not the intended recipient, please contact the
sender and delete all copies.
next prev parent reply other threads:[~2017-11-24 11:23 UTC|newest]
Thread overview: 13+ messages / expand[flat|nested] mbox.gz Atom feed top
2017-11-24 11:23 [dpdk-dev] [RFC PATCH 0/7] RFC:EventDev OPDL PMD liang.j.ma
2017-11-24 11:23 ` liang.j.ma [this message]
2017-11-24 11:23 ` [dpdk-dev] [PATCH 2/7] event/opdl: add the opdl pmd header and init helper function liang.j.ma
2017-11-24 11:23 ` [dpdk-dev] [PATCH 3/7] event/opdl: add the opdl pmd main body and xstats " liang.j.ma
2017-11-24 11:23 ` [dpdk-dev] [PATCH 4/7] event/opdl: update the build system to enable compilation of pmd liang.j.ma
2017-11-24 11:23 ` [dpdk-dev] [PATCH 5/7] test/eventdev: opdl eventdev pmd unit test func and makefiles liang.j.ma
2017-11-24 11:23 ` [dpdk-dev] [PATCH 6/7] examples/eventdev_pipeline_opdl: adding example liang.j.ma
2017-11-24 11:23 ` [dpdk-dev] [PATCH 7/7] doc: add eventdev opdl pmd docuement and example usage document liang.j.ma
2017-11-24 20:55 ` [dpdk-dev] [RFC PATCH 0/7] RFC:EventDev OPDL PMD Jerin Jacob
2017-11-29 12:19 ` Ma, Liang
2017-11-29 12:56 ` Jerin Jacob
2017-11-29 17:15 ` Ma, Liang
2017-11-30 17:41 ` Jerin Jacob
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=1511522632-139652-2-git-send-email-liang.j.ma@intel.com \
--to=liang.j.ma@intel.com \
--cc=bruce.richardson@intel.com \
--cc=deepak.k.jain@intel.com \
--cc=dev@dpdk.org \
--cc=harry.van.haaren@intel.com \
--cc=jerin.jacob@caviumnetworks.com \
--cc=john.geary@intel.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).