From: "Mattias Rönnblom" <hofors@lysator.liu.se>
To: Konstantin Ananyev <konstantin.v.ananyev@yandex.ru>, dev@dpdk.org
Cc: honnappa.nagarahalli@arm.com, jerinj@marvell.com,
hemant.agrawal@nxp.com, bruce.richardson@intel.com,
drc@linux.vnet.ibm.com, ruifeng.wang@arm.com,
mb@smartsharesystems.com,
Konstantin Ananyev <konstantin.ananyev@huawei.com>
Subject: Re: [RFC 3/6] ring/soring: introduce Staged Ordered Ring
Date: Mon, 26 Aug 2024 21:04:51 +0200 [thread overview]
Message-ID: <6af430b0-3bec-46f5-ac53-f69f5ba1de1e@lysator.liu.se> (raw)
In-Reply-To: <20240815085339.1434-4-konstantin.v.ananyev@yandex.ru>
On 2024-08-15 10:53, Konstantin Ananyev wrote:
> From: Konstantin Ananyev <konstantin.ananyev@huawei.com>
>
> Staged-Ordered-Ring (SORING) provides a SW abstraction for 'ordered' queues
> with multiple processing 'stages'.
> It is based on conventional DPDK rte_ring, re-uses many of its concepts,
> and even substantial part of its code.
> It can be viewed as an 'extension' of rte_ring functionality.
> In particular, main SORING properties:
> - circular ring buffer with fixed size objects
> - producer, consumer plus multiple processing stages in the middle.
> - allows to split objects processing into multiple stages.
> - objects remain in the same ring while moving from one stage to the other,
> initial order is preserved, no extra copying needed.
> - preserves the ingress order of objects within the queue across multiple
> stages, i.e.:
> at the same stage multiple threads can process objects from the ring in
> any order, but for the next stage objects will always appear in the
> original order.
> - each stage (and producer/consumer) can be served by single and/or
> multiple threads.
> - number of stages, size and number of objects in the ring are
> configurable at ring initialization time.
>
> Data-path API provides four main operations:
> - enqueue/dequeue works in the same manner as for conventional rte_ring,
> all rte_ring synchronization types are supported.
> - acquire/release - for each stage there is an acquire (start) and
> release (finish) operation.
> after some objects are 'acquired' - given thread can safely assume that
> it has exclusive possession of these objects till 'release' for them is
> invoked.
> Note that right now user has to release exactly the same number of
> objects that was acquired before.
> After 'release', objects can be 'acquired' by next stage and/or dequeued
> by the consumer (in case of last stage).
>
> Expected use-case: applications that uses pipeline model
> (probably with multiple stages) for packet processing, when preserving
> incoming packet order is important. I.E.: IPsec processing, etc.
>
How does SORING related to Eventdev? Would it be feasible to reshape
this into a SW event device?
> Signed-off-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
> ---
> lib/ring/meson.build | 4 +-
> lib/ring/rte_soring.c | 144 ++++++++++++++
> lib/ring/rte_soring.h | 270 ++++++++++++++++++++++++++
> lib/ring/soring.c | 431 ++++++++++++++++++++++++++++++++++++++++++
> lib/ring/soring.h | 124 ++++++++++++
> lib/ring/version.map | 13 ++
> 6 files changed, 984 insertions(+), 2 deletions(-)
> create mode 100644 lib/ring/rte_soring.c
> create mode 100644 lib/ring/rte_soring.h
> create mode 100644 lib/ring/soring.c
> create mode 100644 lib/ring/soring.h
>
> diff --git a/lib/ring/meson.build b/lib/ring/meson.build
> index 7fca958ed7..21f2c12989 100644
> --- a/lib/ring/meson.build
> +++ b/lib/ring/meson.build
> @@ -1,8 +1,8 @@
> # SPDX-License-Identifier: BSD-3-Clause
> # Copyright(c) 2017 Intel Corporation
>
> -sources = files('rte_ring.c')
> -headers = files('rte_ring.h')
> +sources = files('rte_ring.c', 'rte_soring.c', 'soring.c')
> +headers = files('rte_ring.h', 'rte_soring.h')
> # most sub-headers are not for direct inclusion
> indirect_headers += files (
> 'rte_ring_core.h',
> diff --git a/lib/ring/rte_soring.c b/lib/ring/rte_soring.c
> new file mode 100644
> index 0000000000..17b1b73a42
> --- /dev/null
> +++ b/lib/ring/rte_soring.c
> @@ -0,0 +1,144 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2024 Huawei Technologies Co., Ltd
> + */
> +
> +#include "soring.h"
> +#include <rte_string_fns.h>
> +
> +RTE_LOG_REGISTER_DEFAULT(soring_logtype, INFO);
> +#define RTE_LOGTYPE_SORING soring_logtype
> +#define SORING_LOG(level, ...) \
> + RTE_LOG_LINE(level, SORING, "" __VA_ARGS__)
> +
> +static uint32_t
> +soring_calc_elem_num(uint32_t count)
> +{
> + return rte_align32pow2(count + 1);
> +}
> +
> +static int
> +soring_check_param(uint32_t esize, uint32_t stsize, uint32_t count,
> + uint32_t stages)
> +{
> + if (stages == 0) {
> + SORING_LOG(ERR, "invalid number of stages: %u", stages);
> + return -EINVAL;
> + }
> +
> + /* Check if element size is a multiple of 4B */
> + if (esize == 0 || esize % 4 != 0) {
> + SORING_LOG(ERR, "invalid element size: %u", esize);
> + return -EINVAL;
> + }
> +
> + /* Check if ret-code size is a multiple of 4B */
> + if (stsize % 4 != 0) {
> + SORING_LOG(ERR, "invalid retcode size: %u", stsize);
> + return -EINVAL;
> + }
> +
> + /* count must be a power of 2 */
> + if (rte_is_power_of_2(count) == 0 ||
> + (count > RTE_SORING_ELEM_MAX + 1)) {
> + SORING_LOG(ERR, "invalid number of elements: %u", count);
> + return -EINVAL;
> + }
> +
> + return 0;
> +}
> +
> +/*
> + * Calculate size offsets for SORING internal data layout.
> + */
> +static size_t
> +soring_get_szofs(uint32_t esize, uint32_t stsize, uint32_t count,
> + uint32_t stages, size_t *elst_ofs, size_t *state_ofs,
> + size_t *stage_ofs)
> +{
> + size_t sz;
> + const struct rte_soring * const r = NULL;
> +
> + sz = sizeof(r[0]) + (size_t)count * esize;
> + sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
> +
> + if (elst_ofs != NULL)
> + *elst_ofs = sz;
> +
> + sz = sz + (size_t)count * stsize;
> + sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
> +
> + if (state_ofs != NULL)
> + *state_ofs = sz;
> +
> + sz += sizeof(r->state[0]) * count;
> + sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
> +
> + if (stage_ofs != NULL)
> + *stage_ofs = sz;
> +
> + sz += sizeof(r->stage[0]) * stages;
> + sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
> +
> + return sz;
> +}
> +
> +
> +ssize_t
> +rte_soring_get_memsize(const struct rte_soring_param *prm)
> +{
> + int32_t rc;
> + uint32_t count;
> +
> + count = soring_calc_elem_num(prm->elems);
> + rc = soring_check_param(prm->esize, prm->stsize, count, prm->stages);
> + if (rc != 0)
> + return rc;
> +
> + return soring_get_szofs(prm->esize, prm->stsize, count, prm->stages,
> + NULL, NULL, NULL);
> +}
> +
> +int
> +rte_soring_init(struct rte_soring *r, const struct rte_soring_param *prm)
> +{
> + int32_t rc;
> + uint32_t n;
> + size_t elst_ofs, stage_ofs, state_ofs;
> +
> + if (r == NULL || prm == NULL)
> + return -EINVAL;
> +
> + n = soring_calc_elem_num(prm->elems);
> + rc = soring_check_param(prm->esize, prm->stsize, n, prm->stages);
> + if (rc != 0)
> + return rc;
> +
> + soring_get_szofs(prm->esize, prm->stsize, n, prm->stages, &elst_ofs,
> + &state_ofs, &stage_ofs);
> +
> + memset(r, 0, sizeof(*r));
> + rc = strlcpy(r->name, prm->name, sizeof(r->name));
> + if (rc < 0 || rc >= (int)sizeof(r->name))
> + return -ENAMETOOLONG;
> +
> + r->size = n;
> + r->mask = r->size - 1;
> + r->capacity = prm->elems;
> + r->esize = prm->esize;
> + r->stsize = prm->stsize;
> +
> + r->prod.ht.sync_type = prm->prod_synt;
> + r->cons.ht.sync_type = prm->cons_synt;
> +
> + r->state = (union soring_state *)((uintptr_t)r + state_ofs);
> + memset(r->state, 0, sizeof(r->state[0]) * r->size);
> +
> + r->stage = (struct soring_stage *)((uintptr_t)r + stage_ofs);
> + r->nb_stage = prm->stages;
> + memset(r->stage, 0, r->nb_stage * sizeof(r->stage[0]));
> +
> + if (r->stsize != 0)
> + r->elemst = (void *)((uintptr_t)r + elst_ofs);
> +
> + return 0;
> +}
> diff --git a/lib/ring/rte_soring.h b/lib/ring/rte_soring.h
> new file mode 100644
> index 0000000000..fb0e75b39a
> --- /dev/null
> +++ b/lib/ring/rte_soring.h
> @@ -0,0 +1,270 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2024 Huawei Technologies Co., Ltd
> + */
> +
> +#ifndef _RTE_SORING_H_
> +#define _RTE_SORING_H_
> +
> +/**
> + * @file
> + * This file contains definition of RTE soring (Staged Ordered Ring) public API.
> + * Brief description:
> + * enqueue/dequeue works the same as for conventional rte_ring:
> + * any rte_ring sync types can be used, etc.
> + * Plus there could be multiple 'stages'.
> + * For each stage there is an acquire (start) and release (finish) operation.
> + * after some elems are 'acquired' - user can safely assume that he has
> + * exclusive possession of these elems till 'release' for them is done.
> + * Note that right now user has to release exactly the same number of elems
> + * he acquired before.
> + * After 'release', elems can be 'acquired' by next stage and/or dequeued
> + * (in case of last stage).
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <rte_ring.h>
> +
> +/* upper 2 bits are used for status */
> +#define RTE_SORING_ST_BIT 30
> +
> +/* max possible number of elements in the soring */
> +#define RTE_SORING_ELEM_MAX (RTE_BIT32(RTE_SORING_ST_BIT) - 1)
> +
> +struct rte_soring_param {
> + /** expected name of the ring */
> + const char *name;
> + /** number of elemnts in the ring */
> + uint32_t elems;
> + /** size of elements in the ring, must be a multiple of 4 */
> + uint32_t esize;
> + /**
> + * size of retcode for each elem, must be a multiple of 4.
> + * This parameter defines a size of supplementary and optional
> + * array of ret-codes associated with each object in the soring.
> + * While element size is configurable (see @esize parameter above),
> + * so user can specify it big enough to hold both object and its
> + * ret-code together, for performance reasons it might be plausible
> + * to access them as separate arrays.
> + * Common usage scenario when such separation helps:
> + * enqueue() - writes to objects array
> + * acquire() - reads from objects array
> + * release() - writes to ret-code array
> + * dequeue() - reads both objects and ret-code array
> + */
> + uint32_t stsize;
> + /** number of stages in the ring */
> + uint32_t stages;
> + /** sync type for producer */
> + enum rte_ring_sync_type prod_synt;
> + /** sync type for consumer */
> + enum rte_ring_sync_type cons_synt;
> +};
> +
> +struct rte_soring;
> +
> +/**
> + * Calculate the memory size needed for a soring
> + *
> + * This function returns the number of bytes needed for a ring, given
> + * the expected parameters for it. This value is the sum of the size of
> + * the internal metadata and the size of the memory needed by the
> + * actual ring elements and theri rec-codes. The value is aligned to a cache
> + * line size.
> + *
> + * @param prm
> + * Pointer to the structure that contains soring creation paramers.
> + * @return
> + * - The memory size needed for the soring on success.
> + * - -EINVAL if provided paramer values are invalid.
> + */
> +__rte_experimental
> +ssize_t
> +rte_soring_get_memsize(const struct rte_soring_param *prm);
> +
> +/**
> + * Initialize a soring structure.
> + *
> + * Initialize a soring structure in memory pointed by "r".
> + * The size of the memory area must be large enough to store the soring
> + * internal structures plus the objects and ret-code tables.
> + * It is strongly advised to use rte_ring_get_memsize() to get the
> + * appropriate size.
> + *
> + * @param r
> + * Pointer to the soring structure.
> + * @param prm
> + * Pointer to the structure that contains soring creation paramers.
> + * @return
> + * - 0 on success, or a negative error code.
> + */
> +__rte_experimental
> +int
> +rte_soring_init(struct rte_soring *r, const struct rte_soring_param *prm);
> +
> +/**
> + * Return the total number of filled entries in a ring.
> + *
> + * @param r
> + * A pointer to the soring structure.
> + * @return
> + * The number of entries in the ring.
> + */
> +__rte_experimental
> +unsigned int
> +rte_soring_count(const struct rte_soring *r);
> +
> +/**
> + * Enqueue several objects on the ring.
> + *
> + * @param r
> + * A pointer to the soring structure.
> + * @param obj_table
> + * A pointer to an array of objects to enqueue.
> + * Size of objects to enqueue must be the same value as @esize parameter
> + * used while creating the ring. Otherwise the results are undefined.
> + * @param objst
> + * A pointer to an array of status values for each object to enqueue.
> + * Note that if user not using object status values, then this parameter
> + * can be NULL.
> + * Size of elements in this array must be the same value as @stsize parameter
> + * used while creating the ring. If user created the soring with
> + * @stsize value equals zero, then @objst parameter should be NULL.
> + * Otherwise the results are undefined.
> + * @param n
> + * The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + * Behavior type, one of:
> + * - RTE_RING_QUEUE_FIXED: enqueue either exactly @n objects or none.
> + * - RTE_RING_QUEUE_VARIABLE: enqueue up to @n objects.
> + * @param free_space
> + * if non-NULL, returns the amount of space in the ring after the
> + * enqueue operation has finished.
> + * @return
> + * - Actual number of objects enqueued.
> + */
> +__rte_experimental
> +uint32_t
> +rte_soring_enqueue(struct rte_soring *r, const void *obj_table,
> + const void *objst, uint32_t n, enum rte_ring_queue_behavior behavior,
> + uint32_t *free_space);
> +
> +/**
> + * Dequeue several objects from the ring.
> + *
> + * @param r
> + * A pointer to the soring structure.
> + * @param obj_table
> + * A pointer to an array of objects to dequeue.
> + * Size of objects to enqueue must be the same value as @esize parameter
> + * used while creating the ring. Otherwise the results are undefined.
> + * @param objst
> + * A pointer to array of status values for each object to dequeue.
> + * Note that if user not using object status values, then this parameter
> + * can be NULL.
> + * Size of elements in this array must be the same value as @stsize parameter
> + * used while creating the ring. If user created the soring with
> + * @stsize value equals zero, then @objst parameter should be NULL.
> + * Otherwise the results are undefined.
> + * @param num
> + * The number of objects to dequeue from the ring into the obj_table.
> + * @param behavior
> + * Behavior type, one of:
> + * - RTE_RING_QUEUE_FIXED: dequeue either exactly @n objects or none.
> + * - RTE_RING_QUEUE_VARIABLE: dequeue up to @n objects.
> + * @param available
> + * If non-NULL, returns the number of remaining ring entries after the
> + * dequeue has finished.
> + * @return
> + * - Actual number of objects dequeued.
> + */
> +__rte_experimental
> +uint32_t
> +rte_soring_dequeue(struct rte_soring *r, void *obj_table, void *objst,
> + uint32_t num, enum rte_ring_queue_behavior behavior,
> + uint32_t *available);
> +
> +/**
> + * Acquire several objects from the ring for given stage.
> + *
> + * @param r
> + * A pointer to the soring structure.
> + * @param objs
> + * A pointer to an array of objects to acquire.
> + * Size of objects must be the same value as @esize parameter
> + * used while creating the ring. Otherwise the results are undefined.
> + * @param objst
> + * A pointer to an array of status values for each for each acquired object.
> + * Note that if user not using object status values, then this parameter
> + * can be NULL.
> + * Size of elements in this array must be the same value as @stsize parameter
> + * used while creating the ring. If user created the soring with
> + * @stsize value equals zero, then @objst parameter should be NULL.
> + * Otherwise the results are undefined.
> + * @param stage
> + * Stage to acquire objects for.
> + * @param num
> + * The number of objects to acquire.
> + * @param behavior
> + * Behavior type, one of:
> + * - RTE_RING_QUEUE_FIXED: acquire either exactly @n objects or none.
> + * - RTE_RING_QUEUE_VARIABLE: acquire up to @n objects.
> + * @param ftoken
> + * Pointer to the opaque 'token' value used by release() op.
> + * User has to store this value somewhere, and later provide to the
> + * release().
> + * @param available
> + * If non-NULL, returns the number of remaining ring entries for given stage
> + * after the acquire has finished.
> + * @return
> + * - Actual number of objects acquired.
> + */
> +__rte_experimental
> +uint32_t
> +rte_soring_acquire(struct rte_soring *r, void *objs, void *objst,
> + uint32_t stage, uint32_t num, enum rte_ring_queue_behavior behavior,
> + uint32_t *ftoken, uint32_t *available);
> +
> +/**
> + * Release several objects for given stage back to the ring.
> + * Note that it means these objects become avaialble for next stage or
> + * dequeue.
> + *
> + * @param r
> + * A pointer to the soring structure.
> + * @param objs
> + * A pointer to an array of objects to relase.
> + * Note that unless user needs to overwrite ring objects this parameter
> + * can be NULL.
> + * Size of objects must be the same value as @esize parameter
> + * used while creating the ring. Otherwise the results are undefined.
> + * @param objst
> + * A pointer to an array of status values for each object to release.
> + * Note that if user not using object status values, then this parameter
> + * can be NULL.
> + * Size of elements in this array must be the same value as @stsize parameter
> + * used while creating the ring. If user created the soring with
> + * @stsize value equals zero, then objst parameter should be NULL.
> + * Otherwise the results are undefined.
> + * @param stage
> + * Current stage.
> + * @param n
> + * The number of objects to release.
> + * Has to be the same value as returned by acquire() op.
> + * @param ftoken
> + * Opaque 'token' value obtained from acquire() op.
> + * @return
> + * - None.
> + */
> +__rte_experimental
> +void
> +rte_soring_release(struct rte_soring *r, const void *objs,
> + const void *objst, uint32_t stage, uint32_t n, uint32_t ftoken);
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _RTE_SORING_H_ */
> diff --git a/lib/ring/soring.c b/lib/ring/soring.c
> new file mode 100644
> index 0000000000..929bde9697
> --- /dev/null
> +++ b/lib/ring/soring.c
> @@ -0,0 +1,431 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2024 Huawei Technologies Co., Ltd
> + */
> +
> +/**
> + * @file
> + * This file contains implementation of SORING 'datapath' functions.
> + * Brief description:
> + * ==================
> + * enqueue/dequeue works the same as for conventional rte_ring:
> + * any rte_ring sync types can be used, etc.
> + * Plus there could be multiple 'stages'.
> + * For each stage there is an acquire (start) and release (finish) operation.
> + * After some elems are 'acquired' - user can safely assume that he has
> + * exclusive possession of these elems till 'release' for them is done.
> + * Note that right now user has to release exactly the same number of elems
> + * he acquired before.
> + * After 'release', elems can be 'acquired' by next stage and/or dequeued
> + * (in case of last stage).
> + * Internal structure:
> + * ===================
> + * In addition to 'normal' ring of elems, it also has a ring of states of the
> + * same size. Each state[] corresponds to exactly one elem[].
> + * state[] will be used by acquire/release/dequeue functions to store internal
> + * information and should not be accessed by the user directly.
> + * How it works:
> + * =============
> + * 'acquire()' just moves stage's head (same as rte_ring move_head does),
> + * plus it saves in state[stage.cur_head] information about how many elems
> + * were acquired, current head position and special flag value to indicate
> + * that elems are acquired (RTE_SORING_ST_START).
> + * Note that 'acquire()' returns to the user a special 'ftoken' that user has
> + * to provide for 'release()' (in fact it is just a position for current head).
> + * 'release()' extracts old head value from provided ftoken and checks that
> + * corresponding 'state[]' contains expected values(mostly for sanity
> + * purposes).
> + * * Then it marks this state[] with 'RTE_SORING_ST_FINISH' flag to indicate
> + * that given subset of objects was released.
> + * After that, it checks does old head value equals to current tail value?
> + * If yes, then it performs 'finalize()' operation, otherwise 'release()'
> + * just returns (without spinning on stage tail value).
> + * As updated state[] is shared by all threads, some other thread can do
> + * 'finalize()' for given stage.
> + * That allows 'release()' to avoid excessive waits on the tail value.
> + * Main purpose of 'finalize()' operation is to walk through 'state[]'
> + * from current stage tail up to its head, check state[] and move stage tail
> + * through elements that already are in RTE_SORING_ST_FINISH state.
> + * Along with that, corresponding state[] values are reset to zero.
> + * Note that 'finalize()' for given stage can be done from multiple places:
> + * 'release()' for that stage or from 'acquire()' for next stage
> + * even from consumer's 'dequeue()' - in case given stage is the last one.
> + * So 'finalize()' has to be MT-safe and inside it we have to
> + * guarantee that only one thread will update state[] and stage's tail values.
> + */
> +
> +#include "soring.h"
> +
> +/*
> + * Inline functions (fastpath) start here.
> + */
> +static __rte_always_inline uint32_t
> +__rte_soring_stage_finalize(struct soring_stage_headtail *sht,
> + union soring_state *rstate, uint32_t rmask, uint32_t maxn)
> +{
> + int32_t rc;
> + uint32_t head, i, idx, k, n, tail;
> + union soring_stage_tail nt, ot;
> + union soring_state st;
> +
> + /* try to grab exclusive right to update tail value */
> + ot.raw = rte_atomic_load_explicit(&sht->tail.raw,
> + rte_memory_order_acquire);
> +
> + /* other thread already finalizing it for us */
> + if (ot.sync != 0)
> + return 0;
> +
> + nt.pos = ot.pos;
> + nt.sync = 1;
> + rc = rte_atomic_compare_exchange_strong_explicit(&sht->tail.raw,
> + &ot.raw, nt.raw, rte_memory_order_acquire,
> + rte_memory_order_relaxed);
> +
> + /* other thread won the race */
> + if (rc == 0)
> + return 0;
> +
> + /*
> + * start with current tail and walk through states that are
> + * already finished.
> + */
> +
> + head = rte_atomic_load_explicit(&sht->head, rte_memory_order_relaxed);
> + n = RTE_MIN(head - ot.pos, maxn);
> + for (i = 0, tail = ot.pos; i < n; i += k, tail += k) {
> +
> + idx = tail & rmask;
> + st.raw = rte_atomic_load_explicit(&rstate[idx].raw,
> + rte_memory_order_relaxed);
> + if ((st.stnum & RTE_SORING_ST_MASK) != RTE_SORING_ST_FINISH ||
> + st.ftoken != tail)
> + break;
> +
> + k = st.stnum & ~RTE_SORING_ST_MASK;
> + rte_atomic_store_explicit(&rstate[idx].raw, 0,
> + rte_memory_order_relaxed);
> + }
> +
> +
> + /* release exclusive right to update along with new tail value */
> + ot.pos = tail;
> + rte_atomic_store_explicit(&sht->tail.raw, ot.raw,
> + rte_memory_order_release);
> +
> + return i;
> +}
> +
> +static __rte_always_inline uint32_t
> +__rte_soring_move_prod_head(struct rte_soring *r, uint32_t num,
> + enum rte_ring_queue_behavior behavior, enum rte_ring_sync_type st,
> + uint32_t *head, uint32_t *next, uint32_t *free)
> +{
> + uint32_t n;
> +
> + switch (st) {
> + case RTE_RING_SYNC_ST:
> + case RTE_RING_SYNC_MT:
> + n = __rte_ring_headtail_move_head(&r->prod.ht, &r->cons.ht,
> + r->capacity, st, num, behavior, head, next, free);
> + break;
> + case RTE_RING_SYNC_MT_RTS:
> + n = __rte_ring_rts_move_head(&r->prod.rts, &r->cons.ht,
> + r->capacity, num, behavior, head, free);
> + *next = *head + n;
> + break;
> + case RTE_RING_SYNC_MT_HTS:
> + n = __rte_ring_hts_move_head(&r->prod.hts, &r->cons.ht,
> + r->capacity, num, behavior, head, free);
> + *next = *head + n;
> + break;
> + default:
> + /* unsupported mode, shouldn't be here */
> + RTE_ASSERT(0);
> + *free = 0;
> + n = 0;
> + }
> +
> + return n;
> +}
> +
> +static __rte_always_inline uint32_t
> +__rte_soring_move_cons_head(struct rte_soring *r, uint32_t stage, uint32_t num,
> + enum rte_ring_queue_behavior behavior, enum rte_ring_sync_type st,
> + uint32_t *head, uint32_t *next, uint32_t *avail)
> +{
> + uint32_t n;
> +
> + switch (st) {
> + case RTE_RING_SYNC_ST:
> + case RTE_RING_SYNC_MT:
> + n = __rte_ring_headtail_move_head(&r->cons.ht,
> + &r->stage[stage].ht, 0, st, num, behavior,
> + head, next, avail);
> + break;
> + case RTE_RING_SYNC_MT_RTS:
> + n = __rte_ring_rts_move_head(&r->cons.rts, &r->stage[stage].ht,
> + 0, num, behavior, head, avail);
> + *next = *head + n;
> + break;
> + case RTE_RING_SYNC_MT_HTS:
> + n = __rte_ring_hts_move_head(&r->cons.hts, &r->stage[stage].ht,
> + 0, num, behavior, head, avail);
> + *next = *head + n;
> + break;
> + default:
> + /* unsupported mode, shouldn't be here */
> + RTE_ASSERT(0);
> + *avail = 0;
> + n = 0;
> + }
> +
> + return n;
> +}
> +
> +static __rte_always_inline void
> +__rte_soring_update_tail(struct __rte_ring_headtail *rht,
> + enum rte_ring_sync_type st, uint32_t head, uint32_t next, uint32_t enq)
> +{
> + uint32_t n;
> +
> + switch (st) {
> + case RTE_RING_SYNC_ST:
> + case RTE_RING_SYNC_MT:
> + __rte_ring_update_tail(&rht->ht, head, next, st, enq);
> + break;
> + case RTE_RING_SYNC_MT_RTS:
> + __rte_ring_rts_update_tail(&rht->rts);
> + break;
> + case RTE_RING_SYNC_MT_HTS:
> + n = next - head;
> + __rte_ring_hts_update_tail(&rht->hts, head, n, enq);
> + break;
> + default:
> + /* unsupported mode, shouldn't be here */
> + RTE_ASSERT(0);
> + }
> +}
> +
> +static __rte_always_inline uint32_t
> +__rte_soring_stage_move_head(struct soring_stage_headtail *d,
> + const struct rte_ring_headtail *s, uint32_t capacity, uint32_t num,
> + enum rte_ring_queue_behavior behavior,
> + uint32_t *old_head, uint32_t *new_head, uint32_t *avail)
> +{
> + uint32_t n, tail;
> +
> + *old_head = rte_atomic_load_explicit(&d->head,
> + rte_memory_order_acquire);
> +
> + do {
> + n = num;
> + tail = rte_atomic_load_explicit(&s->tail,
> + rte_memory_order_relaxed);
> + *avail = capacity + tail - *old_head;
> + if (n > *avail)
> + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *avail;
> + if (n == 0)
> + return 0;
> + *new_head = *old_head + n;
> + } while (rte_atomic_compare_exchange_strong_explicit(&d->head,
> + old_head, *new_head, rte_memory_order_acquire,
> + rte_memory_order_acquire) == 0);
> +
> + return n;
> +}
> +
> +/*
> + * Public functions (data-path) start here.
> + */
> +
> +unsigned int
> +rte_soring_count(const struct rte_soring *r)
> +{
> + uint32_t prod_tail = r->prod.ht.tail;
> + uint32_t cons_tail = r->cons.ht.tail;
> + uint32_t count = (prod_tail - cons_tail) & r->mask;
> + return (count > r->capacity) ? r->capacity : count;
> +}
> +
> +uint32_t
> +rte_soring_enqueue(struct rte_soring *r, const void *obj_table,
> + const void *objst, uint32_t n, enum rte_ring_queue_behavior behavior,
> + uint32_t *free_space)
> +{
> + enum rte_ring_sync_type st;
> + uint32_t nb_free, prod_head, prod_next;
> +
> + RTE_ASSERT(r != NULL && r->nb_stage > 0);
> + RTE_ASSERT(objst == NULL || r->elemst != NULL);
> +
> + st = r->prod.ht.sync_type;
> +
> + n = __rte_soring_move_prod_head(r, n, behavior, st,
> + &prod_head, &prod_next, &nb_free);
> + if (n != 0) {
> + __rte_ring_do_enqueue_elems(&r[1], obj_table, r->size,
> + prod_head & r->mask, r->esize, n);
> + if (objst != NULL)
> + __rte_ring_do_enqueue_elems(r->elemst, objst, r->size,
> + prod_head & r->mask, r->stsize, n);
> + __rte_soring_update_tail(&r->prod, st, prod_head, prod_next, 1);
> + }
> +
> + if (free_space != NULL)
> + *free_space = nb_free - n;
> + return n;
> +}
> +
> +uint32_t
> +rte_soring_dequeue(struct rte_soring *r, void *obj_table, void *objst,
> + uint32_t num, enum rte_ring_queue_behavior behavior,
> + uint32_t *available)
> +{
> + enum rte_ring_sync_type st;
> + uint32_t entries, cons_head, cons_next, n, ns, reqn;
> +
> + RTE_ASSERT(r != NULL && r->nb_stage > 0);
> + RTE_ASSERT(objst == NULL || r->elemst != NULL);
> +
> + ns = r->nb_stage - 1;
> + st = r->cons.ht.sync_type;
> +
> + /* try to grab exactly @num elems first */
> + n = __rte_soring_move_cons_head(r, ns, num, RTE_RING_QUEUE_FIXED, st,
> + &cons_head, &cons_next, &entries);
> + if (n == 0) {
> + /* try to finalize some elems from previous stage */
> + n = __rte_soring_stage_finalize(&r->stage[ns].sht,
> + r->state, r->mask, 2 * num);
> + entries += n;
> +
> + /* repeat attempt to grab elems */
> + reqn = (behavior == RTE_RING_QUEUE_FIXED) ? num : 0;
> + if (entries >= reqn)
> + n = __rte_soring_move_cons_head(r, ns, num, behavior,
> + st, &cons_head, &cons_next, &entries);
> + else
> + n = 0;
> + }
> +
> + /* we have some elems to consume */
> + if (n != 0) {
> + __rte_ring_do_dequeue_elems(obj_table, &r[1], r->size,
> + cons_head & r->mask, r->esize, n);
> + if (objst != NULL)
> + __rte_ring_do_dequeue_elems(objst, r->elemst, r->size,
> + cons_head & r->mask, r->stsize, n);
> + __rte_soring_update_tail(&r->cons, st, cons_head, cons_next, 0);
> + }
> +
> + if (available != NULL)
> + *available = entries - n;
> + return n;
> +}
> +
> +uint32_t
> +rte_soring_acquire(struct rte_soring *r, void *objs, void *objst,
> + uint32_t stage, uint32_t num, enum rte_ring_queue_behavior behavior,
> + uint32_t *ftoken, uint32_t *available)
> +{
> + uint32_t avail, head, idx, n, next, reqn;
> + union soring_state st;
> + struct soring_stage *pstg;
> + struct soring_stage_headtail *cons;
> +
> + RTE_ASSERT(r != NULL && stage < r->nb_stage);
> + RTE_ASSERT(objst == NULL || r->elemst != NULL);
> +
> + cons = &r->stage[stage].sht;
> +
> + if (stage == 0)
> + n = __rte_soring_stage_move_head(cons, &r->prod.ht, 0, num,
> + behavior, &head, &next, &avail);
> + else {
> + pstg = r->stage + stage - 1;
> +
> + /* try to grab exactly @num elems */
> + n = __rte_soring_stage_move_head(cons, &pstg->ht, 0, num,
> + RTE_RING_QUEUE_FIXED, &head, &next, &avail);
> + if (n == 0) {
> + /* try to finalize some elems from previous stage */
> + n = __rte_soring_stage_finalize(&pstg->sht, r->state,
> + r->mask, 2 * num);
> + avail += n;
> +
> + /* repeat attempt to grab elems */
> + reqn = (behavior == RTE_RING_QUEUE_FIXED) ? num : 0;
> + if (avail >= reqn)
> + n = __rte_soring_stage_move_head(cons,
> + &pstg->ht, 0, num, behavior, &head,
> + &next, &avail);
> + else
> + n = 0;
> + }
> + }
> +
> + if (n != 0) {
> +
> + idx = head & r->mask;
> +
> + /* copy elems that are ready for given stage */
> + __rte_ring_do_dequeue_elems(objs, &r[1], r->size, idx,
> + r->esize, n);
> + if (objst != NULL)
> + __rte_ring_do_dequeue_elems(objst, r->elemst,
> + r->size, idx, r->stsize, n);
> +
> + /* update status ring */
> + st.ftoken = head;
> + st.stnum = (RTE_SORING_ST_START | n);
> +
> + rte_atomic_store_explicit(&r->state[idx].raw, st.raw,
> + rte_memory_order_relaxed);
> + *ftoken = head;
> + }
> +
> + if (available != NULL)
> + *available = avail - n;
> + return n;
> +}
> +
> +void
> +rte_soring_release(struct rte_soring *r, const void *objs,
> + const void *objst, uint32_t stage, uint32_t n, uint32_t ftoken)
> +{
> + uint32_t idx, tail;
> + struct soring_stage *stg;
> + union soring_state st;
> +
> + RTE_ASSERT(r != NULL && stage < r->nb_stage);
> + RTE_ASSERT(objst == NULL || r->elemst != NULL);
> +
> + stg = r->stage + stage;
> + tail = rte_atomic_load_explicit(&stg->sht.tail.pos,
> + rte_memory_order_relaxed);
> +
> + idx = ftoken & r->mask;
> + st.raw = rte_atomic_load_explicit(&r->state[idx].raw,
> + rte_memory_order_acquire);
> +
> + /* check state ring contents */
> + RTE_VERIFY(st.stnum == (RTE_SORING_ST_START | n) &&
> + st.ftoken == ftoken);
> +
> + /* update contents of the ring, if necessary */
> + if (objs != NULL)
> + __rte_ring_do_enqueue_elems(&r[1], objs, r->size, idx,
> + r->esize, n);
> + if (objst != NULL)
> + __rte_ring_do_enqueue_elems(r->elemst, objst, r->size, idx,
> + r->stsize, n);
> +
> + /* set state to FINISH, make sure it is not reordered */
> + st.stnum = RTE_SORING_ST_FINISH | n;
> + rte_atomic_store_explicit(&r->state[idx].raw, st.raw,
> + rte_memory_order_release);
> +
> + if (tail == ftoken)
> + __rte_soring_stage_finalize(&stg->sht, r->state, r->mask,
> + r->capacity);
> +}
> diff --git a/lib/ring/soring.h b/lib/ring/soring.h
> new file mode 100644
> index 0000000000..3a3f6efa76
> --- /dev/null
> +++ b/lib/ring/soring.h
> @@ -0,0 +1,124 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2024 Huawei Technologies Co., Ltd
> + */
> +
> +#ifndef _SORING_H_
> +#define _SORING_H_
> +
> +/**
> + * @file
> + * This file contains internal strctures of RTE soring: Staged Ordered Ring.
> + * Sort of extension of conventional RTE ring.
> + * Internal structure:
> + * In addition to 'normal' ring of elems, it also has a ring of states of the
> + * same size. Each state[] corresponds to exactly one elem[].
> + * state[] will be used by acquire/release/dequeue functions to store internal
> + * information and should not be accessed by the user directly.
> + * For actual implementation details, please refer to soring.c
> + */
> +
> +#include <rte_soring.h>
> +
> +union soring_state {
> + alignas(sizeof(uint64_t)) RTE_ATOMIC(uint64_t) raw;
> + struct {
> + RTE_ATOMIC(uint32_t) ftoken;
> + RTE_ATOMIC(uint32_t) stnum;
> + };
> +};
> +
> +/* upper 2 bits are used for status */
> +#define RTE_SORING_ST_START RTE_SHIFT_VAL32(1, RTE_SORING_ST_BIT)
> +#define RTE_SORING_ST_FINISH RTE_SHIFT_VAL32(2, RTE_SORING_ST_BIT)
> +
> +#define RTE_SORING_ST_MASK \
> + RTE_GENMASK32(sizeof(uint32_t) * CHAR_BIT - 1, RTE_SORING_ST_BIT)
> +
> +/**
> + * SORING re-uses rte_ring internal structures and implementation
> + * for enqueue/dequeue operations.
> + */
> +struct __rte_ring_headtail {
> +
> + union __rte_cache_aligned {
> + struct rte_ring_headtail ht;
> + struct rte_ring_hts_headtail hts;
> + struct rte_ring_rts_headtail rts;
> + };
> +
> + RTE_CACHE_GUARD;
> +};
> +
> +union soring_stage_tail {
> + /** raw 8B value to read/write *sync* and *pos* as one atomic op */
> + alignas(sizeof(uint64_t)) RTE_ATOMIC(uint64_t) raw;
> + struct {
> + RTE_ATOMIC(uint32_t) sync;
> + RTE_ATOMIC(uint32_t) pos;
> + };
> +};
> +
> +struct soring_stage_headtail {
> + volatile union soring_stage_tail tail;
> + enum rte_ring_sync_type unused; /**< unused */
> + volatile RTE_ATOMIC(uint32_t) head;
> +};
> +
> +/**
> + * SORING stage head_tail structure is 'compatible' with rte_ring ones.
> + * rte_ring internal functions for moving producer/consumer head
> + * can work transparently with stage head_tail and visa-versa
> + * (no modifications required).
> + */
> +struct soring_stage {
> +
> + union __rte_cache_aligned {
> + struct rte_ring_headtail ht;
> + struct soring_stage_headtail sht;
> + };
> +
> + RTE_CACHE_GUARD;
> +};
> +
> +/**
> + * RTE soring internal structure.
> + * As with rte_ring actual elements array supposed to be located direclty
> + * after the rte_soring structure.
> + */
> +struct __rte_cache_aligned rte_soring {
> + uint32_t size; /**< Size of ring. */
> + uint32_t mask; /**< Mask (size-1) of ring. */
> + uint32_t capacity; /**< Usable size of ring */
> + uint32_t esize;
> + /**< size of elements in the ring, must be a multiple of 4*/
> + uint32_t stsize;
> + /**< size of status value for each elem, must be a multiple of 4 */
> +
> + /** Ring stages */
> + struct soring_stage *stage;
> + uint32_t nb_stage;
> +
> + /** Ring of states (one per element) */
> + union soring_state *state;
> +
> + /** Pointer to the buffer where status values for each elements
> + * are stored. This is supplementary and optional information that
> + * user can attach to each element of the ring.
> + * While it is possible to incorporate this information inside
> + * user-defined element, in many cases it is plausible to maintain it
> + * as a separate array (mainly for performance reasons).
> + */
> + void *elemst;
> +
> + RTE_CACHE_GUARD;
> +
> + /** Ring producer status. */
> + struct __rte_ring_headtail prod;
> +
> + /** Ring consumer status. */
> + struct __rte_ring_headtail cons;
> +
> + char name[RTE_RING_NAMESIZE]; /**< Name of the ring. */
> +};
> +
> +#endif /* _SORING_H_ */
> diff --git a/lib/ring/version.map b/lib/ring/version.map
> index 8da094a69a..a1f95a500f 100644
> --- a/lib/ring/version.map
> +++ b/lib/ring/version.map
> @@ -14,3 +14,16 @@ DPDK_25 {
>
> local: *;
> };
> +
> +EXPERIMENTAL {
> + global:
> +
> + # added in 24.11
> + rte_soring_acquire;
> + rte_soring_count;
> + rte_soring_dequeue;
> + rte_soring_enqueue;
> + rte_soring_get_memsize;
> + rte_soring_init;
> + rte_soring_release;
> +};
next prev parent reply other threads:[~2024-08-26 19:05 UTC|newest]
Thread overview: 85+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-08-15 8:53 [RFC 0/6] Stage-Ordered API and other extensions for ring library Konstantin Ananyev
2024-08-15 8:53 ` [RFC 1/6] ring: common functions for 'move head' ops Konstantin Ananyev
2024-08-15 8:53 ` [RFC 2/6] ring: make copying functions generic Konstantin Ananyev
2024-08-15 8:53 ` [RFC 3/6] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-08-15 11:11 ` Morten Brørup
2024-08-15 12:41 ` Konstantin Ananyev
2024-08-15 13:22 ` Morten Brørup
2024-08-26 19:04 ` Mattias Rönnblom [this message]
2024-09-03 13:55 ` Konstantin Ananyev
2024-08-15 8:53 ` [RFC 4/6] app/test: add unit tests for soring API Konstantin Ananyev
2024-08-15 8:53 ` [RFC 5/6] examples/l3fwd: make ACL work in pipeline and eventdev modes Konstantin Ananyev
2024-08-15 8:53 ` [RFC 6/6] ring: minimize reads of the counterpart cache-line Konstantin Ananyev
2024-09-06 13:13 ` [RFCv2 0/6] Stage-Ordered API and other extensions for ring library Konstantin Ananyev
2024-09-06 13:13 ` [RFCv2 1/6] ring: common functions for 'move head' ops Konstantin Ananyev
2024-09-06 13:13 ` [RFCv2 2/6] ring: make copying functions generic Konstantin Ananyev
2024-09-06 13:13 ` [RFCv2 3/6] ring: make dump function more verbose Konstantin Ananyev
2024-09-06 13:13 ` [RFCv2 4/6] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-09-06 13:13 ` [RFCv2 5/6] app/test: add unit tests for soring API Konstantin Ananyev
2024-09-06 13:13 ` [RFCv2 6/6] examples/l3fwd: make ACL work in pipeline and eventdev modes Konstantin Ananyev
2024-09-16 12:37 ` [PATCH v3 0/5] Stage-Ordered API and other extensions for ring library Konstantin Ananyev
2024-09-16 12:37 ` [PATCH v3 1/5] ring: common functions for 'move head' ops Konstantin Ananyev
2024-09-16 12:37 ` [PATCH v3 2/5] ring: make copying functions generic Konstantin Ananyev
2024-09-16 12:37 ` [PATCH v3 3/5] ring: make dump function more verbose Konstantin Ananyev
2024-09-16 12:37 ` [PATCH v3 4/5] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-09-16 12:37 ` [PATCH v3 5/5] app/test: add unit tests for soring API Konstantin Ananyev
2024-09-17 12:09 ` [PATCH v4 0/5] Stage-Ordered API and other extensions for ring library Konstantin Ananyev
2024-09-17 12:09 ` [PATCH v4 1/5] ring: common functions for 'move head' ops Konstantin Ananyev
2024-09-17 12:09 ` [PATCH v4 2/5] ring: make copying functions generic Konstantin Ananyev
2024-09-17 12:09 ` [PATCH v4 3/5] ring: make dump function more verbose Konstantin Ananyev
2024-09-17 12:09 ` [PATCH v4 4/5] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-09-19 17:03 ` Jerin Jacob
2024-09-17 12:09 ` [PATCH v4 5/5] app/test: add unit tests for soring API Konstantin Ananyev
2024-10-12 18:09 ` [PATCH v4 0/5] Stage-Ordered API and other extensions for ring library Stephen Hemminger
2024-10-15 13:01 ` [PATCH v5 0/6] " Konstantin Ananyev
2024-10-15 13:01 ` [PATCH v5 1/6] ring: common functions for 'move head' ops Konstantin Ananyev
2024-10-15 15:04 ` Morten Brørup
2024-10-15 13:01 ` [PATCH v5 2/6] ring: make copying functions generic Konstantin Ananyev
2024-10-15 13:01 ` [PATCH v5 3/6] ring: make dump function more verbose Konstantin Ananyev
2024-10-15 13:01 ` [PATCH v5 4/6] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-10-15 13:01 ` [PATCH v5 5/6] app/test: add unit tests for soring API Konstantin Ananyev
2024-10-15 13:01 ` [PATCH v5 6/6] test: add stress test suite Konstantin Ananyev
2024-10-15 15:59 ` [PATCH v5 0/6] Stage-Ordered API and other extensions for ring library Stephen Hemminger
2024-10-15 16:02 ` Stephen Hemminger
2024-10-21 16:08 ` [PATCH v6 0/7] " Konstantin Ananyev
2024-10-21 16:08 ` [PATCH v6 1/7] test/ring: fix failure with custom number of lcores Konstantin Ananyev
2024-10-21 16:08 ` [PATCH v6 2/7] ring: common functions for 'move head' ops Konstantin Ananyev
2024-10-21 16:08 ` [PATCH v6 3/7] ring: make copying functions generic Konstantin Ananyev
2024-10-21 16:08 ` [PATCH v6 4/7] ring: make dump function more verbose Konstantin Ananyev
2024-10-21 16:08 ` [PATCH v6 5/7] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-10-21 16:08 ` [PATCH v6 6/7] app/test: add unit tests for soring API Konstantin Ananyev
2024-10-21 17:47 ` [PATCH v6 0/7] Stage-Ordered API and other extensions for ring library Konstantin Ananyev
2024-10-21 17:47 ` [PATCH v6 1/7] test/ring: fix failure with custom number of lcores Konstantin Ananyev
2024-10-21 17:47 ` [PATCH v6 2/7] ring: common functions for 'move head' ops Konstantin Ananyev
2024-10-21 17:47 ` [PATCH v6 3/7] ring: make copying functions generic Konstantin Ananyev
2024-10-21 17:47 ` [PATCH v6 4/7] ring: make dump function more verbose Konstantin Ananyev
2024-10-21 17:47 ` [PATCH v6 5/7] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-10-21 17:47 ` [PATCH v6 6/7] app/test: add unit tests for soring API Konstantin Ananyev
2024-10-21 17:47 ` [PATCH v6 7/7] test: add stress test suite Konstantin Ananyev
2024-10-28 17:18 ` [PATCH v6 0/7] Stage-Ordered API and other extensions for ring library David Christensen
2024-10-29 14:32 ` Konstantin Ananyev
2024-10-30 21:22 ` [PATCH v7 " Konstantin Ananyev
2024-10-30 21:22 ` [PATCH v7 1/7] test/ring: fix failure with custom number of lcores Konstantin Ananyev
2024-11-07 11:50 ` Morten Brørup
2024-10-30 21:22 ` [PATCH v7 2/7] ring: common functions for 'move head' ops Konstantin Ananyev
2024-11-07 11:31 ` Morten Brørup
2024-10-30 21:23 ` [PATCH v7 3/7] ring: make copying functions generic Konstantin Ananyev
2024-11-07 11:46 ` Morten Brørup
2024-10-30 21:23 ` [PATCH v7 4/7] ring: make dump function more verbose Konstantin Ananyev
2024-11-07 11:49 ` Morten Brørup
2024-10-30 21:23 ` [PATCH v7 5/7] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-11-07 12:07 ` Morten Brørup
2024-10-30 21:23 ` [PATCH v7 6/7] app/test: add unit tests for soring API Konstantin Ananyev
2024-10-30 21:23 ` [PATCH v7 7/7] test: add stress test suite Konstantin Ananyev
2024-11-07 10:41 ` [PATCH v7 0/7] Stage-Ordered API and other extensions for ring library Konstantin Ananyev
2024-11-07 16:16 ` Stephen Hemminger
2024-11-07 18:11 ` Konstantin Ananyev
2024-11-07 18:24 ` [PATCH v8 " Konstantin Ananyev
2024-11-07 18:24 ` [PATCH v8 1/7] test/ring: fix failure with custom number of lcores Konstantin Ananyev
2024-11-07 18:24 ` [PATCH v8 2/7] ring: common functions for 'move head' ops Konstantin Ananyev
2024-11-07 18:24 ` [PATCH v8 3/7] ring: make copying functions generic Konstantin Ananyev
2024-11-07 18:24 ` [PATCH v8 4/7] ring: make dump function more verbose Konstantin Ananyev
2024-11-07 18:24 ` [PATCH v8 5/7] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-11-07 18:24 ` [PATCH v8 6/7] app/test: add unit tests for soring API Konstantin Ananyev
2024-11-07 18:24 ` [PATCH v8 7/7] test: add stress test suite Konstantin Ananyev
2024-11-08 21:56 ` [PATCH v8 0/7] Stage-Ordered API and other extensions for ring library Thomas Monjalon
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=6af430b0-3bec-46f5-ac53-f69f5ba1de1e@lysator.liu.se \
--to=hofors@lysator.liu.se \
--cc=bruce.richardson@intel.com \
--cc=dev@dpdk.org \
--cc=drc@linux.vnet.ibm.com \
--cc=hemant.agrawal@nxp.com \
--cc=honnappa.nagarahalli@arm.com \
--cc=jerinj@marvell.com \
--cc=konstantin.ananyev@huawei.com \
--cc=konstantin.v.ananyev@yandex.ru \
--cc=mb@smartsharesystems.com \
--cc=ruifeng.wang@arm.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).