From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga18.intel.com (mga18.intel.com [134.134.136.126]) by dpdk.org (Postfix) with ESMTP id CA5252C18 for ; Tue, 5 Mar 2019 18:41:05 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga006.jf.intel.com ([10.7.209.51]) by orsmga106.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 05 Mar 2019 09:41:05 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.58,444,1544515200"; d="scan'208";a="121213982" Received: from txasoft-yocto.an.intel.com ([10.123.72.192]) by orsmga006.jf.intel.com with ESMTP; 05 Mar 2019 09:41:04 -0800 From: Gage Eads To: dev@dpdk.org Cc: olivier.matz@6wind.com, arybchenko@solarflare.com, bruce.richardson@intel.com, konstantin.ananyev@intel.com, stephen@networkplumber.org, jerinj@marvell.com, mczekaj@marvell.com, nd@arm.com, Ola.Liljedahl@arm.com Date: Tue, 5 Mar 2019 11:40:14 -0600 Message-Id: <20190305174019.9693-2-gage.eads@intel.com> X-Mailer: git-send-email 2.13.6 In-Reply-To: <20190305174019.9693-1-gage.eads@intel.com> References: <20190128181407.32739-1-gage.eads@intel.com> <20190305174019.9693-1-gage.eads@intel.com> Subject: [dpdk-dev] [PATCH v5 1/6] ring: add a pointer-width headtail structure X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 05 Mar 2019 17:41:06 -0000 For 64-bit systems, at current CPU speeds, 64-bit head and tail indexes will not wrap-around within the author's lifetime. This is important to avoiding the ABA problem -- in which a thread mistakes reading the same tail index in two accesses to mean that the ring was not modified in the intervening time -- in the upcoming lock-free ring implementation. Using a 64-bit index makes the possibility of this occurring effectively zero. This commit uses pointer-width indexes so the lock-free ring can support 32-bit systems as well. This commit places the new producer and consumer structures in the same location in struct rte_ring as their 32-bit counterparts. Since the 32-bit versions are padded out to a cache line, there is space for the new structure without affecting the layout of struct rte_ring. Thus, the ABI is preserved. Signed-off-by: Gage Eads --- lib/librte_ring/rte_ring.h | 21 +++++- lib/librte_ring/rte_ring_c11_mem.h | 143 +++++++++++++++++++++++++++++++++++++ lib/librte_ring/rte_ring_generic.h | 130 +++++++++++++++++++++++++++++++++ 3 files changed, 291 insertions(+), 3 deletions(-) diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index af5444a9f..c78db6916 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -1,6 +1,6 @@ /* SPDX-License-Identifier: BSD-3-Clause * - * Copyright (c) 2010-2017 Intel Corporation + * Copyright (c) 2010-2019 Intel Corporation * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org * All rights reserved. * Derived from FreeBSD's bufring.h @@ -70,6 +70,13 @@ struct rte_ring_headtail { uint32_t single; /**< True if single prod/cons */ }; +/* Structure to hold a pair of pointer-sized head/tail values and metadata */ +struct rte_ring_headtail_ptr { + volatile uintptr_t head; /**< Prod/consumer head. */ + volatile uintptr_t tail; /**< Prod/consumer tail. */ + uint32_t single; /**< True if single prod/cons */ +}; + /** * An RTE ring structure. * @@ -97,11 +104,19 @@ struct rte_ring { char pad0 __rte_cache_aligned; /**< empty cache line */ /** Ring producer status. */ - struct rte_ring_headtail prod __rte_cache_aligned; + RTE_STD_C11 + union { + struct rte_ring_headtail prod __rte_cache_aligned; + struct rte_ring_headtail_ptr prod_ptr __rte_cache_aligned; + }; char pad1 __rte_cache_aligned; /**< empty cache line */ /** Ring consumer status. */ - struct rte_ring_headtail cons __rte_cache_aligned; + RTE_STD_C11 + union { + struct rte_ring_headtail cons __rte_cache_aligned; + struct rte_ring_headtail_ptr cons_ptr __rte_cache_aligned; + }; char pad2 __rte_cache_aligned; /**< empty cache line */ }; diff --git a/lib/librte_ring/rte_ring_c11_mem.h b/lib/librte_ring/rte_ring_c11_mem.h index 0fb73a337..545caf257 100644 --- a/lib/librte_ring/rte_ring_c11_mem.h +++ b/lib/librte_ring/rte_ring_c11_mem.h @@ -178,4 +178,147 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc, return n; } +/** + * @internal This function updates the producer head for enqueue using + * pointer-sized head/tail values. + * + * @param r + * A pointer to the ring structure + * @param is_sp + * Indicates whether multi-producer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where enqueue starts + * @param new_head + * Returns the current/new head value i.e. where enqueue finishes + * @param free_entries + * Returns the amount of free space in the ring BEFORE head was moved + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_move_prod_head_ptr(struct rte_ring *r, unsigned int is_sp, + unsigned int n, enum rte_ring_queue_behavior behavior, + uintptr_t *old_head, uintptr_t *new_head, + uint32_t *free_entries) +{ + const uint32_t capacity = r->capacity; + uintptr_t cons_tail; + unsigned int max = n; + int success; + + *old_head = __atomic_load_n(&r->prod_ptr.head, __ATOMIC_RELAXED); + do { + /* Reset n to the initial burst count */ + n = max; + + /* Ensure the head is read before tail */ + __atomic_thread_fence(__ATOMIC_ACQUIRE); + + /* load-acquire synchronize with store-release of ht->tail + * in update_tail. + */ + cons_tail = __atomic_load_n(&r->cons_ptr.tail, + __ATOMIC_ACQUIRE); + + *free_entries = (capacity + cons_tail - *old_head); + + /* check that we have enough room in ring */ + if (unlikely(n > *free_entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? + 0 : *free_entries; + + if (n == 0) + return 0; + + *new_head = *old_head + n; + if (is_sp) + r->prod_ptr.head = *new_head, success = 1; + else + /* on failure, *old_head is updated */ + success = __atomic_compare_exchange_n(&r->prod_ptr.head, + old_head, *new_head, + 0, __ATOMIC_RELAXED, + __ATOMIC_RELAXED); + } while (unlikely(success == 0)); + return n; +} + +/** + * @internal This function updates the consumer head for dequeue using + * pointer-sized head/tail values. + * + * @param r + * A pointer to the ring structure + * @param is_sc + * Indicates whether multi-consumer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where dequeue starts + * @param new_head + * Returns the current/new head value i.e. where dequeue finishes + * @param entries + * Returns the number of entries in the ring BEFORE head was moved + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_move_cons_head_ptr(struct rte_ring *r, unsigned int is_sc, + unsigned int n, enum rte_ring_queue_behavior behavior, + uintptr_t *old_head, uintptr_t *new_head, + uint32_t *entries) +{ + unsigned int max = n; + uintptr_t prod_tail; + int success; + + /* move cons.head atomically */ + *old_head = __atomic_load_n(&r->cons_ptr.head, __ATOMIC_RELAXED); + do { + /* Restore n as it may change every loop */ + n = max; + + /* Ensure the head is read before tail */ + __atomic_thread_fence(__ATOMIC_ACQUIRE); + + /* this load-acquire synchronize with store-release of ht->tail + * in update_tail. + */ + prod_tail = __atomic_load_n(&r->prod_ptr.tail, + __ATOMIC_ACQUIRE); + + *entries = (prod_tail - *old_head); + + /* Set the actual entries for dequeue */ + if (n > *entries) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + + if (unlikely(n == 0)) + return 0; + + *new_head = *old_head + n; + if (is_sc) + r->cons_ptr.head = *new_head, success = 1; + else + /* on failure, *old_head will be updated */ + success = __atomic_compare_exchange_n(&r->cons_ptr.head, + old_head, *new_head, + 0, __ATOMIC_RELAXED, + __ATOMIC_RELAXED); + } while (unlikely(success == 0)); + return n; +} + #endif /* _RTE_RING_C11_MEM_H_ */ diff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_ring_generic.h index ea7dbe5b9..6a0e1bbfb 100644 --- a/lib/librte_ring/rte_ring_generic.h +++ b/lib/librte_ring/rte_ring_generic.h @@ -167,4 +167,134 @@ __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc, return n; } +/** + * @internal This function updates the producer head for enqueue using + * pointer-sized head/tail values. + * + * @param r + * A pointer to the ring structure + * @param is_sp + * Indicates whether multi-producer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where enqueue starts + * @param new_head + * Returns the current/new head value i.e. where enqueue finishes + * @param free_entries + * Returns the amount of free space in the ring BEFORE head was moved + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_move_prod_head_ptr(struct rte_ring *r, unsigned int is_sp, + unsigned int n, enum rte_ring_queue_behavior behavior, + uintptr_t *old_head, uintptr_t *new_head, + uint32_t *free_entries) +{ + const uint32_t capacity = r->capacity; + unsigned int max = n; + int success; + + do { + /* Reset n to the initial burst count */ + n = max; + + *old_head = r->prod_ptr.head; + + /* add rmb barrier to avoid load/load reorder in weak + * memory model. It is noop on x86 + */ + rte_smp_rmb(); + + *free_entries = (capacity + r->cons_ptr.tail - *old_head); + + /* check that we have enough room in ring */ + if (unlikely(n > *free_entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? + 0 : *free_entries; + + if (n == 0) + return 0; + + *new_head = *old_head + n; + if (is_sp) + r->prod_ptr.head = *new_head, success = 1; + else + success = __sync_bool_compare_and_swap( + &r->prod_ptr.head, + *old_head, *new_head); + } while (unlikely(success == 0)); + return n; +} + +/** + * @internal This function updates the consumer head for dequeue using + * pointer-sized head/tail values. + * + * @param r + * A pointer to the ring structure + * @param is_sc + * Indicates whether multi-consumer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where dequeue starts + * @param new_head + * Returns the current/new head value i.e. where dequeue finishes + * @param entries + * Returns the number of entries in the ring BEFORE head was moved + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_move_cons_head_ptr(struct rte_ring *r, unsigned int is_sc, + unsigned int n, enum rte_ring_queue_behavior behavior, + uintptr_t *old_head, uintptr_t *new_head, + uint32_t *entries) +{ + unsigned int max = n; + int success; + + do { + /* Restore n as it may change every loop */ + n = max; + + *old_head = r->cons_ptr.head; + + /* add rmb barrier to avoid load/load reorder in weak + * memory model. It is noop on x86 + */ + rte_smp_rmb(); + + *entries = (r->prod_ptr.tail - *old_head); + + /* Set the actual entries for dequeue */ + if (n > *entries) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + + if (unlikely(n == 0)) + return 0; + + *new_head = *old_head + n; + if (is_sc) + r->cons_ptr.head = *new_head, success = 1; + else + success = __sync_bool_compare_and_swap( + &r->cons_ptr.head, + *old_head, *new_head); + } while (unlikely(success == 0)); + return n; +} + #endif /* _RTE_RING_GENERIC_H_ */ -- 2.13.6