From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga11.intel.com (mga11.intel.com [192.55.52.93]) by dpdk.org (Postfix) with ESMTP id 6CAF4288C for ; Fri, 18 Jan 2019 16:24:34 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga005.jf.intel.com ([10.7.209.41]) by fmsmga102.fm.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 18 Jan 2019 07:24:33 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.56,491,1539673200"; d="scan'208";a="292610268" Received: from txasoft-yocto.an.intel.com (HELO txasoft-yocto.an.intel.com.) ([10.123.72.192]) by orsmga005.jf.intel.com with ESMTP; 18 Jan 2019 07:24:32 -0800 From: Gage Eads To: dev@dpdk.org Cc: olivier.matz@6wind.com, arybchenko@solarflare.com, bruce.richardson@intel.com, konstantin.ananyev@intel.com, stephen@networkplumber.org Date: Fri, 18 Jan 2019 09:23:22 -0600 Message-Id: <20190118152326.22686-2-gage.eads@intel.com> X-Mailer: git-send-email 2.13.6 In-Reply-To: <20190118152326.22686-1-gage.eads@intel.com> References: <20190115235227.14013-1-gage.eads@intel.com> <20190118152326.22686-1-gage.eads@intel.com> Subject: [dpdk-dev] [PATCH v3 1/5] ring: add 64-bit headtail structure X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Fri, 18 Jan 2019 15:24:35 -0000 64-bit head and tail index widths greatly increases the time it takes for them to wrap-around (with current CPU speeds, it won't happen within the author's lifetime). This is important in avoiding the ABA problem -- in which a thread mistakes reading the same tail index in two accesses to mean that the ring was not modified in the intervening time -- in the upcoming non-blocking ring implementation. Using a 64-bit index makes the possibility of this occurring effectively zero. This commit places the new producer and consumer structures in the same location in struct rte_ring as their 32-bit counterparts. Since the 32-bit versions are padded out to a cache line, there is space for the new structure without affecting the layout of struct rte_ring. Thus, the ABI is preserved. Signed-off-by: Gage Eads --- lib/librte_eventdev/rte_event_ring.h | 2 +- lib/librte_ring/Makefile | 3 +- lib/librte_ring/rte_ring.h | 24 +++++- lib/librte_ring/rte_ring_generic_64.h | 152 ++++++++++++++++++++++++++++++++++ 4 files changed, 176 insertions(+), 5 deletions(-) create mode 100644 lib/librte_ring/rte_ring_generic_64.h diff --git a/lib/librte_eventdev/rte_event_ring.h b/lib/librte_eventdev/rte_event_ring.h index 827a3209e..5fcb2d5f7 100644 --- a/lib/librte_eventdev/rte_event_ring.h +++ b/lib/librte_eventdev/rte_event_ring.h @@ -1,5 +1,5 @@ /* SPDX-License-Identifier: BSD-3-Clause - * Copyright(c) 2016-2017 Intel Corporation + * Copyright(c) 2016-2019 Intel Corporation */ /** diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index 21a36770d..18c48fbc8 100644 --- a/lib/librte_ring/Makefile +++ b/lib/librte_ring/Makefile @@ -19,6 +19,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c # install includes SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ rte_ring_generic.h \ - rte_ring_c11_mem.h + rte_ring_c11_mem.h \ + rte_ring_generic_64.h include $(RTE_SDK)/mk/rte.lib.mk diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index af5444a9f..b270a4746 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -1,6 +1,6 @@ /* SPDX-License-Identifier: BSD-3-Clause * - * Copyright (c) 2010-2017 Intel Corporation + * Copyright (c) 2010-2019 Intel Corporation * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org * All rights reserved. * Derived from FreeBSD's bufring.h @@ -70,6 +70,15 @@ struct rte_ring_headtail { uint32_t single; /**< True if single prod/cons */ }; +/* 64-bit version of rte_ring_headtail, for use by rings that need to avoid + * head/tail wrap-around. + */ +struct rte_ring_headtail_64 { + volatile uint64_t head; /**< Prod/consumer head. */ + volatile uint64_t tail; /**< Prod/consumer tail. */ + uint32_t single; /**< True if single prod/cons */ +}; + /** * An RTE ring structure. * @@ -97,11 +106,19 @@ struct rte_ring { char pad0 __rte_cache_aligned; /**< empty cache line */ /** Ring producer status. */ - struct rte_ring_headtail prod __rte_cache_aligned; + RTE_STD_C11 + union { + struct rte_ring_headtail prod __rte_cache_aligned; + struct rte_ring_headtail_64 prod_64 __rte_cache_aligned; + }; char pad1 __rte_cache_aligned; /**< empty cache line */ /** Ring consumer status. */ - struct rte_ring_headtail cons __rte_cache_aligned; + RTE_STD_C11 + union { + struct rte_ring_headtail cons __rte_cache_aligned; + struct rte_ring_headtail_64 cons_64 __rte_cache_aligned; + }; char pad2 __rte_cache_aligned; /**< empty cache line */ }; @@ -312,6 +329,7 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r); #else #include "rte_ring_generic.h" #endif +#include "rte_ring_generic_64.h" /** * @internal Enqueue several objects on the ring diff --git a/lib/librte_ring/rte_ring_generic_64.h b/lib/librte_ring/rte_ring_generic_64.h new file mode 100644 index 000000000..58de144c6 --- /dev/null +++ b/lib/librte_ring/rte_ring_generic_64.h @@ -0,0 +1,152 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2019 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_GENERIC_64_H_ +#define _RTE_RING_GENERIC_64_H_ + +/** + * @internal This function updates the producer head for enqueue using + * 64-bit head/tail values. + * + * @param r + * A pointer to the ring structure + * @param is_sp + * Indicates whether multi-producer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where enqueue starts + * @param new_head + * Returns the current/new head value i.e. where enqueue finishes + * @param free_entries + * Returns the amount of free space in the ring BEFORE head was moved + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_move_prod_head_64(struct rte_ring *r, unsigned int is_sp, + unsigned int n, enum rte_ring_queue_behavior behavior, + uint64_t *old_head, uint64_t *new_head, + uint32_t *free_entries) +{ + const uint32_t capacity = r->capacity; + unsigned int max = n; + int success; + + do { + /* Reset n to the initial burst count */ + n = max; + + *old_head = r->prod_64.head; + + /* add rmb barrier to avoid load/load reorder in weak + * memory model. It is noop on x86 + */ + rte_smp_rmb(); + + /* + * The subtraction is done between two unsigned 64bits value + * (the result is always modulo 64 bits even if we have + * *old_head > cons_tail). So 'free_entries' is always between 0 + * and capacity (which is < size). + */ + *free_entries = (capacity + r->cons_64.tail - *old_head); + + /* check that we have enough room in ring */ + if (unlikely(n > *free_entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? + 0 : *free_entries; + + if (n == 0) + return 0; + + *new_head = *old_head + n; + if (is_sp) + r->prod_64.head = *new_head, success = 1; + else + success = rte_atomic64_cmpset(&r->prod_64.head, + *old_head, *new_head); + } while (unlikely(success == 0)); + return n; +} + +/** + * @internal This function updates the consumer head for dequeue using + * 64-bit head/tail values. + * + * @param r + * A pointer to the ring structure + * @param is_sc + * Indicates whether multi-consumer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where dequeue starts + * @param new_head + * Returns the current/new head value i.e. where dequeue finishes + * @param entries + * Returns the number of entries in the ring BEFORE head was moved + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_move_cons_head_64(struct rte_ring *r, unsigned int is_sc, + unsigned int n, enum rte_ring_queue_behavior behavior, + uint64_t *old_head, uint64_t *new_head, + uint32_t *entries) +{ + unsigned int max = n; + int success; + + do { + /* Restore n as it may change every loop */ + n = max; + + *old_head = r->cons_64.head; + + /* add rmb barrier to avoid load/load reorder in weak + * memory model. It is noop on x86 + */ + rte_smp_rmb(); + + /* The subtraction is done between two unsigned 64bits value + * (the result is always modulo 64 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. + */ + *entries = (r->prod_64.tail - *old_head); + + /* Set the actual entries for dequeue */ + if (n > *entries) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + + if (unlikely(n == 0)) + return 0; + + *new_head = *old_head + n; + if (is_sc) + r->cons_64.head = *new_head, success = 1; + else + success = rte_atomic64_cmpset(&r->cons_64.head, + *old_head, *new_head); + } while (unlikely(success == 0)); + return n; +} + +#endif /* _RTE_RING_GENERIC_64_H_ */ -- 2.13.6