From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mail.droids-corp.org (zoll.droids-corp.org [94.23.50.67]) by dpdk.org (Postfix) with ESMTP id E4A5029C6 for ; Mon, 25 Feb 2019 12:28:11 +0100 (CET) Received: from lfbn-1-5920-128.w90-110.abo.wanadoo.fr ([90.110.126.128] helo=droids-corp.org) by mail.droids-corp.org with esmtpsa (TLS1.0:RSA_AES_256_CBC_SHA1:256) (Exim 4.89) (envelope-from ) id 1gyESh-0001UY-7K; Mon, 25 Feb 2019 12:30:20 +0100 Received: by droids-corp.org (sSMTP sendmail emulation); Mon, 25 Feb 2019 12:28:06 +0100 Date: Mon, 25 Feb 2019 12:28:06 +0100 From: Olivier Matz To: Gage Eads Cc: dev@dpdk.org, arybchenko@solarflare.com, bruce.richardson@intel.com, konstantin.ananyev@intel.com, gavin.hu@arm.com, Honnappa.Nagarahalli@arm.com, nd@arm.com, thomas@monjalon.net Message-ID: <20190225112806.vu3ynlswon3t5zer@platinum> References: <20190222160655.3346-1-gage.eads@intel.com> <20190222160655.3346-6-gage.eads@intel.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <20190222160655.3346-6-gage.eads@intel.com> User-Agent: NeoMutt/20170113 (1.7.2) Subject: Re: [dpdk-dev] [PATCH 5/7] stack: add non-blocking stack implementation X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 25 Feb 2019 11:28:12 -0000 On Fri, Feb 22, 2019 at 10:06:53AM -0600, Gage Eads wrote: > This commit adds support for a non-blocking (linked list based) stack to > the stack API. This behavior is selected through a new rte_stack_create() > flag, STACK_F_NB. > > The stack consists of a linked list of elements, each containing a data > pointer and a next pointer, and an atomic stack depth counter. > > The non-blocking push operation enqueues a linked list of pointers by > pointing the tail of the list to the current stack head, and using a CAS to > swing the stack head pointer to the head of the list. The operation retries > if it is unsuccessful (i.e. the list changed between reading the head and > modifying it), else it adjusts the stack length and returns. > > The non-blocking pop operation first reserves num elements by adjusting the > stack length, to ensure the dequeue operation will succeed without > blocking. It then dequeues pointers by walking the list -- starting from > the head -- then swinging the head pointer (using a CAS as well). While > walking the list, the data pointers are recorded in an object table. > > This algorithm stack uses a 128-bit compare-and-swap instruction, which > atomically updates the stack top pointer and a modification counter, to > protect against the ABA problem. > > The linked list elements themselves are maintained in a non-blocking LIFO, > and are allocated before stack pushes and freed after stack pops. Since the > stack has a fixed maximum depth, these elements do not need to be > dynamically created. > > Signed-off-by: Gage Eads [...] > diff --git a/doc/guides/prog_guide/stack_lib.rst b/doc/guides/prog_guide/stack_lib.rst > index 51689cfe1..86fdc0a9b 100644 > --- a/doc/guides/prog_guide/stack_lib.rst > +++ b/doc/guides/prog_guide/stack_lib.rst > @@ -9,7 +9,7 @@ pointers. > > The stack library provides the following basic operations: > > -* Create a uniquely named stack of a user-specified size and using a user-specified socket. > +* Create a uniquely named stack of a user-specified size and using a user-specified socket, with either lock-based or non-blocking behavior. > > * Push and pop a burst of one or more stack objects (pointers). These function are multi-threading safe. > Same comment about 80-cols than in the first patch. [...] > --- a/lib/librte_stack/rte_stack.c > +++ b/lib/librte_stack/rte_stack.c > @@ -26,27 +26,46 @@ static struct rte_tailq_elem rte_stack_tailq = { > EAL_REGISTER_TAILQ(rte_stack_tailq) > > static void > +nb_lifo_init(struct rte_stack *s, unsigned int count) > +{ > + struct rte_nb_lifo_elem *elems; > + unsigned int i; > + > + elems = (struct rte_nb_lifo_elem *)&s[1]; > + for (i = 0; i < count; i++) > + __nb_lifo_push(&s->nb_lifo.free, &elems[i], &elems[i], 1); > +} Would it be possible to add: struct rte_nb_lifo { /** LIFO list of elements */ struct rte_nb_lifo_list used __rte_cache_aligned; /** LIFO list of free elements */ struct rte_nb_lifo_list free __rte_cache_aligned; + struct rte_nb_lifo_elem elems[]; }; I think it is more consistent with the non-blocking structure. [...] > --- a/lib/librte_stack/rte_stack.h > +++ b/lib/librte_stack/rte_stack.h > @@ -29,6 +29,33 @@ extern "C" { > #define RTE_STACK_NAMESIZE (RTE_MEMZONE_NAMESIZE - \ > sizeof(RTE_STACK_MZ_PREFIX) + 1) > > +struct rte_nb_lifo_elem { > + void *data; /**< Data pointer */ > + struct rte_nb_lifo_elem *next; /**< Next pointer */ > +}; > + > +struct rte_nb_lifo_head { > + struct rte_nb_lifo_elem *top; /**< Stack top */ > + uint64_t cnt; /**< Modification counter for avoiding ABA problem */ > +}; > + > +struct rte_nb_lifo_list { > + /** List head */ > + struct rte_nb_lifo_head head __rte_aligned(16); > + /** List len */ > + rte_atomic64_t len; > +}; > + > +/* Structure containing two non-blocking LIFO lists: the stack itself and a > + * list of free linked-list elements. > + */ > +struct rte_nb_lifo { > + /** LIFO list of elements */ > + struct rte_nb_lifo_list used __rte_cache_aligned; > + /** LIFO list of free elements */ > + struct rte_nb_lifo_list free __rte_cache_aligned; > +}; > + The names "rte_nb_lifo*" bothers me a bit. I think a more usual name format is "rte__". What would you think about names like this? rte_nb_lifo -> rte_stack_nb rte_nb_lifo_elem -> rte_stack_nb_elem rte_nb_lifo_head -> rte_stack_nb_head rte_nb_lifo_list -> rte_stack_nb_list rte_lifo -> rte_stack_std I even wonder if "nonblock", "noblk", or "lockless" shouldn't be used in place of "nb" (which is also a common abbreviation for number). This can also applies to the STACK_F_NB flag name. [...] > /* Structure containing the LIFO, its current length, and a lock for mutual > * exclusion. > */ > @@ -48,10 +75,69 @@ struct rte_stack { > const struct rte_memzone *memzone; > uint32_t capacity; /**< Usable size of the stack */ > uint32_t flags; /**< Flags supplied at creation */ > - struct rte_lifo lifo; /**< LIFO structure */ > + RTE_STD_C11 > + union { > + struct rte_nb_lifo nb_lifo; /**< Non-blocking LIFO structure */ > + struct rte_lifo lifo; /**< LIFO structure */ > + }; > } __rte_cache_aligned; > > /** > + * The stack uses non-blocking push and pop functions. This flag is only > + * supported on x86_64 platforms, currently. > + */ > +#define STACK_F_NB 0x0001 What about adding the RTE_ prefix? > +static __rte_always_inline unsigned int __rte_experimental > +rte_nb_lifo_push(struct rte_stack *s, void * const *obj_table, unsigned int n) > +{ > + struct rte_nb_lifo_elem *tmp, *first, *last = NULL; > + unsigned int i; > + > + if (unlikely(n == 0)) > + return 0; > + > + /* Pop n free elements */ > + first = __nb_lifo_pop(&s->nb_lifo.free, n, NULL, NULL); > + if (unlikely(first == NULL)) > + return 0; > + > + /* Construct the list elements */ > + tmp = first; > + for (i = 0; i < n; i++) { > + tmp->data = obj_table[n - i - 1]; > + last = tmp; > + tmp = tmp->next; > + } > + > + /* Push them to the used list */ > + __nb_lifo_push(&s->nb_lifo.used, first, last, n); > + > + return n; > +} Here, I didn't get why "last" is not retrieved through __nb_lifo_pop(), like it's done in rte_nb_lifo_pop(). Is there a reason for that? [...] > --- /dev/null > +++ b/lib/librte_stack/rte_stack_c11_mem.h For the c11 memory model, please consider having an additional reviewer ;) [...] > --- /dev/null > +++ b/lib/librte_stack/rte_stack_generic.h > @@ -0,0 +1,157 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * Copyright(c) 2019 Intel Corporation > + */ > + > +#ifndef _NB_LIFO_GENERIC_H_ > +#define _NB_LIFO_GENERIC_H_ > + > +#include > +#include > + > +static __rte_always_inline unsigned int > +rte_nb_lifo_len(struct rte_stack *s) > +{ > + /* nb_lifo_push() and nb_lifo_pop() do not update the list's contents > + * and nb_lifo->len atomically, which can cause the list to appear > + * shorter than it actually is if this function is called while other > + * threads are modifying the list. > + * > + * However, given the inherently approximate nature of the get_count > + * callback -- even if the list and its size were updated atomically, > + * the size could change between when get_count executes and when the > + * value is returned to the caller -- this is acceptable. > + * > + * The nb_lifo->len updates are placed such that the list may appear to > + * have fewer elements than it does, but will never appear to have more > + * elements. If the mempool is near-empty to the point that this is a > + * concern, the user should consider increasing the mempool size. > + */ > + return (unsigned int)rte_atomic64_read(&s->nb_lifo.used.len); > +} > + > +static __rte_always_inline void > +__nb_lifo_push(struct rte_nb_lifo_list *lifo, > + struct rte_nb_lifo_elem *first, > + struct rte_nb_lifo_elem *last, > + unsigned int num) > +{ > +#ifndef RTE_ARCH_X86_64 > + RTE_SET_USED(first); > + RTE_SET_USED(last); > + RTE_SET_USED(lifo); > + RTE_SET_USED(num); > +#else > + struct rte_nb_lifo_head old_head; > + int success; > + > + old_head = lifo->head; > + > + do { > + struct rte_nb_lifo_head new_head; > + > + /* Swing the top pointer to the first element in the list and > + * make the last element point to the old top. > + */ > + new_head.top = first; > + new_head.cnt = old_head.cnt + 1; > + > + last->next = old_head.top; > + > + /* Ensure the list entry writes are visible before pushing them > + * to the stack. > + */ > + rte_wmb(); > + > + /* old_head is updated on failure */ > + success = rte_atomic128_cmpxchg((rte_int128_t *)&lifo->head, > + (rte_int128_t *)&old_head, > + (rte_int128_t *)&new_head, > + 1, __ATOMIC_RELEASE, > + __ATOMIC_RELAXED); > + } while (success == 0); > + > + rte_atomic64_add(&lifo->len, num); > +#endif > +} > + > +static __rte_always_inline struct rte_nb_lifo_elem * > +__nb_lifo_pop(struct rte_nb_lifo_list *lifo, > + unsigned int num, > + void **obj_table, > + struct rte_nb_lifo_elem **last) > +{ > +#ifndef RTE_ARCH_X86_64 > + RTE_SET_USED(obj_table); > + RTE_SET_USED(last); > + RTE_SET_USED(lifo); > + RTE_SET_USED(num); > + > + return NULL; > +#else > + struct rte_nb_lifo_head old_head; > + int success; > + > + /* Reserve num elements, if available */ > + while (1) { > + uint64_t len = rte_atomic64_read(&lifo->len); > + > + /* Does the list contain enough elements? */ > + if (unlikely(len < num)) > + return NULL; > + > + if (rte_atomic64_cmpset((volatile uint64_t *)&lifo->len, > + len, len - num)) > + break; > + } > + Here, accessing the length with a compare and set costs probably more than a standard atomic sub fonction. I understand that was done for the reason described above: The nb_lifo->len updates are placed such that the list may appear to have fewer elements than it does, but will never appear to have more elements. Another strategy could be to use a rte_atomic64_sub() after the effective pop and change rte_nb_lifo_len() to bound the result to [0:size].