From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id DBF20A2EFC for ; Mon, 14 Oct 2019 21:42:00 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 3592C1C29D; Mon, 14 Oct 2019 21:42:00 +0200 (CEST) Received: from mga17.intel.com (mga17.intel.com [192.55.52.151]) by dpdk.org (Postfix) with ESMTP id B5A841C235 for ; Mon, 14 Oct 2019 21:41:58 +0200 (CEST) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga002.fm.intel.com ([10.253.24.26]) by fmsmga107.fm.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 14 Oct 2019 12:41:57 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.67,296,1566889200"; d="scan'208";a="225179215" Received: from irsmsx103.ger.corp.intel.com ([163.33.3.157]) by fmsmga002.fm.intel.com with ESMTP; 14 Oct 2019 12:41:54 -0700 Received: from irsmsx104.ger.corp.intel.com ([169.254.5.252]) by IRSMSX103.ger.corp.intel.com ([169.254.3.139]) with mapi id 14.03.0439.000; Mon, 14 Oct 2019 20:41:53 +0100 From: "Ananyev, Konstantin" To: Honnappa Nagarahalli , "olivier.matz@6wind.com" , "sthemmin@microsoft.com" , "jerinj@marvell.com" , "Richardson, Bruce" , "david.marchand@redhat.com" , "pbhagavatula@marvell.com" CC: "dev@dpdk.org" , Dharmik Thakkar , "Ruifeng Wang (Arm Technology China)" , "Gavin Hu (Arm Technology China)" , "stephen@networkplumber.org" , nd , nd Thread-Topic: [PATCH v4 1/2] lib/ring: apis to support configurable element size Thread-Index: AQHVfkvmrG5Kyu6ZXEWJeUYrxswBkqdVxOuAgASfoeA= Date: Mon, 14 Oct 2019 19:41:52 +0000 Message-ID: <2601191342CEEE43887BDE71AB97725801A8C68545@IRSMSX104.ger.corp.intel.com> References: <20190906190510.11146-1-honnappa.nagarahalli@arm.com> <20191009024709.38144-1-honnappa.nagarahalli@arm.com> <20191009024709.38144-2-honnappa.nagarahalli@arm.com> In-Reply-To: Accept-Language: en-IE, en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: x-titus-metadata-40: eyJDYXRlZ29yeUxhYmVscyI6IiIsIk1ldGFkYXRhIjp7Im5zIjoiaHR0cDpcL1wvd3d3LnRpdHVzLmNvbVwvbnNcL0ludGVsMyIsImlkIjoiNTNmYTM2ZjctNTI2Yy00YWFjLWEzNmQtN2VlYzBiMGIzYjFmIiwicHJvcHMiOlt7Im4iOiJDVFBDbGFzc2lmaWNhdGlvbiIsInZhbHMiOlt7InZhbHVlIjoiQ1RQX05UIn1dfV19LCJTdWJqZWN0TGFiZWxzIjpbXSwiVE1DVmVyc2lvbiI6IjE3LjEwLjE4MDQuNDkiLCJUcnVzdGVkTGFiZWxIYXNoIjoib29wUkdURzFiU2t4QlNRR3BieHpuSmZjMndVd2ozTVhlNzZDdFVKMjhhTitTSnBYRG12UnQ2a2h2TGJXa2dVNyJ9 x-ctpclassification: CTP_NT dlp-product: dlpe-windows dlp-version: 11.2.0.6 dlp-reaction: no-action x-originating-ip: [163.33.239.181] Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: quoted-printable MIME-Version: 1.0 Subject: Re: [dpdk-dev] [PATCH v4 1/2] lib/ring: apis to support configurable element size X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" > > > > Current APIs assume ring elements to be pointers. However, in many use = cases, > > the size can be different. Add new APIs to support configurable ring el= ement > > sizes. > > > > Signed-off-by: Honnappa Nagarahalli > > Reviewed-by: Dharmik Thakkar > > Reviewed-by: Gavin Hu > > Reviewed-by: Ruifeng Wang > > --- > > lib/librte_ring/Makefile | 3 +- > > lib/librte_ring/meson.build | 3 + > > lib/librte_ring/rte_ring.c | 45 +- > > lib/librte_ring/rte_ring.h | 1 + > > lib/librte_ring/rte_ring_elem.h | 946 +++++++++++++++++++++++++++ > > lib/librte_ring/rte_ring_version.map | 2 + > > 6 files changed, 991 insertions(+), 9 deletions(-) create mode 100644 > > lib/librte_ring/rte_ring_elem.h > > > > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index > > 21a36770d..515a967bb 100644 > > --- a/lib/librte_ring/Makefile > > +++ b/lib/librte_ring/Makefile > > @@ -6,7 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk # library name LIB = =3D > > librte_ring.a > > > > -CFLAGS +=3D $(WERROR_FLAGS) -I$(SRCDIR) -O3 > > +CFLAGS +=3D $(WERROR_FLAGS) -I$(SRCDIR) -O3 - > > DALLOW_EXPERIMENTAL_API > > LDLIBS +=3D -lrte_eal > > > > EXPORT_MAP :=3D rte_ring_version.map > > @@ -18,6 +18,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_RING) :=3D rte_ring.c > > > > # install includes > > SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include :=3D rte_ring.h \ > > + rte_ring_elem.h \ > > rte_ring_generic.h \ > > rte_ring_c11_mem.h > > > > diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build = index > > ab8b0b469..74219840a 100644 > > --- a/lib/librte_ring/meson.build > > +++ b/lib/librte_ring/meson.build > > @@ -6,3 +6,6 @@ sources =3D files('rte_ring.c') headers =3D files('rte= _ring.h', > > 'rte_ring_c11_mem.h', > > 'rte_ring_generic.h') > > + > > +# rte_ring_create_elem and rte_ring_get_memsize_elem are experimental > > +allow_experimental_apis =3D true > > diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c in= dex > > d9b308036..6fed3648b 100644 > > --- a/lib/librte_ring/rte_ring.c > > +++ b/lib/librte_ring/rte_ring.c > > @@ -33,6 +33,7 @@ > > #include > > > > #include "rte_ring.h" > > +#include "rte_ring_elem.h" > > > > TAILQ_HEAD(rte_ring_list, rte_tailq_entry); > > > > @@ -46,23 +47,42 @@ EAL_REGISTER_TAILQ(rte_ring_tailq) > > > > /* return the size of memory occupied by a ring */ ssize_t - > > rte_ring_get_memsize(unsigned count) > > +rte_ring_get_memsize_elem(unsigned count, unsigned esize) > > { > > ssize_t sz; > > > > + /* Supported esize values are 4/8/16. > > + * Others can be added on need basis. > > + */ > > + if ((esize !=3D 4) && (esize !=3D 8) && (esize !=3D 16)) { > > + RTE_LOG(ERR, RING, > > + "Unsupported esize value. Supported values are 4, 8 > > and 16\n"); > > + > > + return -EINVAL; > > + } > > + > > /* count must be a power of 2 */ > > if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) { > > RTE_LOG(ERR, RING, > > - "Requested size is invalid, must be power of 2, and " > > - "do not exceed the size limit %u\n", > > RTE_RING_SZ_MASK); > > + "Requested number of elements is invalid, must be " > > + "power of 2, and do not exceed the limit %u\n", > > + RTE_RING_SZ_MASK); > > + > > return -EINVAL; > > } > > > > - sz =3D sizeof(struct rte_ring) + count * sizeof(void *); > > + sz =3D sizeof(struct rte_ring) + count * esize; > > sz =3D RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); > > return sz; > > } > > > > +/* return the size of memory occupied by a ring */ ssize_t > > +rte_ring_get_memsize(unsigned count) { > > + return rte_ring_get_memsize_elem(count, sizeof(void *)); } > > + > > void > > rte_ring_reset(struct rte_ring *r) > > { > > @@ -114,10 +134,10 @@ rte_ring_init(struct rte_ring *r, const char *nam= e, > > unsigned count, > > return 0; > > } > > > > -/* create the ring */ > > +/* create the ring for a given element size */ > > struct rte_ring * > > -rte_ring_create(const char *name, unsigned count, int socket_id, > > - unsigned flags) > > +rte_ring_create_elem(const char *name, unsigned count, unsigned esize, > > + int socket_id, unsigned flags) > > { > > char mz_name[RTE_MEMZONE_NAMESIZE]; > > struct rte_ring *r; > > @@ -135,7 +155,7 @@ rte_ring_create(const char *name, unsigned count, > > int socket_id, > > if (flags & RING_F_EXACT_SZ) > > count =3D rte_align32pow2(count + 1); > > > > - ring_size =3D rte_ring_get_memsize(count); > > + ring_size =3D rte_ring_get_memsize_elem(count, esize); > > if (ring_size < 0) { > > rte_errno =3D ring_size; > > return NULL; > > @@ -182,6 +202,15 @@ rte_ring_create(const char *name, unsigned count, > > int socket_id, > > return r; > > } > > > > +/* create the ring */ > > +struct rte_ring * > > +rte_ring_create(const char *name, unsigned count, int socket_id, > > + unsigned flags) > > +{ > > + return rte_ring_create_elem(name, count, sizeof(void *), socket_id, > > + flags); > > +} > > + > > /* free the ring */ > > void > > rte_ring_free(struct rte_ring *r) > > diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h in= dex > > 2a9f768a1..18fc5d845 100644 > > --- a/lib/librte_ring/rte_ring.h > > +++ b/lib/librte_ring/rte_ring.h > > @@ -216,6 +216,7 @@ int rte_ring_init(struct rte_ring *r, const char *n= ame, > > unsigned count, > > */ > > struct rte_ring *rte_ring_create(const char *name, unsigned count, > > int socket_id, unsigned flags); > > + > > /** > > * De-allocate all memory used by the ring. > > * > > diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring= _elem.h > > new file mode 100644 index 000000000..860f059ad > > --- /dev/null > > +++ b/lib/librte_ring/rte_ring_elem.h > > @@ -0,0 +1,946 @@ > > +/* SPDX-License-Identifier: BSD-3-Clause > > + * > > + * Copyright (c) 2019 Arm Limited > > + * Copyright (c) 2010-2017 Intel Corporation > > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org > > + * All rights reserved. > > + * Derived from FreeBSD's bufring.h > > + * Used as BSD-3 Licensed with permission from Kip Macy. > > + */ > > + > > +#ifndef _RTE_RING_ELEM_H_ > > +#define _RTE_RING_ELEM_H_ > > + > > +/** > > + * @file > > + * RTE Ring with flexible element size > > + */ > > + > > +#ifdef __cplusplus > > +extern "C" { > > +#endif > > + > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > + > > +#include "rte_ring.h" > > + > > +/** > > + * @warning > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * > > + * Calculate the memory size needed for a ring with given element size > > + * > > + * This function returns the number of bytes needed for a ring, given > > + * the number of elements in it and the size of the element. This valu= e > > + * is the sum of the size of the structure rte_ring and the size of th= e > > + * memory needed for storing the elements. The value is aligned to a > > +cache > > + * line size. > > + * > > + * @param count > > + * The number of elements in the ring (must be a power of 2). > > + * @param esize > > + * The size of ring element, in bytes. It must be a multiple of 4. > > + * Currently, sizes 4, 8 and 16 are supported. > > + * @return > > + * - The memory size needed for the ring on success. > > + * - -EINVAL if count is not a power of 2. > > + */ > > +__rte_experimental > > +ssize_t rte_ring_get_memsize_elem(unsigned count, unsigned esize); > > + > > +/** > > + * @warning > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * > > + * Create a new ring named *name* that stores elements with given size= . > > + * > > + * This function uses ``memzone_reserve()`` to allocate memory. Then i= t > > + * calls rte_ring_init() to initialize an empty ring. > > + * > > + * The new ring size is set to *count*, which must be a power of > > + * two. Water marking is disabled by default. The real usable ring siz= e > > + * is *count-1* instead of *count* to differentiate a free ring from a= n > > + * empty ring. > > + * > > + * The ring is added in RTE_TAILQ_RING list. > > + * > > + * @param name > > + * The name of the ring. > > + * @param count > > + * The number of elements in the ring (must be a power of 2). > > + * @param esize > > + * The size of ring element, in bytes. It must be a multiple of 4. > > + * Currently, sizes 4, 8 and 16 are supported. > > + * @param socket_id > > + * The *socket_id* argument is the socket identifier in case of > > + * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA > > + * constraint for the reserved zone. > > + * @param flags > > + * An OR of the following: > > + * - RING_F_SP_ENQ: If this flag is set, the default behavior when > > + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` > > + * is "single-producer". Otherwise, it is "multi-producers". > > + * - RING_F_SC_DEQ: If this flag is set, the default behavior when > > + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` > > + * is "single-consumer". Otherwise, it is "multi-consumers". > > + * @return > > + * On success, the pointer to the new allocated ring. NULL on error = with > > + * rte_errno set appropriately. Possible errno values include: > > + * - E_RTE_NO_CONFIG - function could not get pointer to rte_config > > structure > > + * - E_RTE_SECONDARY - function was called from a secondary process > > instance > > + * - EINVAL - count provided is not a power of 2 > > + * - ENOSPC - the maximum number of memzones has already been > > allocated > > + * - EEXIST - a memzone with the same name already exists > > + * - ENOMEM - no appropriate memory area found in which to create > > memzone > > + */ > > +__rte_experimental > > +struct rte_ring *rte_ring_create_elem(const char *name, unsigned count= , > > + unsigned esize, int socket_id, unsigned flags); > > + > > +/* the actual enqueue of pointers on the ring. > > + * Placed here since identical code needed in both > > + * single and multi producer enqueue functions. > > + */ > > +#define ENQUEUE_PTRS_ELEM(r, ring_start, prod_head, obj_table, esize, = n) > > do { \ > > + if (esize =3D=3D 4) \ > > + ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n); \ > > + else if (esize =3D=3D 8) \ > > + ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n); \ > > + else if (esize =3D=3D 16) \ > > + ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, n); \ } > > while > > +(0) > > + > > +#define ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n) do { \ > > + unsigned int i; \ > > + const uint32_t size =3D (r)->size; \ > > + uint32_t idx =3D prod_head & (r)->mask; \ > > + uint32_t *ring =3D (uint32_t *)ring_start; \ > > + uint32_t *obj =3D (uint32_t *)obj_table; \ > > + if (likely(idx + n < size)) { \ > > + for (i =3D 0; i < (n & ((~(unsigned)0x7))); i +=3D 8, idx +=3D 8) { = \ > > + ring[idx] =3D obj[i]; \ > > + ring[idx + 1] =3D obj[i + 1]; \ > > + ring[idx + 2] =3D obj[i + 2]; \ > > + ring[idx + 3] =3D obj[i + 3]; \ > > + ring[idx + 4] =3D obj[i + 4]; \ > > + ring[idx + 5] =3D obj[i + 5]; \ > > + ring[idx + 6] =3D obj[i + 6]; \ > > + ring[idx + 7] =3D obj[i + 7]; \ > > + } \ > > + switch (n & 0x7) { \ > > + case 7: \ > > + ring[idx++] =3D obj[i++]; /* fallthrough */ \ > > + case 6: \ > > + ring[idx++] =3D obj[i++]; /* fallthrough */ \ > > + case 5: \ > > + ring[idx++] =3D obj[i++]; /* fallthrough */ \ > > + case 4: \ > > + ring[idx++] =3D obj[i++]; /* fallthrough */ \ > > + case 3: \ > > + ring[idx++] =3D obj[i++]; /* fallthrough */ \ > > + case 2: \ > > + ring[idx++] =3D obj[i++]; /* fallthrough */ \ > > + case 1: \ > > + ring[idx++] =3D obj[i++]; /* fallthrough */ \ > > + } \ > > + } else { \ > > + for (i =3D 0; idx < size; i++, idx++)\ > > + ring[idx] =3D obj[i]; \ > > + for (idx =3D 0; i < n; i++, idx++) \ > > + ring[idx] =3D obj[i]; \ > > + } \ > > +} while (0) > > + > > +#define ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n) do { \ > > + unsigned int i; \ > > + const uint32_t size =3D (r)->size; \ > > + uint32_t idx =3D prod_head & (r)->mask; \ > > + uint64_t *ring =3D (uint64_t *)ring_start; \ > > + uint64_t *obj =3D (uint64_t *)obj_table; \ > > + if (likely(idx + n < size)) { \ > > + for (i =3D 0; i < (n & ((~(unsigned)0x3))); i +=3D 4, idx +=3D 4) { = \ > > + ring[idx] =3D obj[i]; \ > > + ring[idx + 1] =3D obj[i + 1]; \ > > + ring[idx + 2] =3D obj[i + 2]; \ > > + ring[idx + 3] =3D obj[i + 3]; \ > > + } \ > > + switch (n & 0x3) { \ > > + case 3: \ > > + ring[idx++] =3D obj[i++]; /* fallthrough */ \ > > + case 2: \ > > + ring[idx++] =3D obj[i++]; /* fallthrough */ \ > > + case 1: \ > > + ring[idx++] =3D obj[i++]; \ > > + } \ > > + } else { \ > > + for (i =3D 0; idx < size; i++, idx++)\ > > + ring[idx] =3D obj[i]; \ > > + for (idx =3D 0; i < n; i++, idx++) \ > > + ring[idx] =3D obj[i]; \ > > + } \ > > +} while (0) > > + > > +#define ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, n) do { = \ > > + unsigned int i; \ > > + const uint32_t size =3D (r)->size; \ > > + uint32_t idx =3D prod_head & (r)->mask; \ > > + __uint128_t *ring =3D (__uint128_t *)ring_start; \ > > + __uint128_t *obj =3D (__uint128_t *)obj_table; \ > > + if (likely(idx + n < size)) { \ > > + for (i =3D 0; i < (n >> 1); i +=3D 2, idx +=3D 2) { \ > > + ring[idx] =3D obj[i]; \ > > + ring[idx + 1] =3D obj[i + 1]; \ > > + } \ > > + switch (n & 0x1) { \ > > + case 1: \ > > + ring[idx++] =3D obj[i++]; \ > > + } \ > > + } else { \ > > + for (i =3D 0; idx < size; i++, idx++)\ > > + ring[idx] =3D obj[i]; \ > > + for (idx =3D 0; i < n; i++, idx++) \ > > + ring[idx] =3D obj[i]; \ > > + } \ > > +} while (0) > > + > > +/* the actual copy of pointers on the ring to obj_table. > > + * Placed here since identical code needed in both > > + * single and multi consumer dequeue functions. > > + */ > > +#define DEQUEUE_PTRS_ELEM(r, ring_start, cons_head, obj_table, esize, = n) > > do { \ > > + if (esize =3D=3D 4) \ > > + DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n); \ > > + else if (esize =3D=3D 8) \ > > + DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n); \ > > + else if (esize =3D=3D 16) \ > > + DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, n); \ } > > while > > +(0) > > + > > +#define DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n) do { \ > > + unsigned int i; \ > > + uint32_t idx =3D cons_head & (r)->mask; \ > > + const uint32_t size =3D (r)->size; \ > > + uint32_t *ring =3D (uint32_t *)ring_start; \ > > + uint32_t *obj =3D (uint32_t *)obj_table; \ > > + if (likely(idx + n < size)) { \ > > + for (i =3D 0; i < (n & (~(unsigned)0x7)); i +=3D 8, idx +=3D 8) {\ > > + obj[i] =3D ring[idx]; \ > > + obj[i + 1] =3D ring[idx + 1]; \ > > + obj[i + 2] =3D ring[idx + 2]; \ > > + obj[i + 3] =3D ring[idx + 3]; \ > > + obj[i + 4] =3D ring[idx + 4]; \ > > + obj[i + 5] =3D ring[idx + 5]; \ > > + obj[i + 6] =3D ring[idx + 6]; \ > > + obj[i + 7] =3D ring[idx + 7]; \ > > + } \ > > + switch (n & 0x7) { \ > > + case 7: \ > > + obj[i++] =3D ring[idx++]; /* fallthrough */ \ > > + case 6: \ > > + obj[i++] =3D ring[idx++]; /* fallthrough */ \ > > + case 5: \ > > + obj[i++] =3D ring[idx++]; /* fallthrough */ \ > > + case 4: \ > > + obj[i++] =3D ring[idx++]; /* fallthrough */ \ > > + case 3: \ > > + obj[i++] =3D ring[idx++]; /* fallthrough */ \ > > + case 2: \ > > + obj[i++] =3D ring[idx++]; /* fallthrough */ \ > > + case 1: \ > > + obj[i++] =3D ring[idx++]; /* fallthrough */ \ > > + } \ > > + } else { \ > > + for (i =3D 0; idx < size; i++, idx++) \ > > + obj[i] =3D ring[idx]; \ > > + for (idx =3D 0; i < n; i++, idx++) \ > > + obj[i] =3D ring[idx]; \ > > + } \ > > +} while (0) > > + > > +#define DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n) do { \ > > + unsigned int i; \ > > + uint32_t idx =3D cons_head & (r)->mask; \ > > + const uint32_t size =3D (r)->size; \ > > + uint64_t *ring =3D (uint64_t *)ring_start; \ > > + uint64_t *obj =3D (uint64_t *)obj_table; \ > > + if (likely(idx + n < size)) { \ > > + for (i =3D 0; i < (n & (~(unsigned)0x3)); i +=3D 4, idx +=3D 4) {\ > > + obj[i] =3D ring[idx]; \ > > + obj[i + 1] =3D ring[idx + 1]; \ > > + obj[i + 2] =3D ring[idx + 2]; \ > > + obj[i + 3] =3D ring[idx + 3]; \ > > + } \ > > + switch (n & 0x3) { \ > > + case 3: \ > > + obj[i++] =3D ring[idx++]; /* fallthrough */ \ > > + case 2: \ > > + obj[i++] =3D ring[idx++]; /* fallthrough */ \ > > + case 1: \ > > + obj[i++] =3D ring[idx++]; \ > > + } \ > > + } else { \ > > + for (i =3D 0; idx < size; i++, idx++) \ > > + obj[i] =3D ring[idx]; \ > > + for (idx =3D 0; i < n; i++, idx++) \ > > + obj[i] =3D ring[idx]; \ > > + } \ > > +} while (0) > > + > > +#define DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, n) do { = \ > > + unsigned int i; \ > > + uint32_t idx =3D cons_head & (r)->mask; \ > > + const uint32_t size =3D (r)->size; \ > > + __uint128_t *ring =3D (__uint128_t *)ring_start; \ > > + __uint128_t *obj =3D (__uint128_t *)obj_table; \ > > + if (likely(idx + n < size)) { \ > > + for (i =3D 0; i < (n >> 1); i +=3D 2, idx +=3D 2) { \ > > + obj[i] =3D ring[idx]; \ > > + obj[i + 1] =3D ring[idx + 1]; \ > > + } \ > > + switch (n & 0x1) { \ > > + case 1: \ > > + obj[i++] =3D ring[idx++]; /* fallthrough */ \ > > + } \ > > + } else { \ > > + for (i =3D 0; idx < size; i++, idx++) \ > > + obj[i] =3D ring[idx]; \ > > + for (idx =3D 0; i < n; i++, idx++) \ > > + obj[i] =3D ring[idx]; \ > > + } \ > > +} while (0) > > + > > +/* Between load and load. there might be cpu reorder in weak model > > + * (powerpc/arm). > > + * There are 2 choices for the users > > + * 1.use rmb() memory barrier > > + * 2.use one-direction load_acquire/store_release barrier,defined by > > + * CONFIG_RTE_USE_C11_MEM_MODEL=3Dy > > + * It depends on performance test results. > > + * By default, move common functions to rte_ring_generic.h */ #ifdef > > +RTE_USE_C11_MEM_MODEL #include "rte_ring_c11_mem.h" > > +#else > > +#include "rte_ring_generic.h" > > +#endif > > + > > +/** > > + * @internal Enqueue several objects on the ring > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects). > > + * @param esize > > + * The size of ring element, in bytes. It must be a multiple of 4. > > + * Currently, sizes 4, 8 and 16 are supported. This should be the sa= me > > + * as passed while creating the ring, otherwise the results are unde= fined. > > + * @param n > > + * The number of objects to add in the ring from the obj_table. > > + * @param behavior > > + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a r= ing > > + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from > > ring > > + * @param is_sp > > + * Indicates whether to use single producer or multi-producer head u= pdate > > + * @param free_space > > + * returns the amount of space after the enqueue operation has finis= hed > > + * @return > > + * Actual number of objects enqueued. > > + * If behavior =3D=3D RTE_RING_QUEUE_FIXED, this will be 0 or n only= . > > + */ > > +static __rte_always_inline unsigned int > > +__rte_ring_do_enqueue_elem(struct rte_ring *r, void * const obj_table, > > + unsigned int esize, unsigned int n, > > + enum rte_ring_queue_behavior behavior, unsigned int is_sp, > > + unsigned int *free_space) I like the idea to add esize as an argument to the public API, so the compiler can do it's jib optimizing calls with constant esize. Though I am not very happy with the rest of implementation: 1. It doesn't really provide configurable elem size - only 4/8/16B elems ar= e supported. 2. A lot of code duplication with these 3 copies of ENQUEUE/DEQUEUE macros. Looking at ENQUEUE/DEQUEUE macros, I can see that main loop always does 32B copy per iteration. So wonder can we make a generic function that would do 32B copy per iterati= on in a main loop, and copy tail by 4B chunks? That would avoid copy duplication and will allow user to have any elem size (multiple of 4B) he wants. Something like that (note didn't test it, just a rough idea): static inline void copy_elems(uint32_t du32[], const uint32_t su32[], uint32_t num, uint32_t e= size) { uint32_t i, sz; sz =3D (num * esize) / sizeof(uint32_t); for (i =3D 0; i < (sz & ~7); i +=3D 8) memcpy(du32 + i, su32 + i, 8 * sizeof(uint32_t)); switch (sz & 7) { case 7: du32[sz - 7] =3D su32[sz - 7]; /* fallthrough */ case 6: du32[sz - 6] =3D su32[sz - 6]; /* fallthrough */ case 5: du32[sz - 5] =3D su32[sz - 5]; /* fallthrough */ case 4: du32[sz - 4] =3D su32[sz - 4]; /* fallthrough */ case 3: du32[sz - 3] =3D su32[sz - 3]; /* fallthrough */ case 2: du32[sz - 2] =3D su32[sz - 2]; /* fallthrough */ case 1: du32[sz - 1] =3D su32[sz - 1]; /* fallthrough */ } } static inline void enqueue_elems(struct rte_ring *r, void *ring_start, uint32_t prod_head, void *obj_table, uint32_t num, uint32_t esize) { uint32_t idx, n; uint32_t *du32; const uint32_t size =3D r->size; idx =3D prod_head & (r)->mask; du32 =3D ring_start + idx * sizeof(uint32_t); if (idx + num < size) copy_elems(du32, obj_table, num, esize); else { n =3D size - idx; copy_elems(du32, obj_table, n, esize); copy_elems(ring_start, obj_table + n * sizeof(uint32_t), num - n, esize); } } And then, in that function, instead of ENQUEUE_PTRS_ELEM(), just: enqueue_elems(r, &r[1], prod_head, obj_table, n, esize); =20 > > +{ > > + uint32_t prod_head, prod_next; > > + uint32_t free_entries; > > + > > + n =3D __rte_ring_move_prod_head(r, is_sp, n, behavior, > > + &prod_head, &prod_next, &free_entries); > > + if (n =3D=3D 0) > > + goto end; > > + > > + ENQUEUE_PTRS_ELEM(r, &r[1], prod_head, obj_table, esize, n); > > + > > + update_tail(&r->prod, prod_head, prod_next, is_sp, 1); > > +end: > > + if (free_space !=3D NULL) > > + *free_space =3D free_entries - n; > > + return n; > > +} > > +