DPDK patches and discussions
 help / color / mirror / Atom feed
From: "Mattias Rönnblom" <hofors@lysator.liu.se>
To: Konstantin Ananyev <konstantin.v.ananyev@yandex.ru>, dev@dpdk.org
Cc: honnappa.nagarahalli@arm.com, jerinj@marvell.com,
	hemant.agrawal@nxp.com,  bruce.richardson@intel.com,
	drc@linux.vnet.ibm.com, ruifeng.wang@arm.com,
	 mb@smartsharesystems.com,
	Konstantin Ananyev <konstantin.ananyev@huawei.com>
Subject: Re: [RFC 3/6] ring/soring: introduce Staged Ordered Ring
Date: Mon, 26 Aug 2024 21:04:51 +0200	[thread overview]
Message-ID: <6af430b0-3bec-46f5-ac53-f69f5ba1de1e@lysator.liu.se> (raw)
In-Reply-To: <20240815085339.1434-4-konstantin.v.ananyev@yandex.ru>

On 2024-08-15 10:53, Konstantin Ananyev wrote:
> From: Konstantin Ananyev <konstantin.ananyev@huawei.com>
> 
> Staged-Ordered-Ring (SORING) provides a SW abstraction for 'ordered' queues
> with multiple processing 'stages'.
> It is based on conventional DPDK rte_ring, re-uses many of its concepts,
> and even substantial part of its code.
> It can be viewed as an 'extension' of rte_ring functionality.
> In particular, main SORING properties:
> - circular ring buffer with fixed size objects
> - producer, consumer plus multiple processing stages in the middle.
> - allows to split objects processing into multiple stages.
> - objects remain in the same ring while moving from one stage to the other,
>    initial order is preserved, no extra copying needed.
> - preserves the ingress order of objects within the queue across multiple
>    stages, i.e.:
>    at the same stage multiple threads can process objects from the ring in
>    any order, but for the next stage objects will always appear in the
>    original order.
> - each stage (and producer/consumer) can be served by single and/or
>    multiple threads.
> - number of stages, size and number of objects in the ring are
>    configurable at ring initialization time.
> 
> Data-path API provides four main operations:
> - enqueue/dequeue works in the same manner as for conventional rte_ring,
>    all rte_ring synchronization types are supported.
> - acquire/release - for each stage there is an acquire (start) and
>    release (finish) operation.
>    after some objects are 'acquired' - given thread can safely assume that
>    it has exclusive possession of these objects till 'release' for them is
>    invoked.
>    Note that right now user has to release exactly the same number of
>    objects that was acquired before.
>    After 'release', objects can be 'acquired' by next stage and/or dequeued
>    by the consumer (in case of last stage).
> 
> Expected use-case: applications that uses pipeline model
> (probably with multiple stages) for packet processing, when preserving
> incoming packet order is important. I.E.: IPsec processing, etc.
> 

How does SORING related to Eventdev? Would it be feasible to reshape 
this into a SW event device?

