From: Aditya Ambadipudi <aditya.ambadipudi@arm.com>
To: dev@dpdk.org, jackmin@nvidia.com, stephen@networkplumber.org,
matan@nvidia.com, viacheslavo@nvidia.com,
roretzla@linux.microsoft.com, konstantin.ananyev@huawei.com,
mb@smartsharesystems.com, hofors@lysator.liu.se
Cc: wathsala.vithanage@arm.com, dhruv.tripathi@arm.com,
honnappa.nagarahalli@arm.com, nd@arm.com,
Aditya Ambadipudi <aditya.ambadipudi@arm.com>
Subject: [PATCH v2 1/2] deque: add multi-thread unsafe double ended queue
Date: Wed, 24 Apr 2024 08:42:32 -0500 [thread overview]
Message-ID: <20240424134233.1336370-2-aditya.ambadipudi@arm.com> (raw)
In-Reply-To: <20240424134233.1336370-1-aditya.ambadipudi@arm.com>
From: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Add a multi-thread unsafe double ended queue data structure. This
library provides a simple and efficient alternative to multi-thread
safe ring when multi-thread safety is not required.
Signed-off-by: Aditya Ambadipudi <aditya.ambadipudi@arm.com>
Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Change-Id: I6f66fa2ebf750adb22ac75f8cb3c2fe8bdb5fa9e
---
v2:
* Addressed the spell check warning issue with the word "Deque"
* Tried to rename all objects that are named deque to avoid collision with
std::deque
* Added the deque library to msvc section in meson.build
* Renamed api functions to explicitly state if the function inserts at head org
tail.
.mailmap | 1 +
devtools/build-dict.sh | 1 +
lib/deque/meson.build | 11 +
lib/deque/rte_deque.c | 193 +++++++++++++
lib/deque/rte_deque.h | 533 ++++++++++++++++++++++++++++++++++++
lib/deque/rte_deque_core.h | 81 ++++++
lib/deque/rte_deque_pvt.h | 538 +++++++++++++++++++++++++++++++++++++
lib/deque/rte_deque_zc.h | 430 +++++++++++++++++++++++++++++
lib/deque/version.map | 14 +
lib/meson.build | 2 +
10 files changed, 1804 insertions(+)
create mode 100644 lib/deque/meson.build
create mode 100644 lib/deque/rte_deque.c
create mode 100644 lib/deque/rte_deque.h
create mode 100644 lib/deque/rte_deque_core.h
create mode 100644 lib/deque/rte_deque_pvt.h
create mode 100644 lib/deque/rte_deque_zc.h
create mode 100644 lib/deque/version.map
diff --git a/.mailmap b/.mailmap
index 3843868716..8e705ab6ab 100644
--- a/.mailmap
+++ b/.mailmap
@@ -17,6 +17,7 @@ Adam Bynes <adambynes@outlook.com>
Adam Dybkowski <adamx.dybkowski@intel.com>
Adam Ludkiewicz <adam.ludkiewicz@intel.com>
Adham Masarwah <adham@nvidia.com> <adham@mellanox.com>
+Aditya Ambadipudi <aditya.ambadipudi@arm.com>
Adrian Moreno <amorenoz@redhat.com>
Adrian Podlawski <adrian.podlawski@intel.com>
Adrien Mazarguil <adrien.mazarguil@6wind.com>
diff --git a/devtools/build-dict.sh b/devtools/build-dict.sh
index a8cac49029..595d8f9277 100755
--- a/devtools/build-dict.sh
+++ b/devtools/build-dict.sh
@@ -17,6 +17,7 @@ sed '/^..->/d' |
sed '/^uint->/d' |
sed "/^doesn'->/d" |
sed '/^wasn->/d' |
+sed '/^deque.*->/d' |
# print to stdout
cat
diff --git a/lib/deque/meson.build b/lib/deque/meson.build
new file mode 100644
index 0000000000..1ff45fc39f
--- /dev/null
+++ b/lib/deque/meson.build
@@ -0,0 +1,11 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2024 Arm Limited
+
+sources = files('rte_deque.c')
+headers = files('rte_deque.h')
+# most sub-headers are not for direct inclusion
+indirect_headers += files (
+ 'rte_deque_core.h',
+ 'rte_deque_pvt.h',
+ 'rte_deque_zc.h'
+)
diff --git a/lib/deque/rte_deque.c b/lib/deque/rte_deque.c
new file mode 100644
index 0000000000..b83a6c43c4
--- /dev/null
+++ b/lib/deque/rte_deque.c
@@ -0,0 +1,193 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Arm Limited
+ */
+
+#include <stdalign.h>
+#include <string.h>
+#include <stdint.h>
+#include <inttypes.h>
+#include <errno.h>
+#include <sys/queue.h>
+
+#include <rte_common.h>
+#include <rte_log.h>
+#include <rte_memzone.h>
+#include <rte_malloc.h>
+#include <rte_eal_memconfig.h>
+#include <rte_errno.h>
+#include <rte_string_fns.h>
+
+#include "rte_deque.h"
+
+/* mask of all valid flag values to deque_create() */
+#define __RTE_DEQUE_F_MASK (RTE_DEQUE_F_EXACT_SZ)
+ssize_t
+rte_deque_get_memsize_elem(unsigned int esize, unsigned int count)
+{
+ ssize_t sz;
+
+ /* Check if element size is a multiple of 4B */
+ if (esize % 4 != 0) {
+ rte_log(RTE_LOG_ERR, rte_deque_log_type,
+ "%s(): element size is not a multiple of 4\n",
+ __func__);
+
+ return -EINVAL;
+ }
+
+ /* count must be a power of 2 */
+ if ((!RTE_IS_POWER_OF_2(count)) || (count > RTE_DEQUE_SZ_MASK)) {
+ rte_log(RTE_LOG_ERR, rte_deque_log_type,
+ "%s(): Requested number of elements is invalid,"
+ "must be power of 2, and not exceed %u\n",
+ __func__, RTE_DEQUE_SZ_MASK);
+
+ return -EINVAL;
+ }
+
+ sz = sizeof(struct rte_deque) + (ssize_t)count * esize;
+ sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
+ return sz;
+}
+
+void
+rte_deque_reset(struct rte_deque *d)
+{
+ d->head = 0;
+ d->tail = 0;
+}
+
+int
+rte_deque_init(struct rte_deque *d, const char *name, unsigned int count,
+ unsigned int flags)
+{
+ int ret;
+
+ /* compilation-time checks */
+ RTE_BUILD_BUG_ON((sizeof(struct rte_deque) &
+ RTE_CACHE_LINE_MASK) != 0);
+
+ /* future proof flags, only allow supported values */
+ if (flags & ~__RTE_DEQUE_F_MASK) {
+ rte_log(RTE_LOG_ERR, rte_deque_log_type,
+ "%s(): Unsupported flags requested %#x\n",
+ __func__, flags);
+ return -EINVAL;
+ }
+
+ /* init the deque structure */
+ memset(d, 0, sizeof(*d));
+ ret = strlcpy(d->name, name, sizeof(d->name));
+ if (ret < 0 || ret >= (int)sizeof(d->name))
+ return -ENAMETOOLONG;
+ d->flags = flags;
+
+ if (flags & RTE_DEQUE_F_EXACT_SZ) {
+ d->size = rte_align32pow2(count + 1);
+ d->mask = d->size - 1;
+ d->capacity = count;
+ } else {
+ if ((!RTE_IS_POWER_OF_2(count)) || (count > RTE_DEQUE_SZ_MASK)) {
+ rte_log(RTE_LOG_ERR, rte_deque_log_type,
+ "%s(): Requested size is invalid, must be power"
+ " of 2, and not exceed the size limit %u\n",
+ __func__, RTE_DEQUE_SZ_MASK);
+ return -EINVAL;
+ }
+ d->size = count;
+ d->mask = count - 1;
+ d->capacity = d->mask;
+ }
+
+ return 0;
+}
+
+/* create the deque for a given element size */
+struct rte_deque *
+rte_deque_create(const char *name, unsigned int esize, unsigned int count,
+ int socket_id, unsigned int flags)
+{
+ char mz_name[RTE_MEMZONE_NAMESIZE];
+ struct rte_deque *d;
+ const struct rte_memzone *mz;
+ ssize_t deque_size;
+ int mz_flags = 0;
+ const unsigned int requested_count = count;
+ int ret;
+
+ /* for an exact size deque, round up from count to a power of two */
+ if (flags & RTE_DEQUE_F_EXACT_SZ)
+ count = rte_align32pow2(count + 1);
+
+ deque_size = rte_deque_get_memsize_elem(esize, count);
+ if (deque_size < 0) {
+ rte_errno = -deque_size;
+ return NULL;
+ }
+
+ ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
+ RTE_DEQUE_MZ_PREFIX, name);
+ if (ret < 0 || ret >= (int)sizeof(mz_name)) {
+ rte_errno = ENAMETOOLONG;
+ return NULL;
+ }
+
+ /* reserve a memory zone for this deque. If we can't get rte_config or
+ * we are secondary process, the memzone_reserve function will set
+ * rte_errno for us appropriately - hence no check in this function
+ */
+ mz = rte_memzone_reserve_aligned(mz_name, deque_size, socket_id,
+ mz_flags, alignof(struct rte_deque));
+ if (mz != NULL) {
+ d = mz->addr;
+ /* no need to check return value here, we already checked the
+ * arguments above
+ */
+ rte_deque_init(d, name, requested_count, flags);
+ d->memzone = mz;
+ } else {
+ d = NULL;
+ rte_log(RTE_LOG_ERR, rte_deque_log_type,
+ "%s(): Cannot reserve memory\n", __func__);
+ }
+ return d;
+}
+
+/* free the deque */
+void
+rte_deque_free(struct rte_deque *d)
+{
+ if (d == NULL)
+ return;
+
+ /*
+ * Deque was not created with rte_deque_create,
+ * therefore, there is no memzone to free.
+ */
+ if (d->memzone == NULL) {
+ rte_log(RTE_LOG_ERR, rte_deque_log_type,
+ "%s(): Cannot free deque, not created "
+ "with rte_deque_create()\n", __func__);
+ return;
+ }
+
+ if (rte_memzone_free(d->memzone) != 0)
+ rte_log(RTE_LOG_ERR, rte_deque_log_type,
+ "%s(): Cannot free memory\n", __func__);
+}
+
+/* dump the status of the deque on the console */
+void
+rte_deque_dump(FILE *f, const struct rte_deque *d)
+{
+ fprintf(f, "deque <%s>@%p\n", d->name, d);
+ fprintf(f, " flags=%x\n", d->flags);
+ fprintf(f, " size=%"PRIu32"\n", d->size);
+ fprintf(f, " capacity=%"PRIu32"\n", d->capacity);
+ fprintf(f, " head=%"PRIu32"\n", d->head);
+ fprintf(f, " tail=%"PRIu32"\n", d->tail);
+ fprintf(f, " used=%u\n", rte_deque_count(d));
+ fprintf(f, " avail=%u\n", rte_deque_free_count(d));
+}
+
+RTE_LOG_REGISTER_DEFAULT(rte_deque_log_type, ERR);
diff --git a/lib/deque/rte_deque.h b/lib/deque/rte_deque.h
new file mode 100644
index 0000000000..6633eab377
--- /dev/null
+++ b/lib/deque/rte_deque.h
@@ -0,0 +1,533 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Arm Limited
+ */
+
+#ifndef _RTE_DEQUE_H_
+#define _RTE_DEQUE_H_
+
+/**
+ * @file
+ * RTE double ended queue (Deque)
+ *
+ * This fixed-size queue does not provide concurrent access by
+ * multiple threads. If required, the application should use locks
+ * to protect the deque from concurrent access.
+ *
+ * - Double ended queue
+ * - Maximum size is fixed
+ * - Store objects of any size
+ * - Single/bulk/burst dequeue at tail or head
+ * - Single/bulk/burst enqueue at head or tail
+ *
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <rte_deque_core.h>
+#include <rte_deque_pvt.h>
+#include <rte_deque_zc.h>
+
+/**
+ * Calculate the memory size needed for a deque
+ *
+ * This function returns the number of bytes needed for a deque, given
+ * the number of objects and the object size. This value is the sum of
+ * the size of the structure rte_deque and the size of the memory needed
+ * by the objects. The value is aligned to a cache line size.
+ *
+ * @param esize
+ * The size of deque object, in bytes. It must be a multiple of 4.
+ * @param count
+ * The number of objects in the deque (must be a power of 2).
+ * @return
+ * - The memory size needed for the deque on success.
+ * - -EINVAL if count is not a power of 2.
+ */
+__rte_experimental
+ssize_t rte_deque_get_memsize_elem(unsigned int esize, unsigned int count);
+
+/**
+ * Initialize a deque structure.
+ *
+ * Initialize a deque structure in memory pointed by "d". The size of the
+ * memory area must be large enough to store the deque structure and the
+ * object table. It is advised to use rte_deque_get_memsize() to get the
+ * appropriate size.
+ *
+ * The deque size is set to *count*, which must be a power of two.
+ * The real usable deque size is *count-1* instead of *count* to
+ * differentiate a full deque from an empty deque.
+ *
+ * @param d
+ * The pointer to the deque structure followed by the objects table.
+ * @param name
+ * The name of the deque.
+ * @param count
+ * The number of objects in the deque (must be a power of 2,
+ * unless RTE_DEQUE_F_EXACT_SZ is set in flags).
+ * @param flags
+ * - RTE_DEQUE_F_EXACT_SZ: If this flag is set, the deque will hold
+ * exactly the requested number of objects, and the requested size
+ * will be rounded up to the next power of two, but the usable space
+ * will be exactly that requested. Worst case, if a power-of-2 size is
+ * requested, half the deque space will be wasted.
+ * Without this flag set, the deque size requested must be a power of 2,
+ * and the usable space will be that size - 1.
+ * @return
+ * 0 on success, or a negative value on error.
+ */
+__rte_experimental
+int rte_deque_init(struct rte_deque *d, const char *name, unsigned int count,
+ unsigned int flags);
+
+/**
+ * Create a new deque named *name* in memory.
+ *
+ * This function uses ``memzone_reserve()`` to allocate memory. Then it
+ * calls rte_deque_init() to initialize an empty deque.
+ *
+ * The new deque size is set to *count*, which must be a power of two.
+ * The real usable deque size is *count-1* instead of *count* to
+ * differentiate a full deque from an empty deque.
+ *
+ * @param name
+ * The name of the deque.
+ * @param esize
+ * The size of deque object, in bytes. It must be a multiple of 4.
+ * @param count
+ * The size of the deque (must be a power of 2,
+ * unless RTE_DEQUE_F_EXACT_SZ is set in flags).
+ * @param socket_id
+ * The *socket_id* argument is the socket identifier in case of
+ * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
+ * constraint for the reserved zone.
+ * @param flags
+ * - RTE_DEQUE_F_EXACT_SZ: If this flag is set, the deque will hold exactly the
+ * requested number of entries, and the requested size will be rounded up
+ * to the next power of two, but the usable space will be exactly that
+ * requested. Worst case, if a power-of-2 size is requested, half the
+ * deque space will be wasted.
+ * Without this flag set, the deque size requested must be a power of 2,
+ * and the usable space will be that size - 1.
+ * @return
+ * On success, the pointer to the new allocated deque. NULL on error with
+ * rte_errno set appropriately. Possible errno values include:
+ * - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
+ * - EINVAL - count provided is not a power of 2
+ * - ENOSPC - the maximum number of memzones has already been allocated
+ * - EEXIST - a memzone with the same name already exists
+ * - ENOMEM - no appropriate memory area found in which to create memzone
+ */
+__rte_experimental
+struct rte_deque *rte_deque_create(const char *name, unsigned int esize,
+ unsigned int count, int socket_id,
+ unsigned int flags);
+
+/**
+ * De-allocate all memory used by the deque.
+ *
+ * @param d
+ * Deque to free.
+ * If NULL then, the function does nothing.
+ */
+__rte_experimental
+void rte_deque_free(struct rte_deque *d);
+
+/**
+ * Dump the status of the deque to a file.
+ *
+ * @param f
+ * A pointer to a file for output
+ * @param d
+ * A pointer to the deque structure.
+ */
+__rte_experimental
+void rte_deque_dump(FILE *f, const struct rte_deque *d);
+
+/**
+ * Return the number of entries in a deque.
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @return
+ * The number of entries in the deque.
+ */
+static inline unsigned int
+rte_deque_count(const struct rte_deque *d)
+{
+ return (d->head - d->tail) & d->mask;
+}
+
+/**
+ * Return the number of free entries in a deque.
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @return
+ * The number of free entries in the deque.
+ */
+static inline unsigned int
+rte_deque_free_count(const struct rte_deque *d)
+{
+ return d->capacity - rte_deque_count(d);
+}
+
+/**
+ * Enqueue fixed number of objects on a deque at the head.
+ *
+ * This function copies the objects at the head of the deque and
+ * moves the head index.
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @param obj_table
+ * A pointer to a table of objects.
+ * @param esize
+ * The size of deque object, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the deque. Otherwise
+ * the results are undefined.
+ * @param n
+ * The number of objects to add in the deque from the obj_table.
+ * @param free_space
+ * Returns the amount of space in the deque after the enqueue operation
+ * has finished.
+ * @return
+ * The number of objects enqueued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_head_enqueue_bulk_elem(struct rte_deque *d,
+ const void *obj_table,
+ unsigned int esize,
+ unsigned int n,
+ unsigned int *free_space)
+{
+ *free_space = rte_deque_free_count(d);
+ if (unlikely(n > *free_space))
+ return 0;
+ *free_space -= n;
+ return __rte_deque_enqueue_at_head(d, obj_table, esize, n);
+}
+
+/**
+ * Enqueue up to a maximum number of objects on a deque at the head.
+ *
+ * This function copies the objects at the head of the deque and
+ * moves the head index.
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @param obj_table
+ * A pointer to a table of objects.
+ * @param esize
+ * The size of deque object, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the deque. Otherwise
+ * the results are undefined.
+ * @param n
+ * The number of objects to add in the deque from the obj_table.
+ * @param free_space
+ * Returns the amount of space in the deque after the enqueue operation
+ * has finished.
+ * @return
+ * - n: Actual number of objects enqueued.
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_head_enqueue_burst_elem(struct rte_deque *d, const void *obj_table,
+ unsigned int esize, unsigned int n,
+ unsigned int *free_space)
+{
+ unsigned int avail_space = rte_deque_free_count(d);
+ unsigned int to_be_enqueued = (n <= avail_space ? n : avail_space);
+ *free_space = avail_space - n;
+ return __rte_deque_enqueue_at_head(d, obj_table, esize, to_be_enqueued);
+}
+
+/**
+ * Enqueue fixed number of objects on a deque at the tail.
+ *
+ * This function copies the objects at the tail of the deque and
+ * moves the tail index (backwards).
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @param obj_table
+ * A pointer to a table of objects.
+ * @param esize
+ * The size of deque object, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the deque. Otherwise
+ * the results are undefined.
+ * @param n
+ * The number of objects to add in the deque from the obj_table.
+ * @param free_space
+ * Returns the amount of space in the deque after the enqueue operation
+ * has finished.
+ * @return
+ * The number of objects enqueued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_tail_enqueue_bulk_elem(struct rte_deque *d,
+ const void *obj_table, unsigned int esize,
+ unsigned int n, unsigned int *free_space)
+{
+ *free_space = rte_deque_free_count(d);
+ if (unlikely(n > *free_space))
+ return 0;
+ *free_space -= n;
+ return __rte_deque_enqueue_at_tail(d, obj_table, esize, n);
+}
+
+/**
+ * Enqueue up to a maximum number of objects on a deque at the tail.
+ *
+ * This function copies the objects at the tail of the deque and
+ * moves the tail index (backwards).
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @param obj_table
+ * A pointer to a table of objects.
+ * @param esize
+ * The size of deque object, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the deque. Otherwise
+ * the results are undefined.
+ * @param n
+ * The number of objects to add in the deque from the obj_table.
+ * @param free_space
+ * Returns the amount of space in the deque after the enqueue operation
+ * has finished.
+ * @return
+ * - n: Actual number of objects enqueued.
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_tail_enqueue_burst_elem(struct rte_deque *d,
+ const void *obj_table, unsigned int esize,
+ unsigned int n, unsigned int *free_space)
+{
+ unsigned int avail_space = rte_deque_free_count(d);
+ unsigned int to_be_enqueued = (n <= avail_space ? n : avail_space);
+ *free_space = avail_space - to_be_enqueued;
+ return __rte_deque_enqueue_at_tail(d, obj_table, esize, to_be_enqueued);
+}
+
+/**
+ * Dequeue a fixed number of objects from a deque at tail.
+ *
+ * This function copies the objects from the tail of the deque and
+ * moves the tail index.
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @param obj_table
+ * A pointer to a table of objects that will be filled.
+ * @param esize
+ * The size of deque object, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the deque. Otherwise
+ * the results are undefined.
+ * @param n
+ * The number of objects to dequeue from the deque to the obj_table.
+ * @param available
+ * Returns the number of remaining deque entries after the dequeue
+ * has finished.
+ * @return
+ * The number of objects dequeued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_tail_dequeue_bulk_elem(struct rte_deque *d, void *obj_table,
+ unsigned int esize, unsigned int n,
+ unsigned int *available)
+{
+ *available = rte_deque_count(d);
+ if (unlikely(n > *available))
+ return 0;
+ *available -= n;
+ return __rte_deque_dequeue_at_tail(d, obj_table, esize, n);
+}
+
+/**
+ * Dequeue up to a maximum number of objects from a deque at tail.
+ *
+ * This function copies the objects from the tail of the deque and
+ * moves the tail index.
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @param obj_table
+ * A pointer to a table of objects that will be filled.
+ * @param esize
+ * The size of deque object, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the deque. Otherwise
+ * the results are undefined.
+ * @param n
+ * The number of objects to dequeue from the deque to the obj_table.
+ * @param available
+ * Returns the number of remaining deque entries after the dequeue
+ * has finished.
+ * @return
+ * - Number of objects dequeued
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_tail_dequeue_burst_elem(struct rte_deque *d, void *obj_table,
+ unsigned int esize, unsigned int n,
+ unsigned int *available)
+{
+ unsigned int count = rte_deque_count(d);
+ unsigned int to_be_dequeued = (n <= count ? n : count);
+ *available = count - to_be_dequeued;
+ return __rte_deque_dequeue_at_tail(d, obj_table, esize, to_be_dequeued);
+}
+
+/**
+ * Dequeue a fixed number of objects from a deque from the head.
+ *
+ * This function copies the objects from the head of the deque and
+ * moves the head index (backwards).
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @param obj_table
+ * A pointer to a table of objects that will be filled.
+ * @param esize
+ * The size of deque object, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the deque. Otherwise
+ * the results are undefined.
+ * @param n
+ * The number of objects to dequeue from the deque to the obj_table.
+ * @param available
+ * Returns the number of remaining deque entries after the dequeue
+ * has finished.
+ * @return
+ * The number of objects dequeued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_head_dequeue_bulk_elem(struct rte_deque *d, void *obj_table,
+ unsigned int esize, unsigned int n,
+ unsigned int *available)
+{
+ *available = rte_deque_count(d);
+ if (unlikely(n > *available))
+ return 0;
+ *available -= n;
+ return __rte_deque_dequeue_at_head(d, obj_table, esize, n);
+}
+
+/**
+ * Dequeue up to a maximum number of objects from a deque from the head.
+ *
+ * This function copies the objects from the head of the deque and
+ * moves the head index (backwards).
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @param obj_table
+ * A pointer to a table of objects that will be filled.
+ * @param esize
+ * The size of deque object, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the deque. Otherwise
+ * the results are undefined.
+ * @param n
+ * The number of objects to dequeue from the deque to the obj_table.
+ * @param available
+ * Returns the number of remaining deque entries after the dequeue
+ * has finished.
+ * @return
+ * - Number of objects dequeued
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_head_dequeue_burst_elem(struct rte_deque *d, void *obj_table,
+ unsigned int esize, unsigned int n,
+ unsigned int *available)
+{
+ unsigned int count = rte_deque_count(d);
+ unsigned int to_be_dequeued = (n <= count ? n : count);
+ *available = count - to_be_dequeued;
+ return __rte_deque_dequeue_at_head(d, obj_table, esize, to_be_dequeued);
+}
+
+/**
+ * Flush a deque.
+ *
+ * This function flush all the objects in a deque
+ *
+ * @warning
+ * Make sure the deque is not in use while calling this function.
+ *
+ * @param d
+ * A pointer to the deque structure.
+ */
+__rte_experimental
+void rte_deque_reset(struct rte_deque *d);
+
+/**
+ * Test if a deque is full.
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @return
+ * - 1: The deque is full.
+ * - 0: The deque is not full.
+ */
+static inline int
+rte_deque_full(const struct rte_deque *d)
+{
+ return rte_deque_free_count(d) == 0;
+}
+
+/**
+ * Test if a deque is empty.
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @return
+ * - 1: The deque is empty.
+ * - 0: The deque is not empty.
+ */
+static inline int
+rte_deque_empty(const struct rte_deque *d)
+{
+ return d->tail == d->head;
+}
+
+/**
+ * Return the size of the deque.
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @return
+ * The size of the data store used by the deque.
+ * NOTE: this is not the same as the usable space in the deque. To query that
+ * use ``rte_deque_get_capacity()``.
+ */
+static inline unsigned int
+rte_deque_get_size(const struct rte_deque *d)
+{
+ return d->size;
+}
+
+/**
+ * Return the number of objects which can be stored in the deque.
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @return
+ * The usable size of the deque.
+ */
+static inline unsigned int
+rte_deque_get_capacity(const struct rte_deque *d)
+{
+ return d->capacity;
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_DEQUE_H_ */
diff --git a/lib/deque/rte_deque_core.h b/lib/deque/rte_deque_core.h
new file mode 100644
index 0000000000..0bb8695c8a
--- /dev/null
+++ b/lib/deque/rte_deque_core.h
@@ -0,0 +1,81 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Arm Limited
+ */
+
+#ifndef _RTE_DEQUE_CORE_H_
+#define _RTE_DEQUE_CORE_H_
+
+/**
+ * @file
+ * This file contains definition of RTE deque structure, init flags and
+ * some related macros. This file should not be included directly,
+ * include rte_deque.h instead.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdint.h>
+#include <string.h>
+#include <errno.h>
+#include <rte_common.h>
+#include <rte_config.h>
+#include <rte_memory.h>
+#include <rte_lcore.h>
+#include <rte_atomic.h>
+#include <rte_branch_prediction.h>
+#include <rte_memzone.h>
+#include <rte_pause.h>
+#include <rte_debug.h>
+
+extern int rte_deque_log_type;
+
+#define RTE_DEQUE_MZ_PREFIX "DEQUE_"
+/** The maximum length of a deque name. */
+#define RTE_DEQUE_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
+ sizeof(RTE_DEQUE_MZ_PREFIX) + 1)
+
+/**
+ * Double ended queue (deque) structure.
+ *
+ * The producer and the consumer have a head and a tail index. These indices
+ * are not between 0 and size(deque)-1. These indices are between 0 and
+ * 2^32 -1. Their value is masked while accessing the objects in deque.
+ * These indices are unsigned 32bits. Hence the result of the subtraction is
+ * always a modulo of 2^32 and it is between 0 and capacity.
+ */
+struct rte_deque {
+ alignas(RTE_CACHE_LINE_SIZE) char name[RTE_DEQUE_NAMESIZE];
+ /**< Name of the deque */
+ int flags;
+ /**< Flags supplied at creation. */
+ const struct rte_memzone *memzone;
+ /**< Memzone, if any, containing the rte_deque */
+
+ alignas(RTE_CACHE_LINE_SIZE) char pad0; /**< empty cache line */
+
+ uint32_t size; /**< Size of deque. */
+ uint32_t mask; /**< Mask (size-1) of deque. */
+ uint32_t capacity; /**< Usable size of deque */
+ /** Ring head and tail pointers. */
+ volatile uint32_t head;
+ volatile uint32_t tail;
+};
+
+/**
+ * Deque is to hold exactly requested number of entries.
+ * Without this flag set, the deque size requested must be a power of 2, and the
+ * usable space will be that size - 1. With the flag, the requested size will
+ * be rounded up to the next power of two, but the usable space will be exactly
+ * that requested. Worst case, if a power-of-2 size is requested, half the
+ * deque space will be wasted.
+ */
+#define RTE_DEQUE_F_EXACT_SZ 0x0004
+#define RTE_DEQUE_SZ_MASK (0x7fffffffU) /**< Ring size mask */
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_DEQUE_CORE_H_ */
diff --git a/lib/deque/rte_deque_pvt.h b/lib/deque/rte_deque_pvt.h
new file mode 100644
index 0000000000..931bbd4d19
--- /dev/null
+++ b/lib/deque/rte_deque_pvt.h
@@ -0,0 +1,538 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Arm Limited
+ */
+
+#ifndef _RTE_DEQUE_PVT_H_
+#define _RTE_DEQUE_PVT_H_
+
+#define __RTE_DEQUE_COUNT(d) ((d->head - d->tail) & d->mask)
+#define __RTE_DEQUE_FREE_SPACE(d) (d->capacity - __RTE_DEQUE_COUNT(d))
+
+static __rte_always_inline void
+__rte_deque_enqueue_elems_head_32(struct rte_deque *d,
+ const unsigned int size,
+ uint32_t idx,
+ const void *obj_table,
+ unsigned int n)
+{
+ unsigned int i;
+ uint32_t *deque = (uint32_t *)&d[1];
+ const uint32_t *obj = (const uint32_t *)obj_table;
+ if (likely(idx + n <= size)) {
+ for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
+ deque[idx] = obj[i];
+ deque[idx + 1] = obj[i + 1];
+ deque[idx + 2] = obj[i + 2];
+ deque[idx + 3] = obj[i + 3];
+ deque[idx + 4] = obj[i + 4];
+ deque[idx + 5] = obj[i + 5];
+ deque[idx + 6] = obj[i + 6];
+ deque[idx + 7] = obj[i + 7];
+ }
+ switch (n & 0x7) {
+ case 7:
+ deque[idx++] = obj[i++]; /* fallthrough */
+ case 6:
+ deque[idx++] = obj[i++]; /* fallthrough */
+ case 5:
+ deque[idx++] = obj[i++]; /* fallthrough */
+ case 4:
+ deque[idx++] = obj[i++]; /* fallthrough */
+ case 3:
+ deque[idx++] = obj[i++]; /* fallthrough */
+ case 2:
+ deque[idx++] = obj[i++]; /* fallthrough */
+ case 1:
+ deque[idx++] = obj[i++]; /* fallthrough */
+ }
+ } else {
+ for (i = 0; idx < size; i++, idx++)
+ deque[idx] = obj[i];
+ /* Start at the beginning */
+ for (idx = 0; i < n; i++, idx++)
+ deque[idx] = obj[i];
+ }
+}
+
+static __rte_always_inline void
+__rte_deque_enqueue_elems_head_64(struct rte_deque *d,
+ const void *obj_table,
+ unsigned int n)
+{
+ unsigned int i;
+ const uint32_t size = d->size;
+ uint32_t idx = (d->head & d->mask);
+ uint64_t *deque = (uint64_t *)&d[1];
+ const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table;
+ if (likely(idx + n <= size)) {
+ for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
+ deque[idx] = obj[i];
+ deque[idx + 1] = obj[i + 1];
+ deque[idx + 2] = obj[i + 2];
+ deque[idx + 3] = obj[i + 3];
+ }
+ switch (n & 0x3) {
+ case 3:
+ deque[idx++] = obj[i++]; /* fallthrough */
+ case 2:
+ deque[idx++] = obj[i++]; /* fallthrough */
+ case 1:
+ deque[idx++] = obj[i++]; /* fallthrough */
+ }
+ } else {
+ for (i = 0; idx < size; i++, idx++)
+ deque[idx] = obj[i];
+ /* Start at the beginning */
+ for (idx = 0; i < n; i++, idx++)
+ deque[idx] = obj[i];
+ }
+}
+
+static __rte_always_inline void
+__rte_deque_enqueue_elems_head_128(struct rte_deque *d,
+ const void *obj_table,
+ unsigned int n)
+{
+ unsigned int i;
+ const uint32_t size = d->size;
+ uint32_t idx = (d->head & d->mask);
+ rte_int128_t *deque = (rte_int128_t *)&d[1];
+ const rte_int128_t *obj = (const rte_int128_t *)obj_table;
+ if (likely(idx + n <= size)) {
+ for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
+ memcpy((void *)(deque + idx),
+ (const void *)(obj + i), 32);
+ switch (n & 0x1) {
+ case 1:
+ memcpy((void *)(deque + idx),
+ (const void *)(obj + i), 16);
+ }
+ } else {
+ for (i = 0; idx < size; i++, idx++)
+ memcpy((void *)(deque + idx),
+ (const void *)(obj + i), 16);
+ /* Start at the beginning */
+ for (idx = 0; i < n; i++, idx++)
+ memcpy((void *)(deque + idx),
+ (const void *)(obj + i), 16);
+ }
+}
+
+static __rte_always_inline unsigned int
+__rte_deque_enqueue_at_head(struct rte_deque *d,
+ const void *obj_table,
+ unsigned int esize,
+ unsigned int n)
+{
+ /* 8B and 16B copies implemented individually because on some platforms
+ * there are 64 bit and 128 bit registers available for direct copying.
+ */
+ if (esize == 8)
+ __rte_deque_enqueue_elems_head_64(d, obj_table, n);
+ else if (esize == 16)
+ __rte_deque_enqueue_elems_head_128(d, obj_table, n);
+ else {
+ uint32_t idx, scale, nd_idx, nd_num, nd_size;
+
+ /* Normalize to uint32_t */
+ scale = esize / sizeof(uint32_t);
+ nd_num = n * scale;
+ idx = d->head & d->mask;
+ nd_idx = idx * scale;
+ nd_size = d->size * scale;
+ __rte_deque_enqueue_elems_head_32(d, nd_size, nd_idx,
+ obj_table, nd_num);
+ }
+ d->head = (d->head + n) & d->mask;
+ return n;
+}
+
+static __rte_always_inline void
+__rte_deque_enqueue_elems_tail_32(struct rte_deque *d,
+ const unsigned int mask,
+ uint32_t idx,
+ const void *obj_table,
+ unsigned int n,
+ const unsigned int scale,
+ const unsigned int elem_size)
+{
+ unsigned int i;
+ uint32_t *deque = (uint32_t *)&d[1];
+ const uint32_t *obj = (const uint32_t *)obj_table;
+
+ if (likely(idx >= n)) {
+ for (i = 0; i < n; idx -= scale, i += scale)
+ memcpy(&deque[idx], &obj[i], elem_size);
+ } else {
+ for (i = 0; (int32_t)idx >= 0; idx -= scale, i += scale)
+ memcpy(&deque[idx], &obj[i], elem_size);
+
+ /* Start at the ending */
+ idx = mask;
+ for (; i < n; idx -= scale, i += scale)
+ memcpy(&deque[idx], &obj[i], elem_size);
+ }
+}
+
+static __rte_always_inline void
+__rte_deque_enqueue_elems_tail_64(struct rte_deque *d,
+ const void *obj_table,
+ unsigned int n)
+{
+ unsigned int i;
+ uint32_t idx = (d->tail & d->mask);
+ uint64_t *deque = (uint64_t *)&d[1];
+ const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table;
+ if (likely((int32_t)(idx - n) >= 0)) {
+ for (i = 0; i < (n & ~0x3); i += 4, idx -= 4) {
+ deque[idx] = obj[i];
+ deque[idx - 1] = obj[i + 1];
+ deque[idx - 2] = obj[i + 2];
+ deque[idx - 3] = obj[i + 3];
+ }
+ switch (n & 0x3) {
+ case 3:
+ deque[idx--] = obj[i++]; /* fallthrough */
+ case 2:
+ deque[idx--] = obj[i++]; /* fallthrough */
+ case 1:
+ deque[idx--] = obj[i++]; /* fallthrough */
+ }
+ } else {
+ for (i = 0; (int32_t)idx >= 0; i++, idx--)
+ deque[idx] = obj[i];
+ /* Start at the ending */
+ for (idx = d->mask; i < n; i++, idx--)
+ deque[idx] = obj[i];
+ }
+}
+
+static __rte_always_inline void
+__rte_deque_enqueue_elems_tail_128(struct rte_deque *d,
+ const void *obj_table,
+ unsigned int n)
+{
+ unsigned int i;
+ uint32_t idx = (d->tail & d->mask);
+ rte_int128_t *deque = (rte_int128_t *)&d[1];
+ const rte_int128_t *obj = (const rte_int128_t *)obj_table;
+ if (likely((int32_t)(idx - n) >= 0)) {
+ for (i = 0; i < (n & ~0x1); i += 2, idx -= 2) {
+ deque[idx] = obj[i];
+ deque[idx - 1] = obj[i + 1];
+ }
+ switch (n & 0x1) {
+ case 1:
+ memcpy((void *)(deque + idx),
+ (const void *)(obj + i), 16);
+ }
+ } else {
+ for (i = 0; (int32_t)idx >= 0; i++, idx--)
+ memcpy((void *)(deque + idx),
+ (const void *)(obj + i), 16);
+ /* Start at the ending */
+ for (idx = d->mask; i < n; i++, idx--)
+ memcpy((void *)(deque + idx),
+ (const void *)(obj + i), 16);
+ }
+}
+
+static __rte_always_inline unsigned int
+__rte_deque_enqueue_at_tail(struct rte_deque *d,
+ const void *obj_table,
+ unsigned int esize,
+ unsigned int n)
+{
+ /* The tail point must point at an empty cell when enqueuing */
+ d->tail--;
+
+ /* 8B and 16B copies implemented individually because on some platforms
+ * there are 64 bit and 128 bit registers available for direct copying.
+ */
+ if (esize == 8)
+ __rte_deque_enqueue_elems_tail_64(d, obj_table, n);
+ else if (esize == 16)
+ __rte_deque_enqueue_elems_tail_128(d, obj_table, n);
+ else {
+ uint32_t idx, scale, nd_idx, nd_num, nd_mask;
+
+ /* Normalize to uint32_t */
+ scale = esize / sizeof(uint32_t);
+ nd_num = n * scale;
+ idx = d->tail & d->mask;
+ nd_idx = idx * scale;
+ nd_mask = d->mask * scale;
+ __rte_deque_enqueue_elems_tail_32(d, nd_mask, nd_idx, obj_table,
+ nd_num, scale, esize);
+ }
+
+ /* The +1 is because the tail needs to point at a
+ * non-empty memory location after the enqueuing operation.
+ */
+ d->tail = (d->tail - n + 1) & d->mask;
+ return n;
+}
+
+static __rte_always_inline void
+__rte_deque_dequeue_elems_32(struct rte_deque *d,
+ const unsigned int size,
+ uint32_t idx,
+ void *obj_table,
+ unsigned int n)
+{
+ unsigned int i;
+ const uint32_t *deque = (const uint32_t *)&d[1];
+ uint32_t *obj = (uint32_t *)obj_table;
+ if (likely(idx + n <= size)) {
+ for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
+ obj[i] = deque[idx];
+ obj[i + 1] = deque[idx + 1];
+ obj[i + 2] = deque[idx + 2];
+ obj[i + 3] = deque[idx + 3];
+ obj[i + 4] = deque[idx + 4];
+ obj[i + 5] = deque[idx + 5];
+ obj[i + 6] = deque[idx + 6];
+ obj[i + 7] = deque[idx + 7];
+ }
+ switch (n & 0x7) {
+ case 7:
+ obj[i++] = deque[idx++]; /* fallthrough */
+ case 6:
+ obj[i++] = deque[idx++]; /* fallthrough */
+ case 5:
+ obj[i++] = deque[idx++]; /* fallthrough */
+ case 4:
+ obj[i++] = deque[idx++]; /* fallthrough */
+ case 3:
+ obj[i++] = deque[idx++]; /* fallthrough */
+ case 2:
+ obj[i++] = deque[idx++]; /* fallthrough */
+ case 1:
+ obj[i++] = deque[idx++]; /* fallthrough */
+ }
+ } else {
+ for (i = 0; idx < size; i++, idx++)
+ obj[i] = deque[idx];
+ /* Start at the beginning */
+ for (idx = 0; i < n; i++, idx++)
+ obj[i] = deque[idx];
+ }
+}
+
+static __rte_always_inline void
+__rte_deque_dequeue_elems_64(struct rte_deque *d, void *obj_table,
+ unsigned int n)
+{
+ unsigned int i;
+ const uint32_t size = d->size;
+ uint32_t idx = (d->tail & d->mask);
+ const uint64_t *deque = (const uint64_t *)&d[1];
+ unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
+ if (likely(idx + n <= size)) {
+ for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
+ obj[i] = deque[idx];
+ obj[i + 1] = deque[idx + 1];
+ obj[i + 2] = deque[idx + 2];
+ obj[i + 3] = deque[idx + 3];
+ }
+ switch (n & 0x3) {
+ case 3:
+ obj[i++] = deque[idx++]; /* fallthrough */
+ case 2:
+ obj[i++] = deque[idx++]; /* fallthrough */
+ case 1:
+ obj[i++] = deque[idx++]; /* fallthrough */
+ }
+ } else {
+ for (i = 0; idx < size; i++, idx++)
+ obj[i] = deque[idx];
+ /* Start at the beginning */
+ for (idx = 0; i < n; i++, idx++)
+ obj[i] = deque[idx];
+ }
+}
+
+static __rte_always_inline void
+__rte_deque_dequeue_elems_128(struct rte_deque *d,
+ void *obj_table,
+ unsigned int n)
+{
+ unsigned int i;
+ const uint32_t size = d->size;
+ uint32_t idx = (d->tail & d->mask);
+ const rte_int128_t *deque = (const rte_int128_t *)&d[1];
+ rte_int128_t *obj = (rte_int128_t *)obj_table;
+ if (likely(idx + n <= size)) {
+ for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
+ memcpy((void *)(obj + i),
+ (const void *)(deque + idx), 32);
+ switch (n & 0x1) {
+ case 1:
+ memcpy((void *)(obj + i),
+ (const void *)(deque + idx), 16);
+ }
+ } else {
+ for (i = 0; idx < size; i++, idx++)
+ memcpy((void *)(obj + i),
+ (const void *)(deque + idx), 16);
+ /* Start at the beginning */
+ for (idx = 0; i < n; i++, idx++)
+ memcpy((void *)(obj + i),
+ (const void *)(deque + idx), 16);
+ }
+}
+
+static __rte_always_inline unsigned int
+__rte_deque_dequeue_at_tail(struct rte_deque *d,
+ void *obj_table,
+ unsigned int esize,
+ unsigned int n)
+{
+ /* 8B and 16B copies implemented individually because on some platforms
+ * there are 64 bit and 128 bit registers available for direct copying.
+ */
+ if (esize == 8)
+ __rte_deque_dequeue_elems_64(d, obj_table, n);
+ else if (esize == 16)
+ __rte_deque_dequeue_elems_128(d, obj_table, n);
+ else {
+ uint32_t idx, scale, nd_idx, nd_num, nd_size;
+
+ /* Normalize to uint32_t */
+ scale = esize / sizeof(uint32_t);
+ nd_num = n * scale;
+ idx = d->tail & d->mask;
+ nd_idx = idx * scale;
+ nd_size = d->size * scale;
+ __rte_deque_dequeue_elems_32(d, nd_size, nd_idx,
+ obj_table, nd_num);
+ }
+ d->tail = (d->tail + n) & d->mask;
+ return n;
+}
+
+static __rte_always_inline void
+__rte_deque_dequeue_elems_head_32(struct rte_deque *d,
+ const unsigned int mask,
+ uint32_t idx,
+ void *obj_table,
+ unsigned int n,
+ const unsigned int scale,
+ const unsigned int elem_size)
+{
+ unsigned int i;
+ const uint32_t *deque = (uint32_t *)&d[1];
+ uint32_t *obj = (uint32_t *)obj_table;
+
+ if (likely(idx >= n)) {
+ for (i = 0; i < n; idx -= scale, i += scale)
+ memcpy(&obj[i], &deque[idx], elem_size);
+ } else {
+ for (i = 0; (int32_t)idx >= 0; idx -= scale, i += scale)
+ memcpy(&obj[i], &deque[idx], elem_size);
+ /* Start at the ending */
+ idx = mask;
+ for (; i < n; idx -= scale, i += scale)
+ memcpy(&obj[i], &deque[idx], elem_size);
+ }
+}
+
+static __rte_always_inline void
+__rte_deque_dequeue_elems_head_64(struct rte_deque *d,
+ void *obj_table,
+ unsigned int n)
+{
+ unsigned int i;
+ uint32_t idx = (d->head & d->mask);
+ const uint64_t *deque = (uint64_t *)&d[1];
+ unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
+ if (likely((int32_t)(idx - n) >= 0)) {
+ for (i = 0; i < (n & ~0x3); i += 4, idx -= 4) {
+ obj[i] = deque[idx];
+ obj[i + 1] = deque[idx - 1];
+ obj[i + 2] = deque[idx - 2];
+ obj[i + 3] = deque[idx - 3];
+ }
+ switch (n & 0x3) {
+ case 3:
+ obj[i++] = deque[idx--]; /* fallthrough */
+ case 2:
+ obj[i++] = deque[idx--]; /* fallthrough */
+ case 1:
+ obj[i++] = deque[idx--]; /* fallthrough */
+ }
+ } else {
+ for (i = 0; (int32_t)idx >= 0; i++, idx--)
+ obj[i] = deque[idx];
+ /* Start at the ending */
+ for (idx = d->mask; i < n; i++, idx--)
+ obj[i] = deque[idx];
+ }
+}
+
+static __rte_always_inline void
+__rte_deque_dequeue_elems_head_128(struct rte_deque *d,
+ void *obj_table,
+ unsigned int n)
+{
+ unsigned int i;
+ uint32_t idx = (d->head & d->mask);
+ const rte_int128_t *deque = (rte_int128_t *)&d[1];
+ rte_int128_t *obj = (rte_int128_t *)obj_table;
+ if (likely((int32_t)(idx - n) >= 0)) {
+ for (i = 0; i < (n & ~0x1); i += 2, idx -= 2) {
+ obj[i] = deque[idx];
+ obj[i + 1] = deque[idx - 1];
+ }
+ switch (n & 0x1) {
+ case 1:
+ memcpy((void *)(obj + i),
+ (const void *)(deque + idx), 16);
+ }
+ } else {
+ for (i = 0; (int32_t)idx >= 0; i++, idx--)
+ memcpy((void *)(obj + i),
+ (const void *)(deque + idx), 16);
+ /* Start at the ending */
+ for (idx = d->mask; i < n; i++, idx--)
+ memcpy((void *)(obj + i),
+ (const void *)(deque + idx), 16);
+ }
+}
+
+static __rte_always_inline unsigned int
+__rte_deque_dequeue_at_head(struct rte_deque *d,
+ void *obj_table,
+ unsigned int esize,
+ unsigned int n)
+{
+ /* The head must point at an empty cell when dequeueing */
+ d->head--;
+
+ /* 8B and 16B copies implemented individually because on some platforms
+ * there are 64 bit and 128 bit registers available for direct copying.
+ */
+ if (esize == 8)
+ __rte_deque_dequeue_elems_head_64(d, obj_table, n);
+ else if (esize == 16)
+ __rte_deque_dequeue_elems_head_128(d, obj_table, n);
+ else {
+ uint32_t idx, scale, nd_idx, nd_num, nd_mask;
+
+ /* Normalize to uint32_t */
+ scale = esize / sizeof(uint32_t);
+ nd_num = n * scale;
+ idx = d->head & d->mask;
+ nd_idx = idx * scale;
+ nd_mask = d->mask * scale;
+ __rte_deque_dequeue_elems_head_32(d, nd_mask, nd_idx, obj_table,
+ nd_num, scale, esize);
+ }
+
+ /* The +1 is because the head needs to point at a
+ * empty memory location after the dequeueing operation.
+ */
+ d->head = (d->head - n + 1) & d->mask;
+ return n;
+}
+#endif /* _RTE_DEQUEU_PVT_H_ */
diff --git a/lib/deque/rte_deque_zc.h b/lib/deque/rte_deque_zc.h
new file mode 100644
index 0000000000..6d7167e158
--- /dev/null
+++ b/lib/deque/rte_deque_zc.h
@@ -0,0 +1,430 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Arm Limited
+ */
+#ifndef _RTE_DEQUE_ZC_H_
+#define _RTE_DEQUE_ZC_H_
+
+/**
+ * @file
+ * This file should not be included directly, include rte_deque.h instead.
+ *
+ * Deque Zero Copy APIs
+ * These APIs make it possible to split public enqueue/dequeue API
+ * into 3 parts:
+ * - enqueue/dequeue start
+ * - copy data to/from the deque
+ * - enqueue/dequeue finish
+ * These APIs provide the ability to avoid copying of the data to temporary area.
+ *
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Deque zero-copy information structure.
+ *
+ * This structure contains the pointers and length of the space
+ * reserved on the Deque storage.
+ */
+struct __rte_cache_aligned rte_deque_zc_data {
+ /* Pointer to the first space in the deque */
+ void *ptr1;
+ /* Pointer to the second space in the deque if there is wrap-around.
+ * It contains valid value only if wrap-around happens.
+ */
+ void *ptr2;
+ /* Number of elements in the first pointer. If this is equal to
+ * the number of elements requested, then ptr2 is NULL.
+ * Otherwise, subtracting n1 from number of elements requested
+ * will give the number of elements available at ptr2.
+ */
+ unsigned int n1;
+};
+
+static __rte_always_inline void
+__rte_deque_get_elem_addr(struct rte_deque *d, uint32_t pos,
+ uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void **dst2,
+ bool low_to_high)
+{
+ uint32_t idx, scale, nr_idx;
+ uint32_t *deque_ptr = (uint32_t *)&d[1];
+
+ /* Normalize to uint32_t */
+ scale = esize / sizeof(uint32_t);
+ idx = pos & d->mask;
+ nr_idx = idx * scale;
+
+ *dst1 = deque_ptr + nr_idx;
+ *n1 = num;
+
+ if (low_to_high) {
+ if (idx + num > d->size) {
+ *n1 = d->size - idx;
+ *dst2 = deque_ptr;
+ } else
+ *dst2 = NULL;
+ } else {
+ if ((int32_t)(idx - num) < 0) {
+ *n1 = idx + 1;
+ *dst2 = (void *)&deque_ptr[(-1 & d->mask) * scale];
+ } else
+ *dst2 = NULL;
+ }
+}
+
+/**
+ * Start to enqueue several objects on the deque.
+ * Note that no actual objects are put in the deque by this function,
+ * it just reserves space for the user on the deque.
+ * User has to copy objects into the deque using the returned pointers.
+ * User should call rte_deque_enqueue_zc_elem_finish to complete the
+ * enqueue operation.
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @param esize
+ * The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ * The number of objects to add in the deque.
+ * @param zcd
+ * Structure containing the pointers and length of the space
+ * reserved on the deque storage.
+ * @param free_space
+ * Returns the amount of space in the deque after the reservation operation
+ * has finished.
+ * @return
+ * The number of objects that can be enqueued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_head_enqueue_zc_bulk_elem_start(struct rte_deque *d, unsigned int esize,
+ unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *free_space)
+{
+
+ *free_space = __RTE_DEQUE_FREE_SPACE(d);
+ if (unlikely(*free_space < n))
+ return 0;
+ __rte_deque_get_elem_addr(d, d->head, esize, n, &zcd->ptr1,
+ &zcd->n1, &zcd->ptr2, true);
+
+ *free_space -= n;
+ return n;
+}
+
+/**
+ * Complete enqueuing several pointers to objects on the deque.
+ * Note that number of objects to enqueue should not exceed previous
+ * enqueue_start return value.
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @param n
+ * The number of pointers to objects to add to the deque.
+ */
+__rte_experimental
+static __rte_always_inline void
+rte_deque_head_enqueue_zc_elem_finish(struct rte_deque *d, unsigned int n)
+{
+ d->head = (d->head + n) & d->mask;
+}
+
+/**
+ * Start to enqueue several objects on the deque.
+ * Note that no actual objects are put in the queue by this function,
+ * it just reserves space for the user on the deque.
+ * User has to copy objects into the queue using the returned pointers.
+ * User should call rte_deque_enqueue_zc_elem_finish to complete the
+ * enqueue operation.
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @param esize
+ * The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ * The number of objects to add in the deque.
+ * @param zcd
+ * Structure containing the pointers and length of the space
+ * reserved on the deque storage.
+ * @param free_space
+ * Returns the amount of space in the deque after the reservation operation
+ * has finished.
+ * @return
+ * The number of objects that can be enqueued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_head_enqueue_zc_burst_elem_start(struct rte_deque *d, unsigned int esize,
+ unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *free_space)
+{
+ *free_space = __RTE_DEQUE_FREE_SPACE(d);
+ n = n > *free_space ? *free_space : n;
+ return rte_deque_head_enqueue_zc_bulk_elem_start(d, esize, n, zcd, free_space);
+}
+
+/**
+ * Start to enqueue several objects on the deque.
+ * Note that no actual objects are put in the deque by this function,
+ * it just reserves space for the user on the deque.
+ * User has to copy objects into the deque using the returned pointers.
+ * User should call rte_deque_enqueue_zc_elem_finish to complete the
+ * enqueue operation.
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @param esize
+ * The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ * The number of objects to add in the deque.
+ * @param zcd
+ * Structure containing the pointers and length of the space
+ * reserved on the deque storage.
+ * @param free_space
+ * Returns the amount of space in the deque after the reservation operation
+ * has finished.
+ * @return
+ * The number of objects that can be enqueued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_tail_enqueue_zc_bulk_elem_start(struct rte_deque *d, unsigned int esize,
+ unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *free_space)
+{
+ *free_space = __RTE_DEQUE_FREE_SPACE(d);
+ if (unlikely(*free_space < n))
+ return 0;
+ __rte_deque_get_elem_addr(d, d->tail - 1, esize, n, &zcd->ptr1,
+ &zcd->n1, &zcd->ptr2, false);
+
+ *free_space -= n;
+ return n;
+}
+
+/**
+ * Complete enqueuing several pointers to objects on the deque.
+ * Note that number of objects to enqueue should not exceed previous
+ * enqueue_start return value.
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @param n
+ * The number of pointers to objects to add to the deque.
+ */
+__rte_experimental
+static __rte_always_inline void
+rte_deque_tail_enqueue_zc_elem_finish(struct rte_deque *d, unsigned int n)
+{
+ d->tail = (d->tail - n) & d->mask;
+}
+
+/**
+ * Start to enqueue several objects on the deque.
+ * Note that no actual objects are put in the queue by this function,
+ * it just reserves space for the user on the deque.
+ * User has to copy objects into the queue using the returned pointers.
+ * User should call rte_deque_enqueue_zc_elem_finish to complete the
+ * enqueue operation.
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @param esize
+ * The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ * The number of objects to add in the deque.@param r
+ * @param zcd
+ * Structure containing the pointers and length of the space
+ * reserved on the deque storage.
+ * @param free_space
+ * Returns the amount of space in the deque after the reservation operation
+ * has finished.
+ * @return
+ * The number of objects that can be enqueued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_tail_enqueue_zc_burst_elem_start(struct rte_deque *d, unsigned int esize,
+ unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *free_space)
+{
+ *free_space = __RTE_DEQUE_FREE_SPACE(d);
+ n = n > *free_space ? *free_space : n;
+ return rte_deque_tail_enqueue_zc_bulk_elem_start(d, esize, n, zcd, free_space);
+}
+
+/**
+ * Start to dequeue several objects from the deque.
+ * Note that no actual objects are copied from the queue by this function.
+ * User has to copy objects from the queue using the returned pointers.
+ * User should call rte_deque_dequeue_zc_elem_finish to complete the
+ * dequeue operation.
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @param esize
+ * The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ * The number of objects to remove from the deque.
+ * @param zcd
+ * Structure containing the pointers and length of the space
+ * reserved on the deque storage.
+ * @param available
+ * Returns the number of remaining deque entries after the dequeue has
+ * finished.
+ * @return
+ * The number of objects that can be dequeued, either 0 or n.
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_tail_dequeue_zc_bulk_elem_start(struct rte_deque *d, unsigned int esize,
+ unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *available)
+{
+ *available = __RTE_DEQUE_COUNT(d);
+ if (unlikely(*available < n))
+ return 0;
+ __rte_deque_get_elem_addr(d, d->tail, esize, n, &zcd->ptr1,
+ &zcd->n1, &zcd->ptr2, true);
+
+ *available -= n;
+ return n;
+}
+
+/**
+ * Complete dequeuing several objects from the deque.
+ * Note that number of objects to dequeued should not exceed previous
+ * dequeue_start return value.
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @param n
+ * The number of objects to remove from the deque.
+ */
+__rte_experimental
+static __rte_always_inline void
+rte_deque_tail_dequeue_zc_elem_finish(struct rte_deque *d, unsigned int n)
+{
+ d->tail = (d->tail + n) & d->mask;
+}
+
+/**
+ * Start to dequeue several objects from the deque.
+ * Note that no actual objects are copied from the queue by this function.
+ * User has to copy objects from the queue using the returned pointers.
+ * User should call rte_deque_dequeue_zc_elem_finish to complete the
+ * dequeue operation.
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @param esize
+ * The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ * The number of objects to remove from the deque.
+ * @param zcd
+ * Structure containing the pointers and length of the space
+ * reserved on the deque storage.
+ * @param available
+ * Returns the number of remaining deque entries after the dequeue has
+ * finished.
+ * @return
+ * The number of objects that can be dequeued, either 0 or n.
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_tail_dequeue_zc_burst_elem_start(struct rte_deque *d, unsigned int esize,
+ unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *available)
+{
+ *available = __RTE_DEQUE_COUNT(d);
+ n = n > *available ? *available : n;
+ return rte_deque_tail_dequeue_zc_bulk_elem_start(d, esize, n, zcd, available);
+}
+
+/**
+ * Start to dequeue several objects from the deque.
+ * Note that no actual objects are copied from the queue by this function.
+ * User has to copy objects from the queue using the returned pointers.
+ * User should call rte_deque_dequeue_zc_elem_finish to complete the
+ * dequeue operation.
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @param esize
+ * The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ * The number of objects to remove from the deque.
+ * @param zcd
+ * Structure containing the pointers and length of the space
+ * reserved on the deque storage.
+ * @param available
+ * Returns the number of remaining deque entries after the dequeue has
+ * finished.
+ * @return
+ * The number of objects that can be dequeued, either 0 or n.
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_head_dequeue_zc_bulk_elem_start(struct rte_deque *d, unsigned int esize,
+ unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *available)
+{
+ *available = __RTE_DEQUE_COUNT(d);
+ if (unlikely(*available < n))
+ return 0;
+ __rte_deque_get_elem_addr(d, d->head - 1, esize, n, &zcd->ptr1,
+ &zcd->n1, &zcd->ptr2, false);
+
+ *available -= n;
+ return n;
+}
+
+/**
+ * Complete dequeuing several objects from the deque.
+ * Note that number of objects to dequeued should not exceed previous
+ * dequeue_start return value.
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @param n
+ * The number of objects to remove from the deque.
+ */
+__rte_experimental
+static __rte_always_inline void
+rte_deque_head_dequeue_zc_elem_finish(struct rte_deque *d, unsigned int n)
+{
+ d->head = (d->head - n) & d->mask;
+}
+
+/**
+ * Start to dequeue several objects from the deque.
+ * Note that no actual objects are copied from the queue by this function.
+ * User has to copy objects from the queue using the returned pointers.
+ * User should call rte_deque_dequeue_zc_elem_finish to complete the
+ * dequeue operation.
+ *
+ * @param d
+ * A pointer to the deque structure.
+ * @param esize
+ * The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ * The number of objects to remove from the deque.
+ * @param zcd
+ * Structure containing the pointers and length of the space
+ * reserved on the deque storage.
+ * @param available
+ * Returns the number of remaining deque entries after the dequeue has
+ * finished.
+ * @return
+ * The number of objects that can be dequeued, either 0 or n.
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_head_dequeue_zc_burst_elem_start(struct rte_deque *d, unsigned int esize,
+ unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *available)
+{
+ *available = __RTE_DEQUE_COUNT(d);
+ n = n > *available ? *available : n;
+ return rte_deque_head_dequeue_zc_bulk_elem_start(d, esize, n, zcd, available);
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_DEQUE_ZC_H_ */
diff --git a/lib/deque/version.map b/lib/deque/version.map
new file mode 100644
index 0000000000..103fd3b512
--- /dev/null
+++ b/lib/deque/version.map
@@ -0,0 +1,14 @@
+EXPERIMENTAL {
+ global:
+
+ # added in 24.07
+ rte_deque_log_type;
+ rte_deque_create;
+ rte_deque_dump;
+ rte_deque_free;
+ rte_deque_get_memsize_elem;
+ rte_deque_init;
+ rte_deque_reset;
+
+ local: *;
+};
diff --git a/lib/meson.build b/lib/meson.build
index 179a272932..82929b7a11 100644
--- a/lib/meson.build
+++ b/lib/meson.build
@@ -14,6 +14,7 @@ libraries = [
'argparse',
'telemetry', # basic info querying
'eal', # everything depends on eal
+ 'deque',
'ring',
'rcu', # rcu depends on ring
'mempool',
@@ -74,6 +75,7 @@ if is_ms_compiler
'kvargs',
'telemetry',
'eal',
+ 'dpdk',
'ring',
]
endif
--
2.25.1
next prev parent reply other threads:[~2024-04-24 13:42 UTC|newest]
Thread overview: 48+ messages / expand[flat|nested] mbox.gz Atom feed top
2023-08-21 6:04 [RFC] lib/st_ring: add single thread ring Honnappa Nagarahalli
2023-08-21 7:37 ` Morten Brørup
2023-08-22 5:47 ` Honnappa Nagarahalli
2023-08-24 8:05 ` Morten Brørup
2023-08-24 10:52 ` Mattias Rönnblom
2023-08-24 11:22 ` Morten Brørup
2023-08-26 23:34 ` Honnappa Nagarahalli
2023-08-21 21:14 ` Mattias Rönnblom
2023-08-22 5:43 ` Honnappa Nagarahalli
2023-08-22 8:04 ` Mattias Rönnblom
2023-08-22 16:28 ` Honnappa Nagarahalli
2023-09-04 10:13 ` Konstantin Ananyev
2023-09-04 18:10 ` Honnappa Nagarahalli
2023-09-05 8:19 ` Konstantin Ananyev
2024-04-01 1:37 ` [PATCH v1 0/2] deque: add multithread unsafe deque library Aditya Ambadipudi
2024-04-01 1:37 ` [PATCH v1 1/2] deque: add multi-thread unsafe double ended queue Aditya Ambadipudi
2024-04-06 9:35 ` Morten Brørup
2024-04-24 13:42 ` [PATCH v2 0/2] deque: add multithread unsafe deque library Aditya Ambadipudi
2024-04-24 13:42 ` Aditya Ambadipudi [this message]
2024-04-24 15:16 ` [PATCH v2 1/2] deque: add multi-thread unsafe double ended queue Morten Brørup
2024-04-24 17:21 ` Patrick Robb
2024-04-25 7:43 ` Ali Alnubani
2024-04-24 23:28 ` Mattias Rönnblom
2024-05-02 20:19 ` [PATCH v3 0/2] deque: add multithread unsafe deque library Aditya Ambadipudi
2024-05-02 20:19 ` [PATCH v3 1/2] deque: add multi-thread unsafe double ended queue Aditya Ambadipudi
2024-05-02 20:19 ` [PATCH v3 2/2] deque: add unit tests for the deque library Aditya Ambadipudi
2024-05-02 20:29 ` [PATCH v3 0/2] deque: add multithread unsafe " Aditya Ambadipudi
2024-06-27 15:03 ` Thomas Monjalon
2024-06-28 20:05 ` Wathsala Wathawana Vithanage
2024-04-24 13:42 ` [PATCH v2 2/2] deque: add unit tests for the " Aditya Ambadipudi
2024-04-01 1:37 ` [PATCH v1 " Aditya Ambadipudi
2024-04-01 14:05 ` [PATCH v1 0/2] deque: add multithread unsafe " Stephen Hemminger
2024-04-01 22:28 ` Aditya Ambadipudi
2024-04-02 0:05 ` Tyler Retzlaff
2024-04-02 0:47 ` Stephen Hemminger
2024-04-02 1:35 ` Honnappa Nagarahalli
2024-04-02 2:00 ` Stephen Hemminger
2024-04-02 2:14 ` Honnappa Nagarahalli
2024-04-02 2:53 ` Stephen Hemminger
[not found] ` <PAVPR08MB9185DC373708CBD16A38EFA8EF3E2@PAVPR08MB9185.eurprd08.prod.outlook.com>
2024-04-02 4:20 ` Tyler Retzlaff
2024-04-02 23:44 ` Stephen Hemminger
2024-04-03 0:12 ` Honnappa Nagarahalli
2024-04-03 23:52 ` Variable name issues with codespell Stephen Hemminger
2024-04-02 4:20 ` [PATCH v1 0/2] deque: add multithread unsafe deque library Tyler Retzlaff
2024-04-03 16:50 ` Honnappa Nagarahalli
2024-04-03 17:46 ` Tyler Retzlaff
2024-04-02 6:05 ` Mattias Rönnblom
2024-04-02 15:25 ` Stephen Hemminger
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20240424134233.1336370-2-aditya.ambadipudi@arm.com \
--to=aditya.ambadipudi@arm.com \
--cc=dev@dpdk.org \
--cc=dhruv.tripathi@arm.com \
--cc=hofors@lysator.liu.se \
--cc=honnappa.nagarahalli@arm.com \
--cc=jackmin@nvidia.com \
--cc=konstantin.ananyev@huawei.com \
--cc=matan@nvidia.com \
--cc=mb@smartsharesystems.com \
--cc=nd@arm.com \
--cc=roretzla@linux.microsoft.com \
--cc=stephen@networkplumber.org \
--cc=viacheslavo@nvidia.com \
--cc=wathsala.vithanage@arm.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).