From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga11.intel.com (mga11.intel.com [192.55.52.93]) by dpdk.org (Postfix) with ESMTP id 76F6BFA41 for ; Tue, 7 Feb 2017 15:14:40 +0100 (CET) Received: from orsmga005.jf.intel.com ([10.7.209.41]) by fmsmga102.fm.intel.com with ESMTP; 07 Feb 2017 06:14:39 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.33,346,1477983600"; d="scan'208";a="61814258" Received: from sivswdev01.ir.intel.com (HELO localhost.localdomain) ([10.237.217.45]) by orsmga005.jf.intel.com with ESMTP; 07 Feb 2017 06:14:37 -0800 From: Bruce Richardson To: olivier.matz@6wind.com Cc: thomas.monjalon@6wind.com, keith.wiles@intel.com, konstantin.ananyev@intel.com, stephen@networkplumber.org, dev@dpdk.org, Bruce Richardson Date: Tue, 7 Feb 2017 14:12:57 +0000 Message-Id: <1486476777-24768-20-git-send-email-bruce.richardson@intel.com> X-Mailer: git-send-email 1.7.0.7 In-Reply-To: <20170125121456.GA24344@bricha3-MOBL3.ger.corp.intel.com> References: <20170125121456.GA24344@bricha3-MOBL3.ger.corp.intel.com> Subject: [dpdk-dev] [PATCH RFCv3 19/19] ring: add event ring implementation X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 07 Feb 2017 14:14:41 -0000 DEMO ONLY: add an event ring implementation to demonstrate how rings for new types can be efficiently added by reusing existing rte ring functions. Signed-off-by: Bruce Richardson --- app/test/Makefile | 1 + app/test/test_event_ring.c | 85 +++++++ lib/librte_ring/Makefile | 2 + lib/librte_ring/rte_event_ring.c | 220 +++++++++++++++++ lib/librte_ring/rte_event_ring.h | 507 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 815 insertions(+) create mode 100644 app/test/test_event_ring.c create mode 100644 lib/librte_ring/rte_event_ring.c create mode 100644 lib/librte_ring/rte_event_ring.h diff --git a/app/test/Makefile b/app/test/Makefile index 9de301f..c0ab2ba 100644 --- a/app/test/Makefile +++ b/app/test/Makefile @@ -96,6 +96,7 @@ SRCS-y += test_memory.c SRCS-y += test_memzone.c SRCS-y += test_ring.c +SRCS-y += test_event_ring.c SRCS-y += test_ring_perf.c SRCS-y += test_pmd_perf.c diff --git a/app/test/test_event_ring.c b/app/test/test_event_ring.c new file mode 100644 index 0000000..5ac4bac --- /dev/null +++ b/app/test/test_event_ring.c @@ -0,0 +1,85 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2017 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include "test.h" + +#define BURST_SIZE 24 /* not a power of two so we can test wrap around better */ +#define RING_SIZE 128 +#define ENQ_ITERATIONS 48 + +#define ERR_OUT() do { \ + printf("Error %s:%d\n", __FILE__, __LINE__); \ + return -1; \ +} while (0) + +static int +test_event_ring(void) +{ + struct rte_event in_events[BURST_SIZE]; + struct rte_event out_events[BURST_SIZE]; + unsigned int i; + + struct rte_event_ring *ering = rte_event_ring_create("TEST_RING", + RING_SIZE, rte_socket_id(), + RING_F_SP_ENQ|RING_F_SC_DEQ); + if (ering == NULL) + ERR_OUT(); + + for (i = 0; i < BURST_SIZE; i++) + in_events[i].event_metadata = rte_rand(); + + for (i = 0; i < ENQ_ITERATIONS; i++) + if (rte_event_ring_enqueue_bulk(ering, in_events, + RTE_DIM(in_events), NULL) != 0) { + unsigned j; + + if (rte_event_ring_dequeue_burst(ering, out_events, + RTE_DIM(out_events), NULL) + != RTE_DIM(out_events)) + ERR_OUT(); + for (j = 0; j < RTE_DIM(out_events); j++) + if (out_events[j].event_metadata != + in_events[j].event_metadata) + ERR_OUT(); + /* retry, now that we've made space */ + if (rte_event_ring_enqueue_bulk(ering, in_events, + RTE_DIM(in_events), NULL) != 0) + ERR_OUT(); + } + + return 0; +} + +REGISTER_TEST_COMMAND(event_ring_autotest, test_event_ring); diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index 4b1112e..309960f 100644 --- a/lib/librte_ring/Makefile +++ b/lib/librte_ring/Makefile @@ -42,9 +42,11 @@ LIBABIVER := 1 # all source are stored in SRCS-y SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c +SRCS-$(CONFIG_RTE_LIBRTE_RING) += rte_event_ring.c # install includes SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include += rte_event_ring.h DEPDIRS-$(CONFIG_RTE_LIBRTE_RING) += lib/librte_eal diff --git a/lib/librte_ring/rte_event_ring.c b/lib/librte_ring/rte_event_ring.c new file mode 100644 index 0000000..f6875b0 --- /dev/null +++ b/lib/librte_ring/rte_event_ring.c @@ -0,0 +1,220 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2017 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include + +#include +#include +#include +#include +#include +#include + +#include "rte_event_ring.h" + +#define RTE_TAILQ_EVENT_RING_NAME "RTE_EVENT_RING" + +TAILQ_HEAD(rte_event_ring_list, rte_tailq_entry); + +static struct rte_tailq_elem rte_event_ring_tailq = { + .name = RTE_TAILQ_EVENT_RING_NAME, +}; +EAL_REGISTER_TAILQ(rte_event_ring_tailq) + +int +rte_event_ring_init(struct rte_event_ring *r, const char *name, + unsigned int count, unsigned int flags) +{ + return rte_ring_init(&r->hdr, name, count, flags); +} + + +struct rte_event_ring * +rte_event_ring_create(const char *name, unsigned count, int socket_id, + unsigned int flags) +{ + char mz_name[RTE_MEMZONE_NAMESIZE]; + struct rte_event_ring *r; + struct rte_tailq_entry *te; + const struct rte_memzone *mz; + ssize_t ring_size; + int mz_flags = 0; + struct rte_event_ring_list *ring_list = NULL; + int ret; + + ring_list = RTE_TAILQ_CAST(rte_event_ring_tailq.head, + rte_event_ring_list); + + ring_size = rte_ring_get_memsize(count, sizeof(struct rte_event)); + if (ring_size < 0) { + rte_errno = ring_size; + return NULL; + } + + ret = snprintf(mz_name, sizeof(mz_name), "%s%s", + RTE_RING_MZ_PREFIX, name); + if (ret < 0 || ret >= (int)sizeof(mz_name)) { + rte_errno = ENAMETOOLONG; + return NULL; + } + + te = rte_zmalloc("RING_TAILQ_ENTRY", sizeof(*te), 0); + if (te == NULL) { + RTE_LOG(ERR, RING, "Cannot reserve memory for tailq\n"); + rte_errno = ENOMEM; + return NULL; + } + + rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK); + + /* reserve a memory zone for this ring. If we can't get rte_config or + * we are secondary process, the memzone_reserve function will set + * rte_errno for us appropriately - hence no check in this this function */ + mz = rte_memzone_reserve(mz_name, ring_size, socket_id, mz_flags); + if (mz != NULL) { + r = mz->addr; + /* no need to check return value here, we already checked the + * arguments above */ + rte_event_ring_init(r, name, count, flags); + + te->data = (void *) r; + r->hdr.memzone = mz; + + TAILQ_INSERT_TAIL(ring_list, te, next); + } else { + r = NULL; + RTE_LOG(ERR, RING, "Cannot reserve memory\n"); + rte_free(te); + } + rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK); + + return r; +} + +void +rte_event_ring_free(struct rte_event_ring *r) +{ + struct rte_event_ring_list *ring_list = NULL; + struct rte_tailq_entry *te; + + if (r == NULL) + return; + + /* + * Ring was not created with rte_ring_create, + * therefore, there is no memzone to free. + */ + if (r->hdr.memzone == NULL) { + RTE_LOG(ERR, RING, + "Cannot free ring (not created with rte_ring_create()"); + return; + } + + if (rte_memzone_free(r->hdr.memzone) != 0) { + RTE_LOG(ERR, RING, "Cannot free memory\n"); + return; + } + + ring_list = RTE_TAILQ_CAST(rte_event_ring_tailq.head, + rte_event_ring_list); + rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK); + + /* find out tailq entry */ + TAILQ_FOREACH(te, ring_list, next) { + if (te->data == (void *) r) + break; + } + + if (te == NULL) { + rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK); + return; + } + + TAILQ_REMOVE(ring_list, te, next); + + rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK); + + rte_free(te); +} + +void +rte_event_ring_dump(FILE *f, const struct rte_event_ring *r) +{ + return rte_ring_dump(f, &r->hdr); +} + +void +rte_event_ring_list_dump(FILE *f) +{ + const struct rte_tailq_entry *te; + struct rte_event_ring_list *ring_list; + + ring_list = RTE_TAILQ_CAST(rte_event_ring_tailq.head, + rte_event_ring_list); + + rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK); + + TAILQ_FOREACH(te, ring_list, next) { + rte_event_ring_dump(f, te->data); + } + + rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK); +} + +struct rte_event_ring * +rte_event_ring_lookup(const char *name) +{ + struct rte_tailq_entry *te; + struct rte_event_ring *r = NULL; + struct rte_event_ring_list *ring_list; + + ring_list = RTE_TAILQ_CAST(rte_event_ring_tailq.head, + rte_event_ring_list); + + rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK); + + TAILQ_FOREACH(te, ring_list, next) { + r = te->data; + if (strncmp(name, r->hdr.name, RTE_RING_NAMESIZE) == 0) + break; + } + + rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK); + + if (te == NULL) { + rte_errno = ENOENT; + return NULL; + } + + return r; +} diff --git a/lib/librte_ring/rte_event_ring.h b/lib/librte_ring/rte_event_ring.h new file mode 100644 index 0000000..f34b321 --- /dev/null +++ b/lib/librte_ring/rte_event_ring.h @@ -0,0 +1,507 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2017 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _RTE_EVENT_RING_H_ +#define _RTE_EVENT_RING_H_ + +/** + * @file + * RTE Event Ring + * + * The Ring Manager is a fixed-size event queue, implemented as a table of + * events. Head and tail pointers are modified atomically, allowing + * concurrent access to it. It has the following features: + * + * - FIFO (First In First Out) + * - Maximum size is fixed; the pointers are stored in a table. + * - Lockless implementation. + * - Multi- or single-consumer dequeue. + * - Multi- or single-producer enqueue. + * - Bulk dequeue. + * - Bulk enqueue. + * + * Note: the ring implementation is not preemptable. A lcore must not + * be interrupted by another task that uses the same ring. + * + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include "rte_ring.h" + +struct rte_event { + uint64_t event_metadata; + struct rte_mbuf *mbuf; +}; + +struct rte_event_ring { + struct rte_ring hdr; +}; + +/** + * Initialize a ring structure. + * + * Initialize a ring structure in memory pointed by "r". The size of the + * memory area must be large enough to store the ring structure and the + * object table. It is advised to use rte_event_ring_get_memsize() to get the + * appropriate size. + * + * The ring size is set to *count*, which must be a power of two. Water + * marking is disabled by default. The real usable ring size is + * *count-1* instead of *count* to differentiate a free ring from an + * empty ring. + * + * The ring is not added in RTE_TAILQ_RING global list. Indeed, the + * memory given by the caller may not be shareable among dpdk + * processes. + * + * @param r + * The pointer to the ring structure followed by the objects table. + * @param name + * The name of the ring. + * @param count + * The number of elements in the ring (must be a power of 2). + * @param flags + * An OR of the following: + * - RING_F_SP_ENQ: If this flag is set, the default behavior when + * using ``rte_event_ring_enqueue()`` or ``rte_event_ring_enqueue_bulk()`` + * is "single-producer". Otherwise, it is "multi-producers". + * - RING_F_SC_DEQ: If this flag is set, the default behavior when + * using ``rte_event_ring_dequeue()`` or ``rte_event_ring_dequeue_bulk()`` + * is "single-consumer". Otherwise, it is "multi-consumers". + * @return + * 0 on success, or a negative value on error. + */ +int rte_event_ring_init(struct rte_event_ring *r, const char *name, + unsigned int count, unsigned int flags); + +/** + * Create a new ring named *name* in memory. + * + * This function uses ``memzone_reserve()`` to allocate memory. Then it + * calls rte_event_ring_init() to initialize an empty ring. + * + * The new ring size is set to *count*, which must be a power of + * two. Water marking is disabled by default. The real usable ring size + * is *count-1* instead of *count* to differentiate a free ring from an + * empty ring. + * + * The ring is added in RTE_TAILQ_RING list. + * + * @param name + * The name of the ring. + * @param count + * The size of the ring (must be a power of 2). + * @param socket_id + * The *socket_id* argument is the socket identifier in case of + * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA + * constraint for the reserved zone. + * @param flags + * An OR of the following: + * - RING_F_SP_ENQ: If this flag is set, the default behavior when + * using ``rte_event_ring_enqueue()`` or ``rte_event_ring_enqueue_bulk()`` + * is "single-producer". Otherwise, it is "multi-producers". + * - RING_F_SC_DEQ: If this flag is set, the default behavior when + * using ``rte_event_ring_dequeue()`` or ``rte_event_ring_dequeue_bulk()`` + * is "single-consumer". Otherwise, it is "multi-consumers". + * @return + * On success, the pointer to the new allocated ring. NULL on error with + * rte_errno set appropriately. Possible errno values include: + * - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure + * - E_RTE_SECONDARY - function was called from a secondary process instance + * - EINVAL - count provided is not a power of 2 + * - ENOSPC - the maximum number of memzones has already been allocated + * - EEXIST - a memzone with the same name already exists + * - ENOMEM - no appropriate memory area found in which to create memzone + */ +struct rte_event_ring *rte_event_ring_create(const char *name, unsigned count, + int socket_id, unsigned flags); +/** + * De-allocate all memory used by the ring. + * + * @param r + * Ring to free + */ +void rte_event_ring_free(struct rte_event_ring *r); + +/** + * Dump the status of the ring to a file. + * + * @param f + * A pointer to a file for output + * @param r + * A pointer to the ring structure. + */ +void rte_event_ring_dump(FILE *f, const struct rte_event_ring *r); + +/** + * Test if a ring is full. + * + * @param r + * A pointer to the ring structure. + * @return + * - 1: The ring is full. + * - 0: The ring is not full. + */ +static inline int +rte_event_ring_full(const struct rte_event_ring *r) +{ + return rte_ring_full(&r->hdr); +} + +/** + * Test if a ring is empty. + * + * @param r + * A pointer to the ring structure. + * @return + * - 1: The ring is empty. + * - 0: The ring is not empty. + */ +static inline int +rte_event_ring_empty(const struct rte_event_ring *r) +{ + return rte_ring_empty(&r->hdr); +} + +/** + * Return the number of entries in a ring. + * + * @param r + * A pointer to the ring structure. + * @return + * The number of entries in the ring. + */ +static inline unsigned +rte_event_ring_count(const struct rte_event_ring *r) +{ + return rte_ring_count(&r->hdr); +} + +/** + * Return the number of free entries in a ring. + * + * @param r + * A pointer to the ring structure. + * @return + * The number of free entries in the ring. + */ +static inline unsigned +rte_event_ring_free_count(const struct rte_event_ring *r) +{ + return rte_ring_free_count(&r->hdr); +} + +/** + * Dump the status of all rings on the console + * + * @param f + * A pointer to a file for output + */ +void rte_event_ring_list_dump(FILE *f); + +/** + * Search a ring from its name + * + * @param name + * The name of the ring. + * @return + * The pointer to the ring matching the name, or NULL if not found, + * with rte_errno set appropriately. Possible rte_errno values include: + * - ENOENT - required entry not available to return. + */ +struct rte_event_ring *rte_event_ring_lookup(const char *name); + + +static inline __attribute__((always_inline)) unsigned int +__rte_event_ring_do_enqueue(struct rte_event_ring *r, + const struct rte_event *obj_table, unsigned int n, + enum rte_ring_queue_behavior behavior, int is_sp, + unsigned int *free_space) +{ + uint32_t prod_head, prod_next; + uint32_t free_entries; + + n = __rte_ring_move_prod_head(&r->hdr, is_sp, n, behavior, + &prod_head, &prod_next, &free_entries); + if (n == 0) + goto end; + + ENQUEUE_PTRS(&r->hdr, obj_table, n, prod_head, struct rte_event); + rte_smp_wmb(); + + update_tail(&r->hdr.prod, prod_head, prod_next); +end: + if (free_space != NULL) + *free_space = free_entries - n; + return n; +} + +static inline __attribute__((always_inline)) unsigned int +__rte_event_ring_do_dequeue(struct rte_event_ring *r, + struct rte_event *obj_table, unsigned int n, + enum rte_ring_queue_behavior behavior, + int is_sc, unsigned int *available) +{ + uint32_t cons_head, cons_next; + uint32_t entries; + + n = __rte_ring_move_cons_head(&r->hdr, is_sc, n, behavior, + &cons_head, &cons_next, &entries); + if (n == 0) + goto end; + + DEQUEUE_PTRS(&r->hdr, obj_table, n, cons_head, struct rte_event); + rte_smp_rmb(); + + update_tail(&r->hdr.cons, cons_head, cons_next); + +end: + if (available != NULL) + *available = entries - n; + return n; +} + +/** + * Enqueue several objects on the ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * Returns the amount of free space in the ring after the enqueue call + * @return + * - n: Actual number of objects enqueued. + */ +static inline unsigned __attribute__((always_inline)) +rte_event_ring_mp_enqueue_burst(struct rte_event_ring *r, + const struct rte_event * const obj_table, unsigned int n, + unsigned int *free_space) +{ + return __rte_event_ring_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space); +} + +/** + * Enqueue several objects on a ring (NOT multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * Returns the amount of free space in the ring after the enqueue call + * @return + * - n: Actual number of objects enqueued. + */ +static inline unsigned __attribute__((always_inline)) +rte_event_ring_sp_enqueue_burst(struct rte_event_ring *r, + const struct rte_event * const obj_table, unsigned int n, + unsigned int *free_space) +{ + return __rte_event_ring_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space); +} + +/** + * Enqueue several objects on a ring. + * + * This function calls the multi-producer or the single-producer + * version depending on the default behavior that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @return + * - n: Actual number of objects enqueued. + */ +static inline unsigned __attribute__((always_inline)) +rte_event_ring_enqueue_burst(struct rte_event_ring *r, + const struct rte_event * const obj_table, unsigned int n, + unsigned int *free_space) +{ + return __rte_event_ring_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, r->hdr.prod.sp_enqueue, + free_space); +} + +/** + * Enqueue all objects on a ring, or enqueue none. + * + * This function calls the multi-producer or the single-producer + * version depending on the default behavior that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @return + * - n: All objects enqueued. + * - 0: No objects enqueued + */ +static inline unsigned __attribute__((always_inline)) +rte_event_ring_enqueue_bulk(struct rte_event_ring *r, + const struct rte_event * const obj_table, unsigned int n, + unsigned int *free_space) +{ + return __rte_event_ring_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, r->hdr.prod.sp_enqueue, + free_space); +} + +/** + * Dequeue several objects from a ring (multi-consumers safe). When the request + * objects are more than the available objects, only dequeue the actual number + * of objects + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +static inline unsigned __attribute__((always_inline)) +rte_event_ring_mc_dequeue_burst(struct rte_event_ring *r, + struct rte_event * const obj_table, unsigned int n, + unsigned int *available) +{ + return __rte_event_ring_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, __IS_MC, available); +} + +/** + * Dequeue several objects from a ring (NOT multi-consumers safe).When the + * request objects are more than the available objects, only dequeue the + * actual number of objects + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +static inline unsigned __attribute__((always_inline)) +rte_event_ring_sc_dequeue_burst(struct rte_event_ring *r, + struct rte_event * const obj_table, unsigned int n, + unsigned int *available) +{ + return __rte_event_ring_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, __IS_SC, available); +} + +/** + * Dequeue multiple objects from a ring up to a maximum number. + * + * This function calls the multi-consumers or the single-consumer + * version, depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @return + * - Number of objects dequeued + */ +static inline unsigned __attribute__((always_inline)) +rte_event_ring_dequeue_burst(struct rte_event_ring *r, + struct rte_event * const obj_table, unsigned int n, + unsigned int *available) +{ + return __rte_event_ring_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, r->hdr.cons.sc_dequeue, + available); +} + +/** + * Dequeue all requested objects from a ring, or dequeue none . + * + * This function calls the multi-consumers or the single-consumer + * version, depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @return + * - n: all objects dequeued + * - 0: nothing dequeued + */ +static inline unsigned __attribute__((always_inline)) +rte_event_ring_dequeue_bulk(struct rte_event_ring *r, + struct rte_event * const obj_table, unsigned int n, + unsigned int *available) +{ + return __rte_event_ring_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, r->hdr.cons.sc_dequeue, + available); +} + + + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_EVENT_RING_H_ */ -- 2.9.3