> Signed-off-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
> ---
>   lib/ring/meson.build  |   4 +-
>   lib/ring/rte_soring.c | 144 ++++++++++++++
>   lib/ring/rte_soring.h | 270 ++++++++++++++++++++++++++
>   lib/ring/soring.c     | 431 ++++++++++++++++++++++++++++++++++++++++++
>   lib/ring/soring.h     | 124 ++++++++++++
>   lib/ring/version.map  |  13 ++
>   6 files changed, 984 insertions(+), 2 deletions(-)
>   create mode 100644 lib/ring/rte_soring.c
>   create mode 100644 lib/ring/rte_soring.h
>   create mode 100644 lib/ring/soring.c
>   create mode 100644 lib/ring/soring.h
> 
> diff --git a/lib/ring/meson.build b/lib/ring/meson.build
> index 7fca958ed7..21f2c12989 100644
> --- a/lib/ring/meson.build
> +++ b/lib/ring/meson.build
> @@ -1,8 +1,8 @@
>   # SPDX-License-Identifier: BSD-3-Clause
>   # Copyright(c) 2017 Intel Corporation
>   
> -sources = files('rte_ring.c')
> -headers = files('rte_ring.h')
> +sources = files('rte_ring.c', 'rte_soring.c', 'soring.c')
> +headers = files('rte_ring.h', 'rte_soring.h')
>   # most sub-headers are not for direct inclusion
>   indirect_headers += files (
>           'rte_ring_core.h',
> diff --git a/lib/ring/rte_soring.c b/lib/ring/rte_soring.c
> new file mode 100644
> index 0000000000..17b1b73a42
> --- /dev/null
> +++ b/lib/ring/rte_soring.c
> @@ -0,0 +1,144 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2024 Huawei Technologies Co., Ltd
> + */
> +
> +#include "soring.h"
> +#include <rte_string_fns.h>
> +
> +RTE_LOG_REGISTER_DEFAULT(soring_logtype, INFO);
> +#define RTE_LOGTYPE_SORING soring_logtype
> +#define SORING_LOG(level, ...) \
> +	RTE_LOG_LINE(level, SORING, "" __VA_ARGS__)
> +
> +static uint32_t
> +soring_calc_elem_num(uint32_t count)
> +{
> +	return rte_align32pow2(count + 1);
> +}
> +
> +static int
> +soring_check_param(uint32_t esize, uint32_t stsize, uint32_t count,
> +	uint32_t stages)
> +{
> +	if (stages == 0) {
> +		SORING_LOG(ERR, "invalid number of stages: %u", stages);
> +		return -EINVAL;
> +	}
> +
> +	/* Check if element size is a multiple of 4B */
> +	if (esize == 0 || esize % 4 != 0) {
> +		SORING_LOG(ERR, "invalid element size: %u", esize);
> +		return -EINVAL;
> +	}
> +
> +	/* Check if ret-code size is a multiple of 4B */
> +	if (stsize % 4 != 0) {
> +		SORING_LOG(ERR, "invalid retcode size: %u", stsize);
> +		return -EINVAL;
> +	}
> +
> +	 /* count must be a power of 2 */
> +	if (rte_is_power_of_2(count) == 0 ||
> +			(count > RTE_SORING_ELEM_MAX + 1)) {
> +		SORING_LOG(ERR, "invalid number of elements: %u", count);
> +		return -EINVAL;
> +	}
> +
> +	return 0;
> +}
> +
> +/*
> + * Calculate size offsets for SORING internal data layout.
> + */
> +static size_t
> +soring_get_szofs(uint32_t esize, uint32_t stsize, uint32_t count,
> +	uint32_t stages, size_t *elst_ofs, size_t *state_ofs,
> +	size_t *stage_ofs)
> +{
> +	size_t sz;
> +	const struct rte_soring * const r = NULL;
> +
> +	sz = sizeof(r[0]) + (size_t)count * esize;
> +	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
> +
> +	if (elst_ofs != NULL)
> +		*elst_ofs = sz;
> +
> +	sz = sz + (size_t)count * stsize;
> +	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
> +
> +	if (state_ofs != NULL)
> +		*state_ofs = sz;
> +
> +	sz += sizeof(r->state[0]) * count;
> +	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
> +
> +	if (stage_ofs != NULL)
> +		*stage_ofs = sz;
> +
> +	sz += sizeof(r->stage[0]) * stages;
> +	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
> +
> +	return sz;
> +}
> +
> +
> +ssize_t
> +rte_soring_get_memsize(const struct rte_soring_param *prm)
> +{
> +	int32_t rc;
> +	uint32_t count;
> +
> +	count = soring_calc_elem_num(prm->elems);
> +	rc = soring_check_param(prm->esize, prm->stsize, count, prm->stages);
> +	if (rc != 0)
> +		return rc;
> +
> +	return soring_get_szofs(prm->esize, prm->stsize, count, prm->stages,
> +			NULL, NULL, NULL);
> +}
> +
> +int
> +rte_soring_init(struct rte_soring *r, const struct rte_soring_param *prm)
> +{
> +	int32_t rc;
> +	uint32_t n;
> +	size_t elst_ofs, stage_ofs, state_ofs;
> +
> +	if (r == NULL || prm == NULL)
> +		return -EINVAL;
> +
> +	n = soring_calc_elem_num(prm->elems);
> +	rc = soring_check_param(prm->esize, prm->stsize, n, prm->stages);
> +	if (rc != 0)
> +		return rc;
> +
> +	soring_get_szofs(prm->esize, prm->stsize, n, prm->stages, &elst_ofs,
> +			&state_ofs, &stage_ofs);
> +
> +	memset(r, 0, sizeof(*r));
> +	rc = strlcpy(r->name, prm->name, sizeof(r->name));
> +	if (rc < 0 || rc >= (int)sizeof(r->name))
> +		return -ENAMETOOLONG;
> +
> +	r->size = n;
> +	r->mask = r->size - 1;
> +	r->capacity = prm->elems;
> +	r->esize = prm->esize;
> +	r->stsize = prm->stsize;
> +
> +	r->prod.ht.sync_type = prm->prod_synt;
> +	r->cons.ht.sync_type = prm->cons_synt;
> +
> +	r->state = (union soring_state *)((uintptr_t)r + state_ofs);
> +	memset(r->state, 0, sizeof(r->state[0]) * r->size);
> +
> +	r->stage = (struct soring_stage *)((uintptr_t)r + stage_ofs);
> +	r->nb_stage = prm->stages;
> +	memset(r->stage, 0, r->nb_stage * sizeof(r->stage[0]));
> +
> +	if (r->stsize != 0)
> +		r->elemst = (void *)((uintptr_t)r + elst_ofs);
> +
> +	return 0;
> +}
> diff --git a/lib/ring/rte_soring.h b/lib/ring/rte_soring.h
> new file mode 100644
> index 0000000000..fb0e75b39a
> --- /dev/null
> +++ b/lib/ring/rte_soring.h
> @@ -0,0 +1,270 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2024 Huawei Technologies Co., Ltd
> + */
> +
> +#ifndef _RTE_SORING_H_
> +#define _RTE_SORING_H_
> +
> +/**
> + * @file
> + * This file contains definition of RTE soring (Staged Ordered Ring) public API.
> + * Brief description:
> + * enqueue/dequeue works the same as for conventional rte_ring:
> + * any rte_ring sync types can be used, etc.
> + * Plus there could be multiple 'stages'.
> + * For each stage there is an acquire (start) and release (finish) operation.
> + * after some elems are 'acquired' - user  can safely assume that he has
> + * exclusive possession of these elems till 'release' for them is done.
> + * Note that right now user has to release exactly the same number of elems
> + * he acquired before.
> + * After 'release', elems can be 'acquired' by next stage and/or dequeued
> + * (in case of last stage).
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <rte_ring.h>
> +
> +/* upper 2 bits are used for status */
> +#define RTE_SORING_ST_BIT       30
> +
> +/* max possible number of elements in the soring */
> +#define RTE_SORING_ELEM_MAX	(RTE_BIT32(RTE_SORING_ST_BIT) - 1)
> +
> +struct rte_soring_param {
> +	/** expected name of the ring */
> +	const char *name;
> +	/** number of elemnts in the ring */
> +	uint32_t elems;
> +	/** size of elements in the ring, must be a multiple of 4 */
> +	uint32_t esize;
> +	/**
> +	 * size of retcode for each elem, must be a multiple of 4.
> +	 * This parameter defines a size of supplementary and optional
> +	 * array of ret-codes associated with each object in the soring.
> +	 * While element size is configurable (see @esize parameter above),
> +	 * so user can specify it big enough to hold both object and its
> +	 * ret-code together, for performance reasons it might be plausible
> +	 * to access them as separate arrays.
> +	 * Common usage scenario when such separation helps:
> +	 * enqueue() - writes to objects array
> +	 * acquire() - reads from objects array
> +	 * release() - writes to ret-code array
> +	 * dequeue() - reads both objects and ret-code array
> +	 */
> +	uint32_t stsize;
> +	/** number of stages in the ring */
> +	uint32_t stages;
> +	/** sync type for producer */
> +	enum rte_ring_sync_type prod_synt;
> +	/** sync type for consumer */
> +	enum rte_ring_sync_type cons_synt;
> +};
> +
> +struct rte_soring;
> +
> +/**
> + * Calculate the memory size needed for a soring
> + *
> + * This function returns the number of bytes needed for a ring, given
> + * the expected parameters for it. This value is the sum of the size of
> + * the internal metadata and the size of the memory needed by the
> + * actual ring elements and theri rec-codes. The value is aligned to a cache
> + * line size.
> + *
> + * @param prm
> + *   Pointer to the structure that contains soring creation paramers.
> + * @return
> + *   - The memory size needed for the soring on success.
> + *   - -EINVAL if provided paramer values are invalid.
> + */
> +__rte_experimental
> +ssize_t
> +rte_soring_get_memsize(const struct rte_soring_param *prm);
> +
> +/**
> + * Initialize a soring structure.
> + *
> + * Initialize a soring structure in memory pointed by "r".
> + * The size of the memory area must be large enough to store the soring
> + * internal structures plus the objects and ret-code tables.
> + * It is strongly advised to use rte_ring_get_memsize() to get the
> + * appropriate size.
> + *
> + * @param r
> + *   Pointer to the soring structure.
> + * @param prm
> + *   Pointer to the structure that contains soring creation paramers.
> + * @return
> + *   - 0 on success, or a negative error code.
> + */
> +__rte_experimental
> +int
> +rte_soring_init(struct rte_soring *r,  const struct rte_soring_param *prm);
> +
> +/**
> + * Return the total number of filled entries in a ring.
> + *
> + * @param r
> + *   A pointer to the soring structure.
> + * @return
> + *   The number of entries in the ring.
> + */
> +__rte_experimental
> +unsigned int
> +rte_soring_count(const struct rte_soring *r);
> +
> +/**
> + * Enqueue several objects on the ring.
> + *
> + * @param r
> + *   A pointer to the soring structure.
> + * @param obj_table
> + *   A pointer to an array of objects to enqueue.
> + *   Size of objects to enqueue must be the same value as @esize parameter
> + *   used while creating the ring. Otherwise the results are undefined.
> + * @param objst
> + *   A pointer to an array of status values for each object to enqueue.
> + *   Note that if user not using object status values, then this parameter
> + *   can be NULL.
> + *   Size of elements in this array must be the same value as @stsize parameter
> + *   used while creating the ring. If user created the soring with
> + *   @stsize value equals zero, then @objst parameter should be NULL.
> + *   Otherwise the results are undefined.
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + *   Behavior type, one of:
> + *   - RTE_RING_QUEUE_FIXED: enqueue either exactly @n objects or none.
> + *   - RTE_RING_QUEUE_VARIABLE: enqueue up to @n objects.
> + * @param free_space
> + *   if non-NULL, returns the amount of space in the ring after the
> + *   enqueue operation has finished.
> + * @return
> + *   - Actual number of objects enqueued.
> + */
> +__rte_experimental
> +uint32_t
> +rte_soring_enqueue(struct rte_soring *r, const void *obj_table,
> +	const void *objst, uint32_t n, enum rte_ring_queue_behavior behavior,
> +	uint32_t *free_space);
> +
> +/**
> + * Dequeue several objects from the ring.
> + *
> + * @param r
> + *   A pointer to the soring structure.
> + * @param obj_table
> + *   A pointer to an array of objects to dequeue.
> + *   Size of objects to enqueue must be the same value as @esize parameter
> + *   used while creating the ring. Otherwise the results are undefined.
> + * @param objst
> + *   A pointer to array of status values for each object to dequeue.
> + *   Note that if user not using object status values, then this parameter
> + *   can be NULL.
> + *   Size of elements in this array must be the same value as @stsize parameter
> + *   used while creating the ring. If user created the soring with
> + *   @stsize value equals zero, then @objst parameter should be NULL.
> + *   Otherwise the results are undefined.
> + * @param num
> + *   The number of objects to dequeue from the ring into the obj_table.
> + * @param behavior
> + *   Behavior type, one of:
> + *   - RTE_RING_QUEUE_FIXED: dequeue either exactly @n objects or none.
> + *   - RTE_RING_QUEUE_VARIABLE: dequeue up to @n objects.
> + * @param available
> + *   If non-NULL, returns the number of remaining ring entries after the
> + *   dequeue has finished.
> + * @return
> + *   - Actual number of objects dequeued.
> + */
> +__rte_experimental
> +uint32_t
> +rte_soring_dequeue(struct rte_soring *r, void *obj_table, void *objst,
> +	uint32_t num, enum rte_ring_queue_behavior behavior,
> +	uint32_t *available);
> +
> +/**
> + * Acquire several objects from the ring for given stage.
> + *
> + * @param r
> + *   A pointer to the soring structure.
> + * @param objs
> + *   A pointer to an array of objects to acquire.
> + *   Size of objects must be the same value as @esize parameter
> + *   used while creating the ring. Otherwise the results are undefined.
> + * @param objst
> + *   A pointer to an array of status values for each for each acquired object.
> + *   Note that if user not using object status values, then this parameter
> + *   can be NULL.
> + *   Size of elements in this array must be the same value as @stsize parameter
> + *   used while creating the ring. If user created the soring with
> + *   @stsize value equals zero, then @objst parameter should be NULL.
> + *   Otherwise the results are undefined.
> + * @param stage
> + *   Stage to acquire objects for.
> + * @param num
> + *   The number of objects to acquire.
> + * @param behavior
> + *   Behavior type, one of:
> + *   - RTE_RING_QUEUE_FIXED: acquire either exactly @n objects or none.
> + *   - RTE_RING_QUEUE_VARIABLE: acquire up to @n objects.
> + * @param ftoken
> + *   Pointer to the opaque 'token' value used by release() op.
> + *   User has to store this value somewhere, and later provide to the
> + *   release().
> + * @param available
> + *   If non-NULL, returns the number of remaining ring entries for given stage
> + *   after the acquire has finished.
> + * @return
> + *   - Actual number of objects acquired.
> + */
> +__rte_experimental
> +uint32_t
> +rte_soring_acquire(struct rte_soring *r, void *objs, void *objst,
> +	uint32_t stage, uint32_t num, enum rte_ring_queue_behavior behavior,
> +	uint32_t *ftoken, uint32_t *available);
> +
> +/**
> + * Release several objects for given stage back to the ring.
> + * Note that it means these objects become avaialble for next stage or
> + * dequeue.
> + *
> + * @param r
> + *   A pointer to the soring structure.
> + * @param objs
> + *   A pointer to an array of objects to relase.
> + *   Note that unless user needs to overwrite ring objects this parameter
> + *   can be NULL.
> + *   Size of objects must be the same value as @esize parameter
> + *   used while creating the ring. Otherwise the results are undefined.
> + * @param objst
> + *   A pointer to an array of status values for each object to release.
> + *   Note that if user not using object status values, then this parameter
> + *   can be NULL.
> + *   Size of elements in this array must be the same value as @stsize parameter
> + *   used while creating the ring. If user created the soring with
> + *   @stsize value equals zero, then objst parameter should be NULL.
> + *   Otherwise the results are undefined.
> + * @param stage
> + *   Current stage.
> + * @param n
> + *   The number of objects to release.
> + *   Has to be the same value as returned by acquire() op.
> + * @param ftoken
> + *   Opaque 'token' value obtained from acquire() op.
> + * @return
> + *   - None.
> + */
> +__rte_experimental
> +void
> +rte_soring_release(struct rte_soring *r, const void *objs,
> +	const void *objst, uint32_t stage, uint32_t n, uint32_t ftoken);
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _RTE_SORING_H_ */
> diff --git a/lib/ring/soring.c b/lib/ring/soring.c
> new file mode 100644
> index 0000000000..929bde9697
> --- /dev/null
> +++ b/lib/ring/soring.c
> @@ -0,0 +1,431 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2024 Huawei Technologies Co., Ltd
> + */
> +
> +/**
> + * @file
> + * This file contains implementation of SORING 'datapath' functions.
> + * Brief description:
> + * ==================
> + * enqueue/dequeue works the same as for conventional rte_ring:
> + * any rte_ring sync types can be used, etc.
> + * Plus there could be multiple 'stages'.
> + * For each stage there is an acquire (start) and release (finish) operation.
> + * After some elems are 'acquired' - user can safely assume that he has
> + * exclusive possession of these elems till 'release' for them is done.
> + * Note that right now user has to release exactly the same number of elems
> + * he acquired before.
> + * After 'release', elems can be 'acquired' by next stage and/or dequeued
> + * (in case of last stage).
> + * Internal structure:
> + * ===================
> + * In addition to 'normal' ring of elems, it also has a ring of states of the
> + * same size. Each state[] corresponds to exactly one elem[].
> + * state[] will be used by acquire/release/dequeue functions to store internal
> + * information and should not be accessed by the user directly.
> + * How it works:
> + * =============
> + * 'acquire()' just moves stage's head (same as rte_ring move_head does),
> + * plus it saves in state[stage.cur_head] information about how many elems
> + * were acquired, current head position and special flag value to indicate
> + * that elems are acquired (RTE_SORING_ST_START).
> + * Note that 'acquire()' returns to the user a special 'ftoken' that user has
> + * to provide for 'release()' (in fact it is just a position for current head).
> + * 'release()' extracts old head value from provided ftoken and checks that
> + * corresponding 'state[]' contains expected values(mostly for sanity
> + * purposes).
> + * * Then it marks this state[] with 'RTE_SORING_ST_FINISH' flag to indicate
> + * that given subset of objects was released.
> + * After that, it checks does old head value equals to current tail value?
> + * If yes, then it performs  'finalize()' operation, otherwise 'release()'
> + * just returns (without spinning on stage tail value).
> + * As updated state[] is shared by all threads, some other thread can do
> + * 'finalize()' for given stage.
> + * That allows 'release()' to avoid excessive waits on the tail value.
> + * Main purpose of 'finalize()' operation is to walk through 'state[]'
> + * from current stage tail up to its head, check state[] and move stage tail
> + * through elements that already are in RTE_SORING_ST_FINISH state.
> + * Along with that, corresponding state[] values are reset to zero.
> + * Note that 'finalize()' for given stage can be done from multiple places:
> + * 'release()' for that stage or from 'acquire()' for next stage
> + * even from consumer's 'dequeue()' - in case given stage is the last one.
> + * So 'finalize()' has to be MT-safe and inside it we have to
> + * guarantee that only one thread will update state[] and stage's tail values.
> + */
> +
> +#include "soring.h"
> +
> +/*
> + * Inline functions (fastpath) start here.
> + */
> +static __rte_always_inline uint32_t
> +__rte_soring_stage_finalize(struct soring_stage_headtail *sht,
> +	union soring_state *rstate, uint32_t rmask, uint32_t maxn)
> +{
> +	int32_t rc;
> +	uint32_t head, i, idx, k, n, tail;
> +	union soring_stage_tail nt, ot;
> +	union soring_state st;
> +
> +	/* try to grab exclusive right to update tail value */
> +	ot.raw = rte_atomic_load_explicit(&sht->tail.raw,
> +			rte_memory_order_acquire);
> +
> +	/* other thread already finalizing it for us */
> +	if (ot.sync != 0)
> +		return 0;
> +
> +	nt.pos = ot.pos;
> +	nt.sync = 1;
> +	rc = rte_atomic_compare_exchange_strong_explicit(&sht->tail.raw,
> +		&ot.raw, nt.raw, rte_memory_order_acquire,
> +		rte_memory_order_relaxed);
> +
> +	/* other thread won the race */
> +	if (rc == 0)
> +		return 0;
> +
> +	/*
> +	 * start with current tail and walk through states that are
> +	 * already finished.
> +	 */
> +
> +	head = rte_atomic_load_explicit(&sht->head, rte_memory_order_relaxed);
> +	n = RTE_MIN(head - ot.pos, maxn);
> +	for (i = 0, tail = ot.pos; i < n; i += k, tail += k) {
> +
> +		idx = tail & rmask;
> +		st.raw = rte_atomic_load_explicit(&rstate[idx].raw,
> +			rte_memory_order_relaxed);
> +		if ((st.stnum & RTE_SORING_ST_MASK) != RTE_SORING_ST_FINISH ||
> +				st.ftoken != tail)
> +			break;
> +
> +		k = st.stnum & ~RTE_SORING_ST_MASK;
> +		rte_atomic_store_explicit(&rstate[idx].raw, 0,
> +				rte_memory_order_relaxed);
> +	}
> +
> +
> +	/* release exclusive right to update along with new tail value */
> +	ot.pos = tail;
> +	rte_atomic_store_explicit(&sht->tail.raw, ot.raw,
> +			rte_memory_order_release);
> +
> +	return i;
> +}
> +
> +static __rte_always_inline uint32_t
> +__rte_soring_move_prod_head(struct rte_soring *r, uint32_t num,
> +	enum rte_ring_queue_behavior behavior, enum rte_ring_sync_type st,
> +	uint32_t *head, uint32_t *next, uint32_t *free)
> +{
> +	uint32_t n;
> +
> +	switch (st) {
> +	case RTE_RING_SYNC_ST:
> +	case RTE_RING_SYNC_MT:
> +		n = __rte_ring_headtail_move_head(&r->prod.ht, &r->cons.ht,
> +			r->capacity, st, num, behavior, head, next, free);
> +		break;
> +	case RTE_RING_SYNC_MT_RTS:
> +		n = __rte_ring_rts_move_head(&r->prod.rts, &r->cons.ht,
> +			r->capacity, num, behavior, head, free);
> +		*next = *head + n;
> +		break;
> +	case RTE_RING_SYNC_MT_HTS:
> +		n = __rte_ring_hts_move_head(&r->prod.hts, &r->cons.ht,
> +			r->capacity, num, behavior, head, free);
> +		*next = *head + n;
> +		break;
> +	default:
> +		/* unsupported mode, shouldn't be here */
> +		RTE_ASSERT(0);
> +		*free = 0;
> +		n = 0;
> +	}
> +
> +	return n;
> +}
> +
> +static __rte_always_inline uint32_t
> +__rte_soring_move_cons_head(struct rte_soring *r, uint32_t stage, uint32_t num,
> +	enum rte_ring_queue_behavior behavior, enum rte_ring_sync_type st,
> +	uint32_t *head, uint32_t *next, uint32_t *avail)
> +{
> +	uint32_t n;
> +
> +	switch (st) {
> +	case RTE_RING_SYNC_ST:
> +	case RTE_RING_SYNC_MT:
> +		n = __rte_ring_headtail_move_head(&r->cons.ht,
> +			&r->stage[stage].ht, 0, st, num, behavior,
> +			head, next, avail);
> +		break;
> +	case RTE_RING_SYNC_MT_RTS:
> +		n = __rte_ring_rts_move_head(&r->cons.rts, &r->stage[stage].ht,
> +			0, num, behavior, head, avail);
> +		*next = *head + n;
> +		break;
> +	case RTE_RING_SYNC_MT_HTS:
> +		n = __rte_ring_hts_move_head(&r->cons.hts, &r->stage[stage].ht,
> +			0, num, behavior, head, avail);
> +		*next = *head + n;
> +		break;
> +	default:
> +		/* unsupported mode, shouldn't be here */
> +		RTE_ASSERT(0);
> +		*avail = 0;
> +		n = 0;
> +	}
> +
> +	return n;
> +}
> +
> +static __rte_always_inline void
> +__rte_soring_update_tail(struct __rte_ring_headtail *rht,
> +	enum rte_ring_sync_type st, uint32_t head, uint32_t next, uint32_t enq)
> +{
> +	uint32_t n;
> +
> +	switch (st) {
> +	case RTE_RING_SYNC_ST:
> +	case RTE_RING_SYNC_MT:
> +		__rte_ring_update_tail(&rht->ht, head, next, st, enq);
> +		break;
> +	case RTE_RING_SYNC_MT_RTS:
> +		__rte_ring_rts_update_tail(&rht->rts);
> +		break;
> +	case RTE_RING_SYNC_MT_HTS:
> +		n = next - head;
> +		__rte_ring_hts_update_tail(&rht->hts, head, n, enq);
> +		break;
> +	default:
> +		/* unsupported mode, shouldn't be here */
> +		RTE_ASSERT(0);
> +	}
> +}
> +
> +static __rte_always_inline uint32_t
> +__rte_soring_stage_move_head(struct soring_stage_headtail *d,
> +	const struct rte_ring_headtail *s, uint32_t capacity, uint32_t num,
> +	enum rte_ring_queue_behavior behavior,
> +	uint32_t *old_head, uint32_t *new_head, uint32_t *avail)
> +{
> +	uint32_t n, tail;
> +
> +	*old_head = rte_atomic_load_explicit(&d->head,
> +			rte_memory_order_acquire);
> +
> +	do {
> +		n = num;
> +		tail = rte_atomic_load_explicit(&s->tail,
> +				rte_memory_order_relaxed);
> +		*avail = capacity + tail - *old_head;
> +		if (n > *avail)
> +			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *avail;
> +		if (n == 0)
> +			return 0;
> +		*new_head = *old_head + n;
> +	} while (rte_atomic_compare_exchange_strong_explicit(&d->head,
> +			old_head, *new_head, rte_memory_order_acquire,
> +			rte_memory_order_acquire) == 0);
> +
> +	return n;
> +}
> +
> +/*
> + * Public functions (data-path) start here.
> + */
> +
> +unsigned int
> +rte_soring_count(const struct rte_soring *r)
> +{
> +	uint32_t prod_tail = r->prod.ht.tail;
> +	uint32_t cons_tail = r->cons.ht.tail;
> +	uint32_t count = (prod_tail - cons_tail) & r->mask;
> +	return (count > r->capacity) ? r->capacity : count;
> +}
> +
> +uint32_t
> +rte_soring_enqueue(struct rte_soring *r, const void *obj_table,
> +	const void *objst, uint32_t n, enum rte_ring_queue_behavior behavior,
> +	uint32_t *free_space)
> +{
> +	enum rte_ring_sync_type st;
> +	uint32_t nb_free, prod_head, prod_next;
> +
> +	RTE_ASSERT(r != NULL && r->nb_stage > 0);
> +	RTE_ASSERT(objst == NULL || r->elemst != NULL);
> +
> +	st = r->prod.ht.sync_type;
> +
> +	n = __rte_soring_move_prod_head(r, n, behavior, st,
> +			&prod_head, &prod_next, &nb_free);
> +	if (n != 0) {
> +		__rte_ring_do_enqueue_elems(&r[1], obj_table, r->size,
> +			prod_head & r->mask, r->esize, n);
> +		if (objst != NULL)
> +			__rte_ring_do_enqueue_elems(r->elemst, objst, r->size,
> +				prod_head & r->mask, r->stsize, n);
> +		__rte_soring_update_tail(&r->prod, st, prod_head, prod_next, 1);
> +	}
> +
> +	if (free_space != NULL)
> +		*free_space = nb_free - n;
> +	return n;
> +}
> +
> +uint32_t
> +rte_soring_dequeue(struct rte_soring *r, void *obj_table, void *objst,
> +	uint32_t num, enum rte_ring_queue_behavior behavior,
> +	uint32_t *available)
> +{
> +	enum rte_ring_sync_type st;
> +	uint32_t entries, cons_head, cons_next, n, ns, reqn;
> +
> +	RTE_ASSERT(r != NULL && r->nb_stage > 0);
> +	RTE_ASSERT(objst == NULL || r->elemst != NULL);
> +
> +	ns = r->nb_stage - 1;
> +	st = r->cons.ht.sync_type;
> +
> +	/* try to grab exactly @num elems first */
> +	n = __rte_soring_move_cons_head(r, ns, num, RTE_RING_QUEUE_FIXED, st,
> +			&cons_head, &cons_next, &entries);
> +	if (n == 0) {
> +		/* try to finalize some elems from previous stage */
> +		n = __rte_soring_stage_finalize(&r->stage[ns].sht,
> +			r->state, r->mask, 2 * num);
> +		entries += n;
> +
> +		/* repeat attempt to grab elems */
> +		reqn = (behavior == RTE_RING_QUEUE_FIXED) ? num : 0;
> +		if (entries >= reqn)
> +			n = __rte_soring_move_cons_head(r, ns, num, behavior,
> +				st, &cons_head, &cons_next, &entries);
> +		else
> +			n = 0;
> +	}
> +
> +	/* we have some elems to consume */
> +	if (n != 0) {
> +		__rte_ring_do_dequeue_elems(obj_table, &r[1], r->size,
> +			cons_head & r->mask, r->esize, n);
> +		if (objst != NULL)
> +			__rte_ring_do_dequeue_elems(objst, r->elemst, r->size,
> +				cons_head & r->mask, r->stsize, n);
> +		__rte_soring_update_tail(&r->cons, st, cons_head, cons_next, 0);
> +	}
> +
> +	if (available != NULL)
> +		*available = entries - n;
> +	return n;
> +}
> +
> +uint32_t
> +rte_soring_acquire(struct rte_soring *r, void *objs, void *objst,
> +	uint32_t stage, uint32_t num, enum rte_ring_queue_behavior behavior,
> +	uint32_t *ftoken, uint32_t *available)
> +{
> +	uint32_t avail, head, idx, n, next, reqn;
> +	union soring_state st;
> +	struct soring_stage *pstg;
> +	struct soring_stage_headtail *cons;
> +
> +	RTE_ASSERT(r != NULL && stage < r->nb_stage);
> +	RTE_ASSERT(objst == NULL || r->elemst != NULL);
> +
> +	cons = &r->stage[stage].sht;
> +
> +	if (stage == 0)
> +		n = __rte_soring_stage_move_head(cons, &r->prod.ht, 0, num,
> +			behavior, &head, &next, &avail);
> +	else {
> +		pstg = r->stage + stage - 1;
> +
> +		/* try to grab exactly @num elems */
> +		n = __rte_soring_stage_move_head(cons, &pstg->ht, 0, num,
> +			RTE_RING_QUEUE_FIXED, &head, &next, &avail);
> +		if (n == 0) {
> +			/* try to finalize some elems from previous stage */
> +			n = __rte_soring_stage_finalize(&pstg->sht, r->state,
> +				r->mask, 2 * num);
> +			avail += n;
> +
> +			/* repeat attempt to grab elems */
> +			reqn = (behavior == RTE_RING_QUEUE_FIXED) ? num : 0;
> +			if (avail >= reqn)
> +				n = __rte_soring_stage_move_head(cons,
> +					&pstg->ht, 0, num, behavior, &head,
> +					&next, &avail);
> +			else
> +				n = 0;
> +		}
> +	}
> +
> +	if (n != 0) {
> +
> +		idx = head & r->mask;
> +
> +		/* copy elems that are ready for given stage */
> +		__rte_ring_do_dequeue_elems(objs, &r[1], r->size, idx,
> +				r->esize, n);
> +		if (objst != NULL)
> +			__rte_ring_do_dequeue_elems(objst, r->elemst,
> +				r->size, idx, r->stsize, n);
> +
> +		/* update status ring */
> +		st.ftoken = head;
> +		st.stnum = (RTE_SORING_ST_START | n);
> +
> +		rte_atomic_store_explicit(&r->state[idx].raw, st.raw,
> +				rte_memory_order_relaxed);
> +		*ftoken = head;
> +	}
> +
> +	if (available != NULL)
> +		*available = avail - n;
> +	return n;
> +}
> +
> +void
> +rte_soring_release(struct rte_soring *r, const void *objs,
> +	const void *objst, uint32_t stage, uint32_t n, uint32_t ftoken)
> +{
> +	uint32_t idx, tail;
> +	struct soring_stage *stg;
> +	union soring_state st;
> +
> +	RTE_ASSERT(r != NULL && stage < r->nb_stage);
> +	RTE_ASSERT(objst == NULL || r->elemst != NULL);
> +
> +	stg = r->stage + stage;
> +	tail = rte_atomic_load_explicit(&stg->sht.tail.pos,
> +			rte_memory_order_relaxed);
> +
> +	idx = ftoken & r->mask;
> +	st.raw = rte_atomic_load_explicit(&r->state[idx].raw,
> +			rte_memory_order_acquire);
> +
> +	/* check state ring contents */
> +	RTE_VERIFY(st.stnum == (RTE_SORING_ST_START | n) &&
> +		st.ftoken == ftoken);
> +
> +	/* update contents of the ring, if necessary */
> +	if (objs != NULL)
> +		__rte_ring_do_enqueue_elems(&r[1], objs, r->size, idx,
> +			r->esize, n);
> +	if (objst != NULL)
> +		__rte_ring_do_enqueue_elems(r->elemst, objst, r->size, idx,
> +			r->stsize, n);
> +
> +	/* set state to FINISH, make sure it is not reordered */
> +	st.stnum = RTE_SORING_ST_FINISH | n;
> +	rte_atomic_store_explicit(&r->state[idx].raw, st.raw,
> +			rte_memory_order_release);
> +
> +	if (tail == ftoken)
> +		__rte_soring_stage_finalize(&stg->sht, r->state, r->mask,
> +				r->capacity);
> +}
> diff --git a/lib/ring/soring.h b/lib/ring/soring.h
> new file mode 100644
> index 0000000000..3a3f6efa76
> --- /dev/null
> +++ b/lib/ring/soring.h
> @@ -0,0 +1,124 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2024 Huawei Technologies Co., Ltd
> + */
> +
> +#ifndef _SORING_H_
> +#define _SORING_H_
> +
> +/**
> + * @file
> + * This file contains internal strctures of RTE soring: Staged Ordered Ring.
> + * Sort of extension of conventional RTE ring.
> + * Internal structure:
> + * In addition to 'normal' ring of elems, it also has a ring of states of the
> + * same size. Each state[] corresponds to exactly one elem[].
> + * state[] will be used by acquire/release/dequeue functions to store internal
> + * information and should not be accessed by the user directly.
> + * For actual implementation details, please refer to soring.c
> + */
> +
> +#include <rte_soring.h>
> +
> +union soring_state {
> +	alignas(sizeof(uint64_t)) RTE_ATOMIC(uint64_t) raw;
> +	struct {
> +		RTE_ATOMIC(uint32_t) ftoken;
> +		RTE_ATOMIC(uint32_t) stnum;
> +	};
> +};
> +
> +/* upper 2 bits are used for status */
> +#define RTE_SORING_ST_START	RTE_SHIFT_VAL32(1, RTE_SORING_ST_BIT)
> +#define RTE_SORING_ST_FINISH	RTE_SHIFT_VAL32(2, RTE_SORING_ST_BIT)
> +
> +#define RTE_SORING_ST_MASK	\
> +	RTE_GENMASK32(sizeof(uint32_t) * CHAR_BIT - 1, RTE_SORING_ST_BIT)
> +
> +/**
> + * SORING re-uses rte_ring internal structures and implementation
> + * for enqueue/dequeue operations.
> + */
> +struct __rte_ring_headtail {
> +
> +	union __rte_cache_aligned {
> +		struct rte_ring_headtail ht;
> +		struct rte_ring_hts_headtail hts;
> +		struct rte_ring_rts_headtail rts;
> +	};
> +
> +	RTE_CACHE_GUARD;
> +};
> +
> +union soring_stage_tail {
> +	/** raw 8B value to read/write *sync* and *pos* as one atomic op */
> +	alignas(sizeof(uint64_t)) RTE_ATOMIC(uint64_t) raw;
> +	struct {
> +		RTE_ATOMIC(uint32_t) sync;
> +		RTE_ATOMIC(uint32_t) pos;
> +	};
> +};
> +
> +struct soring_stage_headtail {
> +	volatile union soring_stage_tail tail;
> +	enum rte_ring_sync_type unused;  /**< unused */
> +	volatile RTE_ATOMIC(uint32_t) head;
> +};
> +
> +/**
> + * SORING stage head_tail structure is 'compatible' with rte_ring ones.
> + * rte_ring internal functions for moving producer/consumer head
> + * can work transparently with stage head_tail and visa-versa
> + * (no modifications required).
> + */
> +struct soring_stage {
> +
> +	union __rte_cache_aligned {
> +		struct rte_ring_headtail ht;
> +		struct soring_stage_headtail sht;
> +	};
> +
> +	RTE_CACHE_GUARD;
> +};
> +
> +/**
> + * RTE soring internal structure.
> + * As with rte_ring actual elements array supposed to be located direclty
> + * after the rte_soring structure.
> + */
> +struct  __rte_cache_aligned rte_soring {
> +	uint32_t size;           /**< Size of ring. */
> +	uint32_t mask;           /**< Mask (size-1) of ring. */
> +	uint32_t capacity;       /**< Usable size of ring */
> +	uint32_t esize;
> +	/**< size of elements in the ring, must be a multiple of 4*/
> +	uint32_t stsize;
> +	/**< size of status value for each elem, must be a multiple of 4 */
> +
> +	/** Ring stages */
> +	struct soring_stage *stage;
> +	uint32_t nb_stage;
> +
> +	/** Ring of states (one per element) */
> +	union soring_state *state;
> +
> +	/** Pointer to the buffer where status values for each elements
> +	 * are stored. This is supplementary and optional information that
> +	 * user can attach to each element of the ring.
> +	 * While it is possible to incorporate this information inside
> +	 * user-defined element, in many cases it is plausible to maintain it
> +	 * as a separate array (mainly for performance reasons).
> +	 */
> +	void *elemst;
> +
> +	RTE_CACHE_GUARD;
> +
> +	/** Ring producer status. */
> +	struct __rte_ring_headtail prod;
> +
> +	/** Ring consumer status. */
> +	struct __rte_ring_headtail cons;
> +
> +	char name[RTE_RING_NAMESIZE];  /**< Name of the ring. */
> +};
> +
> +#endif /* _SORING_H_ */
> diff --git a/lib/ring/version.map b/lib/ring/version.map
> index 8da094a69a..a1f95a500f 100644
> --- a/lib/ring/version.map
> +++ b/lib/ring/version.map
> @@ -14,3 +14,16 @@ DPDK_25 {
>   
>   	local: *;
>   };
> +
> +EXPERIMENTAL {
> +	global:
> +
> +	# added in 24.11
> +	rte_soring_acquire;
> +	rte_soring_count;
> +	rte_soring_dequeue;
> +	rte_soring_enqueue;
> +	rte_soring_get_memsize;
> +	rte_soring_init;
> +	rte_soring_release;
> +};

  parent reply	other threads:[~2024-08-26 19:05 UTC|newest]

Thread overview: 85+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-08-15  8:53 [RFC 0/6] Stage-Ordered API and other extensions for ring library Konstantin Ananyev
2024-08-15  8:53 ` [RFC 1/6] ring: common functions for 'move head' ops Konstantin Ananyev
2024-08-15  8:53 ` [RFC 2/6] ring: make copying functions generic Konstantin Ananyev
2024-08-15  8:53 ` [RFC 3/6] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-08-15 11:11   ` Morten Brørup
2024-08-15 12:41     ` Konstantin Ananyev
2024-08-15 13:22       ` Morten Brørup
2024-08-26 19:04   ` Mattias Rönnblom [this message]
2024-09-03 13:55     ` Konstantin Ananyev
2024-08-15  8:53 ` [RFC 4/6] app/test: add unit tests for soring API Konstantin Ananyev
2024-08-15  8:53 ` [RFC 5/6] examples/l3fwd: make ACL work in pipeline and eventdev modes Konstantin Ananyev
2024-08-15  8:53 ` [RFC 6/6] ring: minimize reads of the counterpart cache-line Konstantin Ananyev
2024-09-06 13:13 ` [RFCv2 0/6] Stage-Ordered API and other extensions for ring library Konstantin Ananyev
2024-09-06 13:13   ` [RFCv2 1/6] ring: common functions for 'move head' ops Konstantin Ananyev
2024-09-06 13:13   ` [RFCv2 2/6] ring: make copying functions generic Konstantin Ananyev
2024-09-06 13:13   ` [RFCv2 3/6] ring: make dump function more verbose Konstantin Ananyev
2024-09-06 13:13   ` [RFCv2 4/6] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-09-06 13:13   ` [RFCv2 5/6] app/test: add unit tests for soring API Konstantin Ananyev
2024-09-06 13:13   ` [RFCv2 6/6] examples/l3fwd: make ACL work in pipeline and eventdev modes Konstantin Ananyev
2024-09-16 12:37   ` [PATCH v3 0/5] Stage-Ordered API and other extensions for ring library Konstantin Ananyev
2024-09-16 12:37     ` [PATCH v3 1/5] ring: common functions for 'move head' ops Konstantin Ananyev
2024-09-16 12:37     ` [PATCH v3 2/5] ring: make copying functions generic Konstantin Ananyev
2024-09-16 12:37     ` [PATCH v3 3/5] ring: make dump function more verbose Konstantin Ananyev
2024-09-16 12:37     ` [PATCH v3 4/5] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-09-16 12:37     ` [PATCH v3 5/5] app/test: add unit tests for soring API Konstantin Ananyev
2024-09-17 12:09     ` [PATCH v4 0/5] Stage-Ordered API and other extensions for ring library Konstantin Ananyev
2024-09-17 12:09       ` [PATCH v4 1/5] ring: common functions for 'move head' ops Konstantin Ananyev
2024-09-17 12:09       ` [PATCH v4 2/5] ring: make copying functions generic Konstantin Ananyev
2024-09-17 12:09       ` [PATCH v4 3/5] ring: make dump function more verbose Konstantin Ananyev
2024-09-17 12:09       ` [PATCH v4 4/5] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-09-19 17:03         ` Jerin Jacob
2024-09-17 12:09       ` [PATCH v4 5/5] app/test: add unit tests for soring API Konstantin Ananyev
2024-10-12 18:09       ` [PATCH v4 0/5] Stage-Ordered API and other extensions for ring library Stephen Hemminger
2024-10-15 13:01       ` [PATCH v5 0/6] " Konstantin Ananyev
2024-10-15 13:01         ` [PATCH v5 1/6] ring: common functions for 'move head' ops Konstantin Ananyev
2024-10-15 15:04           ` Morten Brørup
2024-10-15 13:01         ` [PATCH v5 2/6] ring: make copying functions generic Konstantin Ananyev
2024-10-15 13:01         ` [PATCH v5 3/6] ring: make dump function more verbose Konstantin Ananyev
2024-10-15 13:01         ` [PATCH v5 4/6] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-10-15 13:01         ` [PATCH v5 5/6] app/test: add unit tests for soring API Konstantin Ananyev
2024-10-15 13:01         ` [PATCH v5 6/6] test: add stress test suite Konstantin Ananyev
2024-10-15 15:59         ` [PATCH v5 0/6] Stage-Ordered API and other extensions for ring library Stephen Hemminger
2024-10-15 16:02         ` Stephen Hemminger
2024-10-21 16:08         ` [PATCH v6 0/7] " Konstantin Ananyev
2024-10-21 16:08           ` [PATCH v6 1/7] test/ring: fix failure with custom number of lcores Konstantin Ananyev
2024-10-21 16:08           ` [PATCH v6 2/7] ring: common functions for 'move head' ops Konstantin Ananyev
2024-10-21 16:08           ` [PATCH v6 3/7] ring: make copying functions generic Konstantin Ananyev
2024-10-21 16:08           ` [PATCH v6 4/7] ring: make dump function more verbose Konstantin Ananyev
2024-10-21 16:08           ` [PATCH v6 5/7] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-10-21 16:08           ` [PATCH v6 6/7] app/test: add unit tests for soring API Konstantin Ananyev
2024-10-21 17:47         ` [PATCH v6 0/7] Stage-Ordered API and other extensions for ring library Konstantin Ananyev
2024-10-21 17:47           ` [PATCH v6 1/7] test/ring: fix failure with custom number of lcores Konstantin Ananyev
2024-10-21 17:47           ` [PATCH v6 2/7] ring: common functions for 'move head' ops Konstantin Ananyev
2024-10-21 17:47           ` [PATCH v6 3/7] ring: make copying functions generic Konstantin Ananyev
2024-10-21 17:47           ` [PATCH v6 4/7] ring: make dump function more verbose Konstantin Ananyev
2024-10-21 17:47           ` [PATCH v6 5/7] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-10-21 17:47           ` [PATCH v6 6/7] app/test: add unit tests for soring API Konstantin Ananyev
2024-10-21 17:47           ` [PATCH v6 7/7] test: add stress test suite Konstantin Ananyev
2024-10-28 17:18           ` [PATCH v6 0/7] Stage-Ordered API and other extensions for ring library David Christensen
2024-10-29 14:32             ` Konstantin Ananyev
2024-10-30 21:22           ` [PATCH v7 " Konstantin Ananyev
2024-10-30 21:22             ` [PATCH v7 1/7] test/ring: fix failure with custom number of lcores Konstantin Ananyev
2024-11-07 11:50               ` Morten Brørup
2024-10-30 21:22             ` [PATCH v7 2/7] ring: common functions for 'move head' ops Konstantin Ananyev
2024-11-07 11:31               ` Morten Brørup
2024-10-30 21:23             ` [PATCH v7 3/7] ring: make copying functions generic Konstantin Ananyev
2024-11-07 11:46               ` Morten Brørup
2024-10-30 21:23             ` [PATCH v7 4/7] ring: make dump function more verbose Konstantin Ananyev
2024-11-07 11:49               ` Morten Brørup
2024-10-30 21:23             ` [PATCH v7 5/7] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-11-07 12:07               ` Morten Brørup
2024-10-30 21:23             ` [PATCH v7 6/7] app/test: add unit tests for soring API Konstantin Ananyev
2024-10-30 21:23             ` [PATCH v7 7/7] test: add stress test suite Konstantin Ananyev
2024-11-07 10:41             ` [PATCH v7 0/7] Stage-Ordered API and other extensions for ring library Konstantin Ananyev
2024-11-07 16:16             ` Stephen Hemminger
2024-11-07 18:11               ` Konstantin Ananyev
2024-11-07 18:24             ` [PATCH v8 " Konstantin Ananyev
2024-11-07 18:24               ` [PATCH v8 1/7] test/ring: fix failure with custom number of lcores Konstantin Ananyev
2024-11-07 18:24               ` [PATCH v8 2/7] ring: common functions for 'move head' ops Konstantin Ananyev
2024-11-07 18:24               ` [PATCH v8 3/7] ring: make copying functions generic Konstantin Ananyev
2024-11-07 18:24               ` [PATCH v8 4/7] ring: make dump function more verbose Konstantin Ananyev
2024-11-07 18:24               ` [PATCH v8 5/7] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-11-07 18:24               ` [PATCH v8 6/7] app/test: add unit tests for soring API Konstantin Ananyev
2024-11-07 18:24               ` [PATCH v8 7/7] test: add stress test suite Konstantin Ananyev
2024-11-08 21:56               ` [PATCH v8 0/7] Stage-Ordered API and other extensions for ring library Thomas Monjalon

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=6af430b0-3bec-46f5-ac53-f69f5ba1de1e@lysator.liu.se \
    --to=hofors@lysator.liu.se \
    --cc=bruce.richardson@intel.com \
    --cc=dev@dpdk.org \
    --cc=drc@linux.vnet.ibm.com \
    --cc=hemant.agrawal@nxp.com \
    --cc=honnappa.nagarahalli@arm.com \
    --cc=jerinj@marvell.com \
    --cc=konstantin.ananyev@huawei.com \
    --cc=konstantin.v.ananyev@yandex.ru \
    --cc=mb@smartsharesystems.com \
    --cc=ruifeng.wang@arm.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).