From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga09.intel.com (mga09.intel.com [134.134.136.24]) by dpdk.org (Postfix) with ESMTP id C9B2D1B2E9 for ; Thu, 21 Dec 2017 23:01:28 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga007.fm.intel.com ([10.253.24.52]) by orsmga102.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 21 Dec 2017 14:01:05 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.45,437,1508828400"; d="scan'208";a="4280319" Received: from irsmsx102.ger.corp.intel.com ([163.33.3.155]) by fmsmga007.fm.intel.com with ESMTP; 21 Dec 2017 14:01:04 -0800 Received: from irsmsx109.ger.corp.intel.com ([169.254.13.28]) by IRSMSX102.ger.corp.intel.com ([169.254.2.180]) with mapi id 14.03.0319.002; Thu, 21 Dec 2017 22:01:03 +0000 From: "Ma, Liang J" To: "jerin.jacob@caviumnetworks.com" CC: "dev@dpdk.org" , "Van Haaren, Harry" , "Richardson, Bruce" , "Jain, Deepak K" , "Geary, John" , "Mccarthy, Peter" , "seanbh@gmail.com" Thread-Topic: [dpdk-dev] [PATCH v3 1/8] event/opdl: add the opdl ring infrastructure library Thread-Index: AQHTeoFNS+ldY3pX3UGkCz68aFPjLqNOWV8B Date: Thu, 21 Dec 2017 22:01:02 +0000 Message-ID: References: <1513877270-194773-1-git-send-email-liang.j.ma@intel.com>, <1513877270-194773-2-git-send-email-liang.j.ma@intel.com> In-Reply-To: <1513877270-194773-2-git-send-email-liang.j.ma@intel.com> Accept-Language: zh-CN, en-US Content-Language: zh-CN X-MS-Has-Attach: X-MS-TNEF-Correlator: MIME-Version: 1.0 Content-Type: text/plain; charset="windows-1252" Content-Transfer-Encoding: quoted-printable Subject: Re: [dpdk-dev] [PATCH v3 1/8] event/opdl: add the opdl ring infrastructure library X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 21 Dec 2017 22:01:30 -0000 > = > OPDL ring is the core infrastructure of OPDL PMD. OPDL ring library > provide the core data structure and core helper function set. The Ring > implements a single ring multi-port/stage pipelined packet distribution > mechanism. This mechanism has the following characteristics: > = > =95 No multiple queue cost, therefore, latency is significant reduced. > =95 Fixed dependencies between queue/ports is more suitable for complex. > fixed pipelines of stateless packet processing (static pipeline). > =95 Has decentralized distribution (no scheduling core). > =95 Packets remain in order (no reorder core(s)). > * Update build system to enable compilation. > = > Signed-off-by: Liang Ma > Signed-off-by: Peter Mccarthy > --- > config/common_base | 6 + > drivers/event/Makefile | 1 + > drivers/event/opdl/Makefile | 62 + > drivers/event/opdl/opdl_log.h | 59 + > drivers/event/opdl/opdl_ring.c | 1250 ++++++++++++++++= +++++ > drivers/event/opdl/opdl_ring.h | 628 +++++++++++ > drivers/event/opdl/rte_pmd_evdev_opdl_version.map | 3 + > mk/rte.app.mk | 1 + > mk/toolchain/gcc/rte.toolchain-compat.mk | 6 + > mk/toolchain/icc/rte.toolchain-compat.mk | 6 + > 10 files changed, 2022 insertions(+) > create mode 100644 drivers/event/opdl/Makefile > create mode 100644 drivers/event/opdl/opdl_log.h > create mode 100644 drivers/event/opdl/opdl_ring.c > create mode 100644 drivers/event/opdl/opdl_ring.h > create mode 100644 drivers/event/opdl/rte_pmd_evdev_opdl_version.map > = > diff --git a/config/common_base b/config/common_base > index e74febe..67adaba 100644 > --- a/config/common_base > +++ b/config/common_base > @@ -594,6 +594,12 @@ CONFIG_RTE_LIBRTE_PMD_OCTEONTX_SSOVF=3Dy > CONFIG_RTE_LIBRTE_PMD_OCTEONTX_SSOVF_DEBUG=3Dn > = > # > +# Compile PMD for OPDL event device > +# > +CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV=3Dy > +CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV_DEBUG=3Dn > + > +# > # Compile librte_ring > # > CONFIG_RTE_LIBRTE_RING=3Dy > diff --git a/drivers/event/Makefile b/drivers/event/Makefile > index 1f9c0ba..d626666 100644 > --- a/drivers/event/Makefile > +++ b/drivers/event/Makefile > @@ -35,5 +35,6 @@ DIRS-$(CONFIG_RTE_LIBRTE_PMD_SKELETON_EVENTDEV) +=3D sk= eleton > DIRS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) +=3D sw > DIRS-$(CONFIG_RTE_LIBRTE_PMD_OCTEONTX_SSOVF) +=3D octeontx > DIRS-$(CONFIG_RTE_LIBRTE_PMD_DPAA2_EVENTDEV) +=3D dpaa2 > +DIRS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) +=3D opdl > = > include $(RTE_SDK)/mk/rte.subdir.mk > diff --git a/drivers/event/opdl/Makefile b/drivers/event/opdl/Makefile > new file mode 100644 > index 0000000..8277e25 > --- /dev/null > +++ b/drivers/event/opdl/Makefile > @@ -0,0 +1,62 @@ > +# BSD LICENSE > +# > +# Copyright(c) 2016-2017 Intel Corporation. All rights reserved. > +# > +# Redistribution and use in source and binary forms, with or without > +# modification, are permitted provided that the following conditions > +# are met: > +# > +# * Redistributions of source code must retain the above copyright > +# notice, this list of conditions and the following disclaimer. > +# * Redistributions in binary form must reproduce the above copyright > +# notice, this list of conditions and the following disclaimer in > +# the documentation and/or other materials provided with the > +# distribution. > +# * Neither the name of Intel Corporation nor the names of its > +# contributors may be used to endorse or promote products derived > +# from this software without specific prior written permission. > +# > +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS > +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT > +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR > +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT > +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, > +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT > +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, > +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY > +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT > +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE > +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. > + > +include $(RTE_SDK)/mk/rte.vars.mk > + > +# library name > +LIB =3D librte_pmd_opdl_event.a > + > +# build flags > +CFLAGS +=3D -O3 > +CFLAGS +=3D $(WERROR_FLAGS) > +# for older GCC versions, allow us to initialize an event using > +# designated initializers. > +ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y) > +ifeq ($(shell test $(GCC_VERSION) -le 50 && echo 1), 1) > +CFLAGS +=3D -Wno-missing-field-initializers > +endif > +endif > + > +LDLIBS +=3D -lrte_eal -lrte_eventdev -lrte_kvargs > +LDLIBS +=3D -lrte_bus_vdev -lrte_mbuf -lrte_mempool > + > +# library version > +LIBABIVER :=3D 1 > + > +# versioning export map > +EXPORT_MAP :=3D rte_pmd_evdev_opdl_version.map > + > +# library source files > +SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) +=3D opdl_ring.c > + > +# export include files > +SYMLINK-y-include +=3D > + > +include $(RTE_SDK)/mk/rte.lib.mk > diff --git a/drivers/event/opdl/opdl_log.h b/drivers/event/opdl/opdl_log.h > new file mode 100644 > index 0000000..833697b > --- /dev/null > +++ b/drivers/event/opdl/opdl_log.h > @@ -0,0 +1,59 @@ > +/*- > + * BSD LICENSE > + * > + * Copyright(c) 2010-2015 Intel Corporation. All rights reserved. > + * All rights reserved. > + * > + * Redistribution and use in source and binary forms, with or without > + * modification, are permitted provided that the following conditions > + * are met: > + * > + * * Redistributions of source code must retain the above copyright > + * notice, this list of conditions and the following disclaimer. > + * * Redistributions in binary form must reproduce the above copyrig= ht > + * notice, this list of conditions and the following disclaimer in > + * the documentation and/or other materials provided with the > + * distribution. > + * * Neither the name of Intel Corporation nor the names of its > + * contributors may be used to endorse or promote products derived > + * from this software without specific prior written permission. > + * > + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS > + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT > + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS F= OR > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT > + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTA= L, > + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT > + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF US= E, > + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON A= NY > + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE U= SE > + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. > + */ > + > +#ifndef _OPDL_LOGS_H_ > +#define _OPDL_LOGS_H_ > + > +#include > + > +int opdl_logtype_driver; This should be extern int opdl_logtype_driver; That will be fixed in next iteration = > + > +#define PMD_DRV_LOG_RAW(level, fmt, args...) \ > + rte_log(RTE_LOG_ ## level, opdl_logtype_driver, "%s(): " fmt, \ > + __func__, ## args) > + > +#define PMD_DRV_LOG(level, fmt, args...) \ > + PMD_DRV_LOG_RAW(level, fmt "\n", ## args) > + > +#ifdef RTE_LIBRTE_PMD_EVDEV_OPDL_DEBUG > + > +#define OPDL_LOG_DBG(fmt, args...) \ > + RTE_LOG(DEBUG, EVENTDEV, "[%s] %s() line %u: " fmt "\n", \ > + OPDL_PMD_NAME, \ > + __func__, __LINE__, ## args) > +#else > +#define OPDL_LOG_DBG(fmt, args...) > +#endif > + > + > +#endif /* _OPDL_LOGS_H_ */ > diff --git a/drivers/event/opdl/opdl_ring.c b/drivers/event/opdl/opdl_rin= g.c > new file mode 100644 > index 0000000..937d1bd > --- /dev/null > +++ b/drivers/event/opdl/opdl_ring.c > @@ -0,0 +1,1250 @@ > +/*- > + * BSD LICENSE > + * > + * Copyright(c) 2016-2017 Intel Corporation. All rights reserved. > + * > + * Redistribution and use in source and binary forms, with or without > + * modification, are permitted provided that the following conditions > + * are met: > + * > + * * Redistributions of source code must retain the above copyright > + * notice, this list of conditions and the following disclaimer. > + * * Redistributions in binary form must reproduce the above copyrig= ht > + * notice, this list of conditions and the following disclaimer in > + * the documentation and/or other materials provided with the > + * distribution. > + * * Neither the name of Intel Corporation nor the names of its > + * contributors may be used to endorse or promote products derived > + * from this software without specific prior written permission. > + * > + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS > + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT > + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS F= OR > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT > + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTA= L, > + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT > + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF US= E, > + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON A= NY > + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE U= SE > + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. > + */ > + > +#include > +#include > +#include > +#include > + > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > + > +#include "opdl_ring.h" > +#include "opdl_log.h" > + > +#define LIB_NAME "opdl_ring" > + > +#define OPDL_NAME_SIZE 64 > + > + > +#define OPDL_EVENT_MASK (0xFFFF0000000FFFFFULL) > + > +/* Types of dependency between stages */ > +enum dep_type { > + DEP_NONE =3D 0, /* no dependency */ > + DEP_DIRECT, /* stage has direct dependency */ > + DEP_INDIRECT, /* in-direct dependency through other stage(s) */ > + DEP_SELF, /* stage dependency on itself, used to detect loops */ > +}; > + > +/* Shared section of stage state. > + * Care is needed when accessing and the layout is important, especially= to > + * limit the adjacent cache-line HW prefetcher from impacting performanc= e. > + */ > +struct shared_state { > + /* Last known minimum sequence number of dependencies, used for multi > + * thread operation > + */ > + uint32_t available_seq; > + char _pad1[RTE_CACHE_LINE_SIZE * 3]; > + uint32_t head; /* Head sequence number (for multi thread operation)= */ > + char _pad2[RTE_CACHE_LINE_SIZE * 3]; > + struct opdl_stage *stage; /* back pointer */ > + uint32_t tail; /* Tail sequence number */ > + char _pad3[RTE_CACHE_LINE_SIZE * 2]; > +} __rte_cache_aligned; > + > +/* A structure to keep track of "unfinished" claims. This is only used f= or > + * stages that are threadsafe. Each lcore accesses its own instance of t= his > + * structure to record the entries it has claimed. This allows one lcore= to make > + * multiple claims without being blocked by another. When disclaiming it= moves > + * forward the shared tail when the shared tail matches the tail value r= ecorded > + * here. > + */ > +struct claim_manager { > + uint32_t num_to_disclaim; > + uint32_t num_claimed; > + uint32_t mgr_head; > + uint32_t mgr_tail; > + struct { > + uint32_t head; > + uint32_t tail; > + } claims[OPDL_DISCLAIMS_PER_LCORE]; > +} __rte_cache_aligned; > + > +/* Context for each stage of opdl_ring. > + * Calculations on sequence numbers need to be done with other uint32_t = values > + * so that results are modulus 2^32, and not undefined. > + */ > +struct opdl_stage { > + struct opdl_ring *t; /* back pointer, set at init */ > + uint32_t num_slots; /* Number of slots for entries, set at init */ > + uint32_t index; /* ID for this stage, set at init */ > + bool threadsafe; /* Set to 1 if this stage supports threadsafe use = */ > + /* Last known min seq number of dependencies for used for single thr= ead > + * operation > + */ > + uint32_t available_seq; > + uint32_t head; /* Current head for single-thread operation */ > + uint32_t shadow_head; /* Shadow head for single-thread operation */ > + uint32_t nb_instance; /* Number of instances */ > + uint32_t instance_id; /* ID of this stage instance */ > + uint16_t num_claimed; /* Number of slots claimed */ > + uint16_t num_event; /* Number of events */ > + uint32_t seq; /* sequence number */ > + uint32_t num_deps; /* Number of direct dependencies */ > + /* Keep track of all dependencies, used during init only */ > + enum dep_type *dep_tracking; > + /* Direct dependencies of this stage */ > + struct shared_state **deps; > + /* Other stages read this! */ > + struct shared_state shared __rte_cache_aligned; > + /* For managing disclaims in multi-threaded processing stages */ > + struct claim_manager pending_disclaims[RTE_MAX_LCORE] > + __rte_cache_aligned; > +} __rte_cache_aligned; > + > +/* Context for opdl_ring */ > +struct opdl_ring { > + char name[OPDL_NAME_SIZE]; /* OPDL queue instance name */ > + int socket; /* NUMA socket that memory is allocated on */ > + uint32_t num_slots; /* Number of slots for entries */ > + uint32_t mask; /* Mask for sequence numbers (num_slots - 1) */ > + uint32_t slot_size; /* Size of each slot in bytes */ > + uint32_t num_stages; /* Number of stages that have been added */ > + uint32_t max_num_stages; /* Max number of stages */ > + /* Stages indexed by ID */ > + struct opdl_stage *stages; > + /* Memory for storing slot data */ > + uint8_t slots[0] __rte_cache_aligned; > +}; > + > + > +/* Return input stage of a opdl_ring */ > +static __rte_always_inline struct opdl_stage * > +input_stage(const struct opdl_ring *t) > +{ > + return &t->stages[0]; > +} > + > +/* Check if a stage is the input stage */ > +static __rte_always_inline bool > +is_input_stage(const struct opdl_stage *s) > +{ > + return s->index =3D=3D 0; > +} > + > +/* Get slot pointer from sequence number */ > +static __rte_always_inline void * > +get_slot(const struct opdl_ring *t, uint32_t n) > +{ > + return (void *)(uintptr_t)&t->slots[(n & t->mask) * t->slot_size]; > +} > + > +/* Find how many entries are available for processing */ > +static __rte_always_inline uint32_t > +available(const struct opdl_stage *s) > +{ > + if (s->threadsafe =3D=3D true) { > + uint32_t n =3D __atomic_load_n(&s->shared.available_seq, > + __ATOMIC_ACQUIRE) - > + __atomic_load_n(&s->shared.head, > + __ATOMIC_ACQUIRE); > + > + /* Return 0 if available_seq needs to be updated */ > + return (n <=3D s->num_slots) ? n : 0; > + } > + > + /* Single threaded */ > + return s->available_seq - s->head; > +} > + > +/* Read sequence number of dependencies and find minimum */ > +static __rte_always_inline void > +update_available_seq(struct opdl_stage *s) > +{ > + uint32_t i; > + uint32_t this_tail =3D s->shared.tail; > + uint32_t min_seq =3D __atomic_load_n(&s->deps[0]->tail, __ATOMIC_ACQ= UIRE); > + /* Input stage sequence numbers are greater than the sequence number= s of > + * its dependencies so an offset of t->num_slots is needed when > + * calculating available slots and also the condition which is used = to > + * determine the dependencies minimum sequence number must be revert= ed. > + */ > + uint32_t wrap; > + > + if (is_input_stage(s)) { > + wrap =3D s->num_slots; > + for (i =3D 1; i < s->num_deps; i++) { > + uint32_t seq =3D __atomic_load_n(&s->deps[i]->tail, > + __ATOMIC_ACQUIRE); > + if ((this_tail - seq) > (this_tail - min_seq)) > + min_seq =3D seq; > + } > + } else { > + wrap =3D 0; > + for (i =3D 1; i < s->num_deps; i++) { > + uint32_t seq =3D __atomic_load_n(&s->deps[i]->tail, > + __ATOMIC_ACQUIRE); > + if ((seq - this_tail) < (min_seq - this_tail)) > + min_seq =3D seq; > + } > + } > + > + if (s->threadsafe =3D=3D false) > + s->available_seq =3D min_seq + wrap; > + else > + __atomic_store_n(&s->shared.available_seq, min_seq + wrap, > + __ATOMIC_RELEASE); > +} > + > +/* Wait until the number of available slots reaches number requested */ > +static __rte_always_inline void > +wait_for_available(struct opdl_stage *s, uint32_t n) > +{ > + while (available(s) < n) { > + rte_pause(); > + update_available_seq(s); > + } > +} > + > +/* Return number of slots to process based on number requested and mode = */ > +static __rte_always_inline uint32_t > +num_to_process(struct opdl_stage *s, uint32_t n, bool block) > +{ > + /* Don't read tail sequences of dependencies if not needed */ > + if (available(s) >=3D n) > + return n; > + > + update_available_seq(s); > + > + if (block =3D=3D false) { > + uint32_t avail =3D available(s); > + > + if (avail =3D=3D 0) { > + rte_pause(); > + return 0; > + } > + return (avail <=3D n) ? avail : n; > + } > + > + if (unlikely(n > s->num_slots)) { > + PMD_DRV_LOG(ERR, "%u entries is more than max (%u)", n, s->num_s= lots); > + return 0; /* Avoid infinite loop */ > + } > + /* blocking */ > + wait_for_available(s, n); > + return n; > +} > + > +/* Copy entries in to slots with wrap-around */ > +static __rte_always_inline void > +copy_entries_in(struct opdl_ring *t, uint32_t start, const void *entries, > + uint32_t num_entries) > +{ > + uint32_t slot_size =3D t->slot_size; > + uint32_t slot_index =3D start & t->mask; > + > + if (slot_index + num_entries <=3D t->num_slots) { > + rte_memcpy(get_slot(t, start), entries, > + num_entries * slot_size); > + } else { > + uint32_t split =3D t->num_slots - slot_index; > + > + rte_memcpy(get_slot(t, start), entries, split * slot_size); > + rte_memcpy(get_slot(t, 0), > + RTE_PTR_ADD(entries, split * slot_size), > + (num_entries - split) * slot_size); > + } > +} > + > +/* Copy entries out from slots with wrap-around */ > +static __rte_always_inline void > +copy_entries_out(struct opdl_ring *t, uint32_t start, void *entries, > + uint32_t num_entries) > +{ > + uint32_t slot_size =3D t->slot_size; > + uint32_t slot_index =3D start & t->mask; > + > + if (slot_index + num_entries <=3D t->num_slots) { > + rte_memcpy(entries, get_slot(t, start), > + num_entries * slot_size); > + } else { > + uint32_t split =3D t->num_slots - slot_index; > + > + rte_memcpy(entries, get_slot(t, start), split * slot_size); > + rte_memcpy(RTE_PTR_ADD(entries, split * slot_size), > + get_slot(t, 0), > + (num_entries - split) * slot_size); > + } > +} > + > +/* Input function optimised for single thread */ > +static __rte_always_inline uint32_t > +opdl_ring_input_singlethread(struct opdl_ring *t, const void *entries, > + uint32_t num_entries, bool block) > +{ > + struct opdl_stage *s =3D input_stage(t); > + uint32_t head =3D s->head; > + > + num_entries =3D num_to_process(s, num_entries, block); > + if (num_entries =3D=3D 0) > + return 0; > + > + copy_entries_in(t, head, entries, num_entries); > + > + s->head +=3D num_entries; > + __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE); > + > + return num_entries; > +} > + > +/* Convert head and tail of claim_manager into valid index */ > +static __rte_always_inline uint32_t > +claim_mgr_index(uint32_t n) > +{ > + return n & (OPDL_DISCLAIMS_PER_LCORE - 1); > +} > + > +/* Check if there are available slots in claim_manager */ > +static __rte_always_inline bool > +claim_mgr_available(struct claim_manager *mgr) > +{ > + return (mgr->mgr_head < (mgr->mgr_tail + OPDL_DISCLAIMS_PER_LCORE)) ? > + true : false; > +} > + > +/* Record a new claim. Only use after first checking an entry is availab= le */ > +static __rte_always_inline void > +claim_mgr_add(struct claim_manager *mgr, uint32_t tail, uint32_t head) > +{ > + if ((mgr->mgr_head !=3D mgr->mgr_tail) && > + (mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head =3D=3D > + tail)) { > + /* Combine with previous claim */ > + mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head =3D head; > + } else { > + mgr->claims[claim_mgr_index(mgr->mgr_head)].head =3D head; > + mgr->claims[claim_mgr_index(mgr->mgr_head)].tail =3D tail; > + mgr->mgr_head++; > + } > + > + mgr->num_claimed +=3D (head - tail); > +} > + > +/* Read the oldest recorded claim */ > +static __rte_always_inline bool > +claim_mgr_read(struct claim_manager *mgr, uint32_t *tail, uint32_t *head) > +{ > + if (mgr->mgr_head =3D=3D mgr->mgr_tail) > + return false; > + > + *head =3D mgr->claims[claim_mgr_index(mgr->mgr_tail)].head; > + *tail =3D mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail; > + return true; > +} > + > +/* Remove the oldest recorded claim. Only use after first reading the en= try */ > +static __rte_always_inline void > +claim_mgr_remove(struct claim_manager *mgr) > +{ > + mgr->num_claimed -=3D (mgr->claims[claim_mgr_index(mgr->mgr_tail)].h= ead - > + mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail); > + mgr->mgr_tail++; > +} > + > +/* Update tail in the oldest claim. Only use after first reading the ent= ry */ > +static __rte_always_inline void > +claim_mgr_move_tail(struct claim_manager *mgr, uint32_t num_entries) > +{ > + mgr->num_claimed -=3D num_entries; > + mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail +=3D num_entries; > +} > + > +static __rte_always_inline void > +opdl_stage_disclaim_multithread_n(struct opdl_stage *s, > + uint32_t num_entries, bool block) > +{ > + struct claim_manager *disclaims =3D &s->pending_disclaims[rte_lcore_= id()]; > + uint32_t head; > + uint32_t tail; > + > + while (num_entries) { > + bool ret =3D claim_mgr_read(disclaims, &tail, &head); > + > + if (ret =3D=3D false) > + break; /* nothing is claimed */ > + /* There should be no race condition here. If shared.tail > + * matches, no other core can update it until this one does. > + */ > + if (__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) =3D=3D > + tail) { > + if (num_entries >=3D (head - tail)) { > + claim_mgr_remove(disclaims); > + __atomic_store_n(&s->shared.tail, head, > + __ATOMIC_RELEASE); > + num_entries -=3D (head - tail); > + } else { > + claim_mgr_move_tail(disclaims, num_entries); > + __atomic_store_n(&s->shared.tail, > + num_entries + tail, > + __ATOMIC_RELEASE); > + num_entries =3D 0; > + } > + } else if (block =3D=3D false) > + break; /* blocked by other thread */ > + /* Keep going until num_entries are disclaimed. */ > + rte_pause(); > + } > + > + disclaims->num_to_disclaim =3D num_entries; > +} > + > +/* Move head atomically, returning number of entries available to proces= s and > + * the original value of head. For non-input stages, the claim is record= ed > + * so that the tail can be updated later by opdl_stage_disclaim(). > + */ > +static __rte_always_inline void > +move_head_atomically(struct opdl_stage *s, uint32_t *num_entries, > + uint32_t *old_head, bool block, bool claim_func) > +{ > + uint32_t orig_num_entries =3D *num_entries; > + uint32_t ret; > + struct claim_manager *disclaims =3D &s->pending_disclaims[rte_lcore_= id()]; > + > + /* Attempt to disclaim any outstanding claims */ > + opdl_stage_disclaim_multithread_n(s, disclaims->num_to_disclaim, > + false); > + > + *old_head =3D __atomic_load_n(&s->shared.head, __ATOMIC_ACQUIRE); > + while (true) { > + bool success; > + /* If called by opdl_ring_input(), claim does not need to be > + * recorded, as there will be no disclaim. > + */ > + if (claim_func) { > + /* Check that the claim can be recorded */ > + ret =3D claim_mgr_available(disclaims); > + if (ret =3D=3D false) { > + /* exit out if claim can't be recorded */ > + *num_entries =3D 0; > + return; > + } > + } > + > + *num_entries =3D num_to_process(s, orig_num_entries, block); > + if (*num_entries =3D=3D 0) > + return; > + > + success =3D __atomic_compare_exchange_n(&s->shared.head, old_hea= d, > + *old_head + *num_entries, > + true, /* may fail spuriously */ > + __ATOMIC_RELEASE, /* memory order on success */ > + __ATOMIC_ACQUIRE); /* memory order on fail */ > + if (likely(success)) > + break; > + rte_pause(); > + } > + > + if (claim_func) > + /* Store the claim record */ > + claim_mgr_add(disclaims, *old_head, *old_head + *num_entries); > +} > + > +/* Input function that supports multiple threads */ > +static __rte_always_inline uint32_t > +opdl_ring_input_multithread(struct opdl_ring *t, const void *entries, > + uint32_t num_entries, bool block) > +{ > + struct opdl_stage *s =3D input_stage(t); > + uint32_t old_head; > + > + move_head_atomically(s, &num_entries, &old_head, block, false); > + if (num_entries =3D=3D 0) > + return 0; > + > + copy_entries_in(t, old_head, entries, num_entries); > + > + /* If another thread started inputting before this one, but hasn't > + * finished, we need to wait for it to complete to update the tail. > + */ > + while (unlikely(__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) != =3D > + old_head)) > + rte_pause(); > + > + __atomic_store_n(&s->shared.tail, old_head + num_entries, > + __ATOMIC_RELEASE); > + > + return num_entries; > +} > + > +static __rte_always_inline uint32_t > +opdl_first_entry_id(uint32_t start_seq, uint8_t nb_p_lcores, > + uint8_t this_lcore) > +{ > + return ((nb_p_lcores <=3D 1) ? 0 : > + (nb_p_lcores - (start_seq % nb_p_lcores) + this_lcore) % > + nb_p_lcores); > +} > + > +/* Claim slots to process, optimised for single-thread operation */ > +static __rte_always_inline uint32_t > +opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries, > + uint32_t num_entries, uint32_t *seq, bool block, bool atomic) > +{ > + uint32_t i =3D 0, j =3D 0, offset; > + void *get_slots; > + struct rte_event *ev; > + RTE_SET_USED(seq); > + struct opdl_ring *t =3D s->t; > + uint8_t *entries_offset =3D (uint8_t *)entries; > + > + if (!atomic) { > + > + offset =3D opdl_first_entry_id(s->seq, s->nb_instance, > + s->instance_id); > + > + num_entries =3D s->nb_instance * num_entries; > + > + num_entries =3D num_to_process(s, num_entries, block); > + > + for (; offset < num_entries; offset +=3D s->nb_instance) { > + get_slots =3D get_slot(t, s->head + offset); > + memcpy(entries_offset, get_slots, t->slot_size); > + entries_offset +=3D t->slot_size; > + i++; > + } > + } else { > + num_entries =3D num_to_process(s, num_entries, block); > + > + for (j =3D 0; j < num_entries; j++) { > + ev =3D (struct rte_event *)get_slot(t, s->head+j); > + if ((ev->flow_id%s->nb_instance) =3D=3D s->instance_id) { > + memcpy(entries_offset, ev, t->slot_size); > + entries_offset +=3D t->slot_size; > + i++; > + } > + } > + } > + s->shadow_head =3D s->head; > + s->head +=3D num_entries; > + s->num_claimed =3D num_entries; > + s->num_event =3D i; > + > + /* automatically disclaim entries if number of rte_events is zero */ > + if (unlikely(i =3D=3D 0)) > + opdl_stage_disclaim(s, 0, false); > + > + return i; > +} > + > +/* Thread-safe version of function to claim slots for processing */ > +static __rte_always_inline uint32_t > +opdl_stage_claim_multithread(struct opdl_stage *s, void *entries, > + uint32_t num_entries, uint32_t *seq, bool block) > +{ > + uint32_t old_head; > + struct opdl_ring *t =3D s->t; > + uint32_t i =3D 0, offset; > + uint8_t *entries_offset =3D (uint8_t *)entries; > + > + offset =3D opdl_first_entry_id(*seq, s->nb_instance, s->instance_id); > + num_entries =3D offset + (s->nb_instance * num_entries); > + > + move_head_atomically(s, &num_entries, &old_head, block, true); > + > + for (; offset < num_entries; offset +=3D s->nb_instance) { > + memcpy(entries_offset, get_slot(t, s->head + offset), > + t->slot_size); > + entries_offset +=3D t->slot_size; > + i++; > + } > + if (seq !=3D NULL) > + *seq =3D old_head; > + > + return i; > +} > + > +/* Claim and copy slot pointers, optimised for single-thread operation */ > +static __rte_always_inline uint32_t > +opdl_stage_claim_copy_singlethread(struct opdl_stage *s, void *entries, > + uint32_t num_entries, uint32_t *seq, bool block) > +{ > + num_entries =3D num_to_process(s, num_entries, block); > + if (num_entries =3D=3D 0) > + return 0; > + copy_entries_out(s->t, s->head, entries, num_entries); > + if (seq !=3D NULL) > + *seq =3D s->head; > + s->head +=3D num_entries; > + return num_entries; > +} > + > +/* Thread-safe version of function to claim and copy pointers to slots */ > +static __rte_always_inline uint32_t > +opdl_stage_claim_copy_multithread(struct opdl_stage *s, void *entries, > + uint32_t num_entries, uint32_t *seq, bool block) > +{ > + uint32_t old_head; > + > + move_head_atomically(s, &num_entries, &old_head, block, true); > + if (num_entries =3D=3D 0) > + return 0; > + copy_entries_out(s->t, old_head, entries, num_entries); > + if (seq !=3D NULL) > + *seq =3D old_head; > + return num_entries; > +} > + > +static __rte_always_inline void > +opdl_stage_disclaim_singlethread_n(struct opdl_stage *s, > + uint32_t num_entries) > +{ > + uint32_t old_tail =3D s->shared.tail; > + > + if (unlikely(num_entries > (s->head - old_tail))) { > + PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed= (%u)", > + num_entries, s->head - old_tail); > + num_entries =3D s->head - old_tail; > + } > + __atomic_store_n(&s->shared.tail, num_entries + old_tail, > + __ATOMIC_RELEASE); > +} > + > +uint32_t > +opdl_ring_input(struct opdl_ring *t, const void *entries, uint32_t num_e= ntries, > + bool block) > +{ > + if (input_stage(t)->threadsafe =3D=3D false) > + return opdl_ring_input_singlethread(t, entries, num_entries, > + block); > + else > + return opdl_ring_input_multithread(t, entries, num_entries, > + block); > +} > + > +uint32_t > +opdl_ring_copy_from_burst(struct opdl_ring *t, struct opdl_stage *s, > + const void *entries, uint32_t num_entries, bool block) > +{ > + uint32_t head =3D s->head; > + > + num_entries =3D num_to_process(s, num_entries, block); > + > + if (num_entries =3D=3D 0) > + return 0; > + > + copy_entries_in(t, head, entries, num_entries); > + > + s->head +=3D num_entries; > + __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE); > + > + return num_entries; > + > +} > + > +uint32_t > +opdl_ring_copy_to_burst(struct opdl_ring *t, struct opdl_stage *s, > + void *entries, uint32_t num_entries, bool block) > +{ > + uint32_t head =3D s->head; > + > + num_entries =3D num_to_process(s, num_entries, block); > + if (num_entries =3D=3D 0) > + return 0; > + > + copy_entries_out(t, head, entries, num_entries); > + > + s->head +=3D num_entries; > + __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE); > + > + return num_entries; > +} > + > +uint32_t > +opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries) > +{ > + /* return (num_to_process(s, num_entries, false)); */ > + > + if (available(s) >=3D num_entries) > + return num_entries; > + > + update_available_seq(s); > + > + uint32_t avail =3D available(s); > + > + if (avail =3D=3D 0) { > + rte_pause(); > + return 0; > + } > + return (avail <=3D num_entries) ? avail : num_entries; > +} > + > +uint32_t > +opdl_stage_claim(struct opdl_stage *s, void *entries, > + uint32_t num_entries, uint32_t *seq, bool block, bool atomic) > +{ > + if (s->threadsafe =3D=3D false) > + return opdl_stage_claim_singlethread(s, entries, num_entries, > + seq, block, atomic); > + else > + return opdl_stage_claim_multithread(s, entries, num_entries, > + seq, block); > +} > + > +uint32_t > +opdl_stage_claim_copy(struct opdl_stage *s, void *entries, > + uint32_t num_entries, uint32_t *seq, bool block) > +{ > + if (s->threadsafe =3D=3D false) > + return opdl_stage_claim_copy_singlethread(s, entries, > + num_entries, seq, block); > + else > + return opdl_stage_claim_copy_multithread(s, entries, > + num_entries, seq, block); > +} > + > +void > +opdl_stage_disclaim_n(struct opdl_stage *s, uint32_t num_entries, > + bool block) > +{ > + > + if (s->threadsafe =3D=3D false) { > + opdl_stage_disclaim_singlethread_n(s, s->num_claimed); > + } else { > + struct claim_manager *disclaims =3D > + &s->pending_disclaims[rte_lcore_id()]; > + > + if (unlikely(num_entries > s->num_slots)) { > + PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than cla= imed (%u)", > + num_entries, disclaims->num_claimed); > + num_entries =3D disclaims->num_claimed; > + } > + > + num_entries =3D RTE_MIN(num_entries + disclaims->num_to_disclaim, > + disclaims->num_claimed); > + opdl_stage_disclaim_multithread_n(s, num_entries, block); > + } > +} > + > +int > +opdl_stage_disclaim(struct opdl_stage *s, uint32_t num_entries, bool blo= ck) > +{ > + if (num_entries !=3D s->num_event) { > + rte_errno =3D -EINVAL; > + return 0; > + } > + if (s->threadsafe =3D=3D false) { > + __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE); > + s->seq +=3D s->num_claimed; > + s->shadow_head =3D s->head; > + s->num_claimed =3D 0; > + } else { > + struct claim_manager *disclaims =3D > + &s->pending_disclaims[rte_lcore_id()]; > + opdl_stage_disclaim_multithread_n(s, disclaims->num_claimed, > + block); > + } > + return num_entries; > +} > + > +uint32_t > +opdl_ring_available(struct opdl_ring *t) > +{ > + return opdl_stage_available(&t->stages[0]); > +} > + > +uint32_t > +opdl_stage_available(struct opdl_stage *s) > +{ > + update_available_seq(s); > + return available(s); > +} > + > +void > +opdl_ring_flush(struct opdl_ring *t) > +{ > + struct opdl_stage *s =3D input_stage(t); > + > + wait_for_available(s, s->num_slots); > +} > + > +/******************** Non performance sensitive functions **************= ******/ > + > +/* Initial setup of a new stage's context */ > +static int > +init_stage(struct opdl_ring *t, struct opdl_stage *s, bool threadsafe, > + bool is_input) > +{ > + uint32_t available =3D (is_input) ? t->num_slots : 0; > + > + s->t =3D t; > + s->num_slots =3D t->num_slots; > + s->index =3D t->num_stages; > + s->threadsafe =3D threadsafe; > + s->shared.stage =3D s; > + > + /* Alloc memory for deps */ > + s->dep_tracking =3D rte_zmalloc_socket(LIB_NAME, > + t->max_num_stages * sizeof(enum dep_type), > + 0, t->socket); > + if (s->dep_tracking =3D=3D NULL) > + return -ENOMEM; > + > + s->deps =3D rte_zmalloc_socket(LIB_NAME, > + t->max_num_stages * sizeof(struct shared_state *), > + 0, t->socket); > + if (s->deps =3D=3D NULL) { > + rte_free(s->dep_tracking); > + return -ENOMEM; > + } > + > + s->dep_tracking[s->index] =3D DEP_SELF; > + > + if (threadsafe =3D=3D true) > + s->shared.available_seq =3D available; > + else > + s->available_seq =3D available; > + > + return 0; > +} > + > +/* Add direct or indirect dependencies between stages */ > +static int > +add_dep(struct opdl_stage *dependent, const struct opdl_stage *dependenc= y, > + enum dep_type type) > +{ > + struct opdl_ring *t =3D dependent->t; > + uint32_t i; > + > + /* Add new direct dependency */ > + if ((type =3D=3D DEP_DIRECT) && > + (dependent->dep_tracking[dependency->index] =3D=3D > + DEP_NONE)) { > + PMD_DRV_LOG(DEBUG, "%s:%u direct dependency on %u", > + t->name, dependent->index, dependency->index); > + dependent->dep_tracking[dependency->index] =3D DEP_DIRECT; > + } > + > + /* Add new indirect dependency or change direct to indirect */ > + if ((type =3D=3D DEP_INDIRECT) && > + ((dependent->dep_tracking[dependency->index] =3D=3D > + DEP_NONE) || > + (dependent->dep_tracking[dependency->index] =3D=3D > + DEP_DIRECT))) { > + PMD_DRV_LOG(DEBUG, "%s:%u indirect dependency on %u", > + t->name, dependent->index, dependency->index); > + dependent->dep_tracking[dependency->index] =3D DEP_INDIRECT; > + } > + > + /* Shouldn't happen... */ > + if ((dependent->dep_tracking[dependency->index] =3D=3D DEP_SELF) && > + (dependent !=3D input_stage(t))) { > + PMD_DRV_LOG(ERR, "Loop in dependency graph %s:%u", > + t->name, dependent->index); > + return -EINVAL; > + } > + > + /* Keep going to dependencies of the dependency, until input stage */ > + if (dependency !=3D input_stage(t)) > + for (i =3D 0; i < dependency->num_deps; i++) { > + int ret =3D add_dep(dependent, dependency->deps[i]->stage, > + DEP_INDIRECT); > + > + if (ret < 0) > + return ret; > + } > + > + /* Make list of sequence numbers for direct dependencies only */ > + if (type =3D=3D DEP_DIRECT) > + for (i =3D 0, dependent->num_deps =3D 0; i < t->num_stages; i++) > + if (dependent->dep_tracking[i] =3D=3D DEP_DIRECT) { > + if ((i =3D=3D 0) && (dependent->num_deps > 1)) > + rte_panic("%s:%u depends on > input", > + t->name, > + dependent->index); > + dependent->deps[dependent->num_deps++] =3D > + &t->stages[i].shared; > + } > + > + return 0; > +} > + > +struct opdl_ring * > +opdl_ring_create(const char *name, uint32_t num_slots, uint32_t slot_siz= e, > + uint32_t max_num_stages, int socket) > +{ > + struct opdl_ring *t; > + char mz_name[RTE_MEMZONE_NAMESIZE]; > + int mz_flags =3D 0; > + struct opdl_stage *st =3D NULL; > + const struct rte_memzone *mz =3D NULL; > + size_t alloc_size =3D RTE_CACHE_LINE_ROUNDUP(sizeof(*t) + > + (num_slots * slot_size)); > + > + /* Compile time checking */ > + RTE_BUILD_BUG_ON((sizeof(struct shared_state) & RTE_CACHE_LINE_MASK)= !=3D > + 0); > + RTE_BUILD_BUG_ON((offsetof(struct opdl_stage, shared) & > + RTE_CACHE_LINE_MASK) !=3D 0); > + RTE_BUILD_BUG_ON((offsetof(struct opdl_ring, slots) & > + RTE_CACHE_LINE_MASK) !=3D 0); > + RTE_BUILD_BUG_ON(!rte_is_power_of_2(OPDL_DISCLAIMS_PER_LCORE)); > + > + /* Parameter checking */ > + if (name =3D=3D NULL) { > + PMD_DRV_LOG(ERR, "name param is NULL"); > + return NULL; > + } > + if (!rte_is_power_of_2(num_slots)) { > + PMD_DRV_LOG(ERR, "num_slots (%u) for %s is not power of 2", > + num_slots, name); > + return NULL; > + } > + > + /* Alloc memory for stages */ > + st =3D rte_zmalloc_socket(LIB_NAME, > + max_num_stages * sizeof(struct opdl_stage), > + RTE_CACHE_LINE_SIZE, socket); > + if (st =3D=3D NULL) > + goto exit_fail; > + > + snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name); > + > + /* Alloc memory for memzone */ > + mz =3D rte_memzone_reserve(mz_name, alloc_size, socket, mz_flags); > + if (mz =3D=3D NULL) > + goto exit_fail; > + > + t =3D mz->addr; > + > + /* Initialise opdl_ring queue */ > + memset(t, 0, sizeof(*t)); > + snprintf(t->name, sizeof(t->name), "%s", name); > + t->socket =3D socket; > + t->num_slots =3D num_slots; > + t->mask =3D num_slots - 1; > + t->slot_size =3D slot_size; > + t->max_num_stages =3D max_num_stages; > + t->stages =3D st; > + > + PMD_DRV_LOG(DEBUG, "Created %s at %p (num_slots=3D%u,socket=3D%i,slo= t_size=3D%u)", > + t->name, t, num_slots, socket, slot_size); > + > + return t; > + > +exit_fail: > + PMD_DRV_LOG(ERR, "Cannot reserve memory"); > + rte_free(st); > + rte_memzone_free(mz); > + > + return NULL; > +} > + > +void * > +opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index) > +{ > + return get_slot(t, index); > +} > + > +bool > +opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *e= v, > + uint32_t index, bool atomic) > +{ > + uint32_t i =3D 0, j =3D 0, offset; > + struct opdl_ring *t =3D s->t; > + struct rte_event *ev_orig =3D NULL; > + bool ev_updated =3D false; > + uint64_t ev_temp =3D 0; > + > + if (index > s->num_event) { > + PMD_DRV_LOG(ERR, "index is overflow"); > + return ev_updated; > + } > + > + ev_temp =3D ev->event&OPDL_EVENT_MASK; > + > + if (!atomic) { > + offset =3D opdl_first_entry_id(s->seq, s->nb_instance, > + s->instance_id); > + offset +=3D index*s->nb_instance; > + ev_orig =3D get_slot(t, s->shadow_head+offset); > + if ((ev_orig->event&OPDL_EVENT_MASK) !=3D ev_temp) { > + ev_orig->event =3D ev->event; > + ev_updated =3D true; > + } > + if (ev_orig->u64 !=3D ev->u64) { > + ev_orig->u64 =3D ev->u64; > + ev_updated =3D true; > + } > + > + } else { > + for (i =3D 0; i < s->num_claimed; i++) { > + ev_orig =3D (struct rte_event *)get_slot(t, s->shadow_head+i= ); > + if ((ev_orig->flow_id%s->nb_instance) =3D=3D s->instance_id)= { > + > + if (j =3D=3D index) { > + if ((ev_orig->event&OPDL_EVENT_MASK) !=3D ev_temp) { > + ev_orig->event =3D ev->event; > + ev_updated =3D true; > + } > + if (ev_orig->u64 !=3D ev->u64) { > + ev_orig->u64 =3D ev->u64; > + ev_updated =3D true; > + } > + > + break; > + } > + j++; > + } > + } > + > + } > + > + return ev_updated; > +} > + > +int > +opdl_ring_get_socket(const struct opdl_ring *t) > +{ > + return t->socket; > +} > + > +uint32_t > +opdl_ring_get_num_slots(const struct opdl_ring *t) > +{ > + return t->num_slots; > +} > + > +const char * > +opdl_ring_get_name(const struct opdl_ring *t) > +{ > + return t->name; > +} > + > +/* Check dependency list is valid for a given opdl_ring */ > +static int > +check_deps(struct opdl_ring *t, struct opdl_stage *deps[], > + uint32_t num_deps) > +{ > + unsigned int i; > + > + for (i =3D 0; i < num_deps; ++i) { > + if (!deps[i]) { > + PMD_DRV_LOG(ERR, "deps[%u] is NULL", i); > + return -EINVAL; > + } > + if (t !=3D deps[i]->t) { > + PMD_DRV_LOG(ERR, "deps[%u] is in opdl_ring %s, not %s", > + i, deps[i]->t->name, t->name); > + return -EINVAL; > + } > + } > + if (num_deps > t->num_stages) { > + PMD_DRV_LOG(ERR, "num_deps (%u) > number stages (%u)", > + num_deps, t->num_stages); > + return -EINVAL; > + } > + return 0; > +} > + > +struct opdl_stage * > +opdl_stage_add(struct opdl_ring *t, bool threadsafe, bool is_input) > +{ > + struct opdl_stage *s; > + > + /* Parameter checking */ > + if (!t) { > + PMD_DRV_LOG(ERR, "opdl_ring is NULL"); > + return NULL; > + } > + if (t->num_stages =3D=3D t->max_num_stages) { > + PMD_DRV_LOG(ERR, "%s has max number of stages (%u)", > + t->name, t->max_num_stages); > + return NULL; > + } > + > + s =3D &t->stages[t->num_stages]; > + > + if (((uintptr_t)&s->shared & RTE_CACHE_LINE_MASK) !=3D 0) > + PMD_DRV_LOG(WARNING, "Tail seq num (%p) of %s stage not cache al= igned", > + &s->shared, t->name); > + > + if (init_stage(t, s, threadsafe, is_input) < 0) { > + PMD_DRV_LOG(ERR, "Cannot reserve memory"); > + return NULL; > + } > + t->num_stages++; > + > + return s; > +} > + > +uint32_t > +opdl_stage_deps_add(struct opdl_ring *t, struct opdl_stage *s, > + uint32_t nb_instance, uint32_t instance_id, > + struct opdl_stage *deps[], > + uint32_t num_deps) > +{ > + uint32_t i; > + int ret =3D 0; > + > + if ((num_deps > 0) && (!deps)) { > + PMD_DRV_LOG(ERR, "%s stage has NULL dependencies", t->name); > + return -1; > + } > + ret =3D check_deps(t, deps, num_deps); > + if (ret < 0) > + return ret; > + > + for (i =3D 0; i < num_deps; i++) { > + ret =3D add_dep(s, deps[i], DEP_DIRECT); > + if (ret < 0) > + return ret; > + } > + > + s->nb_instance =3D nb_instance; > + s->instance_id =3D instance_id; > + > + return ret; > +} > + > +struct opdl_stage * > +opdl_ring_get_input_stage(const struct opdl_ring *t) > +{ > + return input_stage(t); > +} > + > +int > +opdl_stage_set_deps(struct opdl_stage *s, struct opdl_stage *deps[], > + uint32_t num_deps) > +{ > + unsigned int i; > + int ret; > + > + if ((num_deps =3D=3D 0) || (!deps)) { > + PMD_DRV_LOG(ERR, "cannot set NULL dependencies"); > + return -EINVAL; > + } > + > + ret =3D check_deps(s->t, deps, num_deps); > + if (ret < 0) > + return ret; > + > + /* Update deps */ > + for (i =3D 0; i < num_deps; i++) > + s->deps[i] =3D &deps[i]->shared; > + s->num_deps =3D num_deps; > + > + return 0; > +} > + > +struct opdl_ring * > +opdl_stage_get_opdl_ring(const struct opdl_stage *s) > +{ > + return s->t; > +} > + > +void > +opdl_ring_dump(const struct opdl_ring *t, FILE *f) > +{ > + uint32_t i; > + > + if (t =3D=3D NULL) { > + fprintf(f, "NULL OPDL!\n"); > + return; > + } > + fprintf(f, "OPDL \"%s\": num_slots=3D%u; mask=3D%#x; slot_size=3D%u;= num_stages=3D%u; socket=3D%i\n", > + t->name, t->num_slots, t->mask, t->slot_size, > + t->num_stages, t->socket); > + for (i =3D 0; i < t->num_stages; i++) { > + uint32_t j; > + const struct opdl_stage *s =3D &t->stages[i]; > + > + fprintf(f, " %s[%u]: threadsafe=3D%s; head=3D%u; available_seq= =3D%u; tail=3D%u; deps=3D%u", > + t->name, i, (s->threadsafe) ? "true" : "false", > + (s->threadsafe) ? s->shared.head : s->head, > + (s->threadsafe) ? s->shared.available_seq : > + s->available_seq, > + s->shared.tail, (s->num_deps > 0) ? > + s->deps[0]->stage->index : 0); > + for (j =3D 1; j < s->num_deps; j++) > + fprintf(f, ",%u", s->deps[j]->stage->index); > + fprintf(f, "\n"); > + } > + fflush(f); > +} > + > +void > +opdl_ring_free(struct opdl_ring *t) > +{ > + uint32_t i; > + const struct rte_memzone *mz; > + char mz_name[RTE_MEMZONE_NAMESIZE]; > + > + if (t =3D=3D NULL) { > + PMD_DRV_LOG(DEBUG, "Freeing NULL OPDL Ring!"); > + return; > + } > + > + PMD_DRV_LOG(DEBUG, "Freeing %s opdl_ring at %p", t->name, t); > + > + for (i =3D 0; i < t->num_stages; ++i) { > + rte_free(t->stages[i].deps); > + rte_free(t->stages[i].dep_tracking); > + } > + > + rte_free(t->stages); > + > + snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, t->name); > + mz =3D rte_memzone_lookup(mz_name); > + if (rte_memzone_free(mz) !=3D 0) > + PMD_DRV_LOG(ERR, "Cannot free memzone for %s", t->name); > +} > + > +/* search a opdl_ring from its name */ > +struct opdl_ring * > +opdl_ring_lookup(const char *name) > +{ > + const struct rte_memzone *mz; > + char mz_name[RTE_MEMZONE_NAMESIZE]; > + > + snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name); > + > + mz =3D rte_memzone_lookup(mz_name); > + if (mz =3D=3D NULL) > + return NULL; > + > + return mz->addr; > +} > + > +void > +opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe) > +{ > + s->threadsafe =3D threadsafe; > +} > diff --git a/drivers/event/opdl/opdl_ring.h b/drivers/event/opdl/opdl_rin= g.h > new file mode 100644 > index 0000000..05baaef > --- /dev/null > +++ b/drivers/event/opdl/opdl_ring.h > @@ -0,0 +1,628 @@ > +/*- > + * BSD LICENSE > + * > + * Copyright(c) 2016-2017 Intel Corporation. All rights reserved. > + * > + * Redistribution and use in source and binary forms, with or without > + * modification, are permitted provided that the following conditions > + * are met: > + * > + * * Redistributions of source code must retain the above copyright > + * notice, this list of conditions and the following disclaimer. > + * * Redistributions in binary form must reproduce the above copyrig= ht > + * notice, this list of conditions and the following disclaimer in > + * the documentation and/or other materials provided with the > + * distribution. > + * * Neither the name of Intel Corporation nor the names of its > + * contributors may be used to endorse or promote products derived > + * from this software without specific prior written permission. > + * > + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS > + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT > + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS F= OR > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT > + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTA= L, > + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT > + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF US= E, > + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON A= NY > + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE U= SE > + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. > + */ > + > +#ifndef _OPDL_H_ > +#define _OPDL_H_ > + > +/** > + * @file > + * The "opdl_ring" is a data structure that contains a fixed number of s= lots, > + * with each slot having the same, but configurable, size. Entries are i= nput > + * into the opdl_ring by copying into available slots. Once in the opdl_= ring, > + * an entry is processed by a number of stages, with the ordering of sta= ge > + * processing controlled by making stages dependent on one or more other= stages. > + * An entry is not available for a stage to process until it has been pr= ocessed > + * by that stages dependencies. Entries are always made available for > + * processing in the same order that they were input in to the opdl_ring. > + * Inputting is considered as a stage that depends on all other stages, > + * and is also a dependency of all stages. > + * > + * Inputting and processing in a stage can support multi-threading. Note= that > + * multi-thread processing can also be done by making stages co-operate = e.g. two > + * stages where one processes the even packets and the other processes o= dd > + * packets. > + * > + * A opdl_ring can be used as the basis for pipeline based applications.= Instead > + * of each stage in a pipeline dequeueing from a ring, processing and en= queueing > + * to another ring, it can process entries in-place on the ring. If stag= es do > + * not depend on each other, they can run in parallel. > + * > + * The opdl_ring works with entries of configurable size, these could be > + * pointers to mbufs, pointers to mbufs with application specific meta-d= ata, > + * tasks etc. > + */ > + > +#include > +#include > +#include > + > +#include > +#ifdef __cplusplus > +extern "C" { > +#endif > + > +#ifndef OPDL_DISCLAIMS_PER_LCORE > +/** Multi-threaded processing allows one thread to process multiple batc= hes in a > + * stage, while another thread is processing a single large batch. This = number > + * controls how many non-contiguous batches one stage can process before= being > + * blocked by the other stage. > + */ > +#define OPDL_DISCLAIMS_PER_LCORE 8 > +#endif > + > +/** Opaque handle to a opdl_ring instance */ > +struct opdl_ring; > + > +/** Opaque handle to a single stage in a opdl_ring */ > +struct opdl_stage; > + > +/** > + * Create a new instance of a opdl_ring. > + * > + * @param name > + * String containing the name to give the new opdl_ring instance. > + * @param num_slots > + * How many slots the opdl_ring contains. Must be a power a 2! > + * @param slot_size > + * How many bytes in each slot. > + * @param max_num_stages > + * Maximum number of stages. > + * @param socket > + * The NUMA socket (or SOCKET_ID_ANY) to allocate the memory used for = this > + * opdl_ring instance. > + * @param threadsafe > + * Whether to support multiple threads inputting to the opdl_ring or n= ot. > + * Enabling this may have a negative impact on performance if only one= thread > + * will be inputting. > + * > + * @return > + * A pointer to a new opdl_ring instance, or NULL on error. > + */ > +struct opdl_ring * > +opdl_ring_create(const char *name, uint32_t num_slots, uint32_t slot_siz= e, > + uint32_t max_num_stages, int socket); > + > +/** > + * Get pointer to individual slot in a opdl_ring. > + * > + * @param t > + * The opdl_ring. > + * @param index > + * Index of slot. If greater than the number of slots it will be maske= d to be > + * within correct range. > + * > + * @return > + * A pointer to that slot. > + */ > +void * > +opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index); > + > +/** > + * Get NUMA socket used by a opdl_ring. > + * > + * @param t > + * The opdl_ring. > + * > + * @return > + * NUMA socket. > + */ > +int > +opdl_ring_get_socket(const struct opdl_ring *t); > + > +/** > + * Get number of slots in a opdl_ring. > + * > + * @param t > + * The opdl_ring. > + * > + * @return > + * Number of slots. > + */ > +uint32_t > +opdl_ring_get_num_slots(const struct opdl_ring *t); > + > +/** > + * Get name of a opdl_ring. > + * > + * @param t > + * The opdl_ring. > + * > + * @return > + * Name string. > + */ > +const char * > +opdl_ring_get_name(const struct opdl_ring *t); > + > +/** > + * Adds a new processing stage to a specified opdl_ring instance. Adding= a stage > + * while there are entries in the opdl_ring being processed will cause u= ndefined > + * behaviour. > + * > + * @param t > + * The opdl_ring to add the stage to. > + * @param deps > + * An array of pointers to other stages that this stage depends on. Th= e other > + * stages must be part of the same opdl_ring! Note that input is an im= plied > + * dependency. This can be NULL if num_deps is 0. > + * @param num_deps > + * The size of the deps array. > + * @param threadsafe > + * Whether to support multiple threads processing this stage or not. > + * Enabling this may have a negative impact on performance if only one= thread > + * will be processing this stage. > + * @param is_input > + * Indication to nitialise the stage with all slots available or none > + * > + * @return > + * A pointer to the new stage, or NULL on error. > + */ > +struct opdl_stage * > +opdl_stage_add(struct opdl_ring *t, bool threadsafe, bool is_input); > + > +/** > + * Returns the input stage of a opdl_ring to be used by other API functi= ons. > + * > + * @param t > + * The opdl_ring. > + * > + * @return > + * A pointer to the input stage. > + */ > +struct opdl_stage * > +opdl_ring_get_input_stage(const struct opdl_ring *t); > + > +/** > + * Sets the dependencies for a stage (clears all the previous deps!). Ch= anging > + * dependencies while there are entries in the opdl_ring being processed= will > + * cause undefined behaviour. > + * > + * @param s > + * The stage to set the dependencies for. > + * @param deps > + * An array of pointers to other stages that this stage will depends o= n. The > + * other stages must be part of the same opdl_ring! > + * @param num_deps > + * The size of the deps array. This must be > 0. > + * > + * @return > + * 0 on success, a negative value on error. > + */ > +int > +opdl_stage_set_deps(struct opdl_stage *s, struct opdl_stage *deps[], > + uint32_t num_deps); > + > +/** > + * Returns the opdl_ring that a stage belongs to. > + * > + * @param s > + * The stage > + * > + * @return > + * A pointer to the opdl_ring that the stage belongs to. > + */ > +struct opdl_ring * > +opdl_stage_get_opdl_ring(const struct opdl_stage *s); > + > +/** > + * Inputs a new batch of entries into the opdl_ring. This function is on= ly > + * threadsafe (with the same opdl_ring parameter) if the threadsafe para= meter of > + * opdl_ring_create() was true. For performance reasons, this function d= oes not > + * check input parameters. > + * > + * @param t > + * The opdl_ring to input entries in to. > + * @param entries > + * An array of entries that will be copied in to the opdl_ring. > + * @param num_entries > + * The size of the entries array. > + * @param block > + * If this is true, the function blocks until enough slots are availab= le to > + * input all the requested entries. If false, then the function inputs= as > + * many entries as currently possible. > + * > + * @return > + * The number of entries successfully input. > + */ > +uint32_t > +opdl_ring_input(struct opdl_ring *t, const void *entries, uint32_t num_e= ntries, > + bool block); > + > +/** > + * Inputs a new batch of entries into a opdl stage. This function is only > + * threadsafe (with the same opdl parameter) if the threadsafe parameter= of > + * opdl_create() was true. For performance reasons, this function does n= ot > + * check input parameters. > + * > + * @param t > + * The opdl ring to input entries in to. > + * @param s > + * The stage to copy entries to. > + * @param entries > + * An array of entries that will be copied in to the opdl ring. > + * @param num_entries > + * The size of the entries array. > + * @param block > + * If this is true, the function blocks until enough slots are availab= le to > + * input all the requested entries. If false, then the function inputs= as > + * many entries as currently possible. > + * > + * @return > + * The number of entries successfully input. > + */ > +uint32_t > +opdl_ring_copy_from_burst(struct opdl_ring *t, struct opdl_stage *s, > + const void *entries, uint32_t num_entries, bool block); > + > +/** > + * Copy a batch of entries from the opdl ring. This function is only > + * threadsafe (with the same opdl parameter) if the threadsafe parameter= of > + * opdl_create() was true. For performance reasons, this function does n= ot > + * check input parameters. > + * > + * @param t > + * The opdl ring to copy entries from. > + * @param s > + * The stage to copy entries from. > + * @param entries > + * An array of entries that will be copied from the opdl ring. > + * @param num_entries > + * The size of the entries array. > + * @param block > + * If this is true, the function blocks until enough slots are availab= le to > + * input all the requested entries. If false, then the function inputs= as > + * many entries as currently possible. > + * > + * @return > + * The number of entries successfully input. > + */ > +uint32_t > +opdl_ring_copy_to_burst(struct opdl_ring *t, struct opdl_stage *s, > + void *entries, uint32_t num_entries, bool block); > + > +/** > + * Before processing a batch of entries, a stage must first claim them t= o get > + * access. This function is threadsafe using same opdl_stage parameter if > + * the stage was created with threadsafe set to true, otherwise it is on= ly > + * threadsafe with a different opdl_stage per thread. For performance > + * reasons, this function does not check input parameters. > + * > + * @param s > + * The opdl_ring stage to read entries in. > + * @param entries > + * An array of pointers to entries that will be filled in by this func= tion. > + * @param num_entries > + * The number of entries to attempt to claim for processing (and the s= ize of > + * the entries array). > + * @param seq > + * If not NULL, this is set to the value of the internal stage sequenc= e number > + * associated with the first entry returned. > + * @param block > + * If this is true, the function blocks until num_entries slots are av= ailable > + * to process. If false, then the function claims as many entries as > + * currently possible. > + * > + * @param atomic > + * if this is true, the function will return event according to event = flow id > + * @return > + * The number of pointers to entries filled in to the entries array. > + */ > +uint32_t > +opdl_stage_claim(struct opdl_stage *s, void *entries, > + uint32_t num_entries, uint32_t *seq, bool block, bool atomic); > + > +uint32_t > +opdl_stage_deps_add(struct opdl_ring *t, struct opdl_stage *s, > + uint32_t nb_instance, uint32_t instance_id, > + struct opdl_stage *deps[], uint32_t num_deps); > + > +/** > + * A function to check how many entries are ready to be claimed. > + * > + * @param entries > + * An array of pointers to entries. > + * @param num_entries > + * Number of entries in an array. > + * @param arg > + * An opaque pointer to data passed to the claim function. > + * @param block > + * When set to true, the function should wait until num_entries are re= ady to > + * be processed. Otherwise it should return immediately. > + * > + * @return > + * Number of entries ready to be claimed. > + */ > +typedef uint32_t (opdl_ring_check_entries_t)(void *entries[], > + uint32_t num_entries, void *arg, bool block); > + > +/** > + * Before processing a batch of entries, a stage must first claim them t= o get > + * access. Each entry is checked by the passed check() function and depe= nding > + * on block value, it waits until num_entries are ready or returns immed= iately. > + * This function is only threadsafe with a different opdl_stage per thre= ad. > + * > + * @param s > + * The opdl_ring stage to read entries in. > + * @param entries > + * An array of pointers to entries that will be filled in by this func= tion. > + * @param num_entries > + * The number of entries to attempt to claim for processing (and the s= ize of > + * the entries array). > + * @param seq > + * If not NULL, this is set to the value of the internal stage sequenc= e number > + * associated with the first entry returned. > + * @param block > + * If this is true, the function blocks until num_entries ready slots = are > + * available to process. If false, then the function claims as many re= ady > + * entries as currently possible. > + * @param check > + * Pointer to a function called to check entries. > + * @param arg > + * Opaque data passed to check() function. > + * > + * @return > + * The number of pointers to ready entries filled in to the entries ar= ray. > + */ > +uint32_t > +opdl_stage_claim_check(struct opdl_stage *s, void **entries, > + uint32_t num_entries, uint32_t *seq, bool block, > + opdl_ring_check_entries_t *check, void *arg); > + > +/** > + * Before processing a batch of entries, a stage must first claim them t= o get > + * access. This function is threadsafe using same opdl_stage parameter if > + * the stage was created with threadsafe set to true, otherwise it is on= ly > + * threadsafe with a different opdl_stage per thread. > + * > + * The difference between this function and opdl_stage_claim() is that t= his > + * function copies the entries from the opdl_ring. Note that any changes= made to > + * the copied entries will not be reflected back in to the entries in the > + * opdl_ring, so this function probably only makes sense if the entries = are > + * pointers to other data. For performance reasons, this function does n= ot check > + * input parameters. > + * > + * @param s > + * The opdl_ring stage to read entries in. > + * @param entries > + * An array of entries that will be filled in by this function. > + * @param num_entries > + * The number of entries to attempt to claim for processing (and the s= ize of > + * the entries array). > + * @param seq > + * If not NULL, this is set to the value of the internal stage sequenc= e number > + * associated with the first entry returned. > + * @param block > + * If this is true, the function blocks until num_entries slots are av= ailable > + * to process. If false, then the function claims as many entries as > + * currently possible. > + * > + * @return > + * The number of entries copied in to the entries array. > + */ > +uint32_t > +opdl_stage_claim_copy(struct opdl_stage *s, void *entries, > + uint32_t num_entries, uint32_t *seq, bool block); > + > +/** > + * This function must be called when a stage has finished its processing= of > + * entries, to make them available to any dependent stages. All entries = that are > + * claimed by the calling thread in the stage will be disclaimed. It is = possible > + * to claim multiple batches before disclaiming. For performance reasons= , this > + * function does not check input parameters. > + * > + * @param s > + * The opdl_ring stage in which to disclaim all claimed entries. > + * > + * @param block > + * Entries are always made available to a stage in the same order that= they > + * were input in the stage. If a stage is multithread safe, this may m= ean that > + * full disclaiming of a batch of entries can not be considered comple= te until > + * all earlier threads in the stage have disclaimed. If this parameter= is true > + * then the function blocks until all entries are fully disclaimed, ot= herwise > + * it disclaims as many as currently possible, with non fully disclaim= ed > + * batches stored until the next call to a claim or disclaim function = for this > + * stage on this thread. > + * > + * If a thread is not going to process any more entries in this stage,= it > + * *must* first call this function with this parameter set to true to = ensure > + * it does not block the entire opdl_ring. > + * > + * In a single threaded stage, this parameter has no effect. > + */ > +int > +opdl_stage_disclaim(struct opdl_stage *s, uint32_t num_entries, > + bool block); > + > +/** > + * This function can be called when a stage has finished its processing = of > + * entries, to make them available to any dependent stages. The differen= ce > + * between this function and opdl_stage_disclaim() is that here only a > + * portion of entries are disclaimed, not all of them. For performance r= easons, > + * this function does not check input parameters. > + * > + * @param s > + * The opdl_ring stage in which to disclaim entries. > + * > + * @param num_entries > + * The number of entries to disclaim. > + * > + * @param block > + * Entries are always made available to a stage in the same order that= they > + * were input in the stage. If a stage is multithread safe, this may m= ean that > + * full disclaiming of a batch of entries can not be considered comple= te until > + * all earlier threads in the stage have disclaimed. If this parameter= is true > + * then the function blocks until the specified number of entries has = been > + * disclaimed (or there are no more entries to disclaim). Otherwise it > + * disclaims as many claims as currently possible and an attempt to di= sclaim > + * them is made the next time a claim or disclaim function for this st= age on > + * this thread is called. > + * > + * In a single threaded stage, this parameter has no effect. > + */ > +void > +opdl_stage_disclaim_n(struct opdl_stage *s, uint32_t num_entries, > + bool block); > + > +/** > + * Check how many entries can be input. > + * > + * @param t > + * The opdl_ring instance to check. > + * > + * @return > + * The number of new entries currently allowed to be input. > + */ > +uint32_t > +opdl_ring_available(struct opdl_ring *t); > + > +/** > + * Check how many entries can be processed in a stage. > + * > + * @param s > + * The stage to check. > + * > + * @return > + * The number of entries currently available to be processed in this s= tage. > + */ > +uint32_t > +opdl_stage_available(struct opdl_stage *s); > + > +/** > + * Check how many entries are available to be processed. > + * > + * NOTE : DOES NOT CHANGE ANY STATE WITHIN THE STAGE > + * > + * @param s > + * The stage to check. > + * > + * @param num_entries > + * The number of entries to check for availability. > + * > + * @return > + * The number of entries currently available to be processed in this s= tage. > + */ > +uint32_t > +opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries= ); > + > +/** > + * Create empty stage instance and return the pointer. > + * > + * @param t > + * The pointer of opdl_ring. > + * > + * @param threadsafe > + * enable multiple thread or not. > + * @return > + * The pointer of one empty stage instance. > + */ > +struct opdl_stage * > +opdl_stage_create(struct opdl_ring *t, bool threadsafe); > + > +/** > + * Prints information on opdl_ring instance and all its stages > + * > + * @param t > + * The stage to print info on. > + * @param f > + * Where to print the info. > + */ > +void > +opdl_ring_dump(const struct opdl_ring *t, FILE *f); > + > +/** > + * Blocks until all entries in a opdl_ring have been processed by all st= ages. > + * > + * @param t > + * The opdl_ring instance to flush. > + */ > +void > +opdl_ring_flush(struct opdl_ring *t); > + > +/** > + * Deallocates all resources used by a opdl_ring instance > + * > + * @param t > + * The opdl_ring instance to free. > + */ > +void > +opdl_ring_free(struct opdl_ring *t); > + > +/** > + * Search for a opdl_ring by its name > + * > + * @param name > + * The name of the opdl_ring. > + * @return > + * The pointer to the opdl_ring matching the name, or NULL if not foun= d. > + * > + */ > +struct opdl_ring * > +opdl_ring_lookup(const char *name); > + > +/** > + * Set a opdl_stage to threadsafe variable. > + * > + * @param s > + * The opdl_stage. > + * @param threadsafe > + * Threadsafe value. > + */ > +void > +opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe); > + > + > +/** > + * Compare the event descriptor with original version in the ring. > + * if key field event descriptor is changed by application, then > + * update the slot in the ring otherwise do nothing with it. > + * the key field is flow_id, prioirty, mbuf, impl_opaque > + * > + * @param s > + * The opdl_stage. > + * @param ev > + * pointer of the event descriptor. > + * @param index > + * index of the event descriptor. > + * @param atomic > + * queue type associate with the stage. > + * @return > + * if the evevnt key field is changed compare with previous record. > + */ > + > +bool > +opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *e= v, > + uint32_t index, bool atomic); > + > +#ifdef __cplusplus > +} > +#endif > + > +#endif /* _OPDL_H_ */ > diff --git a/drivers/event/opdl/rte_pmd_evdev_opdl_version.map b/drivers/= event/opdl/rte_pmd_evdev_opdl_version.map > new file mode 100644 > index 0000000..58b9427 > --- /dev/null > +++ b/drivers/event/opdl/rte_pmd_evdev_opdl_version.map > @@ -0,0 +1,3 @@ > +DPDK_18.02 { > + local: *; > +}; > diff --git a/mk/rte.app.mk b/mk/rte.app.mk > index 6a6a745..a55a21d 100644 > --- a/mk/rte.app.mk > +++ b/mk/rte.app.mk > @@ -200,6 +200,7 @@ _LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_OCTEONTX_SSOVF) +=3D = -lrte_pmd_octeontx_ssovf > _LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_DPAA2_EVENTDEV) +=3D -lrte_pmd_dpaa2_event > _LDLIBS-$(CONFIG_RTE_LIBRTE_OCTEONTX_MEMPOOL) +=3D -lrte_mempool_octeontx > _LDLIBS-$(CONFIG_RTE_LIBRTE_OCTEONTX_PMD) +=3D -lrte_pmd_octeontx > +_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) +=3D -lrte_pmd_opdl_event > endif # CONFIG_RTE_LIBRTE_EVENTDEV > = > ifeq ($(CONFIG_RTE_LIBRTE_DPAA2_PMD),y) > diff --git a/mk/toolchain/gcc/rte.toolchain-compat.mk b/mk/toolchain/gcc/= rte.toolchain-compat.mk > index 01ac7e2..87edba9 100644 > --- a/mk/toolchain/gcc/rte.toolchain-compat.mk > +++ b/mk/toolchain/gcc/rte.toolchain-compat.mk > @@ -95,4 +95,10 @@ else > ifeq ($(shell test $(GCC_VERSION) -lt 47 && echo 1), 1) > CONFIG_RTE_LIBRTE_THUNDERX_NICVF_PMD=3Dd > endif > + > + # Disable OPLD PMD for gcc < 4.7 > + ifeq ($(shell test $(GCC_VERSION) -lt 47 && echo 1), 1) > + CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV=3Dd > + endif > + > endif > diff --git a/mk/toolchain/icc/rte.toolchain-compat.mk b/mk/toolchain/icc/= rte.toolchain-compat.mk > index 3c25d18..238ef3b 100644 > --- a/mk/toolchain/icc/rte.toolchain-compat.mk > +++ b/mk/toolchain/icc/rte.toolchain-compat.mk > @@ -77,4 +77,10 @@ else > ifeq ($(shell test $(ICC_MAJOR_VERSION) -le 16 && echo 1), 1) > CONFIG_RTE_LIBRTE_THUNDERX_NICVF_PMD=3Dd > endif > + > + # Disable event/opdl PMD for icc <=3D 16.0 > + ifeq ($(shell test $(ICC_MAJOR_VERSION) -le 16 && echo 1), 1) > + CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV=3Dd > + endif > + > endif > -- = > 2.7.5 > = > -------------------------------------------------------------- > Intel Research and Development Ireland Limited > Registered in Ireland > Registered Office: Collinstown Industrial Park, Leixlip, County Kildare > Registered Number: 308263 > = > = > This e-mail and any attachments may contain confidential material for the= sole > use of the intended recipient(s). Any review or distribution by others is > strictly prohibited. If you are not the intended recipient, please contac= t the > sender and delete all copies. -------------------------------------------------------------- Intel Research and Development Ireland Limited Registered in Ireland Registered Office: Collinstown Industrial Park, Leixlip, County Kildare Registered Number: 308263 This e-mail and any attachments may contain confidential material for the s= ole use of the intended recipient(s). Any review or distribution by others is strictly prohibited. If you are not the intended recipient, please contact = the sender and delete all copies.