From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 7CCD545FF0; Sun, 5 Jan 2025 10:58:18 +0100 (CET) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 5541A40264; Sun, 5 Jan 2025 10:58:17 +0100 (CET) Received: from m16.mail.163.com (m16.mail.163.com [220.197.31.3]) by mails.dpdk.org (Postfix) with ESMTP id 3CFA54014F for ; Sun, 5 Jan 2025 10:57:57 +0100 (CET) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=163.com; s=s110527; h=From:Subject:Date:Message-Id:MIME-Version; bh=UKjIz FVISXWCWoUIapuF/25pMCe2P99nUkVZuA2PwCo=; b=P2v4Z3VzDeVI+RUA00Squ P7ZQoPfXX0wthLTTB7hHoO0lnww2AWD9S02ZwTHNnHW9X8EmSrVVErQMcoGHbIEH JmNHA4ryVOxE0BLv11ClJkH6JuZR2Bi8roFQa0kppRG7ewEXp9Sc0VQCtWnRRNJx VVqPFVXQ51Q69AMKyHDNn8= Received: from 192.168.227.180 (unknown [120.244.233.101]) by gzsmtp3 (Coremail) with SMTP id PigvCgD3QsqcV3pnSfteIA--.4936S2; Sun, 05 Jan 2025 17:57:49 +0800 (CST) From: Huichao Cai To: honnappa.nagarahalli@arm.com, konstantin.v.ananyev@yandex.ru Cc: dev@dpdk.org Subject: [PATCH] ring: add the second version of the RTS interface Date: Sun, 5 Jan 2025 17:57:46 +0800 Message-Id: <20250105095746.2727-1-chcchc88@163.com> X-Mailer: git-send-email 2.27.0 MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-CM-TRANSID: PigvCgD3QsqcV3pnSfteIA--.4936S2 X-Coremail-Antispam: 1Uf129KBjvAXoWfAw1Dtry8KF1Dtr13Aw47urg_yoW5urWkZo WfCws2g3WkGa4xJF15Kr4xGFyUur48ArZ0kr4fCw45K3W8XrW3Gry7tFs8X3429FWYgryv g3Wvq3s2vFWUKrZ3n29KB7ZKAUJUUUU8529EdanIXcx71UUUUU7v73VFW2AGmfu7bjvjm3 AaLaJ3UbIYCTnIWIevJa73UjIFyTuYvjxU5nmRDUUUU X-Originating-IP: [120.244.233.101] X-CM-SenderInfo: pfkfuxrfyyqiywtou0bp/1tbiUgDLF2d6VfIbkAAAs- X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org The timing of the update of the RTS enqueues/dequeues tail is limited to the last enqueues/dequeues, which reduces concurrency, so the RTS interface of the V2 version is added, which makes the tail of the enqueues/dequeues not limited to the last enqueues/dequeues and thus enables timely updates to increase concurrency. Add some corresponding test cases. Signed-off-by: Huichao Cai --- app/test/meson.build | 1 + app/test/test_ring.c | 26 +++ app/test/test_ring_rts_v2_stress.c | 32 ++++ app/test/test_ring_stress.c | 3 + app/test/test_ring_stress.h | 1 + devtools/libabigail.abignore | 3 + doc/guides/rel_notes/release_25_03.rst | 2 + lib/ring/rte_ring.c | 53 +++++- lib/ring/rte_ring.h | 12 ++ lib/ring/rte_ring_core.h | 9 ++ lib/ring/rte_ring_elem.h | 18 +++ lib/ring/rte_ring_rts.h | 216 ++++++++++++++++++++++++- lib/ring/rte_ring_rts_elem_pvt.h | 168 +++++++++++++++++++ 13 files changed, 534 insertions(+), 10 deletions(-) create mode 100644 app/test/test_ring_rts_v2_stress.c diff --git a/app/test/meson.build b/app/test/meson.build index d5cb6a7f7a..e3d8cef3fa 100644 --- a/app/test/meson.build +++ b/app/test/meson.build @@ -166,6 +166,7 @@ source_file_deps = { 'test_ring_mt_peek_stress_zc.c': ['ptr_compress'], 'test_ring_perf.c': ['ptr_compress'], 'test_ring_rts_stress.c': ['ptr_compress'], + 'test_ring_rts_v2_stress.c': ['ptr_compress'], 'test_ring_st_peek_stress.c': ['ptr_compress'], 'test_ring_st_peek_stress_zc.c': ['ptr_compress'], 'test_ring_stress.c': ['ptr_compress'], diff --git a/app/test/test_ring.c b/app/test/test_ring.c index ba1fec1de3..094f14b859 100644 --- a/app/test/test_ring.c +++ b/app/test/test_ring.c @@ -284,6 +284,19 @@ static const struct { .felem = rte_ring_dequeue_bulk_elem, }, }, + { + .desc = "MP_RTS/MC_RTS V2 sync mode", + .api_type = TEST_RING_ELEM_BULK | TEST_RING_THREAD_DEF, + .create_flags = RING_F_MP_RTS_V2_ENQ | RING_F_MC_RTS_V2_DEQ, + .enq = { + .flegacy = rte_ring_enqueue_bulk, + .felem = rte_ring_enqueue_bulk_elem, + }, + .deq = { + .flegacy = rte_ring_dequeue_bulk, + .felem = rte_ring_dequeue_bulk_elem, + }, + }, { .desc = "MP_HTS/MC_HTS sync mode", .api_type = TEST_RING_ELEM_BULK | TEST_RING_THREAD_DEF, @@ -349,6 +362,19 @@ static const struct { .felem = rte_ring_dequeue_burst_elem, }, }, + { + .desc = "MP_RTS/MC_RTS V2 sync mode", + .api_type = TEST_RING_ELEM_BURST | TEST_RING_THREAD_DEF, + .create_flags = RING_F_MP_RTS_V2_ENQ | RING_F_MC_RTS_V2_DEQ, + .enq = { + .flegacy = rte_ring_enqueue_burst, + .felem = rte_ring_enqueue_burst_elem, + }, + .deq = { + .flegacy = rte_ring_dequeue_burst, + .felem = rte_ring_dequeue_burst_elem, + }, + }, { .desc = "MP_HTS/MC_HTS sync mode", .api_type = TEST_RING_ELEM_BURST | TEST_RING_THREAD_DEF, diff --git a/app/test/test_ring_rts_v2_stress.c b/app/test/test_ring_rts_v2_stress.c new file mode 100644 index 0000000000..6079366a7d --- /dev/null +++ b/app/test/test_ring_rts_v2_stress.c @@ -0,0 +1,32 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2020 Intel Corporation + */ + +#include "test_ring_stress_impl.h" + +static inline uint32_t +_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n, + uint32_t *avail) +{ + return rte_ring_mc_rts_v2_dequeue_bulk(r, obj, n, avail); +} + +static inline uint32_t +_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n, + uint32_t *free) +{ + return rte_ring_mp_rts_v2_enqueue_bulk(r, obj, n, free); +} + +static int +_st_ring_init(struct rte_ring *r, const char *name, uint32_t num) +{ + return rte_ring_init(r, name, num, + RING_F_MP_RTS_V2_ENQ | RING_F_MC_RTS_V2_DEQ); +} + +const struct test test_ring_rts_v2_stress = { + .name = "MT_RTS_V2", + .nb_case = RTE_DIM(tests), + .cases = tests, +}; diff --git a/app/test/test_ring_stress.c b/app/test/test_ring_stress.c index 1af45e0fc8..94085acd5e 100644 --- a/app/test/test_ring_stress.c +++ b/app/test/test_ring_stress.c @@ -43,6 +43,9 @@ test_ring_stress(void) n += test_ring_rts_stress.nb_case; k += run_test(&test_ring_rts_stress); + n += test_ring_rts_v2_stress.nb_case; + k += run_test(&test_ring_rts_v2_stress); + n += test_ring_hts_stress.nb_case; k += run_test(&test_ring_hts_stress); diff --git a/app/test/test_ring_stress.h b/app/test/test_ring_stress.h index 416d68c9a0..505957f6fb 100644 --- a/app/test/test_ring_stress.h +++ b/app/test/test_ring_stress.h @@ -34,6 +34,7 @@ struct test { extern const struct test test_ring_mpmc_stress; extern const struct test test_ring_rts_stress; +extern const struct test test_ring_rts_v2_stress; extern const struct test test_ring_hts_stress; extern const struct test test_ring_mt_peek_stress; extern const struct test test_ring_mt_peek_stress_zc; diff --git a/devtools/libabigail.abignore b/devtools/libabigail.abignore index 21b8cd6113..0a0f305acb 100644 --- a/devtools/libabigail.abignore +++ b/devtools/libabigail.abignore @@ -33,3 +33,6 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ; Temporary exceptions till next major ABI version ; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +[suppress_type] + name = rte_ring_rts_headtail + has_data_member_inserted_between = {offset_of(head), end} diff --git a/doc/guides/rel_notes/release_25_03.rst b/doc/guides/rel_notes/release_25_03.rst index 426dfcd982..f73bc9e397 100644 --- a/doc/guides/rel_notes/release_25_03.rst +++ b/doc/guides/rel_notes/release_25_03.rst @@ -102,6 +102,8 @@ ABI Changes * No ABI change that would break compatibility with 24.11. +* ring: Added ``rte_ring_rts_cache`` structure and ``rts_cache`` field to the + ``rte_ring_rts_headtail`` structure. Known Issues ------------ diff --git a/lib/ring/rte_ring.c b/lib/ring/rte_ring.c index aebb6d6728..df84592300 100644 --- a/lib/ring/rte_ring.c +++ b/lib/ring/rte_ring.c @@ -43,7 +43,8 @@ EAL_REGISTER_TAILQ(rte_ring_tailq) /* mask of all valid flag values to ring_create() */ #define RING_F_MASK (RING_F_SP_ENQ | RING_F_SC_DEQ | RING_F_EXACT_SZ | \ RING_F_MP_RTS_ENQ | RING_F_MC_RTS_DEQ | \ - RING_F_MP_HTS_ENQ | RING_F_MC_HTS_DEQ) + RING_F_MP_HTS_ENQ | RING_F_MC_HTS_DEQ | \ + RING_F_MP_RTS_V2_ENQ | RING_F_MC_RTS_V2_DEQ) /* true if x is a power of 2 */ #define POWEROF2(x) ((((x)-1) & (x)) == 0) @@ -106,6 +107,7 @@ reset_headtail(void *p) ht->tail = 0; break; case RTE_RING_SYNC_MT_RTS: + case RTE_RING_SYNC_MT_RTS_V2: ht_rts->head.raw = 0; ht_rts->tail.raw = 0; break; @@ -135,9 +137,11 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st, enum rte_ring_sync_type *cons_st) { static const uint32_t prod_st_flags = - (RING_F_SP_ENQ | RING_F_MP_RTS_ENQ | RING_F_MP_HTS_ENQ); + (RING_F_SP_ENQ | RING_F_MP_RTS_ENQ | RING_F_MP_HTS_ENQ | + RING_F_MP_RTS_V2_ENQ); static const uint32_t cons_st_flags = - (RING_F_SC_DEQ | RING_F_MC_RTS_DEQ | RING_F_MC_HTS_DEQ); + (RING_F_SC_DEQ | RING_F_MC_RTS_DEQ | RING_F_MC_HTS_DEQ | + RING_F_MC_RTS_V2_DEQ); switch (flags & prod_st_flags) { case 0: @@ -152,6 +156,9 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st, case RING_F_MP_HTS_ENQ: *prod_st = RTE_RING_SYNC_MT_HTS; break; + case RING_F_MP_RTS_V2_ENQ: + *prod_st = RTE_RING_SYNC_MT_RTS_V2; + break; default: return -EINVAL; } @@ -169,6 +176,9 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st, case RING_F_MC_HTS_DEQ: *cons_st = RTE_RING_SYNC_MT_HTS; break; + case RING_F_MC_RTS_V2_DEQ: + *cons_st = RTE_RING_SYNC_MT_RTS_V2; + break; default: return -EINVAL; } @@ -239,6 +249,28 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned int count, if (flags & RING_F_MC_RTS_DEQ) rte_ring_set_cons_htd_max(r, r->capacity / HTD_MAX_DEF); + /* set default values for head-tail distance and allocate memory to cache */ + if (flags & RING_F_MP_RTS_V2_ENQ) { + rte_ring_set_prod_htd_max(r, r->capacity / HTD_MAX_DEF); + r->rts_prod.rts_cache = (struct rte_ring_rts_cache *)rte_zmalloc( + "RTS_PROD_CACHE", sizeof(struct rte_ring_rts_cache) * r->size, 0); + if (r->rts_prod.rts_cache == NULL) { + RING_LOG(ERR, "Cannot reserve memory for rts prod cache"); + return -ENOMEM; + } + } + if (flags & RING_F_MC_RTS_V2_DEQ) { + rte_ring_set_cons_htd_max(r, r->capacity / HTD_MAX_DEF); + r->rts_cons.rts_cache = (struct rte_ring_rts_cache *)rte_zmalloc( + "RTS_CONS_CACHE", sizeof(struct rte_ring_rts_cache) * r->size, 0); + if (r->rts_cons.rts_cache == NULL) { + if (flags & RING_F_MP_RTS_V2_ENQ) + rte_free(r->rts_prod.rts_cache); + RING_LOG(ERR, "Cannot reserve memory for rts cons cache"); + return -ENOMEM; + } + } + return 0; } @@ -293,9 +325,13 @@ rte_ring_create_elem(const char *name, unsigned int esize, unsigned int count, mz_flags, alignof(typeof(*r))); if (mz != NULL) { r = mz->addr; - /* no need to check return value here, we already checked the - * arguments above */ - rte_ring_init(r, name, requested_count, flags); + + if (rte_ring_init(r, name, requested_count, flags)) { + rte_free(te); + if (rte_memzone_free(mz) != 0) + RING_LOG(ERR, "Cannot free memory for ring"); + return NULL; + } te->data = (void *) r; r->memzone = mz; @@ -358,6 +394,11 @@ rte_ring_free(struct rte_ring *r) rte_mcfg_tailq_write_unlock(); + if (r->flags & RING_F_MP_RTS_V2_ENQ) + rte_free(r->rts_prod.rts_cache); + if (r->flags & RING_F_MC_RTS_V2_DEQ) + rte_free(r->rts_cons.rts_cache); + if (rte_memzone_free(r->memzone) != 0) RING_LOG(ERR, "Cannot free memory"); diff --git a/lib/ring/rte_ring.h b/lib/ring/rte_ring.h index 11ca69c73d..2b35ce038e 100644 --- a/lib/ring/rte_ring.h +++ b/lib/ring/rte_ring.h @@ -89,6 +89,9 @@ ssize_t rte_ring_get_memsize(unsigned int count); * - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` * is "multi-producer RTS mode". + * - RING_F_MP_RTS_V2_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "multi-producer RTS V2 mode". * - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` * is "multi-producer HTS mode". @@ -101,6 +104,9 @@ ssize_t rte_ring_get_memsize(unsigned int count); * - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` * is "multi-consumer RTS mode". + * - RING_F_MC_RTS_V2_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "multi-consumer RTS V2 mode". * - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` * is "multi-consumer HTS mode". @@ -149,6 +155,9 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned int count, * - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` * is "multi-producer RTS mode". + * - RING_F_MP_RTS_V2_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "multi-producer RTS V2 mode". * - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` * is "multi-producer HTS mode". @@ -161,6 +170,9 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned int count, * - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` * is "multi-consumer RTS mode". + * - RING_F_MC_RTS_V2_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "multi-consumer RTS V2 mode". * - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` * is "multi-consumer HTS mode". diff --git a/lib/ring/rte_ring_core.h b/lib/ring/rte_ring_core.h index 6cd6ce9884..9e627d26c1 100644 --- a/lib/ring/rte_ring_core.h +++ b/lib/ring/rte_ring_core.h @@ -55,6 +55,7 @@ enum rte_ring_sync_type { RTE_RING_SYNC_ST, /**< single thread only */ RTE_RING_SYNC_MT_RTS, /**< multi-thread relaxed tail sync */ RTE_RING_SYNC_MT_HTS, /**< multi-thread head/tail sync */ + RTE_RING_SYNC_MT_RTS_V2, /**< multi-thread relaxed tail sync v2 */ }; /** @@ -82,11 +83,16 @@ union __rte_ring_rts_poscnt { } val; }; +struct rte_ring_rts_cache { + volatile RTE_ATOMIC(uint32_t) num; /**< Number of objs. */ +}; + struct rte_ring_rts_headtail { volatile union __rte_ring_rts_poscnt tail; enum rte_ring_sync_type sync_type; /**< sync type of prod/cons */ uint32_t htd_max; /**< max allowed distance between head/tail */ volatile union __rte_ring_rts_poscnt head; + struct rte_ring_rts_cache *rts_cache; /**< Cache of prod/cons */ }; union __rte_ring_hts_pos { @@ -163,4 +169,7 @@ struct rte_ring { #define RING_F_MP_HTS_ENQ 0x0020 /**< The default enqueue is "MP HTS". */ #define RING_F_MC_HTS_DEQ 0x0040 /**< The default dequeue is "MC HTS". */ +#define RING_F_MP_RTS_V2_ENQ 0x0080 /**< The default enqueue is "MP RTS V2". */ +#define RING_F_MC_RTS_V2_DEQ 0x0100 /**< The default dequeue is "MC RTS V2". */ + #endif /* _RTE_RING_CORE_H_ */ diff --git a/lib/ring/rte_ring_elem.h b/lib/ring/rte_ring_elem.h index b96bfc003f..1352709f94 100644 --- a/lib/ring/rte_ring_elem.h +++ b/lib/ring/rte_ring_elem.h @@ -71,6 +71,9 @@ ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count); * - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` * is "multi-producer RTS mode". + * - RING_F_MP_RTS_V2_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "multi-producer RTS V2 mode". * - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` * is "multi-producer HTS mode". @@ -83,6 +86,9 @@ ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count); * - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` * is "multi-consumer RTS mode". + * - RING_F_MC_RTS_V2_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "multi-consumer RTS V2 mode". * - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` * is "multi-consumer HTS mode". @@ -203,6 +209,9 @@ rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, case RTE_RING_SYNC_MT_HTS: return rte_ring_mp_hts_enqueue_bulk_elem(r, obj_table, esize, n, free_space); + case RTE_RING_SYNC_MT_RTS_V2: + return rte_ring_mp_rts_v2_enqueue_bulk_elem(r, obj_table, esize, n, + free_space); } /* valid ring should never reach this point */ @@ -385,6 +394,9 @@ rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, case RTE_RING_SYNC_MT_HTS: return rte_ring_mc_hts_dequeue_bulk_elem(r, obj_table, esize, n, available); + case RTE_RING_SYNC_MT_RTS_V2: + return rte_ring_mc_rts_v2_dequeue_bulk_elem(r, obj_table, esize, + n, available); } /* valid ring should never reach this point */ @@ -571,6 +583,9 @@ rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, case RTE_RING_SYNC_MT_HTS: return rte_ring_mp_hts_enqueue_burst_elem(r, obj_table, esize, n, free_space); + case RTE_RING_SYNC_MT_RTS_V2: + return rte_ring_mp_rts_v2_enqueue_burst_elem(r, obj_table, esize, + n, free_space); } /* valid ring should never reach this point */ @@ -681,6 +696,9 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table, case RTE_RING_SYNC_MT_HTS: return rte_ring_mc_hts_dequeue_burst_elem(r, obj_table, esize, n, available); + case RTE_RING_SYNC_MT_RTS_V2: + return rte_ring_mc_rts_v2_dequeue_burst_elem(r, obj_table, esize, + n, available); } /* valid ring should never reach this point */ diff --git a/lib/ring/rte_ring_rts.h b/lib/ring/rte_ring_rts.h index d7a3863c83..b47e400452 100644 --- a/lib/ring/rte_ring_rts.h +++ b/lib/ring/rte_ring_rts.h @@ -84,6 +84,33 @@ rte_ring_mp_rts_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, RTE_RING_QUEUE_FIXED, free_space); } +/** + * Enqueue several objects on the RTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_mp_rts_v2_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_rts_v2_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, free_space); +} + /** * Dequeue several objects from an RTS ring (multi-consumers safe). * @@ -111,6 +138,33 @@ rte_ring_mc_rts_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, RTE_RING_QUEUE_FIXED, available); } +/** + * Dequeue several objects from an RTS ring (multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_mc_rts_v2_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_rts_v2_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, available); +} + /** * Enqueue several objects on the RTS ring (multi-producers safe). * @@ -138,6 +192,33 @@ rte_ring_mp_rts_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, RTE_RING_QUEUE_VARIABLE, free_space); } +/** + * Enqueue several objects on the RTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +static __rte_always_inline unsigned int +rte_ring_mp_rts_v2_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_rts_v2_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, free_space); +} + /** * Dequeue several objects from an RTS ring (multi-consumers safe). * When the requested objects are more than the available objects, @@ -167,6 +248,35 @@ rte_ring_mc_rts_dequeue_burst_elem(struct rte_ring *r, void *obj_table, RTE_RING_QUEUE_VARIABLE, available); } +/** + * Dequeue several objects from an RTS ring (multi-consumers safe). + * When the requested objects are more than the available objects, + * only dequeue the actual number of objects. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +static __rte_always_inline unsigned int +rte_ring_mc_rts_v2_dequeue_burst_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_rts_v2_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, available); +} + /** * Enqueue several objects on the RTS ring (multi-producers safe). * @@ -213,6 +323,52 @@ rte_ring_mc_rts_dequeue_bulk(struct rte_ring *r, void **obj_table, sizeof(uintptr_t), n, available); } +/** + * Enqueue several objects on the RTS V2 ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_mp_rts_v2_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return rte_ring_mp_rts_v2_enqueue_bulk_elem(r, obj_table, + sizeof(uintptr_t), n, free_space); +} + +/** + * Dequeue several objects from an RTS V2 ring (multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_mc_rts_v2_dequeue_bulk(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return rte_ring_mc_rts_v2_dequeue_bulk_elem(r, obj_table, + sizeof(uintptr_t), n, available); +} + /** * Enqueue several objects on the RTS ring (multi-producers safe). * @@ -261,6 +417,54 @@ rte_ring_mc_rts_dequeue_burst(struct rte_ring *r, void **obj_table, sizeof(uintptr_t), n, available); } +/** + * Enqueue several objects on the RTS V2 ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +static __rte_always_inline unsigned int +rte_ring_mp_rts_v2_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return rte_ring_mp_rts_v2_enqueue_burst_elem(r, obj_table, + sizeof(uintptr_t), n, free_space); +} + +/** + * Dequeue several objects from an RTS V2 ring (multi-consumers safe). + * When the requested objects are more than the available objects, + * only dequeue the actual number of objects. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +static __rte_always_inline unsigned int +rte_ring_mc_rts_v2_dequeue_burst(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return rte_ring_mc_rts_v2_dequeue_burst_elem(r, obj_table, + sizeof(uintptr_t), n, available); +} + /** * Return producer max Head-Tail-Distance (HTD). * @@ -273,7 +477,8 @@ rte_ring_mc_rts_dequeue_burst(struct rte_ring *r, void **obj_table, static inline uint32_t rte_ring_get_prod_htd_max(const struct rte_ring *r) { - if (r->prod.sync_type == RTE_RING_SYNC_MT_RTS) + if ((r->prod.sync_type == RTE_RING_SYNC_MT_RTS) || + (r->prod.sync_type == RTE_RING_SYNC_MT_RTS_V2)) return r->rts_prod.htd_max; return UINT32_MAX; } @@ -292,7 +497,8 @@ rte_ring_get_prod_htd_max(const struct rte_ring *r) static inline int rte_ring_set_prod_htd_max(struct rte_ring *r, uint32_t v) { - if (r->prod.sync_type != RTE_RING_SYNC_MT_RTS) + if ((r->prod.sync_type != RTE_RING_SYNC_MT_RTS) && + (r->prod.sync_type != RTE_RING_SYNC_MT_RTS_V2)) return -ENOTSUP; r->rts_prod.htd_max = v; @@ -311,7 +517,8 @@ rte_ring_set_prod_htd_max(struct rte_ring *r, uint32_t v) static inline uint32_t rte_ring_get_cons_htd_max(const struct rte_ring *r) { - if (r->cons.sync_type == RTE_RING_SYNC_MT_RTS) + if ((r->cons.sync_type == RTE_RING_SYNC_MT_RTS) || + (r->cons.sync_type == RTE_RING_SYNC_MT_RTS_V2)) return r->rts_cons.htd_max; return UINT32_MAX; } @@ -330,7 +537,8 @@ rte_ring_get_cons_htd_max(const struct rte_ring *r) static inline int rte_ring_set_cons_htd_max(struct rte_ring *r, uint32_t v) { - if (r->cons.sync_type != RTE_RING_SYNC_MT_RTS) + if ((r->cons.sync_type != RTE_RING_SYNC_MT_RTS) && + (r->cons.sync_type != RTE_RING_SYNC_MT_RTS_V2)) return -ENOTSUP; r->rts_cons.htd_max = v; diff --git a/lib/ring/rte_ring_rts_elem_pvt.h b/lib/ring/rte_ring_rts_elem_pvt.h index 122650346b..4ce22a93ed 100644 --- a/lib/ring/rte_ring_rts_elem_pvt.h +++ b/lib/ring/rte_ring_rts_elem_pvt.h @@ -46,6 +46,92 @@ __rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht) rte_memory_order_release, rte_memory_order_acquire) == 0); } +/** + * @file rte_ring_rts_elem_pvt.h + * It is not recommended to include this file directly, + * include instead. + * Contains internal helper functions for Relaxed Tail Sync (RTS) ring mode. + * For more information please refer to . + */ + +/** + * @internal This function updates tail values. + */ +static __rte_always_inline void +__rte_ring_rts_v2_update_tail(struct rte_ring_rts_headtail *ht, + uint32_t old_tail, uint32_t num, uint32_t mask) +{ + union __rte_ring_rts_poscnt ot, nt; + + ot.val.cnt = nt.val.cnt = 0; + ot.val.pos = old_tail; + nt.val.pos = old_tail + num; + + /* + * If the tail is equal to the current enqueues/dequeues, update + * the tail with new value and then continue to try to update the + * tail until the num of the cache is 0, otherwise write the num of + * the current enqueues/dequeues to the cache. + */ + + if (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw, + (uint64_t *)(uintptr_t)&ot.raw, nt.raw, + rte_memory_order_release, rte_memory_order_acquire) == 0) { + ot.val.pos = old_tail; + + /* + * Write the num of the current enqueues/dequeues to the + * corresponding cache. + */ + rte_atomic_store_explicit(&ht->rts_cache[ot.val.pos & mask].num, + num, rte_memory_order_release); + + /* + * There may be competition with another enqueues/dequeues + * for the update tail. The winner continues to try to update + * the tail, and the loser exits. + */ + if (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw, + (uint64_t *)(uintptr_t)&ot.raw, nt.raw, + rte_memory_order_release, rte_memory_order_acquire) == 0) + return; + + /* + * Set the corresponding cache to 0 for next use. + */ + rte_atomic_store_explicit(&ht->rts_cache[ot.val.pos & mask].num, + 0, rte_memory_order_release); + } + + /* + * Try to update the tail until the num of the corresponding cache is 0. + * Getting here means that the current enqueues/dequeues is trying to update + * the tail of another enqueues/dequeues. + */ + while (1) { + num = rte_atomic_load_explicit(&ht->rts_cache[nt.val.pos & mask].num, + rte_memory_order_acquire); + if (num == 0) + break; + + ot.val.pos = nt.val.pos; + nt.val.pos += num; + + /* + * There may be competition with another enqueues/dequeues + * for the update tail. The winner continues to try to update + * the tail, and the loser exits. + */ + if (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw, + (uint64_t *)(uintptr_t)&ot.raw, nt.raw, + rte_memory_order_release, rte_memory_order_acquire) == 0) + return; + + rte_atomic_store_explicit(&ht->rts_cache[ot.val.pos & mask].num, + 0, rte_memory_order_release); + }; +} + /** * @internal This function waits till head/tail distance wouldn't * exceed pre-defined max value. @@ -218,6 +304,47 @@ __rte_ring_do_rts_enqueue_elem(struct rte_ring *r, const void *obj_table, return n; } +/** + * @internal Enqueue several objects on the RTS ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_rts_v2_enqueue_elem(struct rte_ring *r, const void *obj_table, + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, + uint32_t *free_space) +{ + uint32_t free, head; + + n = __rte_ring_rts_move_prod_head(r, n, behavior, &head, &free); + + if (n != 0) { + __rte_ring_enqueue_elems(r, head, obj_table, esize, n); + __rte_ring_rts_v2_update_tail(&r->rts_prod, head, n, r->mask); + } + + if (free_space != NULL) + *free_space = free - n; + return n; +} + /** * @internal Dequeue several objects from the RTS ring. * @@ -259,4 +386,45 @@ __rte_ring_do_rts_dequeue_elem(struct rte_ring *r, void *obj_table, return n; } +/** + * @internal Dequeue several objects from the RTS ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_rts_v2_dequeue_elem(struct rte_ring *r, void *obj_table, + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, + uint32_t *available) +{ + uint32_t entries, head; + + n = __rte_ring_rts_move_cons_head(r, n, behavior, &head, &entries); + + if (n != 0) { + __rte_ring_dequeue_elems(r, head, obj_table, esize, n); + __rte_ring_rts_v2_update_tail(&r->rts_cons, head, n, r->mask); + } + + if (available != NULL) + *available = entries - n; + return n; +} + #endif /* _RTE_RING_RTS_ELEM_PVT_H_ */ -- 2.27.0