From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id E5BA3A0598; Sat, 18 Apr 2020 18:33:46 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id F2AE61D541; Sat, 18 Apr 2020 18:33:11 +0200 (CEST) Received: from mga12.intel.com (mga12.intel.com [192.55.52.136]) by dpdk.org (Postfix) with ESMTP id 8EEF71D52B for ; Sat, 18 Apr 2020 18:32:54 +0200 (CEST) IronPort-SDR: MxWsxhp0c7FndE/K0US1ETaxBPn3YzSSg0Rsh7ViEbLzyz+YolLtMSj4SVuQFsu2n8mKchsoq8 w0peIHr1d0kA== X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga008.fm.intel.com ([10.253.24.58]) by fmsmga106.fm.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 18 Apr 2020 09:32:54 -0700 IronPort-SDR: xZg7O9a5hk+T7NryMHrPuj9l3WvOMz3kx7kwc23S0UEQHQFYp8sjrcxvCgiGpSjDd9bYTQrunJ rJu7M/9osCuw== X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.72,399,1580803200"; d="scan'208";a="246542833" Received: from sivswdev08.ir.intel.com ([10.237.217.47]) by fmsmga008.fm.intel.com with ESMTP; 18 Apr 2020 09:32:52 -0700 From: Konstantin Ananyev To: dev@dpdk.org Cc: honnappa.nagarahalli@arm.com, david.marchand@redhat.com, jielong.zjl@antfin.com, Konstantin Ananyev Date: Sat, 18 Apr 2020 17:32:21 +0100 Message-Id: <20200418163225.17635-6-konstantin.ananyev@intel.com> X-Mailer: git-send-email 2.18.0 In-Reply-To: <20200418163225.17635-1-konstantin.ananyev@intel.com> References: <20200417133639.14019-1-konstantin.ananyev@intel.com> <20200418163225.17635-1-konstantin.ananyev@intel.com> Subject: [dpdk-dev] [PATCH v5 5/9] ring: introduce HTS ring mode X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Introduce head/tail sync mode for MT ring synchronization. In that mode enqueue/dequeue operation is fully serialized: only one thread at a time is allowed to perform given op. Suppose to reduce stall times in case when ring is used on overcommitted cpus (multiple active threads on the same cpu). Signed-off-by: Konstantin Ananyev --- lib/librte_ring/Makefile | 2 + lib/librte_ring/meson.build | 2 + lib/librte_ring/rte_ring.c | 20 +- lib/librte_ring/rte_ring.h | 11 + lib/librte_ring/rte_ring_core.h | 20 ++ lib/librte_ring/rte_ring_elem.h | 13 + lib/librte_ring/rte_ring_hts.h | 332 +++++++++++++++++++++++++ lib/librte_ring/rte_ring_hts_c11_mem.h | 207 +++++++++++++++ 8 files changed, 605 insertions(+), 2 deletions(-) create mode 100644 lib/librte_ring/rte_ring_hts.h create mode 100644 lib/librte_ring/rte_ring_hts_c11_mem.h diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index 04e446e37..f75d8e530 100644 --- a/lib/librte_ring/Makefile +++ b/lib/librte_ring/Makefile @@ -20,6 +20,8 @@ SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ rte_ring_elem.h \ rte_ring_generic.h \ rte_ring_c11_mem.h \ + rte_ring_hts.h \ + rte_ring_hts_c11_mem.h \ rte_ring_rts.h \ rte_ring_rts_c11_mem.h diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index a95598032..ca37cb8cc 100644 --- a/lib/librte_ring/meson.build +++ b/lib/librte_ring/meson.build @@ -7,6 +7,8 @@ headers = files('rte_ring.h', 'rte_ring_elem.h', 'rte_ring_c11_mem.h', 'rte_ring_generic.h', + 'rte_ring_hts.h', + 'rte_ring_hts_c11_mem.h', 'rte_ring_rts.h', 'rte_ring_rts_c11_mem.h') diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index 222eec0fb..ebe5ccf0d 100644 --- a/lib/librte_ring/rte_ring.c +++ b/lib/librte_ring/rte_ring.c @@ -89,9 +89,11 @@ static void reset_headtail(void *p) { struct rte_ring_headtail *ht; + struct rte_ring_hts_headtail *ht_hts; struct rte_ring_rts_headtail *ht_rts; ht = p; + ht_hts = p; ht_rts = p; switch (ht->sync_type) { @@ -104,6 +106,9 @@ reset_headtail(void *p) ht_rts->head.raw = 0; ht_rts->tail.raw = 0; break; + case RTE_RING_SYNC_MT_HTS: + ht_hts->ht.raw = 0; + break; default: /* unknown sync mode */ RTE_ASSERT(0); @@ -127,9 +132,9 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st, enum rte_ring_sync_type *cons_st) { static const uint32_t prod_st_flags = - (RING_F_SP_ENQ | RING_F_MP_RTS_ENQ); + (RING_F_SP_ENQ | RING_F_MP_RTS_ENQ | RING_F_MP_HTS_ENQ); static const uint32_t cons_st_flags = - (RING_F_SC_DEQ | RING_F_MC_RTS_DEQ); + (RING_F_SC_DEQ | RING_F_MC_RTS_DEQ | RING_F_MC_HTS_DEQ); switch (flags & prod_st_flags) { case 0: @@ -141,6 +146,9 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st, case RING_F_MP_RTS_ENQ: *prod_st = RTE_RING_SYNC_MT_RTS; break; + case RING_F_MP_HTS_ENQ: + *prod_st = RTE_RING_SYNC_MT_HTS; + break; default: return -EINVAL; } @@ -155,6 +163,9 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st, case RING_F_MC_RTS_DEQ: *cons_st = RTE_RING_SYNC_MT_RTS; break; + case RING_F_MC_HTS_DEQ: + *cons_st = RTE_RING_SYNC_MT_HTS; + break; default: return -EINVAL; } @@ -176,6 +187,11 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count, RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) & RTE_CACHE_LINE_MASK) != 0); + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) != + offsetof(struct rte_ring_hts_headtail, sync_type)); + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) != + offsetof(struct rte_ring_hts_headtail, ht.pos.tail)); + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) != offsetof(struct rte_ring_rts_headtail, sync_type)); RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) != diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index 77f206ca7..7a7695914 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -398,6 +398,9 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mp_rts_enqueue_bulk(r, obj_table, n, free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mp_hts_enqueue_bulk(r, obj_table, n, + free_space); #endif } @@ -545,6 +548,8 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, #ifdef ALLOW_EXPERIMENTAL_API case RTE_RING_SYNC_MT_RTS: return rte_ring_mc_rts_dequeue_bulk(r, obj_table, n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mc_hts_dequeue_bulk(r, obj_table, n, available); #endif } @@ -879,6 +884,9 @@ rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mp_rts_enqueue_burst(r, obj_table, n, free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mp_hts_enqueue_burst(r, obj_table, n, + free_space); #endif } @@ -972,6 +980,9 @@ rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mc_rts_dequeue_burst(r, obj_table, n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mc_hts_dequeue_burst(r, obj_table, n, + available); #endif } diff --git a/lib/librte_ring/rte_ring_core.h b/lib/librte_ring/rte_ring_core.h index ded0fa0b7..7dc0fb0d9 100644 --- a/lib/librte_ring/rte_ring_core.h +++ b/lib/librte_ring/rte_ring_core.h @@ -59,6 +59,7 @@ enum rte_ring_sync_type { RTE_RING_SYNC_ST, /**< single thread only */ #ifdef ALLOW_EXPERIMENTAL_API RTE_RING_SYNC_MT_RTS, /**< multi-thread relaxed tail sync */ + RTE_RING_SYNC_MT_HTS, /**< multi-thread head/tail sync */ #endif }; @@ -95,6 +96,20 @@ struct rte_ring_rts_headtail { volatile union rte_ring_rts_poscnt head; }; +union rte_ring_hts_pos { + /** raw 8B value to read/write *head* and *tail* as one atomic op */ + uint64_t raw __rte_aligned(8); + struct { + uint32_t head; /**< head position */ + uint32_t tail; /**< tail position */ + } pos; +}; + +struct rte_ring_hts_headtail { + volatile union rte_ring_hts_pos ht; + enum rte_ring_sync_type sync_type; /**< sync type of prod/cons */ +}; + /** * An RTE ring structure. * @@ -126,6 +141,7 @@ struct rte_ring { RTE_STD_C11 union { struct rte_ring_headtail prod; + struct rte_ring_hts_headtail hts_prod; struct rte_ring_rts_headtail rts_prod; } __rte_cache_aligned; @@ -135,6 +151,7 @@ struct rte_ring { RTE_STD_C11 union { struct rte_ring_headtail cons; + struct rte_ring_hts_headtail hts_cons; struct rte_ring_rts_headtail rts_cons; } __rte_cache_aligned; @@ -157,6 +174,9 @@ struct rte_ring { #define RING_F_MP_RTS_ENQ 0x0008 /**< The default enqueue is "MP RTS". */ #define RING_F_MC_RTS_DEQ 0x0010 /**< The default dequeue is "MC RTS". */ +#define RING_F_MP_HTS_ENQ 0x0020 /**< The default enqueue is "MP HTS". */ +#define RING_F_MC_HTS_DEQ 0x0040 /**< The default dequeue is "MC HTS". */ + #ifdef __cplusplus } #endif diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h index 6da0a917b..df485fc6b 100644 --- a/lib/librte_ring/rte_ring_elem.h +++ b/lib/librte_ring/rte_ring_elem.h @@ -529,6 +529,7 @@ rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, } #ifdef ALLOW_EXPERIMENTAL_API +#include #include #endif @@ -573,6 +574,9 @@ rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mp_rts_enqueue_bulk_elem(r, obj_table, esize, n, free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mp_hts_enqueue_bulk_elem(r, obj_table, esize, n, + free_space); #endif } @@ -754,6 +758,9 @@ rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mc_rts_dequeue_bulk_elem(r, obj_table, esize, n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mc_hts_dequeue_bulk_elem(r, obj_table, esize, + n, available); #endif } @@ -939,6 +946,9 @@ rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mp_rts_enqueue_burst_elem(r, obj_table, esize, n, free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mp_hts_enqueue_burst_elem(r, obj_table, esize, + n, free_space); #endif } @@ -1048,6 +1058,9 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mc_rts_dequeue_burst_elem(r, obj_table, esize, n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mc_hts_dequeue_burst_elem(r, obj_table, esize, + n, available); #endif } diff --git a/lib/librte_ring/rte_ring_hts.h b/lib/librte_ring/rte_ring_hts.h new file mode 100644 index 000000000..c7701defc --- /dev/null +++ b/lib/librte_ring/rte_ring_hts.h @@ -0,0 +1,332 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2020 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_HTS_H_ +#define _RTE_RING_HTS_H_ + +/** + * @file rte_ring_hts.h + * @b EXPERIMENTAL: this API may change without prior notice + * It is not recommended to include this file directly. + * Please include instead. + * + * Contains functions for serialized, aka Head-Tail Sync (HTS) ring mode. + * In that mode enqueue/dequeue operation is fully serialized: + * at any given moment only one enqueue/dequeue operation can proceed. + * This is achieved by allowing a thread to proceed with changing head.value + * only when head.value == tail.value. + * Both head and tail values are updated atomically (as one 64-bit value). + * To achieve that 64-bit CAS is used by head update routine. + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +/** + * @internal Enqueue several objects on the HTS ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_hts_enqueue_elem(struct rte_ring *r, const void *obj_table, + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, + uint32_t *free_space) +{ + uint32_t free, head; + + n = __rte_ring_hts_move_prod_head(r, n, behavior, &head, &free); + + if (n != 0) { + __rte_ring_enqueue_elems(r, head, obj_table, esize, n); + __rte_ring_hts_update_tail(&r->hts_prod, head, n, 1); + } + + if (free_space != NULL) + *free_space = free - n; + return n; +} + +/** + * @internal Dequeue several objects from the HTS ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_hts_dequeue_elem(struct rte_ring *r, void *obj_table, + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, + uint32_t *available) +{ + uint32_t entries, head; + + n = __rte_ring_hts_move_cons_head(r, n, behavior, &head, &entries); + + if (n != 0) { + __rte_ring_dequeue_elems(r, head, obj_table, esize, n); + __rte_ring_hts_update_tail(&r->hts_cons, head, n, 0); + } + + if (available != NULL) + *available = entries - n; + return n; +} + +/** + * Enqueue several objects on the HTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mp_hts_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_hts_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, free_space); +} + +/** + * Dequeue several objects from an HTS ring (multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mc_hts_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_hts_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, available); +} + +/** + * Enqueue several objects on the HTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mp_hts_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_hts_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, free_space); +} + +/** + * Dequeue several objects from an HTS ring (multi-consumers safe). + * When the requested objects are more than the available objects, + * only dequeue the actual number of objects. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mc_hts_dequeue_burst_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_hts_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, available); +} + +/** + * Enqueue several objects on the HTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mp_hts_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return rte_ring_mp_hts_enqueue_bulk_elem(r, obj_table, + sizeof(uintptr_t), n, free_space); +} + +/** + * Dequeue several objects from an HTS ring (multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mc_hts_dequeue_bulk(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return rte_ring_mc_hts_dequeue_bulk_elem(r, obj_table, + sizeof(uintptr_t), n, available); +} + +/** + * Enqueue several objects on the HTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mp_hts_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return rte_ring_mp_hts_enqueue_burst_elem(r, obj_table, + sizeof(uintptr_t), n, free_space); +} + +/** + * Dequeue several objects from an HTS ring (multi-consumers safe). + * When the requested objects are more than the available objects, + * only dequeue the actual number of objects. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mc_hts_dequeue_burst(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return rte_ring_mc_hts_dequeue_burst_elem(r, obj_table, + sizeof(uintptr_t), n, available); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_RING_HTS_H_ */ diff --git a/lib/librte_ring/rte_ring_hts_c11_mem.h b/lib/librte_ring/rte_ring_hts_c11_mem.h new file mode 100644 index 000000000..87e84fdc9 --- /dev/null +++ b/lib/librte_ring/rte_ring_hts_c11_mem.h @@ -0,0 +1,207 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2020 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_HTS_C11_MEM_H_ +#define _RTE_RING_HTS_C11_MEM_H_ + +/** + * @file rte_ring_hts_c11_mem.h + * It is not recommended to include this file directly, + * include instead. + * Contains internal helper functions for head/tail sync (HTS) ring mode. + * For more information please refer to . + */ + +/** + * @internal get current tail value. + * Check that user didn't request to move tail above the head. + * In that situation: + * - return zero, that will cause abort any pending changes and + * return head to its previous position. + * - throw an assert in debug mode. + */ +static __rte_always_inline uint32_t +__rte_ring_hts_get_tail(struct rte_ring_hts_headtail *ht, uint32_t *tail, + uint32_t num) +{ + uint32_t n; + union rte_ring_hts_pos p; + + p.raw = __atomic_load_n(&ht->ht.raw, __ATOMIC_RELAXED); + n = p.pos.head - p.pos.tail; + + RTE_ASSERT(n >= num); + num = (n >= num) ? num : 0; + + *tail = p.pos.tail; + return num; +} + +/** + * @internal set new values for head and tail as one atomic 64 bit operation. + * Should be used only in conjunction with __rte_ring_hts_get_tail. + */ +static __rte_always_inline void +__rte_ring_hts_set_head_tail(struct rte_ring_hts_headtail *ht, uint32_t tail, + uint32_t num, uint32_t enqueue) +{ + union rte_ring_hts_pos p; + + RTE_SET_USED(enqueue); + + p.pos.head = tail + num; + p.pos.tail = p.pos.head; + + __atomic_store_n(&ht->ht.raw, p.raw, __ATOMIC_RELEASE); +} + +/** + * @internal update tail with new value. + */ +static __rte_always_inline void +__rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t old_tail, + uint32_t num, uint32_t enqueue) +{ + uint32_t tail; + + RTE_SET_USED(enqueue); + + tail = old_tail + num; + __atomic_store_n(&ht->ht.pos.tail, tail, __ATOMIC_RELEASE); +} + +/** + * @internal waits till tail will become equal to head. + * Means no writer/reader is active for that ring. + * Suppose to work as serialization point. + */ +static __rte_always_inline void +__rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht, + union rte_ring_hts_pos *p) +{ + while (p->pos.head != p->pos.tail) { + rte_pause(); + p->raw = __atomic_load_n(&ht->ht.raw, __ATOMIC_ACQUIRE); + } +} + +/** + * @internal This function updates the producer head for enqueue + */ +static __rte_always_inline unsigned int +__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num, + enum rte_ring_queue_behavior behavior, uint32_t *old_head, + uint32_t *free_entries) +{ + uint32_t n; + union rte_ring_hts_pos np, op; + + const uint32_t capacity = r->capacity; + + op.raw = __atomic_load_n(&r->hts_prod.ht.raw, __ATOMIC_ACQUIRE); + + do { + /* Reset n to the initial burst count */ + n = num; + + /* + * wait for tail to be equal to head, + * make sure that we read prod head/tail *before* + * reading cons tail. + */ + __rte_ring_hts_head_wait(&r->hts_prod, &op); + + /* + * The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * *old_head > cons_tail). So 'free_entries' is always between 0 + * and capacity (which is < size). + */ + *free_entries = capacity + r->cons.tail - op.pos.head; + + /* check that we have enough room in ring */ + if (unlikely(n > *free_entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? + 0 : *free_entries; + + if (n == 0) + break; + + np.pos.tail = op.pos.tail; + np.pos.head = op.pos.head + n; + + /* + * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent: + * - OOO reads of cons tail value + * - OOO copy of elems from the ring + */ + } while (__atomic_compare_exchange_n(&r->hts_prod.ht.raw, + &op.raw, np.raw, + 0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0); + + *old_head = op.pos.head; + return n; +} + +/** + * @internal This function updates the consumer head for dequeue + */ +static __rte_always_inline unsigned int +__rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num, + enum rte_ring_queue_behavior behavior, uint32_t *old_head, + uint32_t *entries) +{ + uint32_t n; + union rte_ring_hts_pos np, op; + + op.raw = __atomic_load_n(&r->hts_cons.ht.raw, __ATOMIC_ACQUIRE); + + /* move cons.head atomically */ + do { + /* Restore n as it may change every loop */ + n = num; + + /* + * wait for tail to be equal to head, + * make sure that we read cons head/tail *before* + * reading prod tail. + */ + __rte_ring_hts_head_wait(&r->hts_cons, &op); + + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. + */ + *entries = r->prod.tail - op.pos.head; + + /* Set the actual entries for dequeue */ + if (n > *entries) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + + if (unlikely(n == 0)) + break; + + np.pos.tail = op.pos.tail; + np.pos.head = op.pos.head + n; + + /* + * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent: + * - OOO reads of prod tail value + * - OOO copy of elems from the ring + */ + } while (__atomic_compare_exchange_n(&r->hts_cons.ht.raw, + &op.raw, np.raw, + 0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0); + + *old_head = op.pos.head; + return n; +} + +#endif /* _RTE_RING_HTS_C11_MEM_H_ */ -- 2.17.1