From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id 6BEC2A0562; Fri, 3 Apr 2020 00:11:19 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id A83E91C0D4; Fri, 3 Apr 2020 00:10:52 +0200 (CEST) Received: from mga01.intel.com (mga01.intel.com [192.55.52.88]) by dpdk.org (Postfix) with ESMTP id 1CAE01C0C0 for ; Fri, 3 Apr 2020 00:10:50 +0200 (CEST) IronPort-SDR: NrTFNwf+VEXkc9c51paGbhukTg+mao+J60cSCZAeZoQTPrZq4TYTLEI2j4ErfCe2snhkfwM2jd wbYZlkxgLDsw== X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga001.jf.intel.com ([10.7.209.18]) by fmsmga101.fm.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 02 Apr 2020 15:10:50 -0700 IronPort-SDR: NSfpYaJdDhmQwoAQsRD5c3lChJkEjzesI9dI9mAoBIhm1CNoymfqRQLdz1x5u4NmHDbAO2jisV XCjruANnVw2w== X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.72,337,1580803200"; d="scan'208";a="328975331" Received: from sivswdev08.ir.intel.com ([10.237.217.47]) by orsmga001.jf.intel.com with ESMTP; 02 Apr 2020 15:10:48 -0700 From: Konstantin Ananyev To: dev@dpdk.org Cc: honnappa.nagarahalli@arm.com, david.marchand@redhat.com, jielong.zjl@antfin.com, Konstantin Ananyev Date: Thu, 2 Apr 2020 23:09:55 +0100 Message-Id: <20200402220959.29885-6-konstantin.ananyev@intel.com> X-Mailer: git-send-email 2.18.0 In-Reply-To: <20200402220959.29885-1-konstantin.ananyev@intel.com> References: <20200331164330.28854-1-konstantin.ananyev@intel.com> <20200402220959.29885-1-konstantin.ananyev@intel.com> Subject: [dpdk-dev] [PATCH v2 5/9] ring: introduce HTS ring mode X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Introduce head/tail sync mode for MT ring synchronization. In that mode enqueue/dequeue operation is fully serialized: only one thread at a time is allowed to perform given op. Suppose to reduce stall times in case when ring is used on overcommitted cpus (multiple active threads on the same cpu). Signed-off-by: Konstantin Ananyev --- lib/librte_ring/Makefile | 3 + lib/librte_ring/meson.build | 3 + lib/librte_ring/rte_ring.c | 20 ++- lib/librte_ring/rte_ring.h | 31 ++++ lib/librte_ring/rte_ring_elem.h | 13 ++ lib/librte_ring/rte_ring_hts.h | 210 +++++++++++++++++++++++++ lib/librte_ring/rte_ring_hts_elem.h | 205 ++++++++++++++++++++++++ lib/librte_ring/rte_ring_hts_generic.h | 198 +++++++++++++++++++++++ 8 files changed, 681 insertions(+), 2 deletions(-) create mode 100644 lib/librte_ring/rte_ring_hts.h create mode 100644 lib/librte_ring/rte_ring_hts_elem.h create mode 100644 lib/librte_ring/rte_ring_hts_generic.h diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index 8f5c284cc..6fe500f0d 100644 --- a/lib/librte_ring/Makefile +++ b/lib/librte_ring/Makefile @@ -19,6 +19,9 @@ SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ rte_ring_elem.h \ rte_ring_generic.h \ rte_ring_c11_mem.h \ + rte_ring_hts.h \ + rte_ring_hts_elem.h \ + rte_ring_hts_generic.h \ rte_ring_rts.h \ rte_ring_rts_elem.h \ rte_ring_rts_generic.h diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index 612936afb..8e86e037a 100644 --- a/lib/librte_ring/meson.build +++ b/lib/librte_ring/meson.build @@ -6,6 +6,9 @@ headers = files('rte_ring.h', 'rte_ring_elem.h', 'rte_ring_c11_mem.h', 'rte_ring_generic.h', + 'rte_ring_hts.h', + 'rte_ring_hts_elem.h', + 'rte_ring_hts_generic.h', 'rte_ring_rts.h', 'rte_ring_rts_elem.h', 'rte_ring_rts_generic.h') diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index 222eec0fb..ebe5ccf0d 100644 --- a/lib/librte_ring/rte_ring.c +++ b/lib/librte_ring/rte_ring.c @@ -89,9 +89,11 @@ static void reset_headtail(void *p) { struct rte_ring_headtail *ht; + struct rte_ring_hts_headtail *ht_hts; struct rte_ring_rts_headtail *ht_rts; ht = p; + ht_hts = p; ht_rts = p; switch (ht->sync_type) { @@ -104,6 +106,9 @@ reset_headtail(void *p) ht_rts->head.raw = 0; ht_rts->tail.raw = 0; break; + case RTE_RING_SYNC_MT_HTS: + ht_hts->ht.raw = 0; + break; default: /* unknown sync mode */ RTE_ASSERT(0); @@ -127,9 +132,9 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st, enum rte_ring_sync_type *cons_st) { static const uint32_t prod_st_flags = - (RING_F_SP_ENQ | RING_F_MP_RTS_ENQ); + (RING_F_SP_ENQ | RING_F_MP_RTS_ENQ | RING_F_MP_HTS_ENQ); static const uint32_t cons_st_flags = - (RING_F_SC_DEQ | RING_F_MC_RTS_DEQ); + (RING_F_SC_DEQ | RING_F_MC_RTS_DEQ | RING_F_MC_HTS_DEQ); switch (flags & prod_st_flags) { case 0: @@ -141,6 +146,9 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st, case RING_F_MP_RTS_ENQ: *prod_st = RTE_RING_SYNC_MT_RTS; break; + case RING_F_MP_HTS_ENQ: + *prod_st = RTE_RING_SYNC_MT_HTS; + break; default: return -EINVAL; } @@ -155,6 +163,9 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st, case RING_F_MC_RTS_DEQ: *cons_st = RTE_RING_SYNC_MT_RTS; break; + case RING_F_MC_HTS_DEQ: + *cons_st = RTE_RING_SYNC_MT_HTS; + break; default: return -EINVAL; } @@ -176,6 +187,11 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count, RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) & RTE_CACHE_LINE_MASK) != 0); + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) != + offsetof(struct rte_ring_hts_headtail, sync_type)); + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) != + offsetof(struct rte_ring_hts_headtail, ht.pos.tail)); + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) != offsetof(struct rte_ring_rts_headtail, sync_type)); RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) != diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index f6f084d79..6e4213afa 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -68,6 +68,7 @@ enum rte_ring_sync_type { RTE_RING_SYNC_ST, /**< single thread only */ #ifdef ALLOW_EXPERIMENTAL_API RTE_RING_SYNC_MT_RTS, /**< multi-thread relaxed tail sync */ + RTE_RING_SYNC_MT_HTS, /**< multi-thread head/tail sync */ #endif }; @@ -103,6 +104,19 @@ struct rte_ring_rts_headtail { volatile union rte_ring_ht_poscnt head; }; +union rte_ring_ht_pos { + uint64_t raw; + struct { + uint32_t head; /**< head position */ + uint32_t tail; /**< tail position */ + } pos; +}; + +struct rte_ring_hts_headtail { + volatile union rte_ring_ht_pos ht; + enum rte_ring_sync_type sync_type; /**< sync type of prod/cons */ +}; + /** * An RTE ring structure. * @@ -133,6 +147,7 @@ struct rte_ring { RTE_STD_C11 union { struct rte_ring_headtail prod; + struct rte_ring_hts_headtail hts_prod; struct rte_ring_rts_headtail rts_prod; } __rte_cache_aligned; @@ -142,6 +157,7 @@ struct rte_ring { RTE_STD_C11 union { struct rte_ring_headtail cons; + struct rte_ring_hts_headtail hts_cons; struct rte_ring_rts_headtail rts_cons; } __rte_cache_aligned; @@ -164,6 +180,9 @@ struct rte_ring { #define RING_F_MP_RTS_ENQ 0x0008 /**< The default enqueue is "MP RTS". */ #define RING_F_MC_RTS_DEQ 0x0010 /**< The default dequeue is "MC RTS". */ +#define RING_F_MP_HTS_ENQ 0x0020 /**< The default enqueue is "MP HTS". */ +#define RING_F_MC_HTS_DEQ 0x0040 /**< The default dequeue is "MC HTS". */ + #define __IS_SP RTE_RING_SYNC_ST #define __IS_MP RTE_RING_SYNC_MT #define __IS_SC RTE_RING_SYNC_ST @@ -494,6 +513,7 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, } #ifdef ALLOW_EXPERIMENTAL_API +#include #include #endif @@ -529,6 +549,9 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mp_rts_enqueue_bulk(r, obj_table, n, free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mp_hts_enqueue_bulk(r, obj_table, n, + free_space); #endif } @@ -676,6 +699,8 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, #ifdef ALLOW_EXPERIMENTAL_API case RTE_RING_SYNC_MT_RTS: return rte_ring_mc_rts_dequeue_bulk(r, obj_table, n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mc_hts_dequeue_bulk(r, obj_table, n, available); #endif } @@ -1010,6 +1035,9 @@ rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mp_rts_enqueue_burst(r, obj_table, n, free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mp_hts_enqueue_burst(r, obj_table, n, + free_space); #endif } @@ -1103,6 +1131,9 @@ rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mc_rts_dequeue_burst(r, obj_table, n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mc_hts_dequeue_burst(r, obj_table, n, + available); #endif } diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h index 5de0850dc..010a564c1 100644 --- a/lib/librte_ring/rte_ring_elem.h +++ b/lib/librte_ring/rte_ring_elem.h @@ -542,6 +542,7 @@ rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, RTE_RING_QUEUE_FIXED, __IS_SP, free_space); } +#include #include /** @@ -585,6 +586,9 @@ rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mp_rts_enqueue_bulk_elem(r, obj_table, esize, n, free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mp_hts_enqueue_bulk_elem(r, obj_table, esize, n, + free_space); #endif } @@ -766,6 +770,9 @@ rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mc_rts_dequeue_bulk_elem(r, obj_table, esize, n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mc_hts_dequeue_bulk_elem(r, obj_table, esize, + n, available); #endif } @@ -951,6 +958,9 @@ rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mp_rts_enqueue_burst_elem(r, obj_table, esize, n, free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mp_hts_enqueue_burst_elem(r, obj_table, esize, + n, free_space); #endif } @@ -1060,6 +1070,9 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mc_rts_dequeue_burst_elem(r, obj_table, esize, n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mc_hts_dequeue_burst_elem(r, obj_table, esize, + n, available); #endif } diff --git a/lib/librte_ring/rte_ring_hts.h b/lib/librte_ring/rte_ring_hts.h new file mode 100644 index 000000000..062d7be6c --- /dev/null +++ b/lib/librte_ring/rte_ring_hts.h @@ -0,0 +1,210 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2017 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_HTS_H_ +#define _RTE_RING_HTS_H_ + +/** + * @file rte_ring_hts.h + * @b EXPERIMENTAL: this API may change without prior notice + * It is not recommended to include this file directly. + * Please include instead. + * + * Contains functions for serialized, aka Head-Tail Sync (HTS) ring mode. + * In that mode enqueue/dequeue operation is fully serialized: + * at any given moement only one enqueue/dequeue operation can proceed. + * This is achieved by thread is allowed to proceed with changing head.value + * only when head.value == tail.value. + * Both head and tail values are updated atomically (as one 64-bit value). + * To achieve that 64-bit CAS is used by head update routine. + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +/** + * @internal Enqueue several objects on the HTS ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_hts_enqueue(struct rte_ring *r, void * const *obj_table, + uint32_t n, enum rte_ring_queue_behavior behavior, + uint32_t *free_space) +{ + uint32_t free, head; + + n = __rte_ring_hts_move_prod_head(r, n, behavior, &head, &free); + + if (n != 0) { + ENQUEUE_PTRS(r, &r[1], head, obj_table, n, void *); + __rte_ring_hts_update_tail(&r->hts_prod, n, 1); + } + + if (free_space != NULL) + *free_space = free - n; + return n; +} + +/** + * @internal Dequeue several objects from the HTS ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_hts_dequeue(struct rte_ring *r, void **obj_table, + uint32_t n, enum rte_ring_queue_behavior behavior, + uint32_t *available) +{ + uint32_t entries, head; + + n = __rte_ring_hts_move_cons_head(r, n, behavior, &head, &entries); + + if (n != 0) { + DEQUEUE_PTRS(r, &r[1], head, obj_table, n, void *); + __rte_ring_hts_update_tail(&r->hts_cons, n, 0); + } + + if (available != NULL) + *available = entries - n; + return n; +} + +/** + * Enqueue several objects on the HTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mp_hts_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_hts_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + free_space); +} + +/** + * Dequeue several objects from an HTS ring (multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mc_hts_dequeue_bulk(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return __rte_ring_do_hts_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + available); +} + +/** + * Enqueue several objects on the HTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mp_hts_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_hts_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, free_space); +} + +/** + * Dequeue several objects from an HTS ring (multi-consumers safe). + * When the requested objects are more than the available objects, + * only dequeue the actual number of objects. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mc_hts_dequeue_burst(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return __rte_ring_do_hts_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, available); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_RING_HTS_H_ */ diff --git a/lib/librte_ring/rte_ring_hts_elem.h b/lib/librte_ring/rte_ring_hts_elem.h new file mode 100644 index 000000000..34f0d121d --- /dev/null +++ b/lib/librte_ring/rte_ring_hts_elem.h @@ -0,0 +1,205 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2017 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_HTS_ELEM_H_ +#define _RTE_RING_HTS_ELEM_H_ + +/** + * @file rte_ring_hts_elem.h + * @b EXPERIMENTAL: this API may change without prior notice + * It is not recommended to include this file directly. + * Please include instead. + * + * Contains *ring_elem* functions for Head-Tail Sync (HTS) ring mode. + * for more details please refer to . + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +/** + * @internal Enqueue several objects on the HTS ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_hts_enqueue_elem(struct rte_ring *r, void * const *obj_table, + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, + uint32_t *free_space) +{ + uint32_t free, head; + + n = __rte_ring_hts_move_prod_head(r, n, behavior, &head, &free); + + if (n != 0) { + __rte_ring_enqueue_elems(r, head, obj_table, esize, n); + __rte_ring_hts_update_tail(&r->hts_prod, n, 1); + } + + if (free_space != NULL) + *free_space = free - n; + return n; +} + +/** + * @internal Dequeue several objects from the HTS ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_hts_dequeue_elem(struct rte_ring *r, void **obj_table, + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, + uint32_t *available) +{ + uint32_t entries, head; + + n = __rte_ring_hts_move_cons_head(r, n, behavior, &head, &entries); + + if (n != 0) { + __rte_ring_dequeue_elems(r, head, obj_table, esize, n); + __rte_ring_hts_update_tail(&r->hts_cons, n, 0); + } + + if (available != NULL) + *available = entries - n; + return n; +} + +/** + * Enqueue several objects on the HTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mp_hts_enqueue_bulk_elem(struct rte_ring *r, void * const *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_hts_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, free_space); +} + +/** + * Dequeue several objects from an HTS ring (multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mc_hts_dequeue_bulk_elem(struct rte_ring *r, void **obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_hts_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, available); +} + +/** + * Enqueue several objects on the HTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mp_hts_enqueue_burst_elem(struct rte_ring *r, void * const *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_hts_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, free_space); +} + +/** + * Dequeue several objects from an HTS ring (multi-consumers safe). + * When the requested objects are more than the available objects, + * only dequeue the actual number of objects. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mc_hts_dequeue_burst_elem(struct rte_ring *r, void **obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_hts_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, available); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_RING_HTS_ELEM_H_ */ diff --git a/lib/librte_ring/rte_ring_hts_generic.h b/lib/librte_ring/rte_ring_hts_generic.h new file mode 100644 index 000000000..0b3931ffa --- /dev/null +++ b/lib/librte_ring/rte_ring_hts_generic.h @@ -0,0 +1,198 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2020 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_HTS_GENERIC_H_ +#define _RTE_RING_HTS_GENERIC_H_ + +/** + * @file rte_ring_hts_generic.h + * It is not recommended to include this file directly, + * include instead. + * Contains internal helper functions for head/tail sync (HTS) ring mode. + * For more information please refer to . + */ + +static __rte_always_inline void +__rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t num, + uint32_t enqueue) +{ + union rte_ring_ht_pos p; + + if (enqueue) + rte_smp_wmb(); + else + rte_smp_rmb(); + + p.raw = rte_atomic64_read((rte_atomic64_t *)(uintptr_t)&ht->ht.raw); + + p.pos.head = p.pos.tail + num; + p.pos.tail = p.pos.head; + + rte_atomic64_set((rte_atomic64_t *)(uintptr_t)&ht->ht.raw, p.raw); +} + +/** + * @internal waits till tail will become equal to head. + * Means no writer/reader is active for that ring. + * Suppose to work as serialization point. + */ +static __rte_always_inline void +__rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht, + union rte_ring_ht_pos *p) +{ + p->raw = rte_atomic64_read((rte_atomic64_t *) + (uintptr_t)&ht->ht.raw); + + while (p->pos.head != p->pos.tail) { + rte_pause(); + p->raw = rte_atomic64_read((rte_atomic64_t *) + (uintptr_t)&ht->ht.raw); + } +} + +/** + * @internal This function updates the producer head for enqueue + * + * @param r + * A pointer to the ring structure + * @param is_sp + * Indicates whether multi-producer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where enqueue starts + * @param new_head + * Returns the current/new head value i.e. where enqueue finishes + * @param free_entries + * Returns the amount of free space in the ring BEFORE head was moved + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num, + enum rte_ring_queue_behavior behavior, uint32_t *old_head, + uint32_t *free_entries) +{ + uint32_t n; + union rte_ring_ht_pos np, op; + + const uint32_t capacity = r->capacity; + + do { + /* Reset n to the initial burst count */ + n = num; + + /* wait for tail to be equal to head */ + __rte_ring_hts_head_wait(&r->hts_prod, &op); + + /* add rmb barrier to avoid load/load reorder in weak + * memory model. It is noop on x86 + */ + rte_smp_rmb(); + + /* + * The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * *old_head > cons_tail). So 'free_entries' is always between 0 + * and capacity (which is < size). + */ + *free_entries = capacity + r->cons.tail - op.pos.head; + + /* check that we have enough room in ring */ + if (unlikely(n > *free_entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? + 0 : *free_entries; + + if (n == 0) + return 0; + + np.pos.tail = op.pos.tail; + np.pos.head = op.pos.head + n; + + } while (rte_atomic64_cmpset(&r->hts_prod.ht.raw, + op.raw, np.raw) == 0); + + *old_head = op.pos.head; + return n; +} + +/** + * @internal This function updates the consumer head for dequeue + * + * @param r + * A pointer to the ring structure + * @param is_sc + * Indicates whether multi-consumer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where dequeue starts + * @param new_head + * Returns the current/new head value i.e. where dequeue finishes + * @param entries + * Returns the number of entries in the ring BEFORE head was moved + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num, + enum rte_ring_queue_behavior behavior, uint32_t *old_head, + uint32_t *entries) +{ + uint32_t n; + union rte_ring_ht_pos np, op; + + /* move cons.head atomically */ + do { + /* Restore n as it may change every loop */ + n = num; + + /* wait for tail to be equal to head */ + __rte_ring_hts_head_wait(&r->hts_cons, &op); + + /* add rmb barrier to avoid load/load reorder in weak + * memory model. It is noop on x86 + */ + rte_smp_rmb(); + + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. + */ + *entries = r->prod.tail - op.pos.head; + + /* Set the actual entries for dequeue */ + if (n > *entries) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + + if (unlikely(n == 0)) + return 0; + + np.pos.tail = op.pos.tail; + np.pos.head = op.pos.head + n; + + } while (rte_atomic64_cmpset(&r->hts_cons.ht.raw, + op.raw, np.raw) == 0); + + *old_head = op.pos.head; + return n; +} + +#endif /* _RTE_RING_HTS_GENERIC_H_ */ -- 2.17.1