From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id C289E43F6C; Thu, 2 May 2024 22:19:41 +0200 (CEST) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 907BE402D4; Thu, 2 May 2024 22:19:39 +0200 (CEST) Received: from foss.arm.com (foss.arm.com [217.140.110.172]) by mails.dpdk.org (Postfix) with ESMTP id 50529402D4 for ; Thu, 2 May 2024 22:19:37 +0200 (CEST) Received: from usa-sjc-imap-foss1.foss.arm.com (unknown [10.121.207.14]) by usa-sjc-mx-foss1.foss.arm.com (Postfix) with ESMTP id 48250339; Thu, 2 May 2024 13:20:02 -0700 (PDT) Received: from 2u-thunderx2.usa.Arm.com (unknown [10.118.12.78]) by usa-sjc-imap-foss1.foss.arm.com (Postfix) with ESMTPA id 2DB6C3F73F; Thu, 2 May 2024 13:19:36 -0700 (PDT) From: Aditya Ambadipudi To: dev@dpdk.org, jackmin@nvidia.com, stephen@networkplumber.org, matan@nvidia.com, viacheslavo@nvidia.com, roretzla@linux.microsoft.com, konstantin.ananyev@huawei.com, mb@smartsharesystems.com, hofors@lysator.liu.se, probb@iol.unh.edu, alialnu@nvidia.com Cc: wathsala.vithanage@arm.com, dhruv.tripathi@arm.com, honnappa.nagarahalli@arm.com, nd@arm.com, venamb01@arm.com, Aditya Ambadipudi Subject: [PATCH v3 1/2] deque: add multi-thread unsafe double ended queue Date: Thu, 2 May 2024 15:19:23 -0500 Message-Id: <20240502201924.127273-2-aditya.ambadipudi@arm.com> X-Mailer: git-send-email 2.25.1 In-Reply-To: <20240502201924.127273-1-aditya.ambadipudi@arm.com> References: <20240424134233.1336370-2-aditya.ambadipudi@arm.com> <20240502201924.127273-1-aditya.ambadipudi@arm.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org From: Honnappa Nagarahalli Add a multi-thread unsafe double ended queue data structure. This library provides a simple and efficient alternative to multi-thread safe ring when multi-thread safety is not required. Signed-off-by: Aditya Ambadipudi Signed-off-by: Honnappa Nagarahalli --- v3: * Removed stdio.h from two files where it is not needed. v2: * Addressed the spell check warning issue with the word "Deque" * Tried to rename all objects that are named deque to avoid collision with std::deque * Added the deque library to msvc section in meson.build * Renamed api functions to explicitly state if the function inserts at head org tail. .mailmap | 1 + devtools/build-dict.sh | 1 + lib/deque/meson.build | 11 + lib/deque/rte_deque.c | 193 +++++++++++++ lib/deque/rte_deque.h | 533 ++++++++++++++++++++++++++++++++++++ lib/deque/rte_deque_core.h | 81 ++++++ lib/deque/rte_deque_pvt.h | 538 +++++++++++++++++++++++++++++++++++++ lib/deque/rte_deque_zc.h | 430 +++++++++++++++++++++++++++++ lib/deque/version.map | 14 + lib/meson.build | 2 + 10 files changed, 1804 insertions(+) create mode 100644 lib/deque/meson.build create mode 100644 lib/deque/rte_deque.c create mode 100644 lib/deque/rte_deque.h create mode 100644 lib/deque/rte_deque_core.h create mode 100644 lib/deque/rte_deque_pvt.h create mode 100644 lib/deque/rte_deque_zc.h create mode 100644 lib/deque/version.map diff --git a/.mailmap b/.mailmap index 3843868716..8e705ab6ab 100644 --- a/.mailmap +++ b/.mailmap @@ -17,6 +17,7 @@ Adam Bynes Adam Dybkowski Adam Ludkiewicz Adham Masarwah +Aditya Ambadipudi Adrian Moreno Adrian Podlawski Adrien Mazarguil diff --git a/devtools/build-dict.sh b/devtools/build-dict.sh index a8cac49029..595d8f9277 100755 --- a/devtools/build-dict.sh +++ b/devtools/build-dict.sh @@ -17,6 +17,7 @@ sed '/^..->/d' | sed '/^uint->/d' | sed "/^doesn'->/d" | sed '/^wasn->/d' | +sed '/^deque.*->/d' | # print to stdout cat diff --git a/lib/deque/meson.build b/lib/deque/meson.build new file mode 100644 index 0000000000..1ff45fc39f --- /dev/null +++ b/lib/deque/meson.build @@ -0,0 +1,11 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024 Arm Limited + +sources = files('rte_deque.c') +headers = files('rte_deque.h') +# most sub-headers are not for direct inclusion +indirect_headers += files ( + 'rte_deque_core.h', + 'rte_deque_pvt.h', + 'rte_deque_zc.h' +) diff --git a/lib/deque/rte_deque.c b/lib/deque/rte_deque.c new file mode 100644 index 0000000000..b83a6c43c4 --- /dev/null +++ b/lib/deque/rte_deque.c @@ -0,0 +1,193 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2024 Arm Limited + */ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "rte_deque.h" + +/* mask of all valid flag values to deque_create() */ +#define __RTE_DEQUE_F_MASK (RTE_DEQUE_F_EXACT_SZ) +ssize_t +rte_deque_get_memsize_elem(unsigned int esize, unsigned int count) +{ + ssize_t sz; + + /* Check if element size is a multiple of 4B */ + if (esize % 4 != 0) { + rte_log(RTE_LOG_ERR, rte_deque_log_type, + "%s(): element size is not a multiple of 4\n", + __func__); + + return -EINVAL; + } + + /* count must be a power of 2 */ + if ((!RTE_IS_POWER_OF_2(count)) || (count > RTE_DEQUE_SZ_MASK)) { + rte_log(RTE_LOG_ERR, rte_deque_log_type, + "%s(): Requested number of elements is invalid," + "must be power of 2, and not exceed %u\n", + __func__, RTE_DEQUE_SZ_MASK); + + return -EINVAL; + } + + sz = sizeof(struct rte_deque) + (ssize_t)count * esize; + sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); + return sz; +} + +void +rte_deque_reset(struct rte_deque *d) +{ + d->head = 0; + d->tail = 0; +} + +int +rte_deque_init(struct rte_deque *d, const char *name, unsigned int count, + unsigned int flags) +{ + int ret; + + /* compilation-time checks */ + RTE_BUILD_BUG_ON((sizeof(struct rte_deque) & + RTE_CACHE_LINE_MASK) != 0); + + /* future proof flags, only allow supported values */ + if (flags & ~__RTE_DEQUE_F_MASK) { + rte_log(RTE_LOG_ERR, rte_deque_log_type, + "%s(): Unsupported flags requested %#x\n", + __func__, flags); + return -EINVAL; + } + + /* init the deque structure */ + memset(d, 0, sizeof(*d)); + ret = strlcpy(d->name, name, sizeof(d->name)); + if (ret < 0 || ret >= (int)sizeof(d->name)) + return -ENAMETOOLONG; + d->flags = flags; + + if (flags & RTE_DEQUE_F_EXACT_SZ) { + d->size = rte_align32pow2(count + 1); + d->mask = d->size - 1; + d->capacity = count; + } else { + if ((!RTE_IS_POWER_OF_2(count)) || (count > RTE_DEQUE_SZ_MASK)) { + rte_log(RTE_LOG_ERR, rte_deque_log_type, + "%s(): Requested size is invalid, must be power" + " of 2, and not exceed the size limit %u\n", + __func__, RTE_DEQUE_SZ_MASK); + return -EINVAL; + } + d->size = count; + d->mask = count - 1; + d->capacity = d->mask; + } + + return 0; +} + +/* create the deque for a given element size */ +struct rte_deque * +rte_deque_create(const char *name, unsigned int esize, unsigned int count, + int socket_id, unsigned int flags) +{ + char mz_name[RTE_MEMZONE_NAMESIZE]; + struct rte_deque *d; + const struct rte_memzone *mz; + ssize_t deque_size; + int mz_flags = 0; + const unsigned int requested_count = count; + int ret; + + /* for an exact size deque, round up from count to a power of two */ + if (flags & RTE_DEQUE_F_EXACT_SZ) + count = rte_align32pow2(count + 1); + + deque_size = rte_deque_get_memsize_elem(esize, count); + if (deque_size < 0) { + rte_errno = -deque_size; + return NULL; + } + + ret = snprintf(mz_name, sizeof(mz_name), "%s%s", + RTE_DEQUE_MZ_PREFIX, name); + if (ret < 0 || ret >= (int)sizeof(mz_name)) { + rte_errno = ENAMETOOLONG; + return NULL; + } + + /* reserve a memory zone for this deque. If we can't get rte_config or + * we are secondary process, the memzone_reserve function will set + * rte_errno for us appropriately - hence no check in this function + */ + mz = rte_memzone_reserve_aligned(mz_name, deque_size, socket_id, + mz_flags, alignof(struct rte_deque)); + if (mz != NULL) { + d = mz->addr; + /* no need to check return value here, we already checked the + * arguments above + */ + rte_deque_init(d, name, requested_count, flags); + d->memzone = mz; + } else { + d = NULL; + rte_log(RTE_LOG_ERR, rte_deque_log_type, + "%s(): Cannot reserve memory\n", __func__); + } + return d; +} + +/* free the deque */ +void +rte_deque_free(struct rte_deque *d) +{ + if (d == NULL) + return; + + /* + * Deque was not created with rte_deque_create, + * therefore, there is no memzone to free. + */ + if (d->memzone == NULL) { + rte_log(RTE_LOG_ERR, rte_deque_log_type, + "%s(): Cannot free deque, not created " + "with rte_deque_create()\n", __func__); + return; + } + + if (rte_memzone_free(d->memzone) != 0) + rte_log(RTE_LOG_ERR, rte_deque_log_type, + "%s(): Cannot free memory\n", __func__); +} + +/* dump the status of the deque on the console */ +void +rte_deque_dump(FILE *f, const struct rte_deque *d) +{ + fprintf(f, "deque <%s>@%p\n", d->name, d); + fprintf(f, " flags=%x\n", d->flags); + fprintf(f, " size=%"PRIu32"\n", d->size); + fprintf(f, " capacity=%"PRIu32"\n", d->capacity); + fprintf(f, " head=%"PRIu32"\n", d->head); + fprintf(f, " tail=%"PRIu32"\n", d->tail); + fprintf(f, " used=%u\n", rte_deque_count(d)); + fprintf(f, " avail=%u\n", rte_deque_free_count(d)); +} + +RTE_LOG_REGISTER_DEFAULT(rte_deque_log_type, ERR); diff --git a/lib/deque/rte_deque.h b/lib/deque/rte_deque.h new file mode 100644 index 0000000000..6633eab377 --- /dev/null +++ b/lib/deque/rte_deque.h @@ -0,0 +1,533 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2024 Arm Limited + */ + +#ifndef _RTE_DEQUE_H_ +#define _RTE_DEQUE_H_ + +/** + * @file + * RTE double ended queue (Deque) + * + * This fixed-size queue does not provide concurrent access by + * multiple threads. If required, the application should use locks + * to protect the deque from concurrent access. + * + * - Double ended queue + * - Maximum size is fixed + * - Store objects of any size + * - Single/bulk/burst dequeue at tail or head + * - Single/bulk/burst enqueue at head or tail + * + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include + +/** + * Calculate the memory size needed for a deque + * + * This function returns the number of bytes needed for a deque, given + * the number of objects and the object size. This value is the sum of + * the size of the structure rte_deque and the size of the memory needed + * by the objects. The value is aligned to a cache line size. + * + * @param esize + * The size of deque object, in bytes. It must be a multiple of 4. + * @param count + * The number of objects in the deque (must be a power of 2). + * @return + * - The memory size needed for the deque on success. + * - -EINVAL if count is not a power of 2. + */ +__rte_experimental +ssize_t rte_deque_get_memsize_elem(unsigned int esize, unsigned int count); + +/** + * Initialize a deque structure. + * + * Initialize a deque structure in memory pointed by "d". The size of the + * memory area must be large enough to store the deque structure and the + * object table. It is advised to use rte_deque_get_memsize() to get the + * appropriate size. + * + * The deque size is set to *count*, which must be a power of two. + * The real usable deque size is *count-1* instead of *count* to + * differentiate a full deque from an empty deque. + * + * @param d + * The pointer to the deque structure followed by the objects table. + * @param name + * The name of the deque. + * @param count + * The number of objects in the deque (must be a power of 2, + * unless RTE_DEQUE_F_EXACT_SZ is set in flags). + * @param flags + * - RTE_DEQUE_F_EXACT_SZ: If this flag is set, the deque will hold + * exactly the requested number of objects, and the requested size + * will be rounded up to the next power of two, but the usable space + * will be exactly that requested. Worst case, if a power-of-2 size is + * requested, half the deque space will be wasted. + * Without this flag set, the deque size requested must be a power of 2, + * and the usable space will be that size - 1. + * @return + * 0 on success, or a negative value on error. + */ +__rte_experimental +int rte_deque_init(struct rte_deque *d, const char *name, unsigned int count, + unsigned int flags); + +/** + * Create a new deque named *name* in memory. + * + * This function uses ``memzone_reserve()`` to allocate memory. Then it + * calls rte_deque_init() to initialize an empty deque. + * + * The new deque size is set to *count*, which must be a power of two. + * The real usable deque size is *count-1* instead of *count* to + * differentiate a full deque from an empty deque. + * + * @param name + * The name of the deque. + * @param esize + * The size of deque object, in bytes. It must be a multiple of 4. + * @param count + * The size of the deque (must be a power of 2, + * unless RTE_DEQUE_F_EXACT_SZ is set in flags). + * @param socket_id + * The *socket_id* argument is the socket identifier in case of + * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA + * constraint for the reserved zone. + * @param flags + * - RTE_DEQUE_F_EXACT_SZ: If this flag is set, the deque will hold exactly the + * requested number of entries, and the requested size will be rounded up + * to the next power of two, but the usable space will be exactly that + * requested. Worst case, if a power-of-2 size is requested, half the + * deque space will be wasted. + * Without this flag set, the deque size requested must be a power of 2, + * and the usable space will be that size - 1. + * @return + * On success, the pointer to the new allocated deque. NULL on error with + * rte_errno set appropriately. Possible errno values include: + * - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure + * - EINVAL - count provided is not a power of 2 + * - ENOSPC - the maximum number of memzones has already been allocated + * - EEXIST - a memzone with the same name already exists + * - ENOMEM - no appropriate memory area found in which to create memzone + */ +__rte_experimental +struct rte_deque *rte_deque_create(const char *name, unsigned int esize, + unsigned int count, int socket_id, + unsigned int flags); + +/** + * De-allocate all memory used by the deque. + * + * @param d + * Deque to free. + * If NULL then, the function does nothing. + */ +__rte_experimental +void rte_deque_free(struct rte_deque *d); + +/** + * Dump the status of the deque to a file. + * + * @param f + * A pointer to a file for output + * @param d + * A pointer to the deque structure. + */ +__rte_experimental +void rte_deque_dump(FILE *f, const struct rte_deque *d); + +/** + * Return the number of entries in a deque. + * + * @param d + * A pointer to the deque structure. + * @return + * The number of entries in the deque. + */ +static inline unsigned int +rte_deque_count(const struct rte_deque *d) +{ + return (d->head - d->tail) & d->mask; +} + +/** + * Return the number of free entries in a deque. + * + * @param d + * A pointer to the deque structure. + * @return + * The number of free entries in the deque. + */ +static inline unsigned int +rte_deque_free_count(const struct rte_deque *d) +{ + return d->capacity - rte_deque_count(d); +} + +/** + * Enqueue fixed number of objects on a deque at the head. + * + * This function copies the objects at the head of the deque and + * moves the head index. + * + * @param d + * A pointer to the deque structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of deque object, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the deque. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the deque from the obj_table. + * @param free_space + * Returns the amount of space in the deque after the enqueue operation + * has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_deque_head_enqueue_bulk_elem(struct rte_deque *d, + const void *obj_table, + unsigned int esize, + unsigned int n, + unsigned int *free_space) +{ + *free_space = rte_deque_free_count(d); + if (unlikely(n > *free_space)) + return 0; + *free_space -= n; + return __rte_deque_enqueue_at_head(d, obj_table, esize, n); +} + +/** + * Enqueue up to a maximum number of objects on a deque at the head. + * + * This function copies the objects at the head of the deque and + * moves the head index. + * + * @param d + * A pointer to the deque structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of deque object, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the deque. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the deque from the obj_table. + * @param free_space + * Returns the amount of space in the deque after the enqueue operation + * has finished. + * @return + * - n: Actual number of objects enqueued. + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_deque_head_enqueue_burst_elem(struct rte_deque *d, const void *obj_table, + unsigned int esize, unsigned int n, + unsigned int *free_space) +{ + unsigned int avail_space = rte_deque_free_count(d); + unsigned int to_be_enqueued = (n <= avail_space ? n : avail_space); + *free_space = avail_space - n; + return __rte_deque_enqueue_at_head(d, obj_table, esize, to_be_enqueued); +} + +/** + * Enqueue fixed number of objects on a deque at the tail. + * + * This function copies the objects at the tail of the deque and + * moves the tail index (backwards). + * + * @param d + * A pointer to the deque structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of deque object, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the deque. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the deque from the obj_table. + * @param free_space + * Returns the amount of space in the deque after the enqueue operation + * has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_deque_tail_enqueue_bulk_elem(struct rte_deque *d, + const void *obj_table, unsigned int esize, + unsigned int n, unsigned int *free_space) +{ + *free_space = rte_deque_free_count(d); + if (unlikely(n > *free_space)) + return 0; + *free_space -= n; + return __rte_deque_enqueue_at_tail(d, obj_table, esize, n); +} + +/** + * Enqueue up to a maximum number of objects on a deque at the tail. + * + * This function copies the objects at the tail of the deque and + * moves the tail index (backwards). + * + * @param d + * A pointer to the deque structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of deque object, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the deque. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the deque from the obj_table. + * @param free_space + * Returns the amount of space in the deque after the enqueue operation + * has finished. + * @return + * - n: Actual number of objects enqueued. + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_deque_tail_enqueue_burst_elem(struct rte_deque *d, + const void *obj_table, unsigned int esize, + unsigned int n, unsigned int *free_space) +{ + unsigned int avail_space = rte_deque_free_count(d); + unsigned int to_be_enqueued = (n <= avail_space ? n : avail_space); + *free_space = avail_space - to_be_enqueued; + return __rte_deque_enqueue_at_tail(d, obj_table, esize, to_be_enqueued); +} + +/** + * Dequeue a fixed number of objects from a deque at tail. + * + * This function copies the objects from the tail of the deque and + * moves the tail index. + * + * @param d + * A pointer to the deque structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of deque object, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the deque. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the deque to the obj_table. + * @param available + * Returns the number of remaining deque entries after the dequeue + * has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_deque_tail_dequeue_bulk_elem(struct rte_deque *d, void *obj_table, + unsigned int esize, unsigned int n, + unsigned int *available) +{ + *available = rte_deque_count(d); + if (unlikely(n > *available)) + return 0; + *available -= n; + return __rte_deque_dequeue_at_tail(d, obj_table, esize, n); +} + +/** + * Dequeue up to a maximum number of objects from a deque at tail. + * + * This function copies the objects from the tail of the deque and + * moves the tail index. + * + * @param d + * A pointer to the deque structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of deque object, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the deque. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the deque to the obj_table. + * @param available + * Returns the number of remaining deque entries after the dequeue + * has finished. + * @return + * - Number of objects dequeued + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_deque_tail_dequeue_burst_elem(struct rte_deque *d, void *obj_table, + unsigned int esize, unsigned int n, + unsigned int *available) +{ + unsigned int count = rte_deque_count(d); + unsigned int to_be_dequeued = (n <= count ? n : count); + *available = count - to_be_dequeued; + return __rte_deque_dequeue_at_tail(d, obj_table, esize, to_be_dequeued); +} + +/** + * Dequeue a fixed number of objects from a deque from the head. + * + * This function copies the objects from the head of the deque and + * moves the head index (backwards). + * + * @param d + * A pointer to the deque structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of deque object, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the deque. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the deque to the obj_table. + * @param available + * Returns the number of remaining deque entries after the dequeue + * has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_deque_head_dequeue_bulk_elem(struct rte_deque *d, void *obj_table, + unsigned int esize, unsigned int n, + unsigned int *available) +{ + *available = rte_deque_count(d); + if (unlikely(n > *available)) + return 0; + *available -= n; + return __rte_deque_dequeue_at_head(d, obj_table, esize, n); +} + +/** + * Dequeue up to a maximum number of objects from a deque from the head. + * + * This function copies the objects from the head of the deque and + * moves the head index (backwards). + * + * @param d + * A pointer to the deque structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of deque object, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the deque. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the deque to the obj_table. + * @param available + * Returns the number of remaining deque entries after the dequeue + * has finished. + * @return + * - Number of objects dequeued + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_deque_head_dequeue_burst_elem(struct rte_deque *d, void *obj_table, + unsigned int esize, unsigned int n, + unsigned int *available) +{ + unsigned int count = rte_deque_count(d); + unsigned int to_be_dequeued = (n <= count ? n : count); + *available = count - to_be_dequeued; + return __rte_deque_dequeue_at_head(d, obj_table, esize, to_be_dequeued); +} + +/** + * Flush a deque. + * + * This function flush all the objects in a deque + * + * @warning + * Make sure the deque is not in use while calling this function. + * + * @param d + * A pointer to the deque structure. + */ +__rte_experimental +void rte_deque_reset(struct rte_deque *d); + +/** + * Test if a deque is full. + * + * @param d + * A pointer to the deque structure. + * @return + * - 1: The deque is full. + * - 0: The deque is not full. + */ +static inline int +rte_deque_full(const struct rte_deque *d) +{ + return rte_deque_free_count(d) == 0; +} + +/** + * Test if a deque is empty. + * + * @param d + * A pointer to the deque structure. + * @return + * - 1: The deque is empty. + * - 0: The deque is not empty. + */ +static inline int +rte_deque_empty(const struct rte_deque *d) +{ + return d->tail == d->head; +} + +/** + * Return the size of the deque. + * + * @param d + * A pointer to the deque structure. + * @return + * The size of the data store used by the deque. + * NOTE: this is not the same as the usable space in the deque. To query that + * use ``rte_deque_get_capacity()``. + */ +static inline unsigned int +rte_deque_get_size(const struct rte_deque *d) +{ + return d->size; +} + +/** + * Return the number of objects which can be stored in the deque. + * + * @param d + * A pointer to the deque structure. + * @return + * The usable size of the deque. + */ +static inline unsigned int +rte_deque_get_capacity(const struct rte_deque *d) +{ + return d->capacity; +} + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_DEQUE_H_ */ diff --git a/lib/deque/rte_deque_core.h b/lib/deque/rte_deque_core.h new file mode 100644 index 0000000000..0bb8695c8a --- /dev/null +++ b/lib/deque/rte_deque_core.h @@ -0,0 +1,81 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2024 Arm Limited + */ + +#ifndef _RTE_DEQUE_CORE_H_ +#define _RTE_DEQUE_CORE_H_ + +/** + * @file + * This file contains definition of RTE deque structure, init flags and + * some related macros. This file should not be included directly, + * include rte_deque.h instead. + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +extern int rte_deque_log_type; + +#define RTE_DEQUE_MZ_PREFIX "DEQUE_" +/** The maximum length of a deque name. */ +#define RTE_DEQUE_NAMESIZE (RTE_MEMZONE_NAMESIZE - \ + sizeof(RTE_DEQUE_MZ_PREFIX) + 1) + +/** + * Double ended queue (deque) structure. + * + * The producer and the consumer have a head and a tail index. These indices + * are not between 0 and size(deque)-1. These indices are between 0 and + * 2^32 -1. Their value is masked while accessing the objects in deque. + * These indices are unsigned 32bits. Hence the result of the subtraction is + * always a modulo of 2^32 and it is between 0 and capacity. + */ +struct rte_deque { + alignas(RTE_CACHE_LINE_SIZE) char name[RTE_DEQUE_NAMESIZE]; + /**< Name of the deque */ + int flags; + /**< Flags supplied at creation. */ + const struct rte_memzone *memzone; + /**< Memzone, if any, containing the rte_deque */ + + alignas(RTE_CACHE_LINE_SIZE) char pad0; /**< empty cache line */ + + uint32_t size; /**< Size of deque. */ + uint32_t mask; /**< Mask (size-1) of deque. */ + uint32_t capacity; /**< Usable size of deque */ + /** Ring head and tail pointers. */ + volatile uint32_t head; + volatile uint32_t tail; +}; + +/** + * Deque is to hold exactly requested number of entries. + * Without this flag set, the deque size requested must be a power of 2, and the + * usable space will be that size - 1. With the flag, the requested size will + * be rounded up to the next power of two, but the usable space will be exactly + * that requested. Worst case, if a power-of-2 size is requested, half the + * deque space will be wasted. + */ +#define RTE_DEQUE_F_EXACT_SZ 0x0004 +#define RTE_DEQUE_SZ_MASK (0x7fffffffU) /**< Ring size mask */ + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_DEQUE_CORE_H_ */ diff --git a/lib/deque/rte_deque_pvt.h b/lib/deque/rte_deque_pvt.h new file mode 100644 index 0000000000..931bbd4d19 --- /dev/null +++ b/lib/deque/rte_deque_pvt.h @@ -0,0 +1,538 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2024 Arm Limited + */ + +#ifndef _RTE_DEQUE_PVT_H_ +#define _RTE_DEQUE_PVT_H_ + +#define __RTE_DEQUE_COUNT(d) ((d->head - d->tail) & d->mask) +#define __RTE_DEQUE_FREE_SPACE(d) (d->capacity - __RTE_DEQUE_COUNT(d)) + +static __rte_always_inline void +__rte_deque_enqueue_elems_head_32(struct rte_deque *d, + const unsigned int size, + uint32_t idx, + const void *obj_table, + unsigned int n) +{ + unsigned int i; + uint32_t *deque = (uint32_t *)&d[1]; + const uint32_t *obj = (const uint32_t *)obj_table; + if (likely(idx + n <= size)) { + for (i = 0; i < (n & ~0x7); i += 8, idx += 8) { + deque[idx] = obj[i]; + deque[idx + 1] = obj[i + 1]; + deque[idx + 2] = obj[i + 2]; + deque[idx + 3] = obj[i + 3]; + deque[idx + 4] = obj[i + 4]; + deque[idx + 5] = obj[i + 5]; + deque[idx + 6] = obj[i + 6]; + deque[idx + 7] = obj[i + 7]; + } + switch (n & 0x7) { + case 7: + deque[idx++] = obj[i++]; /* fallthrough */ + case 6: + deque[idx++] = obj[i++]; /* fallthrough */ + case 5: + deque[idx++] = obj[i++]; /* fallthrough */ + case 4: + deque[idx++] = obj[i++]; /* fallthrough */ + case 3: + deque[idx++] = obj[i++]; /* fallthrough */ + case 2: + deque[idx++] = obj[i++]; /* fallthrough */ + case 1: + deque[idx++] = obj[i++]; /* fallthrough */ + } + } else { + for (i = 0; idx < size; i++, idx++) + deque[idx] = obj[i]; + /* Start at the beginning */ + for (idx = 0; i < n; i++, idx++) + deque[idx] = obj[i]; + } +} + +static __rte_always_inline void +__rte_deque_enqueue_elems_head_64(struct rte_deque *d, + const void *obj_table, + unsigned int n) +{ + unsigned int i; + const uint32_t size = d->size; + uint32_t idx = (d->head & d->mask); + uint64_t *deque = (uint64_t *)&d[1]; + const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table; + if (likely(idx + n <= size)) { + for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { + deque[idx] = obj[i]; + deque[idx + 1] = obj[i + 1]; + deque[idx + 2] = obj[i + 2]; + deque[idx + 3] = obj[i + 3]; + } + switch (n & 0x3) { + case 3: + deque[idx++] = obj[i++]; /* fallthrough */ + case 2: + deque[idx++] = obj[i++]; /* fallthrough */ + case 1: + deque[idx++] = obj[i++]; /* fallthrough */ + } + } else { + for (i = 0; idx < size; i++, idx++) + deque[idx] = obj[i]; + /* Start at the beginning */ + for (idx = 0; i < n; i++, idx++) + deque[idx] = obj[i]; + } +} + +static __rte_always_inline void +__rte_deque_enqueue_elems_head_128(struct rte_deque *d, + const void *obj_table, + unsigned int n) +{ + unsigned int i; + const uint32_t size = d->size; + uint32_t idx = (d->head & d->mask); + rte_int128_t *deque = (rte_int128_t *)&d[1]; + const rte_int128_t *obj = (const rte_int128_t *)obj_table; + if (likely(idx + n <= size)) { + for (i = 0; i < (n & ~0x1); i += 2, idx += 2) + memcpy((void *)(deque + idx), + (const void *)(obj + i), 32); + switch (n & 0x1) { + case 1: + memcpy((void *)(deque + idx), + (const void *)(obj + i), 16); + } + } else { + for (i = 0; idx < size; i++, idx++) + memcpy((void *)(deque + idx), + (const void *)(obj + i), 16); + /* Start at the beginning */ + for (idx = 0; i < n; i++, idx++) + memcpy((void *)(deque + idx), + (const void *)(obj + i), 16); + } +} + +static __rte_always_inline unsigned int +__rte_deque_enqueue_at_head(struct rte_deque *d, + const void *obj_table, + unsigned int esize, + unsigned int n) +{ + /* 8B and 16B copies implemented individually because on some platforms + * there are 64 bit and 128 bit registers available for direct copying. + */ + if (esize == 8) + __rte_deque_enqueue_elems_head_64(d, obj_table, n); + else if (esize == 16) + __rte_deque_enqueue_elems_head_128(d, obj_table, n); + else { + uint32_t idx, scale, nd_idx, nd_num, nd_size; + + /* Normalize to uint32_t */ + scale = esize / sizeof(uint32_t); + nd_num = n * scale; + idx = d->head & d->mask; + nd_idx = idx * scale; + nd_size = d->size * scale; + __rte_deque_enqueue_elems_head_32(d, nd_size, nd_idx, + obj_table, nd_num); + } + d->head = (d->head + n) & d->mask; + return n; +} + +static __rte_always_inline void +__rte_deque_enqueue_elems_tail_32(struct rte_deque *d, + const unsigned int mask, + uint32_t idx, + const void *obj_table, + unsigned int n, + const unsigned int scale, + const unsigned int elem_size) +{ + unsigned int i; + uint32_t *deque = (uint32_t *)&d[1]; + const uint32_t *obj = (const uint32_t *)obj_table; + + if (likely(idx >= n)) { + for (i = 0; i < n; idx -= scale, i += scale) + memcpy(&deque[idx], &obj[i], elem_size); + } else { + for (i = 0; (int32_t)idx >= 0; idx -= scale, i += scale) + memcpy(&deque[idx], &obj[i], elem_size); + + /* Start at the ending */ + idx = mask; + for (; i < n; idx -= scale, i += scale) + memcpy(&deque[idx], &obj[i], elem_size); + } +} + +static __rte_always_inline void +__rte_deque_enqueue_elems_tail_64(struct rte_deque *d, + const void *obj_table, + unsigned int n) +{ + unsigned int i; + uint32_t idx = (d->tail & d->mask); + uint64_t *deque = (uint64_t *)&d[1]; + const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table; + if (likely((int32_t)(idx - n) >= 0)) { + for (i = 0; i < (n & ~0x3); i += 4, idx -= 4) { + deque[idx] = obj[i]; + deque[idx - 1] = obj[i + 1]; + deque[idx - 2] = obj[i + 2]; + deque[idx - 3] = obj[i + 3]; + } + switch (n & 0x3) { + case 3: + deque[idx--] = obj[i++]; /* fallthrough */ + case 2: + deque[idx--] = obj[i++]; /* fallthrough */ + case 1: + deque[idx--] = obj[i++]; /* fallthrough */ + } + } else { + for (i = 0; (int32_t)idx >= 0; i++, idx--) + deque[idx] = obj[i]; + /* Start at the ending */ + for (idx = d->mask; i < n; i++, idx--) + deque[idx] = obj[i]; + } +} + +static __rte_always_inline void +__rte_deque_enqueue_elems_tail_128(struct rte_deque *d, + const void *obj_table, + unsigned int n) +{ + unsigned int i; + uint32_t idx = (d->tail & d->mask); + rte_int128_t *deque = (rte_int128_t *)&d[1]; + const rte_int128_t *obj = (const rte_int128_t *)obj_table; + if (likely((int32_t)(idx - n) >= 0)) { + for (i = 0; i < (n & ~0x1); i += 2, idx -= 2) { + deque[idx] = obj[i]; + deque[idx - 1] = obj[i + 1]; + } + switch (n & 0x1) { + case 1: + memcpy((void *)(deque + idx), + (const void *)(obj + i), 16); + } + } else { + for (i = 0; (int32_t)idx >= 0; i++, idx--) + memcpy((void *)(deque + idx), + (const void *)(obj + i), 16); + /* Start at the ending */ + for (idx = d->mask; i < n; i++, idx--) + memcpy((void *)(deque + idx), + (const void *)(obj + i), 16); + } +} + +static __rte_always_inline unsigned int +__rte_deque_enqueue_at_tail(struct rte_deque *d, + const void *obj_table, + unsigned int esize, + unsigned int n) +{ + /* The tail point must point at an empty cell when enqueuing */ + d->tail--; + + /* 8B and 16B copies implemented individually because on some platforms + * there are 64 bit and 128 bit registers available for direct copying. + */ + if (esize == 8) + __rte_deque_enqueue_elems_tail_64(d, obj_table, n); + else if (esize == 16) + __rte_deque_enqueue_elems_tail_128(d, obj_table, n); + else { + uint32_t idx, scale, nd_idx, nd_num, nd_mask; + + /* Normalize to uint32_t */ + scale = esize / sizeof(uint32_t); + nd_num = n * scale; + idx = d->tail & d->mask; + nd_idx = idx * scale; + nd_mask = d->mask * scale; + __rte_deque_enqueue_elems_tail_32(d, nd_mask, nd_idx, obj_table, + nd_num, scale, esize); + } + + /* The +1 is because the tail needs to point at a + * non-empty memory location after the enqueuing operation. + */ + d->tail = (d->tail - n + 1) & d->mask; + return n; +} + +static __rte_always_inline void +__rte_deque_dequeue_elems_32(struct rte_deque *d, + const unsigned int size, + uint32_t idx, + void *obj_table, + unsigned int n) +{ + unsigned int i; + const uint32_t *deque = (const uint32_t *)&d[1]; + uint32_t *obj = (uint32_t *)obj_table; + if (likely(idx + n <= size)) { + for (i = 0; i < (n & ~0x7); i += 8, idx += 8) { + obj[i] = deque[idx]; + obj[i + 1] = deque[idx + 1]; + obj[i + 2] = deque[idx + 2]; + obj[i + 3] = deque[idx + 3]; + obj[i + 4] = deque[idx + 4]; + obj[i + 5] = deque[idx + 5]; + obj[i + 6] = deque[idx + 6]; + obj[i + 7] = deque[idx + 7]; + } + switch (n & 0x7) { + case 7: + obj[i++] = deque[idx++]; /* fallthrough */ + case 6: + obj[i++] = deque[idx++]; /* fallthrough */ + case 5: + obj[i++] = deque[idx++]; /* fallthrough */ + case 4: + obj[i++] = deque[idx++]; /* fallthrough */ + case 3: + obj[i++] = deque[idx++]; /* fallthrough */ + case 2: + obj[i++] = deque[idx++]; /* fallthrough */ + case 1: + obj[i++] = deque[idx++]; /* fallthrough */ + } + } else { + for (i = 0; idx < size; i++, idx++) + obj[i] = deque[idx]; + /* Start at the beginning */ + for (idx = 0; i < n; i++, idx++) + obj[i] = deque[idx]; + } +} + +static __rte_always_inline void +__rte_deque_dequeue_elems_64(struct rte_deque *d, void *obj_table, + unsigned int n) +{ + unsigned int i; + const uint32_t size = d->size; + uint32_t idx = (d->tail & d->mask); + const uint64_t *deque = (const uint64_t *)&d[1]; + unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table; + if (likely(idx + n <= size)) { + for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { + obj[i] = deque[idx]; + obj[i + 1] = deque[idx + 1]; + obj[i + 2] = deque[idx + 2]; + obj[i + 3] = deque[idx + 3]; + } + switch (n & 0x3) { + case 3: + obj[i++] = deque[idx++]; /* fallthrough */ + case 2: + obj[i++] = deque[idx++]; /* fallthrough */ + case 1: + obj[i++] = deque[idx++]; /* fallthrough */ + } + } else { + for (i = 0; idx < size; i++, idx++) + obj[i] = deque[idx]; + /* Start at the beginning */ + for (idx = 0; i < n; i++, idx++) + obj[i] = deque[idx]; + } +} + +static __rte_always_inline void +__rte_deque_dequeue_elems_128(struct rte_deque *d, + void *obj_table, + unsigned int n) +{ + unsigned int i; + const uint32_t size = d->size; + uint32_t idx = (d->tail & d->mask); + const rte_int128_t *deque = (const rte_int128_t *)&d[1]; + rte_int128_t *obj = (rte_int128_t *)obj_table; + if (likely(idx + n <= size)) { + for (i = 0; i < (n & ~0x1); i += 2, idx += 2) + memcpy((void *)(obj + i), + (const void *)(deque + idx), 32); + switch (n & 0x1) { + case 1: + memcpy((void *)(obj + i), + (const void *)(deque + idx), 16); + } + } else { + for (i = 0; idx < size; i++, idx++) + memcpy((void *)(obj + i), + (const void *)(deque + idx), 16); + /* Start at the beginning */ + for (idx = 0; i < n; i++, idx++) + memcpy((void *)(obj + i), + (const void *)(deque + idx), 16); + } +} + +static __rte_always_inline unsigned int +__rte_deque_dequeue_at_tail(struct rte_deque *d, + void *obj_table, + unsigned int esize, + unsigned int n) +{ + /* 8B and 16B copies implemented individually because on some platforms + * there are 64 bit and 128 bit registers available for direct copying. + */ + if (esize == 8) + __rte_deque_dequeue_elems_64(d, obj_table, n); + else if (esize == 16) + __rte_deque_dequeue_elems_128(d, obj_table, n); + else { + uint32_t idx, scale, nd_idx, nd_num, nd_size; + + /* Normalize to uint32_t */ + scale = esize / sizeof(uint32_t); + nd_num = n * scale; + idx = d->tail & d->mask; + nd_idx = idx * scale; + nd_size = d->size * scale; + __rte_deque_dequeue_elems_32(d, nd_size, nd_idx, + obj_table, nd_num); + } + d->tail = (d->tail + n) & d->mask; + return n; +} + +static __rte_always_inline void +__rte_deque_dequeue_elems_head_32(struct rte_deque *d, + const unsigned int mask, + uint32_t idx, + void *obj_table, + unsigned int n, + const unsigned int scale, + const unsigned int elem_size) +{ + unsigned int i; + const uint32_t *deque = (uint32_t *)&d[1]; + uint32_t *obj = (uint32_t *)obj_table; + + if (likely(idx >= n)) { + for (i = 0; i < n; idx -= scale, i += scale) + memcpy(&obj[i], &deque[idx], elem_size); + } else { + for (i = 0; (int32_t)idx >= 0; idx -= scale, i += scale) + memcpy(&obj[i], &deque[idx], elem_size); + /* Start at the ending */ + idx = mask; + for (; i < n; idx -= scale, i += scale) + memcpy(&obj[i], &deque[idx], elem_size); + } +} + +static __rte_always_inline void +__rte_deque_dequeue_elems_head_64(struct rte_deque *d, + void *obj_table, + unsigned int n) +{ + unsigned int i; + uint32_t idx = (d->head & d->mask); + const uint64_t *deque = (uint64_t *)&d[1]; + unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table; + if (likely((int32_t)(idx - n) >= 0)) { + for (i = 0; i < (n & ~0x3); i += 4, idx -= 4) { + obj[i] = deque[idx]; + obj[i + 1] = deque[idx - 1]; + obj[i + 2] = deque[idx - 2]; + obj[i + 3] = deque[idx - 3]; + } + switch (n & 0x3) { + case 3: + obj[i++] = deque[idx--]; /* fallthrough */ + case 2: + obj[i++] = deque[idx--]; /* fallthrough */ + case 1: + obj[i++] = deque[idx--]; /* fallthrough */ + } + } else { + for (i = 0; (int32_t)idx >= 0; i++, idx--) + obj[i] = deque[idx]; + /* Start at the ending */ + for (idx = d->mask; i < n; i++, idx--) + obj[i] = deque[idx]; + } +} + +static __rte_always_inline void +__rte_deque_dequeue_elems_head_128(struct rte_deque *d, + void *obj_table, + unsigned int n) +{ + unsigned int i; + uint32_t idx = (d->head & d->mask); + const rte_int128_t *deque = (rte_int128_t *)&d[1]; + rte_int128_t *obj = (rte_int128_t *)obj_table; + if (likely((int32_t)(idx - n) >= 0)) { + for (i = 0; i < (n & ~0x1); i += 2, idx -= 2) { + obj[i] = deque[idx]; + obj[i + 1] = deque[idx - 1]; + } + switch (n & 0x1) { + case 1: + memcpy((void *)(obj + i), + (const void *)(deque + idx), 16); + } + } else { + for (i = 0; (int32_t)idx >= 0; i++, idx--) + memcpy((void *)(obj + i), + (const void *)(deque + idx), 16); + /* Start at the ending */ + for (idx = d->mask; i < n; i++, idx--) + memcpy((void *)(obj + i), + (const void *)(deque + idx), 16); + } +} + +static __rte_always_inline unsigned int +__rte_deque_dequeue_at_head(struct rte_deque *d, + void *obj_table, + unsigned int esize, + unsigned int n) +{ + /* The head must point at an empty cell when dequeueing */ + d->head--; + + /* 8B and 16B copies implemented individually because on some platforms + * there are 64 bit and 128 bit registers available for direct copying. + */ + if (esize == 8) + __rte_deque_dequeue_elems_head_64(d, obj_table, n); + else if (esize == 16) + __rte_deque_dequeue_elems_head_128(d, obj_table, n); + else { + uint32_t idx, scale, nd_idx, nd_num, nd_mask; + + /* Normalize to uint32_t */ + scale = esize / sizeof(uint32_t); + nd_num = n * scale; + idx = d->head & d->mask; + nd_idx = idx * scale; + nd_mask = d->mask * scale; + __rte_deque_dequeue_elems_head_32(d, nd_mask, nd_idx, obj_table, + nd_num, scale, esize); + } + + /* The +1 is because the head needs to point at a + * empty memory location after the dequeueing operation. + */ + d->head = (d->head - n + 1) & d->mask; + return n; +} +#endif /* _RTE_DEQUEU_PVT_H_ */ diff --git a/lib/deque/rte_deque_zc.h b/lib/deque/rte_deque_zc.h new file mode 100644 index 0000000000..6d7167e158 --- /dev/null +++ b/lib/deque/rte_deque_zc.h @@ -0,0 +1,430 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2024 Arm Limited + */ +#ifndef _RTE_DEQUE_ZC_H_ +#define _RTE_DEQUE_ZC_H_ + +/** + * @file + * This file should not be included directly, include rte_deque.h instead. + * + * Deque Zero Copy APIs + * These APIs make it possible to split public enqueue/dequeue API + * into 3 parts: + * - enqueue/dequeue start + * - copy data to/from the deque + * - enqueue/dequeue finish + * These APIs provide the ability to avoid copying of the data to temporary area. + * + */ + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Deque zero-copy information structure. + * + * This structure contains the pointers and length of the space + * reserved on the Deque storage. + */ +struct __rte_cache_aligned rte_deque_zc_data { + /* Pointer to the first space in the deque */ + void *ptr1; + /* Pointer to the second space in the deque if there is wrap-around. + * It contains valid value only if wrap-around happens. + */ + void *ptr2; + /* Number of elements in the first pointer. If this is equal to + * the number of elements requested, then ptr2 is NULL. + * Otherwise, subtracting n1 from number of elements requested + * will give the number of elements available at ptr2. + */ + unsigned int n1; +}; + +static __rte_always_inline void +__rte_deque_get_elem_addr(struct rte_deque *d, uint32_t pos, + uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void **dst2, + bool low_to_high) +{ + uint32_t idx, scale, nr_idx; + uint32_t *deque_ptr = (uint32_t *)&d[1]; + + /* Normalize to uint32_t */ + scale = esize / sizeof(uint32_t); + idx = pos & d->mask; + nr_idx = idx * scale; + + *dst1 = deque_ptr + nr_idx; + *n1 = num; + + if (low_to_high) { + if (idx + num > d->size) { + *n1 = d->size - idx; + *dst2 = deque_ptr; + } else + *dst2 = NULL; + } else { + if ((int32_t)(idx - num) < 0) { + *n1 = idx + 1; + *dst2 = (void *)&deque_ptr[(-1 & d->mask) * scale]; + } else + *dst2 = NULL; + } +} + +/** + * Start to enqueue several objects on the deque. + * Note that no actual objects are put in the deque by this function, + * it just reserves space for the user on the deque. + * User has to copy objects into the deque using the returned pointers. + * User should call rte_deque_enqueue_zc_elem_finish to complete the + * enqueue operation. + * + * @param d + * A pointer to the deque structure. + * @param esize + * The size of deque element, in bytes. It must be a multiple of 4. + * @param n + * The number of objects to add in the deque. + * @param zcd + * Structure containing the pointers and length of the space + * reserved on the deque storage. + * @param free_space + * Returns the amount of space in the deque after the reservation operation + * has finished. + * @return + * The number of objects that can be enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_deque_head_enqueue_zc_bulk_elem_start(struct rte_deque *d, unsigned int esize, + unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *free_space) +{ + + *free_space = __RTE_DEQUE_FREE_SPACE(d); + if (unlikely(*free_space < n)) + return 0; + __rte_deque_get_elem_addr(d, d->head, esize, n, &zcd->ptr1, + &zcd->n1, &zcd->ptr2, true); + + *free_space -= n; + return n; +} + +/** + * Complete enqueuing several pointers to objects on the deque. + * Note that number of objects to enqueue should not exceed previous + * enqueue_start return value. + * + * @param d + * A pointer to the deque structure. + * @param n + * The number of pointers to objects to add to the deque. + */ +__rte_experimental +static __rte_always_inline void +rte_deque_head_enqueue_zc_elem_finish(struct rte_deque *d, unsigned int n) +{ + d->head = (d->head + n) & d->mask; +} + +/** + * Start to enqueue several objects on the deque. + * Note that no actual objects are put in the queue by this function, + * it just reserves space for the user on the deque. + * User has to copy objects into the queue using the returned pointers. + * User should call rte_deque_enqueue_zc_elem_finish to complete the + * enqueue operation. + * + * @param d + * A pointer to the deque structure. + * @param esize + * The size of deque element, in bytes. It must be a multiple of 4. + * @param n + * The number of objects to add in the deque. + * @param zcd + * Structure containing the pointers and length of the space + * reserved on the deque storage. + * @param free_space + * Returns the amount of space in the deque after the reservation operation + * has finished. + * @return + * The number of objects that can be enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_deque_head_enqueue_zc_burst_elem_start(struct rte_deque *d, unsigned int esize, + unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *free_space) +{ + *free_space = __RTE_DEQUE_FREE_SPACE(d); + n = n > *free_space ? *free_space : n; + return rte_deque_head_enqueue_zc_bulk_elem_start(d, esize, n, zcd, free_space); +} + +/** + * Start to enqueue several objects on the deque. + * Note that no actual objects are put in the deque by this function, + * it just reserves space for the user on the deque. + * User has to copy objects into the deque using the returned pointers. + * User should call rte_deque_enqueue_zc_elem_finish to complete the + * enqueue operation. + * + * @param d + * A pointer to the deque structure. + * @param esize + * The size of deque element, in bytes. It must be a multiple of 4. + * @param n + * The number of objects to add in the deque. + * @param zcd + * Structure containing the pointers and length of the space + * reserved on the deque storage. + * @param free_space + * Returns the amount of space in the deque after the reservation operation + * has finished. + * @return + * The number of objects that can be enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_deque_tail_enqueue_zc_bulk_elem_start(struct rte_deque *d, unsigned int esize, + unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *free_space) +{ + *free_space = __RTE_DEQUE_FREE_SPACE(d); + if (unlikely(*free_space < n)) + return 0; + __rte_deque_get_elem_addr(d, d->tail - 1, esize, n, &zcd->ptr1, + &zcd->n1, &zcd->ptr2, false); + + *free_space -= n; + return n; +} + +/** + * Complete enqueuing several pointers to objects on the deque. + * Note that number of objects to enqueue should not exceed previous + * enqueue_start return value. + * + * @param d + * A pointer to the deque structure. + * @param n + * The number of pointers to objects to add to the deque. + */ +__rte_experimental +static __rte_always_inline void +rte_deque_tail_enqueue_zc_elem_finish(struct rte_deque *d, unsigned int n) +{ + d->tail = (d->tail - n) & d->mask; +} + +/** + * Start to enqueue several objects on the deque. + * Note that no actual objects are put in the queue by this function, + * it just reserves space for the user on the deque. + * User has to copy objects into the queue using the returned pointers. + * User should call rte_deque_enqueue_zc_elem_finish to complete the + * enqueue operation. + * + * @param d + * A pointer to the deque structure. + * @param esize + * The size of deque element, in bytes. It must be a multiple of 4. + * @param n + * The number of objects to add in the deque.@param r + * @param zcd + * Structure containing the pointers and length of the space + * reserved on the deque storage. + * @param free_space + * Returns the amount of space in the deque after the reservation operation + * has finished. + * @return + * The number of objects that can be enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_deque_tail_enqueue_zc_burst_elem_start(struct rte_deque *d, unsigned int esize, + unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *free_space) +{ + *free_space = __RTE_DEQUE_FREE_SPACE(d); + n = n > *free_space ? *free_space : n; + return rte_deque_tail_enqueue_zc_bulk_elem_start(d, esize, n, zcd, free_space); +} + +/** + * Start to dequeue several objects from the deque. + * Note that no actual objects are copied from the queue by this function. + * User has to copy objects from the queue using the returned pointers. + * User should call rte_deque_dequeue_zc_elem_finish to complete the + * dequeue operation. + * + * @param d + * A pointer to the deque structure. + * @param esize + * The size of deque element, in bytes. It must be a multiple of 4. + * @param n + * The number of objects to remove from the deque. + * @param zcd + * Structure containing the pointers and length of the space + * reserved on the deque storage. + * @param available + * Returns the number of remaining deque entries after the dequeue has + * finished. + * @return + * The number of objects that can be dequeued, either 0 or n. + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_deque_tail_dequeue_zc_bulk_elem_start(struct rte_deque *d, unsigned int esize, + unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *available) +{ + *available = __RTE_DEQUE_COUNT(d); + if (unlikely(*available < n)) + return 0; + __rte_deque_get_elem_addr(d, d->tail, esize, n, &zcd->ptr1, + &zcd->n1, &zcd->ptr2, true); + + *available -= n; + return n; +} + +/** + * Complete dequeuing several objects from the deque. + * Note that number of objects to dequeued should not exceed previous + * dequeue_start return value. + * + * @param d + * A pointer to the deque structure. + * @param n + * The number of objects to remove from the deque. + */ +__rte_experimental +static __rte_always_inline void +rte_deque_tail_dequeue_zc_elem_finish(struct rte_deque *d, unsigned int n) +{ + d->tail = (d->tail + n) & d->mask; +} + +/** + * Start to dequeue several objects from the deque. + * Note that no actual objects are copied from the queue by this function. + * User has to copy objects from the queue using the returned pointers. + * User should call rte_deque_dequeue_zc_elem_finish to complete the + * dequeue operation. + * + * @param d + * A pointer to the deque structure. + * @param esize + * The size of deque element, in bytes. It must be a multiple of 4. + * @param n + * The number of objects to remove from the deque. + * @param zcd + * Structure containing the pointers and length of the space + * reserved on the deque storage. + * @param available + * Returns the number of remaining deque entries after the dequeue has + * finished. + * @return + * The number of objects that can be dequeued, either 0 or n. + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_deque_tail_dequeue_zc_burst_elem_start(struct rte_deque *d, unsigned int esize, + unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *available) +{ + *available = __RTE_DEQUE_COUNT(d); + n = n > *available ? *available : n; + return rte_deque_tail_dequeue_zc_bulk_elem_start(d, esize, n, zcd, available); +} + +/** + * Start to dequeue several objects from the deque. + * Note that no actual objects are copied from the queue by this function. + * User has to copy objects from the queue using the returned pointers. + * User should call rte_deque_dequeue_zc_elem_finish to complete the + * dequeue operation. + * + * @param d + * A pointer to the deque structure. + * @param esize + * The size of deque element, in bytes. It must be a multiple of 4. + * @param n + * The number of objects to remove from the deque. + * @param zcd + * Structure containing the pointers and length of the space + * reserved on the deque storage. + * @param available + * Returns the number of remaining deque entries after the dequeue has + * finished. + * @return + * The number of objects that can be dequeued, either 0 or n. + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_deque_head_dequeue_zc_bulk_elem_start(struct rte_deque *d, unsigned int esize, + unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *available) +{ + *available = __RTE_DEQUE_COUNT(d); + if (unlikely(*available < n)) + return 0; + __rte_deque_get_elem_addr(d, d->head - 1, esize, n, &zcd->ptr1, + &zcd->n1, &zcd->ptr2, false); + + *available -= n; + return n; +} + +/** + * Complete dequeuing several objects from the deque. + * Note that number of objects to dequeued should not exceed previous + * dequeue_start return value. + * + * @param d + * A pointer to the deque structure. + * @param n + * The number of objects to remove from the deque. + */ +__rte_experimental +static __rte_always_inline void +rte_deque_head_dequeue_zc_elem_finish(struct rte_deque *d, unsigned int n) +{ + d->head = (d->head - n) & d->mask; +} + +/** + * Start to dequeue several objects from the deque. + * Note that no actual objects are copied from the queue by this function. + * User has to copy objects from the queue using the returned pointers. + * User should call rte_deque_dequeue_zc_elem_finish to complete the + * dequeue operation. + * + * @param d + * A pointer to the deque structure. + * @param esize + * The size of deque element, in bytes. It must be a multiple of 4. + * @param n + * The number of objects to remove from the deque. + * @param zcd + * Structure containing the pointers and length of the space + * reserved on the deque storage. + * @param available + * Returns the number of remaining deque entries after the dequeue has + * finished. + * @return + * The number of objects that can be dequeued, either 0 or n. + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_deque_head_dequeue_zc_burst_elem_start(struct rte_deque *d, unsigned int esize, + unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *available) +{ + *available = __RTE_DEQUE_COUNT(d); + n = n > *available ? *available : n; + return rte_deque_head_dequeue_zc_bulk_elem_start(d, esize, n, zcd, available); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_DEQUE_ZC_H_ */ diff --git a/lib/deque/version.map b/lib/deque/version.map new file mode 100644 index 0000000000..103fd3b512 --- /dev/null +++ b/lib/deque/version.map @@ -0,0 +1,14 @@ +EXPERIMENTAL { + global: + + # added in 24.07 + rte_deque_log_type; + rte_deque_create; + rte_deque_dump; + rte_deque_free; + rte_deque_get_memsize_elem; + rte_deque_init; + rte_deque_reset; + + local: *; +}; diff --git a/lib/meson.build b/lib/meson.build index 179a272932..127e4dc68c 100644 --- a/lib/meson.build +++ b/lib/meson.build @@ -14,6 +14,7 @@ libraries = [ 'argparse', 'telemetry', # basic info querying 'eal', # everything depends on eal + 'deque', 'ring', 'rcu', # rcu depends on ring 'mempool', @@ -74,6 +75,7 @@ if is_ms_compiler 'kvargs', 'telemetry', 'eal', + 'deque', 'ring', ] endif -- 2.25.1