From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 1CEBF424EF; Mon, 4 Sep 2023 12:13:05 +0200 (CEST) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 0308E402AF; Mon, 4 Sep 2023 12:13:05 +0200 (CEST) Received: from frasgout.his.huawei.com (frasgout.his.huawei.com [185.176.79.56]) by mails.dpdk.org (Postfix) with ESMTP id 54219400EF for ; Mon, 4 Sep 2023 12:13:03 +0200 (CEST) Received: from frapeml500008.china.huawei.com (unknown [172.18.147.207]) by frasgout.his.huawei.com (SkyGuard) with ESMTP id 4RfPVC4RJnz6J7xT; Mon, 4 Sep 2023 18:08:35 +0800 (CST) Received: from frapeml500007.china.huawei.com (7.182.85.172) by frapeml500008.china.huawei.com (7.182.85.71) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256) id 15.1.2507.31; Mon, 4 Sep 2023 12:13:01 +0200 Received: from frapeml500007.china.huawei.com ([7.182.85.172]) by frapeml500007.china.huawei.com ([7.182.85.172]) with mapi id 15.01.2507.031; Mon, 4 Sep 2023 12:13:01 +0200 From: Konstantin Ananyev To: Honnappa Nagarahalli , "jackmin@nvidia.com" , "konstantin.v.ananyev@yandex.ru" CC: "dev@dpdk.org" , "ruifeng.wang@arm.com" , "aditya.ambadipudi@arm.com" , "wathsala.vithanage@arm.com" , "nd@arm.com" Subject: RE: [RFC] lib/st_ring: add single thread ring Thread-Topic: [RFC] lib/st_ring: add single thread ring Thread-Index: AQHZ0/V0Bhhm7ISam0G4Ph+ZQqSgMLAKhqaQ Date: Mon, 4 Sep 2023 10:13:01 +0000 Message-ID: <0273f48a522f4d2f854596e0d7e6d835@huawei.com> References: <20230821060420.3509667-1-honnappa.nagarahalli@arm.com> In-Reply-To: <20230821060420.3509667-1-honnappa.nagarahalli@arm.com> Accept-Language: en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: x-originating-ip: [10.48.144.242] Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: quoted-printable MIME-Version: 1.0 X-CFilter-Loop: Reflected X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org > Add a single thread safe and multi-thread unsafe ring data structure. > This library provides an simple and efficient alternative to multi-thread > safe ring when multi-thread safety is not required. Just a thought: do we really need whole new library for that? >From what I understand all we need right now just one extra function: rte_ring_mt_unsafe_prod_deque(...) Sorry for ugly name :) To dequeue N elems from prod.tail. Or you think there would be some extra advantages in ST version of the ring= : extra usages, better performance, etc.? >=20 > Signed-off-by: Honnappa Nagarahalli > --- > v1: > 1) The code is very prelimnary and is not even compiled > 2) This is intended to show the APIs and some thoughts on implementation > 3) More APIs and the rest of the implementation will come in subsequent > versions >=20 > lib/st_ring/rte_st_ring.h | 567 ++++++++++++++++++++++++++++++++++++++ > 1 file changed, 567 insertions(+) > create mode 100644 lib/st_ring/rte_st_ring.h >=20 > diff --git a/lib/st_ring/rte_st_ring.h b/lib/st_ring/rte_st_ring.h > new file mode 100644 > index 0000000000..8cb8832591 > --- /dev/null > +++ b/lib/st_ring/rte_st_ring.h > @@ -0,0 +1,567 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * Copyright(c) 2023 Arm Limited > + */ > + > +#ifndef _RTE_ST_RING_H_ > +#define _RTE_ST_RING_H_ > + > +/** > + * @file > + * RTE Signle Thread Ring (ST Ring) > + * > + * The ST Ring is a fixed-size queue intended to be accessed > + * by one thread at a time. It does not provide concurrent access to > + * multiple threads. If there are multiple threads accessing the ST ring= , > + * then the threads have to use locks to protect the ring from > + * getting corrupted. > + * > + * - FIFO (First In First Out) > + * - Maximum size is fixed; the pointers are stored in a table. > + * - Consumer and producer part of same thread. > + * - Multi-thread producers and consumers need locking. > + * - Single/Bulk/burst dequeue at Tail or Head > + * - Single/Bulk/burst enqueue at Head or Tail > + * > + */ > + > +#ifdef __cplusplus > +extern "C" { > +#endif > + > +#include > +#include > + > +/** > + * Calculate the memory size needed for a ST ring > + * > + * This function returns the number of bytes needed for a ST ring, given > + * the number of elements in it. This value is the sum of the size of > + * the structure rte_st_ring and the size of the memory needed by the > + * elements. The value is aligned to a cache line size. > + * > + * @param count > + * The number of elements in the ring (must be a power of 2). > + * @return > + * - The memory size needed for the ST ring on success. > + * - -EINVAL if count is not a power of 2. > + */ > +ssize_t rte_st_ring_get_memsize(unsigned int count); > + > +/** > + * Initialize a ST ring structure. > + * > + * Initialize a ST ring structure in memory pointed by "r". The size of = the > + * memory area must be large enough to store the ring structure and the > + * object table. It is advised to use rte_st_ring_get_memsize() to get t= he > + * appropriate size. > + * > + * The ST ring size is set to *count*, which must be a power of two. > + * The real usable ring size is *count-1* instead of *count* to > + * differentiate a full ring from an empty ring. > + * > + * The ring is not added in RTE_TAILQ_ST_RING global list. Indeed, the > + * memory given by the caller may not be shareable among dpdk > + * processes. > + * > + * @param r > + * The pointer to the ring structure followed by the elements table. > + * @param name > + * The name of the ring. > + * @param count > + * The number of elements in the ring (must be a power of 2, > + * unless RTE_ST_RING_F_EXACT_SZ is set in flags). > + * @param flags > + * An OR of the following: > + * - RTE_ST_RING_F_EXACT_SZ: If this flag is set, the ring will hold > + * exactly the requested number of entries, and the requested size > + * will be rounded up to the next power of two, but the usable space > + * will be exactly that requested. Worst case, if a power-of-2 size = is > + * requested, half the ring space will be wasted. > + * Without this flag set, the ring size requested must be a power of= 2, > + * and the usable space will be that size - 1. > + * @return > + * 0 on success, or a negative value on error. > + */ > +int rte_st_ring_init(struct rte_st_ring *r, const char *name, > + unsigned int count, unsigned int flags); > + > +/** > + * Create a new ST ring named *name* in memory. > + * > + * This function uses ``memzone_reserve()`` to allocate memory. Then it > + * calls rte_st_ring_init() to initialize an empty ring. > + * > + * The new ring size is set to *count*, which must be a power of two. > + * The real usable ring size is *count-1* instead of *count* to > + * differentiate a full ring from an empty ring. > + * > + * The ring is added in RTE_TAILQ_ST_RING list. > + * > + * @param name > + * The name of the ring. > + * @param count > + * The size of the ring (must be a power of 2, > + * unless RTE_ST_RING_F_EXACT_SZ is set in flags). > + * @param socket_id > + * The *socket_id* argument is the socket identifier in case of > + * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA > + * constraint for the reserved zone. > + * @param flags > + * - RTE_ST_RING_F_EXACT_SZ: If this flag is set, the ring will hold e= xactly the > + * requested number of entries, and the requested size will be round= ed up > + * to the next power of two, but the usable space will be exactly th= at > + * requested. Worst case, if a power-of-2 size is requested, half th= e > + * ring space will be wasted. > + * Without this flag set, the ring size requested must be a power of= 2, > + * and the usable space will be that size - 1. > + * @return > + * On success, the pointer to the new allocated ring. NULL on error wi= th > + * rte_errno set appropriately. Possible errno values include: > + * - E_RTE_NO_CONFIG - function could not get pointer to rte_config s= tructure > + * - EINVAL - count provided is not a power of 2 > + * - ENOSPC - the maximum number of memzones has already been allocat= ed > + * - EEXIST - a memzone with the same name already exists > + * - ENOMEM - no appropriate memory area found in which to create mem= zone > + */ > +struct rte_st_ring *rte_st_ring_create(const char *name, unsigned int co= unt, > + int socket_id, unsigned int flags); > + > +/** > + * De-allocate all memory used by the ring. > + * > + * @param r > + * Ring to free. > + * If NULL then, the function does nothing. > + */ > +void rte_st_ring_free(struct rte_st_ring *r); > + > +/** > + * Dump the status of the ring to a file. > + * > + * @param f > + * A pointer to a file for output > + * @param r > + * A pointer to the ring structure. > + */ > +void rte_st_ring_dump(FILE *f, const struct rte_st_ring *r); > + > +/** > + * Enqueue fixed number of objects on a ST ring. > + * > + * This function copies the objects at the head of the ring and > + * moves the head index. > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param n > + * The number of objects to add in the ring from the obj_table. > + * @param free_space > + * if non-NULL, returns the amount of space in the ring after the > + * enqueue operation has finished. > + * @return > + * The number of objects enqueued, either 0 or n > + */ > +static __rte_always_inline unsigned int > +rte_st_ring_enqueue_bulk(struct rte_st_ring *r, void * const *obj_table, > + unsigned int n, unsigned int *free_space) > +{ > + return rte_st_ring_enqueue_bulk_elem(r, obj_table, sizeof(void *), > + n, free_space); > +} > + > +/** > + * Enqueue upto a maximum number of objects on a ST ring. > + * > + * This function copies the objects at the head of the ring and > + * moves the head index. > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param n > + * The number of objects to add in the ring from the obj_table. > + * @param free_space > + * if non-NULL, returns the amount of space in the ring after the > + * enqueue operation has finished. > + * @return > + * - n: Actual number of objects enqueued. > + */ > +static __rte_always_inline unsigned int > +rte_st_ring_enqueue_burst(struct rte_st_ring *r, void * const *obj_table= , > + unsigned int n, unsigned int *free_space) > +{ > + return rte_st_ring_enqueue_burst_elem(r, obj_table, sizeof(void *), > + n, free_space); > +} > + > +/** > + * Enqueue one object on a ST ring. > + * > + * This function copies one object at the head of the ring and > + * moves the head index. > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj > + * A pointer to the object to be added. > + * @return > + * - 0: Success; objects enqueued. > + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is en= queued. > + */ > +static __rte_always_inline int > +rte_st_ring_enqueue(struct rte_st_ring *r, void *obj) > +{ > + return rte_st_ring_enqueue_elem(r, &obj, sizeof(void *)); > +} > + > +/** > + * Enqueue fixed number of objects on a ST ring at the tail. > + * > + * This function copies the objects at the tail of the ring and > + * moves the tail index (backwards). > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param n > + * The number of objects to add in the ring from the obj_table. > + * @param free_space > + * if non-NULL, returns the amount of space in the ring after the > + * enqueue operation has finished. > + * @return > + * The number of objects enqueued, either 0 or n > + */ > +static __rte_always_inline unsigned int > +rte_st_ring_enqueue_at_tail_bulk(struct rte_st_ring *r, > + void * const *obj_table, unsigned int n, > + unsigned int *free_space) > +{ > + return rte_st_ring_enqueue_at_tail_bulk_elem(r, obj_table, > + sizeof(void *), n, free_space); > +} > + > +/** > + * Enqueue upto a maximum number of objects on a ST ring at the tail. > + * > + * This function copies the objects at the tail of the ring and > + * moves the tail index (backwards). > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param n > + * The number of objects to add in the ring from the obj_table. > + * @param free_space > + * if non-NULL, returns the amount of space in the ring after the > + * enqueue operation has finished. > + * @return > + * - n: Actual number of objects enqueued. > + */ > +static __rte_always_inline unsigned int > +rte_st_ring_enqueue_at_tail_burst(struct rte_st_ring *r, > + void * const *obj_table, unsigned int n, > + unsigned int *free_space) > +{ > + return rte_st_ring_enqueue_at_tail_burst_elem(r, obj_table, > + sizeof(void *), n, free_space); > +} > + > +/** > + * Enqueue one object on a ST ring at tail. > + * > + * This function copies one object at the tail of the ring and > + * moves the tail index (backwards). > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj > + * A pointer to the object to be added. > + * @return > + * - 0: Success; objects enqueued. > + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is en= queued. > + */ > +static __rte_always_inline int > +rte_st_ring_enqueue_at_tail(struct rte_st_ring *r, void *obj) > +{ > + return rte_st_ring_enqueue_at_tail_elem(r, &obj, sizeof(void *)); > +} > + > +/** > + * Dequeue a fixed number of objects from a ST ring. > + * > + * This function copies the objects from the tail of the ring and > + * moves the tail index. > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects) that will be fill= ed. > + * @param n > + * The number of objects to dequeue from the ring to the obj_table. > + * @param available > + * If non-NULL, returns the number of remaining ring entries after the > + * dequeue has finished. > + * @return > + * The number of objects dequeued, either 0 or n > + */ > +static __rte_always_inline unsigned int > +rte_st_ring_dequeue_bulk(struct rte_st_ring *r, void **obj_table, unsign= ed int n, > + unsigned int *available) > +{ > + return rte_st_ring_dequeue_bulk_elem(r, obj_table, sizeof(void *), > + n, available); > +} > + > +/** > + * Dequeue upto a maximum number of objects from a ST ring. > + * > + * This function copies the objects from the tail of the ring and > + * moves the tail index. > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects) that will be fill= ed. > + * @param n > + * The number of objects to dequeue from the ring to the obj_table. > + * @param available > + * If non-NULL, returns the number of remaining ring entries after the > + * dequeue has finished. > + * @return > + * - Number of objects dequeued > + */ > +static __rte_always_inline unsigned int > +rte_st_ring_dequeue_burst(struct rte_st_ring *r, void **obj_table, > + unsigned int n, unsigned int *available) > +{ > + return rte_st_ring_dequeue_burst_elem(r, obj_table, sizeof(void *), > + n, available); > +} > + > +/** > + * Dequeue one object from a ST ring. > + * > + * This function copies one object from the tail of the ring and > + * moves the tail index. > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_p > + * A pointer to a void * pointer (object) that will be filled. > + * @return > + * - 0: Success, objects dequeued. > + * - -ENOENT: Not enough entries in the ring to dequeue, no object is > + * dequeued. > + */ > +static __rte_always_inline int > +rte_st_ring_dequeue(struct rte_st_ring *r, void **obj_p) > +{ > + return rte_st_ring_dequeue_elem(r, obj_p, sizeof(void *)); > +} > + > +/** > + * Dequeue a fixed number of objects from a ST ring from the head. > + * > + * This function copies the objects from the head of the ring and > + * moves the head index (backwards). > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects) that will be fill= ed. > + * @param n > + * The number of objects to dequeue from the ring to the obj_table. > + * @param available > + * If non-NULL, returns the number of remaining ring entries after the > + * dequeue has finished. > + * @return > + * The number of objects dequeued, either 0 or n > + */ > +static __rte_always_inline unsigned int > +rte_st_ring_dequeue_at_head_bulk(struct rte_st_ring *r, void **obj_table= , unsigned int n, > + unsigned int *available) > +{ > + return rte_st_ring_dequeue_bulk_elem(r, obj_table, sizeof(void *), > + n, available); > +} > + > +/** > + * Dequeue upto a maximum number of objects from a ST ring from the head= . > + * > + * This function copies the objects from the head of the ring and > + * moves the head index (backwards). > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects) that will be fill= ed. > + * @param n > + * The number of objects to dequeue from the ring to the obj_table. > + * @param available > + * If non-NULL, returns the number of remaining ring entries after the > + * dequeue has finished. > + * @return > + * - Number of objects dequeued > + */ > +static __rte_always_inline unsigned int > +rte_st_ring_dequeue_at_head_burst(struct rte_st_ring *r, void **obj_tabl= e, > + unsigned int n, unsigned int *available) > +{ > + return rte_st_ring_dequeue_burst_elem(r, obj_table, sizeof(void *), > + n, available); > +} > + > +/** > + * Dequeue one object from a ST ring from the head. > + * > + * This function copies the objects from the head of the ring and > + * moves the head index (backwards). > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_p > + * A pointer to a void * pointer (object) that will be filled. > + * @return > + * - 0: Success, objects dequeued. > + * - -ENOENT: Not enough entries in the ring to dequeue, no object is > + * dequeued. > + */ > +static __rte_always_inline int > +rte_st_ring_at_head_dequeue(struct rte_st_ring *r, void **obj_p) > +{ > + return rte_st_ring_dequeue_elem(r, obj_p, sizeof(void *)); > +} > + > +/** > + * Flush a ST ring. > + * > + * This function flush all the elements in a ST ring > + * > + * @warning > + * Make sure the ring is not in use while calling this function. > + * > + * @param r > + * A pointer to the ring structure. > + */ > +void > +rte_st_ring_reset(struct rte_st_ring *r); > + > +/** > + * Return the number of entries in a ST ring. > + * > + * @param r > + * A pointer to the ring structure. > + * @return > + * The number of entries in the ring. > + */ > +static inline unsigned int > +rte_st_ring_count(const struct rte_st_ring *r) > +{ > + uint32_t count =3D (r->head - r->tail) & r->mask; > + return count; > +} > + > +/** > + * Return the number of free entries in a ST ring. > + * > + * @param r > + * A pointer to the ring structure. > + * @return > + * The number of free entries in the ring. > + */ > +static inline unsigned int > +rte_st_ring_free_count(const struct rte_st_ring *r) > +{ > + return r->capacity - rte_st_ring_count(r); > +} > + > +/** > + * Test if a ST ring is full. > + * > + * @param r > + * A pointer to the ring structure. > + * @return > + * - 1: The ring is full. > + * - 0: The ring is not full. > + */ > +static inline int > +rte_st_ring_full(const struct rte_st_ring *r) > +{ > + return rte_st_ring_free_count(r) =3D=3D 0; > +} > + > +/** > + * Test if a ST ring is empty. > + * > + * @param r > + * A pointer to the ring structure. > + * @return > + * - 1: The ring is empty. > + * - 0: The ring is not empty. > + */ > +static inline int > +rte_st_ring_empty(const struct rte_st_ring *r) > +{ > + return r->tail =3D=3D r->head; > +} > + > +/** > + * Return the size of the ring. > + * > + * @param r > + * A pointer to the ring structure. > + * @return > + * The size of the data store used by the ring. > + * NOTE: this is not the same as the usable space in the ring. To quer= y that > + * use ``rte_st_ring_get_capacity()``. > + */ > +static inline unsigned int > +rte_st_ring_get_size(const struct rte_st_ring *r) > +{ > + return r->size; > +} > + > +/** > + * Return the number of elements which can be stored in the ring. > + * > + * @param r > + * A pointer to the ring structure. > + * @return > + * The usable size of the ring. > + */ > +static inline unsigned int > +rte_st_ring_get_capacity(const struct rte_st_ring *r) > +{ > + return r->capacity; > +} > + > +/** > + * Dump the status of all rings on the console > + * > + * @param f > + * A pointer to a file for output > + */ > +void rte_st_ring_list_dump(FILE *f); > + > +/** > + * Search a ST ring from its name > + * > + * @param name > + * The name of the ring. > + * @return > + * The pointer to the ring matching the name, or NULL if not found, > + * with rte_errno set appropriately. Possible rte_errno values include= : > + * - ENOENT - required entry not available to return. > + */ > +struct rte_st_ring *rte_st_ring_lookup(const char *name); > + > +#ifdef __cplusplus > +} > +#endif > + > +#endif /* _RTE_ST_RING_H_ */ > -- > 2.25.1 >=20