DPDK patches and discussions
 help / color / mirror / Atom feed
* [PATCH] ring: add the second version of the RTS interface
@ 2025-01-05  9:57 Huichao Cai
  2025-01-05 15:13 ` [PATCH v2] " Huichao Cai
  0 siblings, 1 reply; 7+ messages in thread
From: Huichao Cai @ 2025-01-05  9:57 UTC (permalink / raw)
  To: honnappa.nagarahalli, konstantin.v.ananyev; +Cc: dev

The timing of the update of the RTS enqueues/dequeues tail is
limited to the last enqueues/dequeues, which reduces concurrency,
so the RTS interface of the V2 version is added, which makes the tail
of the enqueues/dequeues not limited to the last enqueues/dequeues
and thus enables timely updates to increase concurrency.

Add some corresponding test cases.

Signed-off-by: Huichao Cai <chcchc88@163.com>
---
 app/test/meson.build                   |   1 +
 app/test/test_ring.c                   |  26 +++
 app/test/test_ring_rts_v2_stress.c     |  32 ++++
 app/test/test_ring_stress.c            |   3 +
 app/test/test_ring_stress.h            |   1 +
 devtools/libabigail.abignore           |   3 +
 doc/guides/rel_notes/release_25_03.rst |   2 +
 lib/ring/rte_ring.c                    |  53 +++++-
 lib/ring/rte_ring.h                    |  12 ++
 lib/ring/rte_ring_core.h               |   9 ++
 lib/ring/rte_ring_elem.h               |  18 +++
 lib/ring/rte_ring_rts.h                | 216 ++++++++++++++++++++++++-
 lib/ring/rte_ring_rts_elem_pvt.h       | 168 +++++++++++++++++++
 13 files changed, 534 insertions(+), 10 deletions(-)
 create mode 100644 app/test/test_ring_rts_v2_stress.c

diff --git a/app/test/meson.build b/app/test/meson.build
index d5cb6a7f7a..e3d8cef3fa 100644
--- a/app/test/meson.build
+++ b/app/test/meson.build
@@ -166,6 +166,7 @@ source_file_deps = {
     'test_ring_mt_peek_stress_zc.c': ['ptr_compress'],
     'test_ring_perf.c': ['ptr_compress'],
     'test_ring_rts_stress.c': ['ptr_compress'],
+    'test_ring_rts_v2_stress.c': ['ptr_compress'],
     'test_ring_st_peek_stress.c': ['ptr_compress'],
     'test_ring_st_peek_stress_zc.c': ['ptr_compress'],
     'test_ring_stress.c': ['ptr_compress'],
diff --git a/app/test/test_ring.c b/app/test/test_ring.c
index ba1fec1de3..094f14b859 100644
--- a/app/test/test_ring.c
+++ b/app/test/test_ring.c
@@ -284,6 +284,19 @@ static const struct {
 			.felem = rte_ring_dequeue_bulk_elem,
 		},
 	},
+	{
+		.desc = "MP_RTS/MC_RTS V2 sync mode",
+		.api_type = TEST_RING_ELEM_BULK | TEST_RING_THREAD_DEF,
+		.create_flags = RING_F_MP_RTS_V2_ENQ | RING_F_MC_RTS_V2_DEQ,
+		.enq = {
+			.flegacy = rte_ring_enqueue_bulk,
+			.felem = rte_ring_enqueue_bulk_elem,
+		},
+		.deq = {
+			.flegacy = rte_ring_dequeue_bulk,
+			.felem = rte_ring_dequeue_bulk_elem,
+		},
+	},
 	{
 		.desc = "MP_HTS/MC_HTS sync mode",
 		.api_type = TEST_RING_ELEM_BULK | TEST_RING_THREAD_DEF,
@@ -349,6 +362,19 @@ static const struct {
 			.felem = rte_ring_dequeue_burst_elem,
 		},
 	},
+	{
+		.desc = "MP_RTS/MC_RTS V2 sync mode",
+		.api_type = TEST_RING_ELEM_BURST | TEST_RING_THREAD_DEF,
+		.create_flags = RING_F_MP_RTS_V2_ENQ | RING_F_MC_RTS_V2_DEQ,
+		.enq = {
+			.flegacy = rte_ring_enqueue_burst,
+			.felem = rte_ring_enqueue_burst_elem,
+		},
+		.deq = {
+			.flegacy = rte_ring_dequeue_burst,
+			.felem = rte_ring_dequeue_burst_elem,
+		},
+	},
 	{
 		.desc = "MP_HTS/MC_HTS sync mode",
 		.api_type = TEST_RING_ELEM_BURST | TEST_RING_THREAD_DEF,
diff --git a/app/test/test_ring_rts_v2_stress.c b/app/test/test_ring_rts_v2_stress.c
new file mode 100644
index 0000000000..6079366a7d
--- /dev/null
+++ b/app/test/test_ring_rts_v2_stress.c
@@ -0,0 +1,32 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+#include "test_ring_stress_impl.h"
+
+static inline uint32_t
+_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n,
+	uint32_t *avail)
+{
+	return rte_ring_mc_rts_v2_dequeue_bulk(r, obj, n, avail);
+}
+
+static inline uint32_t
+_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n,
+	uint32_t *free)
+{
+	return rte_ring_mp_rts_v2_enqueue_bulk(r, obj, n, free);
+}
+
+static int
+_st_ring_init(struct rte_ring *r, const char *name, uint32_t num)
+{
+	return rte_ring_init(r, name, num,
+		RING_F_MP_RTS_V2_ENQ | RING_F_MC_RTS_V2_DEQ);
+}
+
+const struct test test_ring_rts_v2_stress = {
+	.name = "MT_RTS_V2",
+	.nb_case = RTE_DIM(tests),
+	.cases = tests,
+};
diff --git a/app/test/test_ring_stress.c b/app/test/test_ring_stress.c
index 1af45e0fc8..94085acd5e 100644
--- a/app/test/test_ring_stress.c
+++ b/app/test/test_ring_stress.c
@@ -43,6 +43,9 @@ test_ring_stress(void)
 	n += test_ring_rts_stress.nb_case;
 	k += run_test(&test_ring_rts_stress);
 
+	n += test_ring_rts_v2_stress.nb_case;
+	k += run_test(&test_ring_rts_v2_stress);
+
 	n += test_ring_hts_stress.nb_case;
 	k += run_test(&test_ring_hts_stress);
 
diff --git a/app/test/test_ring_stress.h b/app/test/test_ring_stress.h
index 416d68c9a0..505957f6fb 100644
--- a/app/test/test_ring_stress.h
+++ b/app/test/test_ring_stress.h
@@ -34,6 +34,7 @@ struct test {
 
 extern const struct test test_ring_mpmc_stress;
 extern const struct test test_ring_rts_stress;
+extern const struct test test_ring_rts_v2_stress;
 extern const struct test test_ring_hts_stress;
 extern const struct test test_ring_mt_peek_stress;
 extern const struct test test_ring_mt_peek_stress_zc;
diff --git a/devtools/libabigail.abignore b/devtools/libabigail.abignore
index 21b8cd6113..0a0f305acb 100644
--- a/devtools/libabigail.abignore
+++ b/devtools/libabigail.abignore
@@ -33,3 +33,6 @@
 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 ; Temporary exceptions till next major ABI version ;
 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+[suppress_type]
+       name = rte_ring_rts_headtail
+       has_data_member_inserted_between = {offset_of(head), end}
diff --git a/doc/guides/rel_notes/release_25_03.rst b/doc/guides/rel_notes/release_25_03.rst
index 426dfcd982..f73bc9e397 100644
--- a/doc/guides/rel_notes/release_25_03.rst
+++ b/doc/guides/rel_notes/release_25_03.rst
@@ -102,6 +102,8 @@ ABI Changes
 
 * No ABI change that would break compatibility with 24.11.
 
+* ring: Added ``rte_ring_rts_cache`` structure and ``rts_cache`` field to the
+  ``rte_ring_rts_headtail`` structure.
 
 Known Issues
 ------------
diff --git a/lib/ring/rte_ring.c b/lib/ring/rte_ring.c
index aebb6d6728..df84592300 100644
--- a/lib/ring/rte_ring.c
+++ b/lib/ring/rte_ring.c
@@ -43,7 +43,8 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
 /* mask of all valid flag values to ring_create() */
 #define RING_F_MASK (RING_F_SP_ENQ | RING_F_SC_DEQ | RING_F_EXACT_SZ | \
 		     RING_F_MP_RTS_ENQ | RING_F_MC_RTS_DEQ |	       \
-		     RING_F_MP_HTS_ENQ | RING_F_MC_HTS_DEQ)
+		     RING_F_MP_HTS_ENQ | RING_F_MC_HTS_DEQ |	       \
+		     RING_F_MP_RTS_V2_ENQ | RING_F_MC_RTS_V2_DEQ)
 
 /* true if x is a power of 2 */
 #define POWEROF2(x) ((((x)-1) & (x)) == 0)
@@ -106,6 +107,7 @@ reset_headtail(void *p)
 		ht->tail = 0;
 		break;
 	case RTE_RING_SYNC_MT_RTS:
+	case RTE_RING_SYNC_MT_RTS_V2:
 		ht_rts->head.raw = 0;
 		ht_rts->tail.raw = 0;
 		break;
@@ -135,9 +137,11 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st,
 	enum rte_ring_sync_type *cons_st)
 {
 	static const uint32_t prod_st_flags =
-		(RING_F_SP_ENQ | RING_F_MP_RTS_ENQ | RING_F_MP_HTS_ENQ);
+		(RING_F_SP_ENQ | RING_F_MP_RTS_ENQ | RING_F_MP_HTS_ENQ |
+		RING_F_MP_RTS_V2_ENQ);
 	static const uint32_t cons_st_flags =
-		(RING_F_SC_DEQ | RING_F_MC_RTS_DEQ | RING_F_MC_HTS_DEQ);
+		(RING_F_SC_DEQ | RING_F_MC_RTS_DEQ | RING_F_MC_HTS_DEQ |
+		RING_F_MC_RTS_V2_DEQ);
 
 	switch (flags & prod_st_flags) {
 	case 0:
@@ -152,6 +156,9 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st,
 	case RING_F_MP_HTS_ENQ:
 		*prod_st = RTE_RING_SYNC_MT_HTS;
 		break;
+	case RING_F_MP_RTS_V2_ENQ:
+		*prod_st = RTE_RING_SYNC_MT_RTS_V2;
+		break;
 	default:
 		return -EINVAL;
 	}
@@ -169,6 +176,9 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st,
 	case RING_F_MC_HTS_DEQ:
 		*cons_st = RTE_RING_SYNC_MT_HTS;
 		break;
+	case RING_F_MC_RTS_V2_DEQ:
+		*cons_st = RTE_RING_SYNC_MT_RTS_V2;
+		break;
 	default:
 		return -EINVAL;
 	}
@@ -239,6 +249,28 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
 	if (flags & RING_F_MC_RTS_DEQ)
 		rte_ring_set_cons_htd_max(r, r->capacity / HTD_MAX_DEF);
 
+	/* set default values for head-tail distance and allocate memory to cache */
+	if (flags & RING_F_MP_RTS_V2_ENQ) {
+		rte_ring_set_prod_htd_max(r, r->capacity / HTD_MAX_DEF);
+		r->rts_prod.rts_cache = (struct rte_ring_rts_cache *)rte_zmalloc(
+			"RTS_PROD_CACHE", sizeof(struct rte_ring_rts_cache) * r->size, 0);
+		if (r->rts_prod.rts_cache == NULL) {
+			RING_LOG(ERR, "Cannot reserve memory for rts prod cache");
+			return -ENOMEM;
+		}
+	}
+	if (flags & RING_F_MC_RTS_V2_DEQ) {
+		rte_ring_set_cons_htd_max(r, r->capacity / HTD_MAX_DEF);
+		r->rts_cons.rts_cache = (struct rte_ring_rts_cache *)rte_zmalloc(
+			"RTS_CONS_CACHE", sizeof(struct rte_ring_rts_cache) * r->size, 0);
+		if (r->rts_cons.rts_cache == NULL) {
+			if (flags & RING_F_MP_RTS_V2_ENQ)
+				rte_free(r->rts_prod.rts_cache);
+			RING_LOG(ERR, "Cannot reserve memory for rts cons cache");
+			return -ENOMEM;
+		}
+	}
+
 	return 0;
 }
 
@@ -293,9 +325,13 @@ rte_ring_create_elem(const char *name, unsigned int esize, unsigned int count,
 					 mz_flags, alignof(typeof(*r)));
 	if (mz != NULL) {
 		r = mz->addr;
-		/* no need to check return value here, we already checked the
-		 * arguments above */
-		rte_ring_init(r, name, requested_count, flags);
+
+		if (rte_ring_init(r, name, requested_count, flags)) {
+			rte_free(te);
+			if (rte_memzone_free(mz) != 0)
+				RING_LOG(ERR, "Cannot free memory for ring");
+			return NULL;
+		}
 
 		te->data = (void *) r;
 		r->memzone = mz;
@@ -358,6 +394,11 @@ rte_ring_free(struct rte_ring *r)
 
 	rte_mcfg_tailq_write_unlock();
 
+	if (r->flags & RING_F_MP_RTS_V2_ENQ)
+		rte_free(r->rts_prod.rts_cache);
+	if (r->flags & RING_F_MC_RTS_V2_DEQ)
+		rte_free(r->rts_cons.rts_cache);
+
 	if (rte_memzone_free(r->memzone) != 0)
 		RING_LOG(ERR, "Cannot free memory");
 
diff --git a/lib/ring/rte_ring.h b/lib/ring/rte_ring.h
index 11ca69c73d..2b35ce038e 100644
--- a/lib/ring/rte_ring.h
+++ b/lib/ring/rte_ring.h
@@ -89,6 +89,9 @@ ssize_t rte_ring_get_memsize(unsigned int count);
  *      - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when
  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
  *        is "multi-producer RTS mode".
+ *      - RING_F_MP_RTS_V2_ENQ: If this flag is set, the default behavior when
+ *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
+ *        is "multi-producer RTS V2 mode".
  *      - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when
  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
  *        is "multi-producer HTS mode".
@@ -101,6 +104,9 @@ ssize_t rte_ring_get_memsize(unsigned int count);
  *      - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when
  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *        is "multi-consumer RTS mode".
+ *      - RING_F_MC_RTS_V2_DEQ: If this flag is set, the default behavior when
+ *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
+ *        is "multi-consumer RTS V2 mode".
  *      - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when
  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *        is "multi-consumer HTS mode".
@@ -149,6 +155,9 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
  *      - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when
  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
  *        is "multi-producer RTS mode".
+ *      - RING_F_MP_RTS_V2_ENQ: If this flag is set, the default behavior when
+ *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
+ *        is "multi-producer RTS V2 mode".
  *      - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when
  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
  *        is "multi-producer HTS mode".
@@ -161,6 +170,9 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
  *      - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when
  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *        is "multi-consumer RTS mode".
+ *      - RING_F_MC_RTS_V2_DEQ: If this flag is set, the default behavior when
+ *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
+ *        is "multi-consumer RTS V2 mode".
  *      - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when
  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *        is "multi-consumer HTS mode".
diff --git a/lib/ring/rte_ring_core.h b/lib/ring/rte_ring_core.h
index 6cd6ce9884..9e627d26c1 100644
--- a/lib/ring/rte_ring_core.h
+++ b/lib/ring/rte_ring_core.h
@@ -55,6 +55,7 @@ enum rte_ring_sync_type {
 	RTE_RING_SYNC_ST,     /**< single thread only */
 	RTE_RING_SYNC_MT_RTS, /**< multi-thread relaxed tail sync */
 	RTE_RING_SYNC_MT_HTS, /**< multi-thread head/tail sync */
+	RTE_RING_SYNC_MT_RTS_V2, /**< multi-thread relaxed tail sync v2 */
 };
 
 /**
@@ -82,11 +83,16 @@ union __rte_ring_rts_poscnt {
 	} val;
 };
 
+struct rte_ring_rts_cache {
+	volatile RTE_ATOMIC(uint32_t) num;      /**< Number of objs. */
+};
+
 struct rte_ring_rts_headtail {
 	volatile union __rte_ring_rts_poscnt tail;
 	enum rte_ring_sync_type sync_type;  /**< sync type of prod/cons */
 	uint32_t htd_max;   /**< max allowed distance between head/tail */
 	volatile union __rte_ring_rts_poscnt head;
+	struct rte_ring_rts_cache *rts_cache; /**< Cache of prod/cons */
 };
 
 union __rte_ring_hts_pos {
@@ -163,4 +169,7 @@ struct rte_ring {
 #define RING_F_MP_HTS_ENQ 0x0020 /**< The default enqueue is "MP HTS". */
 #define RING_F_MC_HTS_DEQ 0x0040 /**< The default dequeue is "MC HTS". */
 
+#define RING_F_MP_RTS_V2_ENQ 0x0080 /**< The default enqueue is "MP RTS V2". */
+#define RING_F_MC_RTS_V2_DEQ 0x0100 /**< The default dequeue is "MC RTS V2". */
+
 #endif /* _RTE_RING_CORE_H_ */
diff --git a/lib/ring/rte_ring_elem.h b/lib/ring/rte_ring_elem.h
index b96bfc003f..1352709f94 100644
--- a/lib/ring/rte_ring_elem.h
+++ b/lib/ring/rte_ring_elem.h
@@ -71,6 +71,9 @@ ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count);
  *      - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when
  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
  *        is "multi-producer RTS mode".
+ *      - RING_F_MP_RTS_V2_ENQ: If this flag is set, the default behavior when
+ *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
+ *        is "multi-producer RTS V2 mode".
  *      - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when
  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
  *        is "multi-producer HTS mode".
@@ -83,6 +86,9 @@ ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count);
  *      - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when
  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *        is "multi-consumer RTS mode".
+ *      - RING_F_MC_RTS_V2_DEQ: If this flag is set, the default behavior when
+ *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
+ *        is "multi-consumer RTS V2 mode".
  *      - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when
  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *        is "multi-consumer HTS mode".
@@ -203,6 +209,9 @@ rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
 	case RTE_RING_SYNC_MT_HTS:
 		return rte_ring_mp_hts_enqueue_bulk_elem(r, obj_table, esize, n,
 			free_space);
+	case RTE_RING_SYNC_MT_RTS_V2:
+		return rte_ring_mp_rts_v2_enqueue_bulk_elem(r, obj_table, esize, n,
+			free_space);
 	}
 
 	/* valid ring should never reach this point */
@@ -385,6 +394,9 @@ rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
 	case RTE_RING_SYNC_MT_HTS:
 		return rte_ring_mc_hts_dequeue_bulk_elem(r, obj_table, esize,
 			n, available);
+	case RTE_RING_SYNC_MT_RTS_V2:
+		return rte_ring_mc_rts_v2_dequeue_bulk_elem(r, obj_table, esize,
+			n, available);
 	}
 
 	/* valid ring should never reach this point */
@@ -571,6 +583,9 @@ rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
 	case RTE_RING_SYNC_MT_HTS:
 		return rte_ring_mp_hts_enqueue_burst_elem(r, obj_table, esize,
 			n, free_space);
+	case RTE_RING_SYNC_MT_RTS_V2:
+		return rte_ring_mp_rts_v2_enqueue_burst_elem(r, obj_table, esize,
+			n, free_space);
 	}
 
 	/* valid ring should never reach this point */
@@ -681,6 +696,9 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
 	case RTE_RING_SYNC_MT_HTS:
 		return rte_ring_mc_hts_dequeue_burst_elem(r, obj_table, esize,
 			n, available);
+	case RTE_RING_SYNC_MT_RTS_V2:
+		return rte_ring_mc_rts_v2_dequeue_burst_elem(r, obj_table, esize,
+			n, available);
 	}
 
 	/* valid ring should never reach this point */
diff --git a/lib/ring/rte_ring_rts.h b/lib/ring/rte_ring_rts.h
index d7a3863c83..b47e400452 100644
--- a/lib/ring/rte_ring_rts.h
+++ b/lib/ring/rte_ring_rts.h
@@ -84,6 +84,33 @@ rte_ring_mp_rts_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
 			RTE_RING_QUEUE_FIXED, free_space);
 }
 
+/**
+ * Enqueue several objects on the RTS ring (multi-producers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of objects.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   The number of objects enqueued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_mp_rts_v2_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
+	unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_rts_v2_enqueue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_FIXED, free_space);
+}
+
 /**
  * Dequeue several objects from an RTS ring (multi-consumers safe).
  *
@@ -111,6 +138,33 @@ rte_ring_mc_rts_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
 			RTE_RING_QUEUE_FIXED, available);
 }
 
+/**
+ * Dequeue several objects from an RTS ring (multi-consumers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of objects that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   The number of objects dequeued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_mc_rts_v2_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
+	unsigned int esize, unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_rts_v2_dequeue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_FIXED, available);
+}
+
 /**
  * Enqueue several objects on the RTS ring (multi-producers safe).
  *
@@ -138,6 +192,33 @@ rte_ring_mp_rts_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
 			RTE_RING_QUEUE_VARIABLE, free_space);
 }
 
+/**
+ * Enqueue several objects on the RTS ring (multi-producers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of objects.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   - n: Actual number of objects enqueued.
+ */
+static __rte_always_inline unsigned int
+rte_ring_mp_rts_v2_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
+	unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_rts_v2_enqueue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_VARIABLE, free_space);
+}
+
 /**
  * Dequeue several objects from an RTS  ring (multi-consumers safe).
  * When the requested objects are more than the available objects,
@@ -167,6 +248,35 @@ rte_ring_mc_rts_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
 			RTE_RING_QUEUE_VARIABLE, available);
 }
 
+/**
+ * Dequeue several objects from an RTS  ring (multi-consumers safe).
+ * When the requested objects are more than the available objects,
+ * only dequeue the actual number of objects.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of objects that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   - n: Actual number of objects dequeued, 0 if ring is empty
+ */
+static __rte_always_inline unsigned int
+rte_ring_mc_rts_v2_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
+	unsigned int esize, unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_rts_v2_dequeue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_VARIABLE, available);
+}
+
 /**
  * Enqueue several objects on the RTS ring (multi-producers safe).
  *
@@ -213,6 +323,52 @@ rte_ring_mc_rts_dequeue_bulk(struct rte_ring *r, void **obj_table,
 			sizeof(uintptr_t), n, available);
 }
 
+/**
+ * Enqueue several objects on the RTS V2 ring (multi-producers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   The number of objects enqueued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_mp_rts_v2_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
+			 unsigned int n, unsigned int *free_space)
+{
+	return rte_ring_mp_rts_v2_enqueue_bulk_elem(r, obj_table,
+			sizeof(uintptr_t), n, free_space);
+}
+
+/**
+ * Dequeue several objects from an RTS V2 ring (multi-consumers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   The number of objects dequeued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_mc_rts_v2_dequeue_bulk(struct rte_ring *r, void **obj_table,
+		unsigned int n, unsigned int *available)
+{
+	return rte_ring_mc_rts_v2_dequeue_bulk_elem(r, obj_table,
+			sizeof(uintptr_t), n, available);
+}
+
 /**
  * Enqueue several objects on the RTS ring (multi-producers safe).
  *
@@ -261,6 +417,54 @@ rte_ring_mc_rts_dequeue_burst(struct rte_ring *r, void **obj_table,
 			sizeof(uintptr_t), n, available);
 }
 
+/**
+ * Enqueue several objects on the RTS V2 ring (multi-producers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   - n: Actual number of objects enqueued.
+ */
+static __rte_always_inline unsigned int
+rte_ring_mp_rts_v2_enqueue_burst(struct rte_ring *r, void * const *obj_table,
+			 unsigned int n, unsigned int *free_space)
+{
+	return rte_ring_mp_rts_v2_enqueue_burst_elem(r, obj_table,
+			sizeof(uintptr_t), n, free_space);
+}
+
+/**
+ * Dequeue several objects from an RTS V2 ring (multi-consumers safe).
+ * When the requested objects are more than the available objects,
+ * only dequeue the actual number of objects.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   - n: Actual number of objects dequeued, 0 if ring is empty
+ */
+static __rte_always_inline unsigned int
+rte_ring_mc_rts_v2_dequeue_burst(struct rte_ring *r, void **obj_table,
+		unsigned int n, unsigned int *available)
+{
+	return rte_ring_mc_rts_v2_dequeue_burst_elem(r, obj_table,
+			sizeof(uintptr_t), n, available);
+}
+
 /**
  * Return producer max Head-Tail-Distance (HTD).
  *
@@ -273,7 +477,8 @@ rte_ring_mc_rts_dequeue_burst(struct rte_ring *r, void **obj_table,
 static inline uint32_t
 rte_ring_get_prod_htd_max(const struct rte_ring *r)
 {
-	if (r->prod.sync_type == RTE_RING_SYNC_MT_RTS)
+	if ((r->prod.sync_type == RTE_RING_SYNC_MT_RTS) ||
+			(r->prod.sync_type == RTE_RING_SYNC_MT_RTS_V2))
 		return r->rts_prod.htd_max;
 	return UINT32_MAX;
 }
@@ -292,7 +497,8 @@ rte_ring_get_prod_htd_max(const struct rte_ring *r)
 static inline int
 rte_ring_set_prod_htd_max(struct rte_ring *r, uint32_t v)
 {
-	if (r->prod.sync_type != RTE_RING_SYNC_MT_RTS)
+	if ((r->prod.sync_type != RTE_RING_SYNC_MT_RTS) &&
+			(r->prod.sync_type != RTE_RING_SYNC_MT_RTS_V2))
 		return -ENOTSUP;
 
 	r->rts_prod.htd_max = v;
@@ -311,7 +517,8 @@ rte_ring_set_prod_htd_max(struct rte_ring *r, uint32_t v)
 static inline uint32_t
 rte_ring_get_cons_htd_max(const struct rte_ring *r)
 {
-	if (r->cons.sync_type == RTE_RING_SYNC_MT_RTS)
+	if ((r->cons.sync_type == RTE_RING_SYNC_MT_RTS) ||
+			(r->cons.sync_type == RTE_RING_SYNC_MT_RTS_V2))
 		return r->rts_cons.htd_max;
 	return UINT32_MAX;
 }
@@ -330,7 +537,8 @@ rte_ring_get_cons_htd_max(const struct rte_ring *r)
 static inline int
 rte_ring_set_cons_htd_max(struct rte_ring *r, uint32_t v)
 {
-	if (r->cons.sync_type != RTE_RING_SYNC_MT_RTS)
+	if ((r->cons.sync_type != RTE_RING_SYNC_MT_RTS) &&
+			(r->cons.sync_type != RTE_RING_SYNC_MT_RTS_V2))
 		return -ENOTSUP;
 
 	r->rts_cons.htd_max = v;
diff --git a/lib/ring/rte_ring_rts_elem_pvt.h b/lib/ring/rte_ring_rts_elem_pvt.h
index 122650346b..4ce22a93ed 100644
--- a/lib/ring/rte_ring_rts_elem_pvt.h
+++ b/lib/ring/rte_ring_rts_elem_pvt.h
@@ -46,6 +46,92 @@ __rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht)
 			rte_memory_order_release, rte_memory_order_acquire) == 0);
 }
 
+/**
+ * @file rte_ring_rts_elem_pvt.h
+ * It is not recommended to include this file directly,
+ * include <rte_ring.h> instead.
+ * Contains internal helper functions for Relaxed Tail Sync (RTS) ring mode.
+ * For more information please refer to <rte_ring_rts.h>.
+ */
+
+/**
+ * @internal This function updates tail values.
+ */
+static __rte_always_inline void
+__rte_ring_rts_v2_update_tail(struct rte_ring_rts_headtail *ht,
+	uint32_t old_tail, uint32_t num, uint32_t mask)
+{
+	union __rte_ring_rts_poscnt ot, nt;
+
+	ot.val.cnt = nt.val.cnt = 0;
+	ot.val.pos = old_tail;
+	nt.val.pos = old_tail + num;
+
+	/*
+	 * If the tail is equal to the current enqueues/dequeues, update
+	 * the tail with new value and then continue to try to update the
+	 * tail until the num of the cache is 0, otherwise write the num of
+	 * the current enqueues/dequeues to the cache.
+	 */
+
+	if (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw,
+				(uint64_t *)(uintptr_t)&ot.raw, nt.raw,
+				rte_memory_order_release, rte_memory_order_acquire) == 0) {
+		ot.val.pos = old_tail;
+
+		/*
+		 * Write the num of the current enqueues/dequeues to the
+		 * corresponding cache.
+		 */
+		rte_atomic_store_explicit(&ht->rts_cache[ot.val.pos & mask].num,
+			num, rte_memory_order_release);
+
+		/*
+		 * There may be competition with another enqueues/dequeues
+		 * for the update tail. The winner continues to try to update
+		 * the tail, and the loser exits.
+		 */
+		if (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw,
+					(uint64_t *)(uintptr_t)&ot.raw, nt.raw,
+					rte_memory_order_release, rte_memory_order_acquire) == 0)
+			return;
+
+		/*
+		 * Set the corresponding cache to 0 for next use.
+		 */
+		rte_atomic_store_explicit(&ht->rts_cache[ot.val.pos & mask].num,
+			0, rte_memory_order_release);
+	}
+
+	/*
+	 * Try to update the tail until the num of the corresponding cache is 0.
+	 * Getting here means that the current enqueues/dequeues is trying to update
+	 * the tail of another enqueues/dequeues.
+	 */
+	while (1) {
+		num = rte_atomic_load_explicit(&ht->rts_cache[nt.val.pos & mask].num,
+			rte_memory_order_acquire);
+		if (num == 0)
+			break;
+
+		ot.val.pos = nt.val.pos;
+		nt.val.pos += num;
+
+		/*
+		 * There may be competition with another enqueues/dequeues
+		 * for the update tail. The winner continues to try to update
+		 * the tail, and the loser exits.
+		 */
+		if (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw,
+					(uint64_t *)(uintptr_t)&ot.raw, nt.raw,
+					rte_memory_order_release, rte_memory_order_acquire) == 0)
+			return;
+
+		rte_atomic_store_explicit(&ht->rts_cache[ot.val.pos & mask].num,
+			0, rte_memory_order_release);
+	};
+}
+
 /**
  * @internal This function waits till head/tail distance wouldn't
  * exceed pre-defined max value.
@@ -218,6 +304,47 @@ __rte_ring_do_rts_enqueue_elem(struct rte_ring *r, const void *obj_table,
 	return n;
 }
 
+/**
+ * @internal Enqueue several objects on the RTS ring.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of objects.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_rts_v2_enqueue_elem(struct rte_ring *r, const void *obj_table,
+	uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
+	uint32_t *free_space)
+{
+	uint32_t free, head;
+
+	n =  __rte_ring_rts_move_prod_head(r, n, behavior, &head, &free);
+
+	if (n != 0) {
+		__rte_ring_enqueue_elems(r, head, obj_table, esize, n);
+		__rte_ring_rts_v2_update_tail(&r->rts_prod, head, n, r->mask);
+	}
+
+	if (free_space != NULL)
+		*free_space = free - n;
+	return n;
+}
+
 /**
  * @internal Dequeue several objects from the RTS ring.
  *
@@ -259,4 +386,45 @@ __rte_ring_do_rts_dequeue_elem(struct rte_ring *r, void *obj_table,
 	return n;
 }
 
+/**
+ * @internal Dequeue several objects from the RTS ring.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of objects.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_rts_v2_dequeue_elem(struct rte_ring *r, void *obj_table,
+	uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
+	uint32_t *available)
+{
+	uint32_t entries, head;
+
+	n = __rte_ring_rts_move_cons_head(r, n, behavior, &head, &entries);
+
+	if (n != 0) {
+		__rte_ring_dequeue_elems(r, head, obj_table, esize, n);
+		__rte_ring_rts_v2_update_tail(&r->rts_cons, head, n, r->mask);
+	}
+
+	if (available != NULL)
+		*available = entries - n;
+	return n;
+}
+
 #endif /* _RTE_RING_RTS_ELEM_PVT_H_ */
-- 
2.27.0


^ permalink raw reply	[flat|nested] 7+ messages in thread

* [PATCH v2] ring: add the second version of the RTS interface
  2025-01-05  9:57 [PATCH] ring: add the second version of the RTS interface Huichao Cai
@ 2025-01-05 15:13 ` Huichao Cai
  2025-01-08  1:41   ` Huichao Cai
                     ` (2 more replies)
  0 siblings, 3 replies; 7+ messages in thread
From: Huichao Cai @ 2025-01-05 15:13 UTC (permalink / raw)
  To: honnappa.nagarahalli, konstantin.v.ananyev, thomas; +Cc: dev

The timing of the update of the RTS enqueues/dequeues tail is
limited to the last enqueues/dequeues, which reduces concurrency,
so the RTS interface of the V2 version is added, which makes the tail
of the enqueues/dequeues not limited to the last enqueues/dequeues
and thus enables timely updates to increase concurrency.

Add some corresponding test cases.

Signed-off-by: Huichao Cai <chcchc88@163.com>
---
 app/test/meson.build                   |   1 +
 app/test/test_ring.c                   |  26 +++
 app/test/test_ring_rts_v2_stress.c     |  32 ++++
 app/test/test_ring_stress.c            |   3 +
 app/test/test_ring_stress.h            |   1 +
 devtools/libabigail.abignore           |   6 +
 doc/guides/rel_notes/release_25_03.rst |   2 +
 lib/ring/rte_ring.c                    |  54 ++++++-
 lib/ring/rte_ring.h                    |  12 ++
 lib/ring/rte_ring_core.h               |   9 ++
 lib/ring/rte_ring_elem.h               |  18 +++
 lib/ring/rte_ring_rts.h                | 216 ++++++++++++++++++++++++-
 lib/ring/rte_ring_rts_elem_pvt.h       | 168 +++++++++++++++++++
 13 files changed, 538 insertions(+), 10 deletions(-)
 create mode 100644 app/test/test_ring_rts_v2_stress.c

diff --git a/app/test/meson.build b/app/test/meson.build
index d5cb6a7f7a..e3d8cef3fa 100644
--- a/app/test/meson.build
+++ b/app/test/meson.build
@@ -166,6 +166,7 @@ source_file_deps = {
     'test_ring_mt_peek_stress_zc.c': ['ptr_compress'],
     'test_ring_perf.c': ['ptr_compress'],
     'test_ring_rts_stress.c': ['ptr_compress'],
+    'test_ring_rts_v2_stress.c': ['ptr_compress'],
     'test_ring_st_peek_stress.c': ['ptr_compress'],
     'test_ring_st_peek_stress_zc.c': ['ptr_compress'],
     'test_ring_stress.c': ['ptr_compress'],
diff --git a/app/test/test_ring.c b/app/test/test_ring.c
index ba1fec1de3..094f14b859 100644
--- a/app/test/test_ring.c
+++ b/app/test/test_ring.c
@@ -284,6 +284,19 @@ static const struct {
 			.felem = rte_ring_dequeue_bulk_elem,
 		},
 	},
+	{
+		.desc = "MP_RTS/MC_RTS V2 sync mode",
+		.api_type = TEST_RING_ELEM_BULK | TEST_RING_THREAD_DEF,
+		.create_flags = RING_F_MP_RTS_V2_ENQ | RING_F_MC_RTS_V2_DEQ,
+		.enq = {
+			.flegacy = rte_ring_enqueue_bulk,
+			.felem = rte_ring_enqueue_bulk_elem,
+		},
+		.deq = {
+			.flegacy = rte_ring_dequeue_bulk,
+			.felem = rte_ring_dequeue_bulk_elem,
+		},
+	},
 	{
 		.desc = "MP_HTS/MC_HTS sync mode",
 		.api_type = TEST_RING_ELEM_BULK | TEST_RING_THREAD_DEF,
@@ -349,6 +362,19 @@ static const struct {
 			.felem = rte_ring_dequeue_burst_elem,
 		},
 	},
+	{
+		.desc = "MP_RTS/MC_RTS V2 sync mode",
+		.api_type = TEST_RING_ELEM_BURST | TEST_RING_THREAD_DEF,
+		.create_flags = RING_F_MP_RTS_V2_ENQ | RING_F_MC_RTS_V2_DEQ,
+		.enq = {
+			.flegacy = rte_ring_enqueue_burst,
+			.felem = rte_ring_enqueue_burst_elem,
+		},
+		.deq = {
+			.flegacy = rte_ring_dequeue_burst,
+			.felem = rte_ring_dequeue_burst_elem,
+		},
+	},
 	{
 		.desc = "MP_HTS/MC_HTS sync mode",
 		.api_type = TEST_RING_ELEM_BURST | TEST_RING_THREAD_DEF,
diff --git a/app/test/test_ring_rts_v2_stress.c b/app/test/test_ring_rts_v2_stress.c
new file mode 100644
index 0000000000..6079366a7d
--- /dev/null
+++ b/app/test/test_ring_rts_v2_stress.c
@@ -0,0 +1,32 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+#include "test_ring_stress_impl.h"
+
+static inline uint32_t
+_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n,
+	uint32_t *avail)
+{
+	return rte_ring_mc_rts_v2_dequeue_bulk(r, obj, n, avail);
+}
+
+static inline uint32_t
+_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n,
+	uint32_t *free)
+{
+	return rte_ring_mp_rts_v2_enqueue_bulk(r, obj, n, free);
+}
+
+static int
+_st_ring_init(struct rte_ring *r, const char *name, uint32_t num)
+{
+	return rte_ring_init(r, name, num,
+		RING_F_MP_RTS_V2_ENQ | RING_F_MC_RTS_V2_DEQ);
+}
+
+const struct test test_ring_rts_v2_stress = {
+	.name = "MT_RTS_V2",
+	.nb_case = RTE_DIM(tests),
+	.cases = tests,
+};
diff --git a/app/test/test_ring_stress.c b/app/test/test_ring_stress.c
index 1af45e0fc8..94085acd5e 100644
--- a/app/test/test_ring_stress.c
+++ b/app/test/test_ring_stress.c
@@ -43,6 +43,9 @@ test_ring_stress(void)
 	n += test_ring_rts_stress.nb_case;
 	k += run_test(&test_ring_rts_stress);
 
+	n += test_ring_rts_v2_stress.nb_case;
+	k += run_test(&test_ring_rts_v2_stress);
+
 	n += test_ring_hts_stress.nb_case;
 	k += run_test(&test_ring_hts_stress);
 
diff --git a/app/test/test_ring_stress.h b/app/test/test_ring_stress.h
index 416d68c9a0..505957f6fb 100644
--- a/app/test/test_ring_stress.h
+++ b/app/test/test_ring_stress.h
@@ -34,6 +34,7 @@ struct test {
 
 extern const struct test test_ring_mpmc_stress;
 extern const struct test test_ring_rts_stress;
+extern const struct test test_ring_rts_v2_stress;
 extern const struct test test_ring_hts_stress;
 extern const struct test test_ring_mt_peek_stress;
 extern const struct test test_ring_mt_peek_stress_zc;
diff --git a/devtools/libabigail.abignore b/devtools/libabigail.abignore
index 21b8cd6113..d4dd99a99e 100644
--- a/devtools/libabigail.abignore
+++ b/devtools/libabigail.abignore
@@ -33,3 +33,9 @@
 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 ; Temporary exceptions till next major ABI version ;
 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+[suppress_type]
+       type_kind = struct
+       name = rte_ring_rts_cache
+[suppress_type]
+       name = rte_ring_rts_headtail
+       has_data_member_inserted_between = {offset_of(head), end}
diff --git a/doc/guides/rel_notes/release_25_03.rst b/doc/guides/rel_notes/release_25_03.rst
index 426dfcd982..f73bc9e397 100644
--- a/doc/guides/rel_notes/release_25_03.rst
+++ b/doc/guides/rel_notes/release_25_03.rst
@@ -102,6 +102,8 @@ ABI Changes
 
 * No ABI change that would break compatibility with 24.11.
 
+* ring: Added ``rte_ring_rts_cache`` structure and ``rts_cache`` field to the
+  ``rte_ring_rts_headtail`` structure.
 
 Known Issues
 ------------
diff --git a/lib/ring/rte_ring.c b/lib/ring/rte_ring.c
index aebb6d6728..ada1ae88fa 100644
--- a/lib/ring/rte_ring.c
+++ b/lib/ring/rte_ring.c
@@ -43,7 +43,8 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
 /* mask of all valid flag values to ring_create() */
 #define RING_F_MASK (RING_F_SP_ENQ | RING_F_SC_DEQ | RING_F_EXACT_SZ | \
 		     RING_F_MP_RTS_ENQ | RING_F_MC_RTS_DEQ |	       \
-		     RING_F_MP_HTS_ENQ | RING_F_MC_HTS_DEQ)
+		     RING_F_MP_HTS_ENQ | RING_F_MC_HTS_DEQ |	       \
+		     RING_F_MP_RTS_V2_ENQ | RING_F_MC_RTS_V2_DEQ)
 
 /* true if x is a power of 2 */
 #define POWEROF2(x) ((((x)-1) & (x)) == 0)
@@ -106,6 +107,7 @@ reset_headtail(void *p)
 		ht->tail = 0;
 		break;
 	case RTE_RING_SYNC_MT_RTS:
+	case RTE_RING_SYNC_MT_RTS_V2:
 		ht_rts->head.raw = 0;
 		ht_rts->tail.raw = 0;
 		break;
@@ -135,9 +137,11 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st,
 	enum rte_ring_sync_type *cons_st)
 {
 	static const uint32_t prod_st_flags =
-		(RING_F_SP_ENQ | RING_F_MP_RTS_ENQ | RING_F_MP_HTS_ENQ);
+		(RING_F_SP_ENQ | RING_F_MP_RTS_ENQ | RING_F_MP_HTS_ENQ |
+		RING_F_MP_RTS_V2_ENQ);
 	static const uint32_t cons_st_flags =
-		(RING_F_SC_DEQ | RING_F_MC_RTS_DEQ | RING_F_MC_HTS_DEQ);
+		(RING_F_SC_DEQ | RING_F_MC_RTS_DEQ | RING_F_MC_HTS_DEQ |
+		RING_F_MC_RTS_V2_DEQ);
 
 	switch (flags & prod_st_flags) {
 	case 0:
@@ -152,6 +156,9 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st,
 	case RING_F_MP_HTS_ENQ:
 		*prod_st = RTE_RING_SYNC_MT_HTS;
 		break;
+	case RING_F_MP_RTS_V2_ENQ:
+		*prod_st = RTE_RING_SYNC_MT_RTS_V2;
+		break;
 	default:
 		return -EINVAL;
 	}
@@ -169,6 +176,9 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st,
 	case RING_F_MC_HTS_DEQ:
 		*cons_st = RTE_RING_SYNC_MT_HTS;
 		break;
+	case RING_F_MC_RTS_V2_DEQ:
+		*cons_st = RTE_RING_SYNC_MT_RTS_V2;
+		break;
 	default:
 		return -EINVAL;
 	}
@@ -239,6 +249,28 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
 	if (flags & RING_F_MC_RTS_DEQ)
 		rte_ring_set_cons_htd_max(r, r->capacity / HTD_MAX_DEF);
 
+	/* set default values for head-tail distance and allocate memory to cache */
+	if (flags & RING_F_MP_RTS_V2_ENQ) {
+		rte_ring_set_prod_htd_max(r, r->capacity / HTD_MAX_DEF);
+		r->rts_prod.rts_cache = (struct rte_ring_rts_cache *)rte_zmalloc(
+			"RTS_PROD_CACHE", sizeof(struct rte_ring_rts_cache) * r->size, 0);
+		if (r->rts_prod.rts_cache == NULL) {
+			RING_LOG(ERR, "Cannot reserve memory for rts prod cache");
+			return -ENOMEM;
+		}
+	}
+	if (flags & RING_F_MC_RTS_V2_DEQ) {
+		rte_ring_set_cons_htd_max(r, r->capacity / HTD_MAX_DEF);
+		r->rts_cons.rts_cache = (struct rte_ring_rts_cache *)rte_zmalloc(
+			"RTS_CONS_CACHE", sizeof(struct rte_ring_rts_cache) * r->size, 0);
+		if (r->rts_cons.rts_cache == NULL) {
+			if (flags & RING_F_MP_RTS_V2_ENQ)
+				rte_free(r->rts_prod.rts_cache);
+			RING_LOG(ERR, "Cannot reserve memory for rts cons cache");
+			return -ENOMEM;
+		}
+	}
+
 	return 0;
 }
 
@@ -293,9 +325,14 @@ rte_ring_create_elem(const char *name, unsigned int esize, unsigned int count,
 					 mz_flags, alignof(typeof(*r)));
 	if (mz != NULL) {
 		r = mz->addr;
-		/* no need to check return value here, we already checked the
-		 * arguments above */
-		rte_ring_init(r, name, requested_count, flags);
+
+		if (rte_ring_init(r, name, requested_count, flags)) {
+			rte_free(te);
+			if (rte_memzone_free(mz) != 0)
+				RING_LOG(ERR, "Cannot free memory for ring");
+			rte_mcfg_tailq_write_unlock();
+			return NULL;
+		}
 
 		te->data = (void *) r;
 		r->memzone = mz;
@@ -358,6 +395,11 @@ rte_ring_free(struct rte_ring *r)
 
 	rte_mcfg_tailq_write_unlock();
 
+	if (r->flags & RING_F_MP_RTS_V2_ENQ)
+		rte_free(r->rts_prod.rts_cache);
+	if (r->flags & RING_F_MC_RTS_V2_DEQ)
+		rte_free(r->rts_cons.rts_cache);
+
 	if (rte_memzone_free(r->memzone) != 0)
 		RING_LOG(ERR, "Cannot free memory");
 
diff --git a/lib/ring/rte_ring.h b/lib/ring/rte_ring.h
index 11ca69c73d..2b35ce038e 100644
--- a/lib/ring/rte_ring.h
+++ b/lib/ring/rte_ring.h
@@ -89,6 +89,9 @@ ssize_t rte_ring_get_memsize(unsigned int count);
  *      - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when
  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
  *        is "multi-producer RTS mode".
+ *      - RING_F_MP_RTS_V2_ENQ: If this flag is set, the default behavior when
+ *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
+ *        is "multi-producer RTS V2 mode".
  *      - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when
  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
  *        is "multi-producer HTS mode".
@@ -101,6 +104,9 @@ ssize_t rte_ring_get_memsize(unsigned int count);
  *      - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when
  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *        is "multi-consumer RTS mode".
+ *      - RING_F_MC_RTS_V2_DEQ: If this flag is set, the default behavior when
+ *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
+ *        is "multi-consumer RTS V2 mode".
  *      - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when
  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *        is "multi-consumer HTS mode".
@@ -149,6 +155,9 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
  *      - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when
  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
  *        is "multi-producer RTS mode".
+ *      - RING_F_MP_RTS_V2_ENQ: If this flag is set, the default behavior when
+ *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
+ *        is "multi-producer RTS V2 mode".
  *      - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when
  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
  *        is "multi-producer HTS mode".
@@ -161,6 +170,9 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
  *      - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when
  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *        is "multi-consumer RTS mode".
+ *      - RING_F_MC_RTS_V2_DEQ: If this flag is set, the default behavior when
+ *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
+ *        is "multi-consumer RTS V2 mode".
  *      - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when
  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *        is "multi-consumer HTS mode".
diff --git a/lib/ring/rte_ring_core.h b/lib/ring/rte_ring_core.h
index 6cd6ce9884..9e627d26c1 100644
--- a/lib/ring/rte_ring_core.h
+++ b/lib/ring/rte_ring_core.h
@@ -55,6 +55,7 @@ enum rte_ring_sync_type {
 	RTE_RING_SYNC_ST,     /**< single thread only */
 	RTE_RING_SYNC_MT_RTS, /**< multi-thread relaxed tail sync */
 	RTE_RING_SYNC_MT_HTS, /**< multi-thread head/tail sync */
+	RTE_RING_SYNC_MT_RTS_V2, /**< multi-thread relaxed tail sync v2 */
 };
 
 /**
@@ -82,11 +83,16 @@ union __rte_ring_rts_poscnt {
 	} val;
 };
 
+struct rte_ring_rts_cache {
+	volatile RTE_ATOMIC(uint32_t) num;      /**< Number of objs. */
+};
+
 struct rte_ring_rts_headtail {
 	volatile union __rte_ring_rts_poscnt tail;
 	enum rte_ring_sync_type sync_type;  /**< sync type of prod/cons */
 	uint32_t htd_max;   /**< max allowed distance between head/tail */
 	volatile union __rte_ring_rts_poscnt head;
+	struct rte_ring_rts_cache *rts_cache; /**< Cache of prod/cons */
 };
 
 union __rte_ring_hts_pos {
@@ -163,4 +169,7 @@ struct rte_ring {
 #define RING_F_MP_HTS_ENQ 0x0020 /**< The default enqueue is "MP HTS". */
 #define RING_F_MC_HTS_DEQ 0x0040 /**< The default dequeue is "MC HTS". */
 
+#define RING_F_MP_RTS_V2_ENQ 0x0080 /**< The default enqueue is "MP RTS V2". */
+#define RING_F_MC_RTS_V2_DEQ 0x0100 /**< The default dequeue is "MC RTS V2". */
+
 #endif /* _RTE_RING_CORE_H_ */
diff --git a/lib/ring/rte_ring_elem.h b/lib/ring/rte_ring_elem.h
index b96bfc003f..1352709f94 100644
--- a/lib/ring/rte_ring_elem.h
+++ b/lib/ring/rte_ring_elem.h
@@ -71,6 +71,9 @@ ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count);
  *      - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when
  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
  *        is "multi-producer RTS mode".
+ *      - RING_F_MP_RTS_V2_ENQ: If this flag is set, the default behavior when
+ *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
+ *        is "multi-producer RTS V2 mode".
  *      - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when
  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
  *        is "multi-producer HTS mode".
@@ -83,6 +86,9 @@ ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count);
  *      - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when
  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *        is "multi-consumer RTS mode".
+ *      - RING_F_MC_RTS_V2_DEQ: If this flag is set, the default behavior when
+ *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
+ *        is "multi-consumer RTS V2 mode".
  *      - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when
  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *        is "multi-consumer HTS mode".
@@ -203,6 +209,9 @@ rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
 	case RTE_RING_SYNC_MT_HTS:
 		return rte_ring_mp_hts_enqueue_bulk_elem(r, obj_table, esize, n,
 			free_space);
+	case RTE_RING_SYNC_MT_RTS_V2:
+		return rte_ring_mp_rts_v2_enqueue_bulk_elem(r, obj_table, esize, n,
+			free_space);
 	}
 
 	/* valid ring should never reach this point */
@@ -385,6 +394,9 @@ rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
 	case RTE_RING_SYNC_MT_HTS:
 		return rte_ring_mc_hts_dequeue_bulk_elem(r, obj_table, esize,
 			n, available);
+	case RTE_RING_SYNC_MT_RTS_V2:
+		return rte_ring_mc_rts_v2_dequeue_bulk_elem(r, obj_table, esize,
+			n, available);
 	}
 
 	/* valid ring should never reach this point */
@@ -571,6 +583,9 @@ rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
 	case RTE_RING_SYNC_MT_HTS:
 		return rte_ring_mp_hts_enqueue_burst_elem(r, obj_table, esize,
 			n, free_space);
+	case RTE_RING_SYNC_MT_RTS_V2:
+		return rte_ring_mp_rts_v2_enqueue_burst_elem(r, obj_table, esize,
+			n, free_space);
 	}
 
 	/* valid ring should never reach this point */
@@ -681,6 +696,9 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
 	case RTE_RING_SYNC_MT_HTS:
 		return rte_ring_mc_hts_dequeue_burst_elem(r, obj_table, esize,
 			n, available);
+	case RTE_RING_SYNC_MT_RTS_V2:
+		return rte_ring_mc_rts_v2_dequeue_burst_elem(r, obj_table, esize,
+			n, available);
 	}
 
 	/* valid ring should never reach this point */
diff --git a/lib/ring/rte_ring_rts.h b/lib/ring/rte_ring_rts.h
index d7a3863c83..b47e400452 100644
--- a/lib/ring/rte_ring_rts.h
+++ b/lib/ring/rte_ring_rts.h
@@ -84,6 +84,33 @@ rte_ring_mp_rts_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
 			RTE_RING_QUEUE_FIXED, free_space);
 }
 
+/**
+ * Enqueue several objects on the RTS ring (multi-producers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of objects.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   The number of objects enqueued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_mp_rts_v2_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
+	unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_rts_v2_enqueue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_FIXED, free_space);
+}
+
 /**
  * Dequeue several objects from an RTS ring (multi-consumers safe).
  *
@@ -111,6 +138,33 @@ rte_ring_mc_rts_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
 			RTE_RING_QUEUE_FIXED, available);
 }
 
+/**
+ * Dequeue several objects from an RTS ring (multi-consumers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of objects that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   The number of objects dequeued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_mc_rts_v2_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
+	unsigned int esize, unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_rts_v2_dequeue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_FIXED, available);
+}
+
 /**
  * Enqueue several objects on the RTS ring (multi-producers safe).
  *
@@ -138,6 +192,33 @@ rte_ring_mp_rts_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
 			RTE_RING_QUEUE_VARIABLE, free_space);
 }
 
+/**
+ * Enqueue several objects on the RTS ring (multi-producers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of objects.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   - n: Actual number of objects enqueued.
+ */
+static __rte_always_inline unsigned int
+rte_ring_mp_rts_v2_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
+	unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_rts_v2_enqueue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_VARIABLE, free_space);
+}
+
 /**
  * Dequeue several objects from an RTS  ring (multi-consumers safe).
  * When the requested objects are more than the available objects,
@@ -167,6 +248,35 @@ rte_ring_mc_rts_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
 			RTE_RING_QUEUE_VARIABLE, available);
 }
 
+/**
+ * Dequeue several objects from an RTS  ring (multi-consumers safe).
+ * When the requested objects are more than the available objects,
+ * only dequeue the actual number of objects.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of objects that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   - n: Actual number of objects dequeued, 0 if ring is empty
+ */
+static __rte_always_inline unsigned int
+rte_ring_mc_rts_v2_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
+	unsigned int esize, unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_rts_v2_dequeue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_VARIABLE, available);
+}
+
 /**
  * Enqueue several objects on the RTS ring (multi-producers safe).
  *
@@ -213,6 +323,52 @@ rte_ring_mc_rts_dequeue_bulk(struct rte_ring *r, void **obj_table,
 			sizeof(uintptr_t), n, available);
 }
 
+/**
+ * Enqueue several objects on the RTS V2 ring (multi-producers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   The number of objects enqueued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_mp_rts_v2_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
+			 unsigned int n, unsigned int *free_space)
+{
+	return rte_ring_mp_rts_v2_enqueue_bulk_elem(r, obj_table,
+			sizeof(uintptr_t), n, free_space);
+}
+
+/**
+ * Dequeue several objects from an RTS V2 ring (multi-consumers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   The number of objects dequeued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_mc_rts_v2_dequeue_bulk(struct rte_ring *r, void **obj_table,
+		unsigned int n, unsigned int *available)
+{
+	return rte_ring_mc_rts_v2_dequeue_bulk_elem(r, obj_table,
+			sizeof(uintptr_t), n, available);
+}
+
 /**
  * Enqueue several objects on the RTS ring (multi-producers safe).
  *
@@ -261,6 +417,54 @@ rte_ring_mc_rts_dequeue_burst(struct rte_ring *r, void **obj_table,
 			sizeof(uintptr_t), n, available);
 }
 
+/**
+ * Enqueue several objects on the RTS V2 ring (multi-producers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   - n: Actual number of objects enqueued.
+ */
+static __rte_always_inline unsigned int
+rte_ring_mp_rts_v2_enqueue_burst(struct rte_ring *r, void * const *obj_table,
+			 unsigned int n, unsigned int *free_space)
+{
+	return rte_ring_mp_rts_v2_enqueue_burst_elem(r, obj_table,
+			sizeof(uintptr_t), n, free_space);
+}
+
+/**
+ * Dequeue several objects from an RTS V2 ring (multi-consumers safe).
+ * When the requested objects are more than the available objects,
+ * only dequeue the actual number of objects.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   - n: Actual number of objects dequeued, 0 if ring is empty
+ */
+static __rte_always_inline unsigned int
+rte_ring_mc_rts_v2_dequeue_burst(struct rte_ring *r, void **obj_table,
+		unsigned int n, unsigned int *available)
+{
+	return rte_ring_mc_rts_v2_dequeue_burst_elem(r, obj_table,
+			sizeof(uintptr_t), n, available);
+}
+
 /**
  * Return producer max Head-Tail-Distance (HTD).
  *
@@ -273,7 +477,8 @@ rte_ring_mc_rts_dequeue_burst(struct rte_ring *r, void **obj_table,
 static inline uint32_t
 rte_ring_get_prod_htd_max(const struct rte_ring *r)
 {
-	if (r->prod.sync_type == RTE_RING_SYNC_MT_RTS)
+	if ((r->prod.sync_type == RTE_RING_SYNC_MT_RTS) ||
+			(r->prod.sync_type == RTE_RING_SYNC_MT_RTS_V2))
 		return r->rts_prod.htd_max;
 	return UINT32_MAX;
 }
@@ -292,7 +497,8 @@ rte_ring_get_prod_htd_max(const struct rte_ring *r)
 static inline int
 rte_ring_set_prod_htd_max(struct rte_ring *r, uint32_t v)
 {
-	if (r->prod.sync_type != RTE_RING_SYNC_MT_RTS)
+	if ((r->prod.sync_type != RTE_RING_SYNC_MT_RTS) &&
+			(r->prod.sync_type != RTE_RING_SYNC_MT_RTS_V2))
 		return -ENOTSUP;
 
 	r->rts_prod.htd_max = v;
@@ -311,7 +517,8 @@ rte_ring_set_prod_htd_max(struct rte_ring *r, uint32_t v)
 static inline uint32_t
 rte_ring_get_cons_htd_max(const struct rte_ring *r)
 {
-	if (r->cons.sync_type == RTE_RING_SYNC_MT_RTS)
+	if ((r->cons.sync_type == RTE_RING_SYNC_MT_RTS) ||
+			(r->cons.sync_type == RTE_RING_SYNC_MT_RTS_V2))
 		return r->rts_cons.htd_max;
 	return UINT32_MAX;
 }
@@ -330,7 +537,8 @@ rte_ring_get_cons_htd_max(const struct rte_ring *r)
 static inline int
 rte_ring_set_cons_htd_max(struct rte_ring *r, uint32_t v)
 {
-	if (r->cons.sync_type != RTE_RING_SYNC_MT_RTS)
+	if ((r->cons.sync_type != RTE_RING_SYNC_MT_RTS) &&
+			(r->cons.sync_type != RTE_RING_SYNC_MT_RTS_V2))
 		return -ENOTSUP;
 
 	r->rts_cons.htd_max = v;
diff --git a/lib/ring/rte_ring_rts_elem_pvt.h b/lib/ring/rte_ring_rts_elem_pvt.h
index 122650346b..4ce22a93ed 100644
--- a/lib/ring/rte_ring_rts_elem_pvt.h
+++ b/lib/ring/rte_ring_rts_elem_pvt.h
@@ -46,6 +46,92 @@ __rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht)
 			rte_memory_order_release, rte_memory_order_acquire) == 0);
 }
 
+/**
+ * @file rte_ring_rts_elem_pvt.h
+ * It is not recommended to include this file directly,
+ * include <rte_ring.h> instead.
+ * Contains internal helper functions for Relaxed Tail Sync (RTS) ring mode.
+ * For more information please refer to <rte_ring_rts.h>.
+ */
+
+/**
+ * @internal This function updates tail values.
+ */
+static __rte_always_inline void
+__rte_ring_rts_v2_update_tail(struct rte_ring_rts_headtail *ht,
+	uint32_t old_tail, uint32_t num, uint32_t mask)
+{
+	union __rte_ring_rts_poscnt ot, nt;
+
+	ot.val.cnt = nt.val.cnt = 0;
+	ot.val.pos = old_tail;
+	nt.val.pos = old_tail + num;
+
+	/*
+	 * If the tail is equal to the current enqueues/dequeues, update
+	 * the tail with new value and then continue to try to update the
+	 * tail until the num of the cache is 0, otherwise write the num of
+	 * the current enqueues/dequeues to the cache.
+	 */
+
+	if (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw,
+				(uint64_t *)(uintptr_t)&ot.raw, nt.raw,
+				rte_memory_order_release, rte_memory_order_acquire) == 0) {
+		ot.val.pos = old_tail;
+
+		/*
+		 * Write the num of the current enqueues/dequeues to the
+		 * corresponding cache.
+		 */
+		rte_atomic_store_explicit(&ht->rts_cache[ot.val.pos & mask].num,
+			num, rte_memory_order_release);
+
+		/*
+		 * There may be competition with another enqueues/dequeues
+		 * for the update tail. The winner continues to try to update
+		 * the tail, and the loser exits.
+		 */
+		if (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw,
+					(uint64_t *)(uintptr_t)&ot.raw, nt.raw,
+					rte_memory_order_release, rte_memory_order_acquire) == 0)
+			return;
+
+		/*
+		 * Set the corresponding cache to 0 for next use.
+		 */
+		rte_atomic_store_explicit(&ht->rts_cache[ot.val.pos & mask].num,
+			0, rte_memory_order_release);
+	}
+
+	/*
+	 * Try to update the tail until the num of the corresponding cache is 0.
+	 * Getting here means that the current enqueues/dequeues is trying to update
+	 * the tail of another enqueues/dequeues.
+	 */
+	while (1) {
+		num = rte_atomic_load_explicit(&ht->rts_cache[nt.val.pos & mask].num,
+			rte_memory_order_acquire);
+		if (num == 0)
+			break;
+
+		ot.val.pos = nt.val.pos;
+		nt.val.pos += num;
+
+		/*
+		 * There may be competition with another enqueues/dequeues
+		 * for the update tail. The winner continues to try to update
+		 * the tail, and the loser exits.
+		 */
+		if (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw,
+					(uint64_t *)(uintptr_t)&ot.raw, nt.raw,
+					rte_memory_order_release, rte_memory_order_acquire) == 0)
+			return;
+
+		rte_atomic_store_explicit(&ht->rts_cache[ot.val.pos & mask].num,
+			0, rte_memory_order_release);
+	};
+}
+
 /**
  * @internal This function waits till head/tail distance wouldn't
  * exceed pre-defined max value.
@@ -218,6 +304,47 @@ __rte_ring_do_rts_enqueue_elem(struct rte_ring *r, const void *obj_table,
 	return n;
 }
 
+/**
+ * @internal Enqueue several objects on the RTS ring.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of objects.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_rts_v2_enqueue_elem(struct rte_ring *r, const void *obj_table,
+	uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
+	uint32_t *free_space)
+{
+	uint32_t free, head;
+
+	n =  __rte_ring_rts_move_prod_head(r, n, behavior, &head, &free);
+
+	if (n != 0) {
+		__rte_ring_enqueue_elems(r, head, obj_table, esize, n);
+		__rte_ring_rts_v2_update_tail(&r->rts_prod, head, n, r->mask);
+	}
+
+	if (free_space != NULL)
+		*free_space = free - n;
+	return n;
+}
+
 /**
  * @internal Dequeue several objects from the RTS ring.
  *
@@ -259,4 +386,45 @@ __rte_ring_do_rts_dequeue_elem(struct rte_ring *r, void *obj_table,
 	return n;
 }
 
+/**
+ * @internal Dequeue several objects from the RTS ring.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of objects.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_rts_v2_dequeue_elem(struct rte_ring *r, void *obj_table,
+	uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
+	uint32_t *available)
+{
+	uint32_t entries, head;
+
+	n = __rte_ring_rts_move_cons_head(r, n, behavior, &head, &entries);
+
+	if (n != 0) {
+		__rte_ring_dequeue_elems(r, head, obj_table, esize, n);
+		__rte_ring_rts_v2_update_tail(&r->rts_cons, head, n, r->mask);
+	}
+
+	if (available != NULL)
+		*available = entries - n;
+	return n;
+}
+
 #endif /* _RTE_RING_RTS_ELEM_PVT_H_ */
-- 
2.27.0


^ permalink raw reply	[flat|nested] 7+ messages in thread

* [PATCH v2] ring: add the second version of the RTS interface
  2025-01-05 15:13 ` [PATCH v2] " Huichao Cai
@ 2025-01-08  1:41   ` Huichao Cai
  2025-01-14 15:04     ` Thomas Monjalon
  2025-01-08 16:49   ` Konstantin Ananyev
  2025-01-14 12:55   ` Huichao Cai
  2 siblings, 1 reply; 7+ messages in thread
From: Huichao Cai @ 2025-01-08  1:41 UTC (permalink / raw)
  To: thomas; +Cc: dev, honnappa.nagarahalli, konstantin.v.ananyev

Hi,Thomas
    This patch adds a field to the ABI structure.I have added the suppress_type
field in the file libabigail.abignore, but "ci/github-robot: Build" still reported
an error, could you please advise on how to fill in the suppress_type field?


^ permalink raw reply	[flat|nested] 7+ messages in thread

* RE: [PATCH v2] ring: add the second version of the RTS interface
  2025-01-05 15:13 ` [PATCH v2] " Huichao Cai
  2025-01-08  1:41   ` Huichao Cai
@ 2025-01-08 16:49   ` Konstantin Ananyev
  2025-01-14 12:55   ` Huichao Cai
  2 siblings, 0 replies; 7+ messages in thread
From: Konstantin Ananyev @ 2025-01-08 16:49 UTC (permalink / raw)
  To: Huichao Cai, honnappa.nagarahalli, konstantin.v.ananyev, thomas; +Cc: dev


Hi,

> The timing of the update of the RTS enqueues/dequeues tail is
> limited to the last enqueues/dequeues, which reduces concurrency,
> so the RTS interface of the V2 version is added, which makes the tail
> of the enqueues/dequeues not limited to the last enqueues/dequeues
> and thus enables timely updates to increase concurrency.

That's description is way too cryptic to me and really just creates more confusion
instead of explain things: I have to go and read through the code to understand
what you are up to.
In fact, I don't think the approach you used will work properly dues to race
Conditions (see below for more details).
But for future reference, when you are introducing a new sync mechanism
for the ring please do:
1. explain clearly what particular problem(s) with existing one(s) you are trying to address.
2. clearly explain new sync mechanism you are going to introduce  and why/when you believe
    It would behave better than existing ones
3. In case of performance  improvement claims - provide some reproducible numbers
    either with ring_stress_test or some dpdk packet processing sample app or both.  
 
> Add some corresponding test cases.
> 
> Signed-off-by: Huichao Cai <chcchc88@163.com>
> ---
...

> diff --git a/lib/ring/rte_ring.c b/lib/ring/rte_ring.c
> index aebb6d6728..ada1ae88fa 100644
> --- a/lib/ring/rte_ring.c
> +++ b/lib/ring/rte_ring.c
> @@ -43,7 +43,8 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
>  /* mask of all valid flag values to ring_create() */
>  #define RING_F_MASK (RING_F_SP_ENQ | RING_F_SC_DEQ | RING_F_EXACT_SZ | \
>  		     RING_F_MP_RTS_ENQ | RING_F_MC_RTS_DEQ |	       \
> -		     RING_F_MP_HTS_ENQ | RING_F_MC_HTS_DEQ)
> +		     RING_F_MP_HTS_ENQ | RING_F_MC_HTS_DEQ |	       \
> +		     RING_F_MP_RTS_V2_ENQ | RING_F_MC_RTS_V2_DEQ)
> 
>  /* true if x is a power of 2 */
>  #define POWEROF2(x) ((((x)-1) & (x)) == 0)
> @@ -106,6 +107,7 @@ reset_headtail(void *p)
>  		ht->tail = 0;
>  		break;
>  	case RTE_RING_SYNC_MT_RTS:
> +	case RTE_RING_SYNC_MT_RTS_V2:
>  		ht_rts->head.raw = 0;
>  		ht_rts->tail.raw = 0;
>  		break;
> @@ -135,9 +137,11 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st,
>  	enum rte_ring_sync_type *cons_st)
>  {
>  	static const uint32_t prod_st_flags =
> -		(RING_F_SP_ENQ | RING_F_MP_RTS_ENQ | RING_F_MP_HTS_ENQ);
> +		(RING_F_SP_ENQ | RING_F_MP_RTS_ENQ | RING_F_MP_HTS_ENQ |
> +		RING_F_MP_RTS_V2_ENQ);
>  	static const uint32_t cons_st_flags =
> -		(RING_F_SC_DEQ | RING_F_MC_RTS_DEQ | RING_F_MC_HTS_DEQ);
> +		(RING_F_SC_DEQ | RING_F_MC_RTS_DEQ | RING_F_MC_HTS_DEQ |
> +		RING_F_MC_RTS_V2_DEQ);
> 
>  	switch (flags & prod_st_flags) {
>  	case 0:
> @@ -152,6 +156,9 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st,
>  	case RING_F_MP_HTS_ENQ:
>  		*prod_st = RTE_RING_SYNC_MT_HTS;
>  		break;
> +	case RING_F_MP_RTS_V2_ENQ:
> +		*prod_st = RTE_RING_SYNC_MT_RTS_V2;
> +		break;
>  	default:
>  		return -EINVAL;
>  	}
> @@ -169,6 +176,9 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st,
>  	case RING_F_MC_HTS_DEQ:
>  		*cons_st = RTE_RING_SYNC_MT_HTS;
>  		break;
> +	case RING_F_MC_RTS_V2_DEQ:
> +		*cons_st = RTE_RING_SYNC_MT_RTS_V2;
> +		break;
>  	default:
>  		return -EINVAL;
>  	}
> @@ -239,6 +249,28 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
>  	if (flags & RING_F_MC_RTS_DEQ)
>  		rte_ring_set_cons_htd_max(r, r->capacity / HTD_MAX_DEF);
> 
> +	/* set default values for head-tail distance and allocate memory to cache */
> +	if (flags & RING_F_MP_RTS_V2_ENQ) {
> +		rte_ring_set_prod_htd_max(r, r->capacity / HTD_MAX_DEF);
> +		r->rts_prod.rts_cache = (struct rte_ring_rts_cache *)rte_zmalloc(
> +			"RTS_PROD_CACHE", sizeof(struct rte_ring_rts_cache) * r->size, 0);

That doesn't look right at all - rte_ring_init() not supposed to allocate extra memory.
It is a caller responsibility to provide buffer large enough to hold whole ring (including
actual data and meta-data).
Ideally, if your sync mechanism needs extra space it should be reported by rte_ring_get_memsize_elem().
Though right now  it takes only elem size and count...
One approach might be to introduce new function for that, another introduce new high-level struct,
instead of rte_ring.
Though I suppose first thing that needs to be done fix race-conditions and prove that such
addition is really worth from performance perspective.
 
> +		if (r->rts_prod.rts_cache == NULL) {
> +			RING_LOG(ERR, "Cannot reserve memory for rts prod cache");
> +			return -ENOMEM;
> +		}
> +	}
> +	if (flags & RING_F_MC_RTS_V2_DEQ) {
> +		rte_ring_set_cons_htd_max(r, r->capacity / HTD_MAX_DEF);
> +		r->rts_cons.rts_cache = (struct rte_ring_rts_cache *)rte_zmalloc(
> +			"RTS_CONS_CACHE", sizeof(struct rte_ring_rts_cache) * r->size, 0);
> +		if (r->rts_cons.rts_cache == NULL) {
> +			if (flags & RING_F_MP_RTS_V2_ENQ)
> +				rte_free(r->rts_prod.rts_cache);
> +			RING_LOG(ERR, "Cannot reserve memory for rts cons cache");
> +			return -ENOMEM;
> +		}
> +	}
> +
>  	return 0;
>  }
> 

....
> diff --git a/lib/ring/rte_ring_rts_elem_pvt.h b/lib/ring/rte_ring_rts_elem_pvt.h
> index 122650346b..4ce22a93ed 100644
> --- a/lib/ring/rte_ring_rts_elem_pvt.h
> +++ b/lib/ring/rte_ring_rts_elem_pvt.h
> @@ -46,6 +46,92 @@ __rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht)
>  			rte_memory_order_release, rte_memory_order_acquire) == 0);
>  }
> 
> +/**
> + * @file rte_ring_rts_elem_pvt.h
> + * It is not recommended to include this file directly,
> + * include <rte_ring.h> instead.
> + * Contains internal helper functions for Relaxed Tail Sync (RTS) ring mode.
> + * For more information please refer to <rte_ring_rts.h>.
> + */
> +
> +/**
> + * @internal This function updates tail values.
> + */
> +static __rte_always_inline void
> +__rte_ring_rts_v2_update_tail(struct rte_ring_rts_headtail *ht,
> +	uint32_t old_tail, uint32_t num, uint32_t mask)
> +{
> +	union __rte_ring_rts_poscnt ot, nt;
> +
> +	ot.val.cnt = nt.val.cnt = 0;
> +	ot.val.pos = old_tail;
> +	nt.val.pos = old_tail + num;
> +
> +	/*
> +	 * If the tail is equal to the current enqueues/dequeues, update
> +	 * the tail with new value and then continue to try to update the
> +	 * tail until the num of the cache is 0, otherwise write the num of
> +	 * the current enqueues/dequeues to the cache.
> +	 */
> +
> +	if (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw,
> +				(uint64_t *)(uintptr_t)&ot.raw, nt.raw,
> +				rte_memory_order_release, rte_memory_order_acquire) == 0) {
> +		ot.val.pos = old_tail;
> +
> +		/*
> +		 * Write the num of the current enqueues/dequeues to the
> +		 * corresponding cache.
> +		 */
> +		rte_atomic_store_explicit(&ht->rts_cache[ot.val.pos & mask].num,
> +			num, rte_memory_order_release);
> +
> +		/*
> +		 * There may be competition with another enqueues/dequeues
> +		 * for the update tail. The winner continues to try to update
> +		 * the tail, and the loser exits.
> +		 */
> +		if (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw,
> +					(uint64_t *)(uintptr_t)&ot.raw, nt.raw,
> +					rte_memory_order_release, rte_memory_order_acquire) == 0)
> +			return;
> +
> +		/*
> +		 * Set the corresponding cache to 0 for next use.
> +		 */

I think there is  race condition between CAS above and the store below:
After you updated the tail, other threads are free to re-use these ring elements.
So if some thread get stalled/preempted here while other threads will continue to do
enqueue/dequeue to the ring, this rts_cache[] element can be already re-used
by other thread when given thread will wake-up and overwrite it with zero.
In fact such race-condition can be easily reproduced on my box with:

echo ring_stress_autotest  | dpdk-test  --lcores=(2-16)@(3-5) -n 8 --no-pci --no-huge   

all workers ends up in the infinite loop:

(gdb) bt
#0  _mm_pause ()
    at /usr/lib64/gcc/x86_64-suse-linux/12/include/xmmintrin.h:1335
#1  rte_pause () at ../lib/eal/x86/include/rte_pause.h:18
#2  0x00000000010d008b in __rte_ring_rts_head_wait (h=0x7ff90c3ffca0,
    ht=0x103ed2d40) at ../lib/ring/rte_ring_rts_elem_pvt.h:148
#3  __rte_ring_rts_move_prod_head (free_entries=0x7ff90c3ffcc8,
    old_head=0x7ff90c3ffcc4, behavior=RTE_RING_QUEUE_FIXED, num=33,
    r=0x103ed2cc0) at ../lib/ring/rte_ring_rts_elem_pvt.h:177
#4  __rte_ring_do_rts_v2_enqueue_elem (free_space=0x0,
    behavior=RTE_RING_QUEUE_FIXED, n=33, esize=8, obj_table=0x7ff90c3ffec0,
    r=0x103ed2cc0) at ../lib/ring/rte_ring_rts_elem_pvt.h:336
#5  rte_ring_mp_rts_v2_enqueue_bulk_elem (free_space=0x0, n=33, esize=8,
    obj_table=0x7ff90c3ffec0, r=0x103ed2cc0) at ../lib/ring/rte_ring_rts.h:110
#6  rte_ring_mp_rts_v2_enqueue_bulk (free_space=0x0, n=33,
    obj_table=0x7ff90c3ffec0, r=0x103ed2cc0) at ../lib/ring/rte_ring_rts.h:345
#7  _st_ring_enqueue_bulk (r=0x103ed2cc0, obj=0x7ff90c3ffec0, n=33, free=0x0)
    at ../app/test/test_ring_rts_v2_stress.c:18
....
(gdb) print/x r->rts_prod.rts_cache[r->rts_prod.tail.val.pos & r->mask]
$11 = {num = 0x0}
(gdb) print r->rts_prod
$13 = {tail = {raw = 127384228873633792, val = {cnt = 0, pos = 29658952}},
  sync_type = RTE_RING_SYNC_MT_RTS_V2, htd_max = 2047, head = {
    raw = 127393072212139551, val = {cnt = 843295, pos = 29661011}},
  rts_cache = 0x103ec2c40}
(gdb) print 29661011-29658952
$14 = 2059

All in all - I don't think this approach is going to work.
You need some extra stuff to synchronize between cache[] and tail.
If interested, in SORING we solved similar thing by updating state[] before updating tail,
plus making sure that only one thread at a time will update tail value: 
https://patchwork.dpdk.org/project/dpdk/patch/20241206183600.34758-6-konstantin.ananyev@huawei.com/

Another minor thing - why do you re-use RTS head/tail struct?
From what I can read you don't .cnt part at all.

> +		rte_atomic_store_explicit(&ht->rts_cache[ot.val.pos & mask].num,
> +			0, rte_memory_order_release);
> +	}
> +
> +	/*
> +	 * Try to update the tail until the num of the corresponding cache is 0.
> +	 * Getting here means that the current enqueues/dequeues is trying to update
> +	 * the tail of another enqueues/dequeues.
> +	 */
> +	while (1) {
> +		num = rte_atomic_load_explicit(&ht->rts_cache[nt.val.pos & mask].num,
> +			rte_memory_order_acquire);
> +		if (num == 0)
> +			break;
> +
> +		ot.val.pos = nt.val.pos;
> +		nt.val.pos += num;
> +
> +		/*
> +		 * There may be competition with another enqueues/dequeues
> +		 * for the update tail. The winner continues to try to update
> +		 * the tail, and the loser exits.
> +		 */
> +		if (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw,
> +					(uint64_t *)(uintptr_t)&ot.raw, nt.raw,
> +					rte_memory_order_release, rte_memory_order_acquire) == 0)
> +			return;
> +
> +		rte_atomic_store_explicit(&ht->rts_cache[ot.val.pos & mask].num,
> +			0, rte_memory_order_release);
> +	};
> +}
> +

^ permalink raw reply	[flat|nested] 7+ messages in thread

* [PATCH v2] ring: add the second version of the RTS interface
  2025-01-05 15:13 ` [PATCH v2] " Huichao Cai
  2025-01-08  1:41   ` Huichao Cai
  2025-01-08 16:49   ` Konstantin Ananyev
@ 2025-01-14 12:55   ` Huichao Cai
  2 siblings, 0 replies; 7+ messages in thread
From: Huichao Cai @ 2025-01-14 12:55 UTC (permalink / raw)
  To: konstantin.v.ananyev; +Cc: dev, honnappa.nagarahalli, thomas

Hi Konstantin, thank you very much for your question!

I have modified the __rte_ring_rts_v2_update_tail function(See at the bottom)and it works properly when
using your test command in my local environment(KVM). The local environment parameters are as follows:
Architecture:            x86_64
  CPU op-mode(s):        32-bit, 64-bit
  Address sizes:         46 bits physical, 57 bits virtual
  Byte Order:            Little Endian
CPU(s):                  8
  On-line CPU(s) list:   0-7
NUMA:                    
  NUMA node(s):          1
  NUMA node0 CPU(s):     0-7

I have roughly looked at the code of the SORING patch, and my patch's update logic
for tail is similar to the __rte_soring_stage_finalize function. Update the tail as soon as possible.

Tail update logic explanation:
Assuming there are three deqs/enqs simultaneously deq/enq. The order of completion for
deq/enq is first, second, and third deqs/enqs.
RTS: The tail will only be updated after the third deqs/enqs completes it.
RTS_V2: After each deqs/enqs completes it, the tail will be updated.

I have tested it multiple times and found that the performance comparison between RTS
and RTS_V2 test results is not fixed, each with its own strengths and weaknesses, as shown
in the following two test results. So I'm not sure if this patch can truly improve performance,
maybe useful for certain scenarios?

Here are two stress tests comparing the results of RTS and RTS_V2 tests:
=================test 1=================
[root@localhost ~]# echo ring_stress_autotest  | /opt/build-dpdk-release/app/dpdk-test --lcores "(2-7)@(3-5)" -n 8 --no-pci --no-huge
EAL: Detected CPU lcores: 8
EAL: Detected NUMA nodes: 1
EAL: Static memory layout is selected, amount of reserved memory can be adjusted with -m or --socket-mem
EAL: Detected static linkage of DPDK
EAL: Multi-process socket /var/run/dpdk/rte/mp_socket
EAL: Selected IOVA mode 'VA'
APP: HPET is not enabled, using TSC as default timer
RTE>>ring_stress_autotest
TEST-CASE MT_RTS MT-WRK_ENQ_DEQ-MST_NONE-PRCS START
lcore_stat_dump(AGGREGATE)={
	nb_cycle=168020391844(60007282.80 usec),
	DEQ+ENQ={
		nb_call=87964158,
		nb_obj=3122632083,
		nb_cycle=357901041584,
		obj/call(avg): 35.50
		cycles/obj(avg): 114.62
		cycles/call(avg): 4068.71
		max cycles/call=226802256(81000.81 usec),
		min cycles/call=288(0.10 usec),
	},
};
TEST-CASE MT_RTS MT-WRK_ENQ_DEQ-MST_NONE-PRCS OK
TEST-CASE MT_RTS MT-WRK_ENQ_DEQ-MST_NONE-AVG START
lcore_stat_dump(AGGREGATE)={
	nb_cycle=168039915096(60014255.39 usec),
	DEQ+ENQ={
		nb_call=92846537,
		nb_obj=3296030996,
		nb_cycle=840090079114,
		obj/call(avg): 35.50
		cycles/obj(avg): 254.88
		cycles/call(avg): 9048.16
	},
};
TEST-CASE MT_RTS MT-WRK_ENQ_DEQ-MST_NONE-AVG OK
TEST-CASE MT_RTS_V2 MT-WRK_ENQ_DEQ-MST_NONE-PRCS START
lcore_stat_dump(AGGREGATE)={
	nb_cycle=168006214342(60002219.41 usec),
	DEQ+ENQ={
		nb_call=83543881,
		nb_obj=2965835220,
		nb_cycle=389465266530,
		obj/call(avg): 35.50
		cycles/obj(avg): 131.32
		cycles/call(avg): 4661.80
		max cycles/call=123210780(44003.85 usec),
		min cycles/call=298(0.11 usec),
	},
};
TEST-CASE MT_RTS_V2 MT-WRK_ENQ_DEQ-MST_NONE-PRCS OK
TEST-CASE MT_RTS_V2 MT-WRK_ENQ_DEQ-MST_NONE-AVG START
lcore_stat_dump(AGGREGATE)={
	nb_cycle=168000036710(60000013.11 usec),
	DEQ+ENQ={
		nb_call=89759571,
		nb_obj=3186412623,
		nb_cycle=839986422120,
		obj/call(avg): 35.50
		cycles/obj(avg): 263.62
		cycles/call(avg): 9358.18
	},
};
TEST-CASE MT_RTS_V2 MT-WRK_ENQ_DEQ-MST_NONE-AVG OK
Number of tests:	4
Success:	4
Failed:	0
Test OK

=================test 2=================
[root@localhost ~]# echo ring_stress_autotest  | /opt/build-dpdk-release/app/dpdk-test --lcores "(2-7)@(3-5)" -n 8 --no-pci --no-huge
EAL: Detected CPU lcores: 8
EAL: Detected NUMA nodes: 1
EAL: Static memory layout is selected, amount of reserved memory can be adjusted with -m or --socket-mem
EAL: Detected static linkage of DPDK
EAL: Multi-process socket /var/run/dpdk/rte/mp_socket
EAL: Selected IOVA mode 'VA'
APP: HPET is not enabled, using TSC as default timer
RTE>>ring_stress_autotest
TEST-CASE MT_RTS MT-WRK_ENQ_DEQ-MST_NONE-PRCS START
lcore_stat_dump(AGGREGATE)={
	nb_cycle=168011911986(60004254.28 usec),
	DEQ+ENQ={
		nb_call=47315418,
		nb_obj=1679700058,
		nb_cycle=361351406016,
		obj/call(avg): 35.50
		cycles/obj(avg): 215.13
		cycles/call(avg): 7637.08
		max cycles/call=114663660(40951.31 usec),
		min cycles/call=286(0.10 usec),
	},
};
TEST-CASE MT_RTS MT-WRK_ENQ_DEQ-MST_NONE-PRCS OK
TEST-CASE MT_RTS MT-WRK_ENQ_DEQ-MST_NONE-AVG START
lcore_stat_dump(AGGREGATE)={
	nb_cycle=168039811194(60014218.28 usec),
	DEQ+ENQ={
		nb_call=70103600,
		nb_obj=2488627393,
		nb_cycle=840101179096,
		obj/call(avg): 35.50
		cycles/obj(avg): 337.58
		cycles/call(avg): 11983.71
	},
};
TEST-CASE MT_RTS MT-WRK_ENQ_DEQ-MST_NONE-AVG OK
TEST-CASE MT_RTS_V2 MT-WRK_ENQ_DEQ-MST_NONE-PRCS START
lcore_stat_dump(AGGREGATE)={
	nb_cycle=168000022894(60000008.18 usec),
	DEQ+ENQ={
		nb_call=72380924,
		nb_obj=2569422396,
		nb_cycle=386306567792,
		obj/call(avg): 35.50
		cycles/obj(avg): 150.35
		cycles/call(avg): 5337.13
		max cycles/call=226802852(81001.02 usec),
		min cycles/call=328(0.12 usec),
	},
};
TEST-CASE MT_RTS_V2 MT-WRK_ENQ_DEQ-MST_NONE-PRCS OK
TEST-CASE MT_RTS_V2 MT-WRK_ENQ_DEQ-MST_NONE-AVG START
lcore_stat_dump(AGGREGATE)={
	nb_cycle=168000052432(60000018.73 usec),
	DEQ+ENQ={
		nb_call=77585568,
		nb_obj=2754266203,
		nb_cycle=839935549688,
		obj/call(avg): 35.50
		cycles/obj(avg): 304.96
		cycles/call(avg): 10825.93
	},
};
TEST-CASE MT_RTS_V2 MT-WRK_ENQ_DEQ-MST_NONE-AVG OK
Number of tests:	4
Success:	4
Failed:	0
Test OK

==========The modified function is as follows:=========
 static __rte_always_inline void
__rte_ring_rts_v2_update_tail(struct rte_ring_rts_headtail *ht,
	uint32_t old_tail, uint32_t num, uint32_t mask)
{
	union __rte_ring_rts_poscnt ot, nt;
	uint32_t expect_num = 0;

	ot.val.cnt = 0;
	ot.val.pos = old_tail;

	/*
	 * If the tail is equal to the current enqueue/dequeue, update
	 * the tail with new value and then continue to try to update the
	 * tail until the num of the cache is 0, otherwise write the num of
	 * the current enqueue/dequeue to the cache.
	 */

	nt.raw = rte_atomic_load_explicit(&ht->tail.raw, rte_memory_order_acquire);
	if (ot.val.pos != nt.val.pos) {
		/*
		 * Write the num of the current enqueues/dequeues to the
		 * corresponding cache.
		 */
		if (rte_atomic_compare_exchange_strong_explicit(
				&ht->rts_cache[ot.val.pos & mask].num, &expect_num, num,
				rte_memory_order_release, rte_memory_order_acquire))
			return;

		/*
		 * Another enqueue/dequeue has exited the operation of updating the tail,
		 * and this enqueue/dequeue for continuing the update.
		 */
		rte_atomic_store_explicit(&ht->tail.raw, ot.raw, rte_memory_order_release);
	}

	/*
	 * Set the corresponding cache to 0 for next use.
	 */
	rte_atomic_store_explicit(&ht->rts_cache[ot.val.pos & mask].num,
		0, rte_memory_order_release);

	nt.val.pos = ot.val.pos + num;

	/*
	 * Try to update the tail until the num of the corresponding cache is 0.
	 * Getting here means that the current enqueues/dequeues is trying to update
	 * the tail of another enqueue/dequeue.
	 */
	while (1) {
		num = 0;
		if (rte_atomic_compare_exchange_strong_explicit(
				&ht->rts_cache[nt.val.pos & mask].num, &num , mask,
				rte_memory_order_release, rte_memory_order_acquire)) {
			/* on 32-bit systems we have to do atomic read here */
			rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw,
				(uint64_t *)(uintptr_t)&ot.raw, nt.raw,
				rte_memory_order_release, rte_memory_order_acquire);
			return;
		}

		rte_atomic_store_explicit(&ht->rts_cache[nt.val.pos & mask].num,
			0, rte_memory_order_release);

		 /* Now it is safe to update the tail. */
		rte_atomic_store_explicit(&ht->tail.raw, nt.raw, rte_memory_order_release);

		ot.val.pos = nt.val.pos;
		nt.val.pos += num;
	};
}


^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [PATCH v2] ring: add the second version of the RTS interface
  2025-01-08  1:41   ` Huichao Cai
@ 2025-01-14 15:04     ` Thomas Monjalon
  0 siblings, 0 replies; 7+ messages in thread
From: Thomas Monjalon @ 2025-01-14 15:04 UTC (permalink / raw)
  To: Huichao Cai; +Cc: dev, honnappa.nagarahalli, konstantin.v.ananyev

08/01/2025 02:41, Huichao Cai:
> Hi,Thomas
>     This patch adds a field to the ABI structure.I have added the suppress_type
> field in the file libabigail.abignore, but "ci/github-robot: Build" still reported
> an error, could you please advise on how to fill in the suppress_type field?

You must check locally and see what happens when you add some suppressions.

You will find documentation here:
https://sourceware.org/libabigail/manual/libabigail-concepts.html#suppression-specifications



^ permalink raw reply	[flat|nested] 7+ messages in thread

* [PATCH v2] ring: add the second version of the RTS interface
@ 2025-01-05 15:09 Huichao Cai
  0 siblings, 0 replies; 7+ messages in thread
From: Huichao Cai @ 2025-01-05 15:09 UTC (permalink / raw)
  To: honnappa.nagarahalli, konstantin.v.ananyev, thomas; +Cc: dev

The timing of the update of the RTS enqueues/dequeues tail is
limited to the last enqueues/dequeues, which reduces concurrency,
so the RTS interface of the V2 version is added, which makes the tail
of the enqueues/dequeues not limited to the last enqueues/dequeues
and thus enables timely updates to increase concurrency.

Add some corresponding test cases.

Signed-off-by: Huichao Cai <chcchc88@163.com>
---
 app/test/meson.build                   |   1 +
 app/test/test_ring.c                   |  26 +++
 app/test/test_ring_rts_v2_stress.c     |  32 ++++
 app/test/test_ring_stress.c            |   3 +
 app/test/test_ring_stress.h            |   1 +
 devtools/libabigail.abignore           |   6 +
 doc/guides/rel_notes/release_25_03.rst |   2 +
 lib/ring/rte_ring.c                    |  54 ++++++-
 lib/ring/rte_ring.h                    |  12 ++
 lib/ring/rte_ring_core.h               |   9 ++
 lib/ring/rte_ring_elem.h               |  18 +++
 lib/ring/rte_ring_rts.h                | 216 ++++++++++++++++++++++++-
 lib/ring/rte_ring_rts_elem_pvt.h       | 168 +++++++++++++++++++
 13 files changed, 538 insertions(+), 10 deletions(-)
 create mode 100644 app/test/test_ring_rts_v2_stress.c

diff --git a/app/test/meson.build b/app/test/meson.build
index d5cb6a7f7a..e3d8cef3fa 100644
--- a/app/test/meson.build
+++ b/app/test/meson.build
@@ -166,6 +166,7 @@ source_file_deps = {
     'test_ring_mt_peek_stress_zc.c': ['ptr_compress'],
     'test_ring_perf.c': ['ptr_compress'],
     'test_ring_rts_stress.c': ['ptr_compress'],
+    'test_ring_rts_v2_stress.c': ['ptr_compress'],
     'test_ring_st_peek_stress.c': ['ptr_compress'],
     'test_ring_st_peek_stress_zc.c': ['ptr_compress'],
     'test_ring_stress.c': ['ptr_compress'],
diff --git a/app/test/test_ring.c b/app/test/test_ring.c
index ba1fec1de3..094f14b859 100644
--- a/app/test/test_ring.c
+++ b/app/test/test_ring.c
@@ -284,6 +284,19 @@ static const struct {
 			.felem = rte_ring_dequeue_bulk_elem,
 		},
 	},
+	{
+		.desc = "MP_RTS/MC_RTS V2 sync mode",
+		.api_type = TEST_RING_ELEM_BULK | TEST_RING_THREAD_DEF,
+		.create_flags = RING_F_MP_RTS_V2_ENQ | RING_F_MC_RTS_V2_DEQ,
+		.enq = {
+			.flegacy = rte_ring_enqueue_bulk,
+			.felem = rte_ring_enqueue_bulk_elem,
+		},
+		.deq = {
+			.flegacy = rte_ring_dequeue_bulk,
+			.felem = rte_ring_dequeue_bulk_elem,
+		},
+	},
 	{
 		.desc = "MP_HTS/MC_HTS sync mode",
 		.api_type = TEST_RING_ELEM_BULK | TEST_RING_THREAD_DEF,
@@ -349,6 +362,19 @@ static const struct {
 			.felem = rte_ring_dequeue_burst_elem,
 		},
 	},
+	{
+		.desc = "MP_RTS/MC_RTS V2 sync mode",
+		.api_type = TEST_RING_ELEM_BURST | TEST_RING_THREAD_DEF,
+		.create_flags = RING_F_MP_RTS_V2_ENQ | RING_F_MC_RTS_V2_DEQ,
+		.enq = {
+			.flegacy = rte_ring_enqueue_burst,
+			.felem = rte_ring_enqueue_burst_elem,
+		},
+		.deq = {
+			.flegacy = rte_ring_dequeue_burst,
+			.felem = rte_ring_dequeue_burst_elem,
+		},
+	},
 	{
 		.desc = "MP_HTS/MC_HTS sync mode",
 		.api_type = TEST_RING_ELEM_BURST | TEST_RING_THREAD_DEF,
diff --git a/app/test/test_ring_rts_v2_stress.c b/app/test/test_ring_rts_v2_stress.c
new file mode 100644
index 0000000000..6079366a7d
--- /dev/null
+++ b/app/test/test_ring_rts_v2_stress.c
@@ -0,0 +1,32 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+#include "test_ring_stress_impl.h"
+
+static inline uint32_t
+_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n,
+	uint32_t *avail)
+{
+	return rte_ring_mc_rts_v2_dequeue_bulk(r, obj, n, avail);
+}
+
+static inline uint32_t
+_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n,
+	uint32_t *free)
+{
+	return rte_ring_mp_rts_v2_enqueue_bulk(r, obj, n, free);
+}
+
+static int
+_st_ring_init(struct rte_ring *r, const char *name, uint32_t num)
+{
+	return rte_ring_init(r, name, num,
+		RING_F_MP_RTS_V2_ENQ | RING_F_MC_RTS_V2_DEQ);
+}
+
+const struct test test_ring_rts_v2_stress = {
+	.name = "MT_RTS_V2",
+	.nb_case = RTE_DIM(tests),
+	.cases = tests,
+};
diff --git a/app/test/test_ring_stress.c b/app/test/test_ring_stress.c
index 1af45e0fc8..94085acd5e 100644
--- a/app/test/test_ring_stress.c
+++ b/app/test/test_ring_stress.c
@@ -43,6 +43,9 @@ test_ring_stress(void)
 	n += test_ring_rts_stress.nb_case;
 	k += run_test(&test_ring_rts_stress);
 
+	n += test_ring_rts_v2_stress.nb_case;
+	k += run_test(&test_ring_rts_v2_stress);
+
 	n += test_ring_hts_stress.nb_case;
 	k += run_test(&test_ring_hts_stress);
 
diff --git a/app/test/test_ring_stress.h b/app/test/test_ring_stress.h
index 416d68c9a0..505957f6fb 100644
--- a/app/test/test_ring_stress.h
+++ b/app/test/test_ring_stress.h
@@ -34,6 +34,7 @@ struct test {
 
 extern const struct test test_ring_mpmc_stress;
 extern const struct test test_ring_rts_stress;
+extern const struct test test_ring_rts_v2_stress;
 extern const struct test test_ring_hts_stress;
 extern const struct test test_ring_mt_peek_stress;
 extern const struct test test_ring_mt_peek_stress_zc;
diff --git a/devtools/libabigail.abignore b/devtools/libabigail.abignore
index 21b8cd6113..d4dd99a99e 100644
--- a/devtools/libabigail.abignore
+++ b/devtools/libabigail.abignore
@@ -33,3 +33,9 @@
 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 ; Temporary exceptions till next major ABI version ;
 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+[suppress_type]
+       type_kind = struct
+       name = rte_ring_rts_cache
+[suppress_type]
+       name = rte_ring_rts_headtail
+       has_data_member_inserted_between = {offset_of(head), end}
diff --git a/doc/guides/rel_notes/release_25_03.rst b/doc/guides/rel_notes/release_25_03.rst
index 426dfcd982..f73bc9e397 100644
--- a/doc/guides/rel_notes/release_25_03.rst
+++ b/doc/guides/rel_notes/release_25_03.rst
@@ -102,6 +102,8 @@ ABI Changes
 
 * No ABI change that would break compatibility with 24.11.
 
+* ring: Added ``rte_ring_rts_cache`` structure and ``rts_cache`` field to the
+  ``rte_ring_rts_headtail`` structure.
 
 Known Issues
 ------------
diff --git a/lib/ring/rte_ring.c b/lib/ring/rte_ring.c
index aebb6d6728..ada1ae88fa 100644
--- a/lib/ring/rte_ring.c
+++ b/lib/ring/rte_ring.c
@@ -43,7 +43,8 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
 /* mask of all valid flag values to ring_create() */
 #define RING_F_MASK (RING_F_SP_ENQ | RING_F_SC_DEQ | RING_F_EXACT_SZ | \
 		     RING_F_MP_RTS_ENQ | RING_F_MC_RTS_DEQ |	       \
-		     RING_F_MP_HTS_ENQ | RING_F_MC_HTS_DEQ)
+		     RING_F_MP_HTS_ENQ | RING_F_MC_HTS_DEQ |	       \
+		     RING_F_MP_RTS_V2_ENQ | RING_F_MC_RTS_V2_DEQ)
 
 /* true if x is a power of 2 */
 #define POWEROF2(x) ((((x)-1) & (x)) == 0)
@@ -106,6 +107,7 @@ reset_headtail(void *p)
 		ht->tail = 0;
 		break;
 	case RTE_RING_SYNC_MT_RTS:
+	case RTE_RING_SYNC_MT_RTS_V2:
 		ht_rts->head.raw = 0;
 		ht_rts->tail.raw = 0;
 		break;
@@ -135,9 +137,11 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st,
 	enum rte_ring_sync_type *cons_st)
 {
 	static const uint32_t prod_st_flags =
-		(RING_F_SP_ENQ | RING_F_MP_RTS_ENQ | RING_F_MP_HTS_ENQ);
+		(RING_F_SP_ENQ | RING_F_MP_RTS_ENQ | RING_F_MP_HTS_ENQ |
+		RING_F_MP_RTS_V2_ENQ);
 	static const uint32_t cons_st_flags =
-		(RING_F_SC_DEQ | RING_F_MC_RTS_DEQ | RING_F_MC_HTS_DEQ);
+		(RING_F_SC_DEQ | RING_F_MC_RTS_DEQ | RING_F_MC_HTS_DEQ |
+		RING_F_MC_RTS_V2_DEQ);
 
 	switch (flags & prod_st_flags) {
 	case 0:
@@ -152,6 +156,9 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st,
 	case RING_F_MP_HTS_ENQ:
 		*prod_st = RTE_RING_SYNC_MT_HTS;
 		break;
+	case RING_F_MP_RTS_V2_ENQ:
+		*prod_st = RTE_RING_SYNC_MT_RTS_V2;
+		break;
 	default:
 		return -EINVAL;
 	}
@@ -169,6 +176,9 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st,
 	case RING_F_MC_HTS_DEQ:
 		*cons_st = RTE_RING_SYNC_MT_HTS;
 		break;
+	case RING_F_MC_RTS_V2_DEQ:
+		*cons_st = RTE_RING_SYNC_MT_RTS_V2;
+		break;
 	default:
 		return -EINVAL;
 	}
@@ -239,6 +249,28 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
 	if (flags & RING_F_MC_RTS_DEQ)
 		rte_ring_set_cons_htd_max(r, r->capacity / HTD_MAX_DEF);
 
+	/* set default values for head-tail distance and allocate memory to cache */
+	if (flags & RING_F_MP_RTS_V2_ENQ) {
+		rte_ring_set_prod_htd_max(r, r->capacity / HTD_MAX_DEF);
+		r->rts_prod.rts_cache = (struct rte_ring_rts_cache *)rte_zmalloc(
+			"RTS_PROD_CACHE", sizeof(struct rte_ring_rts_cache) * r->size, 0);
+		if (r->rts_prod.rts_cache == NULL) {
+			RING_LOG(ERR, "Cannot reserve memory for rts prod cache");
+			return -ENOMEM;
+		}
+	}
+	if (flags & RING_F_MC_RTS_V2_DEQ) {
+		rte_ring_set_cons_htd_max(r, r->capacity / HTD_MAX_DEF);
+		r->rts_cons.rts_cache = (struct rte_ring_rts_cache *)rte_zmalloc(
+			"RTS_CONS_CACHE", sizeof(struct rte_ring_rts_cache) * r->size, 0);
+		if (r->rts_cons.rts_cache == NULL) {
+			if (flags & RING_F_MP_RTS_V2_ENQ)
+				rte_free(r->rts_prod.rts_cache);
+			RING_LOG(ERR, "Cannot reserve memory for rts cons cache");
+			return -ENOMEM;
+		}
+	}
+
 	return 0;
 }
 
@@ -293,9 +325,14 @@ rte_ring_create_elem(const char *name, unsigned int esize, unsigned int count,
 					 mz_flags, alignof(typeof(*r)));
 	if (mz != NULL) {
 		r = mz->addr;
-		/* no need to check return value here, we already checked the
-		 * arguments above */
-		rte_ring_init(r, name, requested_count, flags);
+
+		if (rte_ring_init(r, name, requested_count, flags)) {
+			rte_free(te);
+			if (rte_memzone_free(mz) != 0)
+				RING_LOG(ERR, "Cannot free memory for ring");
+			rte_mcfg_tailq_write_unlock();
+			return NULL;
+		}
 
 		te->data = (void *) r;
 		r->memzone = mz;
@@ -358,6 +395,11 @@ rte_ring_free(struct rte_ring *r)
 
 	rte_mcfg_tailq_write_unlock();
 
+	if (r->flags & RING_F_MP_RTS_V2_ENQ)
+		rte_free(r->rts_prod.rts_cache);
+	if (r->flags & RING_F_MC_RTS_V2_DEQ)
+		rte_free(r->rts_cons.rts_cache);
+
 	if (rte_memzone_free(r->memzone) != 0)
 		RING_LOG(ERR, "Cannot free memory");
 
diff --git a/lib/ring/rte_ring.h b/lib/ring/rte_ring.h
index 11ca69c73d..2b35ce038e 100644
--- a/lib/ring/rte_ring.h
+++ b/lib/ring/rte_ring.h
@@ -89,6 +89,9 @@ ssize_t rte_ring_get_memsize(unsigned int count);
  *      - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when
  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
  *        is "multi-producer RTS mode".
+ *      - RING_F_MP_RTS_V2_ENQ: If this flag is set, the default behavior when
+ *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
+ *        is "multi-producer RTS V2 mode".
  *      - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when
  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
  *        is "multi-producer HTS mode".
@@ -101,6 +104,9 @@ ssize_t rte_ring_get_memsize(unsigned int count);
  *      - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when
  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *        is "multi-consumer RTS mode".
+ *      - RING_F_MC_RTS_V2_DEQ: If this flag is set, the default behavior when
+ *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
+ *        is "multi-consumer RTS V2 mode".
  *      - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when
  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *        is "multi-consumer HTS mode".
@@ -149,6 +155,9 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
  *      - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when
  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
  *        is "multi-producer RTS mode".
+ *      - RING_F_MP_RTS_V2_ENQ: If this flag is set, the default behavior when
+ *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
+ *        is "multi-producer RTS V2 mode".
  *      - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when
  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
  *        is "multi-producer HTS mode".
@@ -161,6 +170,9 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
  *      - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when
  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *        is "multi-consumer RTS mode".
+ *      - RING_F_MC_RTS_V2_DEQ: If this flag is set, the default behavior when
+ *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
+ *        is "multi-consumer RTS V2 mode".
  *      - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when
  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *        is "multi-consumer HTS mode".
diff --git a/lib/ring/rte_ring_core.h b/lib/ring/rte_ring_core.h
index 6cd6ce9884..9e627d26c1 100644
--- a/lib/ring/rte_ring_core.h
+++ b/lib/ring/rte_ring_core.h
@@ -55,6 +55,7 @@ enum rte_ring_sync_type {
 	RTE_RING_SYNC_ST,     /**< single thread only */
 	RTE_RING_SYNC_MT_RTS, /**< multi-thread relaxed tail sync */
 	RTE_RING_SYNC_MT_HTS, /**< multi-thread head/tail sync */
+	RTE_RING_SYNC_MT_RTS_V2, /**< multi-thread relaxed tail sync v2 */
 };
 
 /**
@@ -82,11 +83,16 @@ union __rte_ring_rts_poscnt {
 	} val;
 };
 
+struct rte_ring_rts_cache {
+	volatile RTE_ATOMIC(uint32_t) num;      /**< Number of objs. */
+};
+
 struct rte_ring_rts_headtail {
 	volatile union __rte_ring_rts_poscnt tail;
 	enum rte_ring_sync_type sync_type;  /**< sync type of prod/cons */
 	uint32_t htd_max;   /**< max allowed distance between head/tail */
 	volatile union __rte_ring_rts_poscnt head;
+	struct rte_ring_rts_cache *rts_cache; /**< Cache of prod/cons */
 };
 
 union __rte_ring_hts_pos {
@@ -163,4 +169,7 @@ struct rte_ring {
 #define RING_F_MP_HTS_ENQ 0x0020 /**< The default enqueue is "MP HTS". */
 #define RING_F_MC_HTS_DEQ 0x0040 /**< The default dequeue is "MC HTS". */
 
+#define RING_F_MP_RTS_V2_ENQ 0x0080 /**< The default enqueue is "MP RTS V2". */
+#define RING_F_MC_RTS_V2_DEQ 0x0100 /**< The default dequeue is "MC RTS V2". */
+
 #endif /* _RTE_RING_CORE_H_ */
diff --git a/lib/ring/rte_ring_elem.h b/lib/ring/rte_ring_elem.h
index b96bfc003f..1352709f94 100644
--- a/lib/ring/rte_ring_elem.h
+++ b/lib/ring/rte_ring_elem.h
@@ -71,6 +71,9 @@ ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count);
  *      - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when
  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
  *        is "multi-producer RTS mode".
+ *      - RING_F_MP_RTS_V2_ENQ: If this flag is set, the default behavior when
+ *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
+ *        is "multi-producer RTS V2 mode".
  *      - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when
  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
  *        is "multi-producer HTS mode".
@@ -83,6 +86,9 @@ ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count);
  *      - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when
  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *        is "multi-consumer RTS mode".
+ *      - RING_F_MC_RTS_V2_DEQ: If this flag is set, the default behavior when
+ *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
+ *        is "multi-consumer RTS V2 mode".
  *      - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when
  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *        is "multi-consumer HTS mode".
@@ -203,6 +209,9 @@ rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
 	case RTE_RING_SYNC_MT_HTS:
 		return rte_ring_mp_hts_enqueue_bulk_elem(r, obj_table, esize, n,
 			free_space);
+	case RTE_RING_SYNC_MT_RTS_V2:
+		return rte_ring_mp_rts_v2_enqueue_bulk_elem(r, obj_table, esize, n,
+			free_space);
 	}
 
 	/* valid ring should never reach this point */
@@ -385,6 +394,9 @@ rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
 	case RTE_RING_SYNC_MT_HTS:
 		return rte_ring_mc_hts_dequeue_bulk_elem(r, obj_table, esize,
 			n, available);
+	case RTE_RING_SYNC_MT_RTS_V2:
+		return rte_ring_mc_rts_v2_dequeue_bulk_elem(r, obj_table, esize,
+			n, available);
 	}
 
 	/* valid ring should never reach this point */
@@ -571,6 +583,9 @@ rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
 	case RTE_RING_SYNC_MT_HTS:
 		return rte_ring_mp_hts_enqueue_burst_elem(r, obj_table, esize,
 			n, free_space);
+	case RTE_RING_SYNC_MT_RTS_V2:
+		return rte_ring_mp_rts_v2_enqueue_burst_elem(r, obj_table, esize,
+			n, free_space);
 	}
 
 	/* valid ring should never reach this point */
@@ -681,6 +696,9 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
 	case RTE_RING_SYNC_MT_HTS:
 		return rte_ring_mc_hts_dequeue_burst_elem(r, obj_table, esize,
 			n, available);
+	case RTE_RING_SYNC_MT_RTS_V2:
+		return rte_ring_mc_rts_v2_dequeue_burst_elem(r, obj_table, esize,
+			n, available);
 	}
 
 	/* valid ring should never reach this point */
diff --git a/lib/ring/rte_ring_rts.h b/lib/ring/rte_ring_rts.h
index d7a3863c83..b47e400452 100644
--- a/lib/ring/rte_ring_rts.h
+++ b/lib/ring/rte_ring_rts.h
@@ -84,6 +84,33 @@ rte_ring_mp_rts_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
 			RTE_RING_QUEUE_FIXED, free_space);
 }
 
+/**
+ * Enqueue several objects on the RTS ring (multi-producers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of objects.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   The number of objects enqueued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_mp_rts_v2_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
+	unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_rts_v2_enqueue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_FIXED, free_space);
+}
+
 /**
  * Dequeue several objects from an RTS ring (multi-consumers safe).
  *
@@ -111,6 +138,33 @@ rte_ring_mc_rts_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
 			RTE_RING_QUEUE_FIXED, available);
 }
 
+/**
+ * Dequeue several objects from an RTS ring (multi-consumers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of objects that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   The number of objects dequeued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_mc_rts_v2_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
+	unsigned int esize, unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_rts_v2_dequeue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_FIXED, available);
+}
+
 /**
  * Enqueue several objects on the RTS ring (multi-producers safe).
  *
@@ -138,6 +192,33 @@ rte_ring_mp_rts_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
 			RTE_RING_QUEUE_VARIABLE, free_space);
 }
 
+/**
+ * Enqueue several objects on the RTS ring (multi-producers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of objects.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   - n: Actual number of objects enqueued.
+ */
+static __rte_always_inline unsigned int
+rte_ring_mp_rts_v2_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
+	unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_rts_v2_enqueue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_VARIABLE, free_space);
+}
+
 /**
  * Dequeue several objects from an RTS  ring (multi-consumers safe).
  * When the requested objects are more than the available objects,
@@ -167,6 +248,35 @@ rte_ring_mc_rts_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
 			RTE_RING_QUEUE_VARIABLE, available);
 }
 
+/**
+ * Dequeue several objects from an RTS  ring (multi-consumers safe).
+ * When the requested objects are more than the available objects,
+ * only dequeue the actual number of objects.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of objects that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   - n: Actual number of objects dequeued, 0 if ring is empty
+ */
+static __rte_always_inline unsigned int
+rte_ring_mc_rts_v2_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
+	unsigned int esize, unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_rts_v2_dequeue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_VARIABLE, available);
+}
+
 /**
  * Enqueue several objects on the RTS ring (multi-producers safe).
  *
@@ -213,6 +323,52 @@ rte_ring_mc_rts_dequeue_bulk(struct rte_ring *r, void **obj_table,
 			sizeof(uintptr_t), n, available);
 }
 
+/**
+ * Enqueue several objects on the RTS V2 ring (multi-producers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   The number of objects enqueued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_mp_rts_v2_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
+			 unsigned int n, unsigned int *free_space)
+{
+	return rte_ring_mp_rts_v2_enqueue_bulk_elem(r, obj_table,
+			sizeof(uintptr_t), n, free_space);
+}
+
+/**
+ * Dequeue several objects from an RTS V2 ring (multi-consumers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   The number of objects dequeued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_mc_rts_v2_dequeue_bulk(struct rte_ring *r, void **obj_table,
+		unsigned int n, unsigned int *available)
+{
+	return rte_ring_mc_rts_v2_dequeue_bulk_elem(r, obj_table,
+			sizeof(uintptr_t), n, available);
+}
+
 /**
  * Enqueue several objects on the RTS ring (multi-producers safe).
  *
@@ -261,6 +417,54 @@ rte_ring_mc_rts_dequeue_burst(struct rte_ring *r, void **obj_table,
 			sizeof(uintptr_t), n, available);
 }
 
+/**
+ * Enqueue several objects on the RTS V2 ring (multi-producers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   - n: Actual number of objects enqueued.
+ */
+static __rte_always_inline unsigned int
+rte_ring_mp_rts_v2_enqueue_burst(struct rte_ring *r, void * const *obj_table,
+			 unsigned int n, unsigned int *free_space)
+{
+	return rte_ring_mp_rts_v2_enqueue_burst_elem(r, obj_table,
+			sizeof(uintptr_t), n, free_space);
+}
+
+/**
+ * Dequeue several objects from an RTS V2 ring (multi-consumers safe).
+ * When the requested objects are more than the available objects,
+ * only dequeue the actual number of objects.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   - n: Actual number of objects dequeued, 0 if ring is empty
+ */
+static __rte_always_inline unsigned int
+rte_ring_mc_rts_v2_dequeue_burst(struct rte_ring *r, void **obj_table,
+		unsigned int n, unsigned int *available)
+{
+	return rte_ring_mc_rts_v2_dequeue_burst_elem(r, obj_table,
+			sizeof(uintptr_t), n, available);
+}
+
 /**
  * Return producer max Head-Tail-Distance (HTD).
  *
@@ -273,7 +477,8 @@ rte_ring_mc_rts_dequeue_burst(struct rte_ring *r, void **obj_table,
 static inline uint32_t
 rte_ring_get_prod_htd_max(const struct rte_ring *r)
 {
-	if (r->prod.sync_type == RTE_RING_SYNC_MT_RTS)
+	if ((r->prod.sync_type == RTE_RING_SYNC_MT_RTS) ||
+			(r->prod.sync_type == RTE_RING_SYNC_MT_RTS_V2))
 		return r->rts_prod.htd_max;
 	return UINT32_MAX;
 }
@@ -292,7 +497,8 @@ rte_ring_get_prod_htd_max(const struct rte_ring *r)
 static inline int
 rte_ring_set_prod_htd_max(struct rte_ring *r, uint32_t v)
 {
-	if (r->prod.sync_type != RTE_RING_SYNC_MT_RTS)
+	if ((r->prod.sync_type != RTE_RING_SYNC_MT_RTS) &&
+			(r->prod.sync_type != RTE_RING_SYNC_MT_RTS_V2))
 		return -ENOTSUP;
 
 	r->rts_prod.htd_max = v;
@@ -311,7 +517,8 @@ rte_ring_set_prod_htd_max(struct rte_ring *r, uint32_t v)
 static inline uint32_t
 rte_ring_get_cons_htd_max(const struct rte_ring *r)
 {
-	if (r->cons.sync_type == RTE_RING_SYNC_MT_RTS)
+	if ((r->cons.sync_type == RTE_RING_SYNC_MT_RTS) ||
+			(r->cons.sync_type == RTE_RING_SYNC_MT_RTS_V2))
 		return r->rts_cons.htd_max;
 	return UINT32_MAX;
 }
@@ -330,7 +537,8 @@ rte_ring_get_cons_htd_max(const struct rte_ring *r)
 static inline int
 rte_ring_set_cons_htd_max(struct rte_ring *r, uint32_t v)
 {
-	if (r->cons.sync_type != RTE_RING_SYNC_MT_RTS)
+	if ((r->cons.sync_type != RTE_RING_SYNC_MT_RTS) &&
+			(r->cons.sync_type != RTE_RING_SYNC_MT_RTS_V2))
 		return -ENOTSUP;
 
 	r->rts_cons.htd_max = v;
diff --git a/lib/ring/rte_ring_rts_elem_pvt.h b/lib/ring/rte_ring_rts_elem_pvt.h
index 122650346b..4ce22a93ed 100644
--- a/lib/ring/rte_ring_rts_elem_pvt.h
+++ b/lib/ring/rte_ring_rts_elem_pvt.h
@@ -46,6 +46,92 @@ __rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht)
 			rte_memory_order_release, rte_memory_order_acquire) == 0);
 }
 
+/**
+ * @file rte_ring_rts_elem_pvt.h
+ * It is not recommended to include this file directly,
+ * include <rte_ring.h> instead.
+ * Contains internal helper functions for Relaxed Tail Sync (RTS) ring mode.
+ * For more information please refer to <rte_ring_rts.h>.
+ */
+
+/**
+ * @internal This function updates tail values.
+ */
+static __rte_always_inline void
+__rte_ring_rts_v2_update_tail(struct rte_ring_rts_headtail *ht,
+	uint32_t old_tail, uint32_t num, uint32_t mask)
+{
+	union __rte_ring_rts_poscnt ot, nt;
+
+	ot.val.cnt = nt.val.cnt = 0;
+	ot.val.pos = old_tail;
+	nt.val.pos = old_tail + num;
+
+	/*
+	 * If the tail is equal to the current enqueues/dequeues, update
+	 * the tail with new value and then continue to try to update the
+	 * tail until the num of the cache is 0, otherwise write the num of
+	 * the current enqueues/dequeues to the cache.
+	 */
+
+	if (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw,
+				(uint64_t *)(uintptr_t)&ot.raw, nt.raw,
+				rte_memory_order_release, rte_memory_order_acquire) == 0) {
+		ot.val.pos = old_tail;
+
+		/*
+		 * Write the num of the current enqueues/dequeues to the
+		 * corresponding cache.
+		 */
+		rte_atomic_store_explicit(&ht->rts_cache[ot.val.pos & mask].num,
+			num, rte_memory_order_release);
+
+		/*
+		 * There may be competition with another enqueues/dequeues
+		 * for the update tail. The winner continues to try to update
+		 * the tail, and the loser exits.
+		 */
+		if (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw,
+					(uint64_t *)(uintptr_t)&ot.raw, nt.raw,
+					rte_memory_order_release, rte_memory_order_acquire) == 0)
+			return;
+
+		/*
+		 * Set the corresponding cache to 0 for next use.
+		 */
+		rte_atomic_store_explicit(&ht->rts_cache[ot.val.pos & mask].num,
+			0, rte_memory_order_release);
+	}
+
+	/*
+	 * Try to update the tail until the num of the corresponding cache is 0.
+	 * Getting here means that the current enqueues/dequeues is trying to update
+	 * the tail of another enqueues/dequeues.
+	 */
+	while (1) {
+		num = rte_atomic_load_explicit(&ht->rts_cache[nt.val.pos & mask].num,
+			rte_memory_order_acquire);
+		if (num == 0)
+			break;
+
+		ot.val.pos = nt.val.pos;
+		nt.val.pos += num;
+
+		/*
+		 * There may be competition with another enqueues/dequeues
+		 * for the update tail. The winner continues to try to update
+		 * the tail, and the loser exits.
+		 */
+		if (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw,
+					(uint64_t *)(uintptr_t)&ot.raw, nt.raw,
+					rte_memory_order_release, rte_memory_order_acquire) == 0)
+			return;
+
+		rte_atomic_store_explicit(&ht->rts_cache[ot.val.pos & mask].num,
+			0, rte_memory_order_release);
+	};
+}
+
 /**
  * @internal This function waits till head/tail distance wouldn't
  * exceed pre-defined max value.
@@ -218,6 +304,47 @@ __rte_ring_do_rts_enqueue_elem(struct rte_ring *r, const void *obj_table,
 	return n;
 }
 
+/**
+ * @internal Enqueue several objects on the RTS ring.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of objects.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_rts_v2_enqueue_elem(struct rte_ring *r, const void *obj_table,
+	uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
+	uint32_t *free_space)
+{
+	uint32_t free, head;
+
+	n =  __rte_ring_rts_move_prod_head(r, n, behavior, &head, &free);
+
+	if (n != 0) {
+		__rte_ring_enqueue_elems(r, head, obj_table, esize, n);
+		__rte_ring_rts_v2_update_tail(&r->rts_prod, head, n, r->mask);
+	}
+
+	if (free_space != NULL)
+		*free_space = free - n;
+	return n;
+}
+
 /**
  * @internal Dequeue several objects from the RTS ring.
  *
@@ -259,4 +386,45 @@ __rte_ring_do_rts_dequeue_elem(struct rte_ring *r, void *obj_table,
 	return n;
 }
 
+/**
+ * @internal Dequeue several objects from the RTS ring.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of objects.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_rts_v2_dequeue_elem(struct rte_ring *r, void *obj_table,
+	uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
+	uint32_t *available)
+{
+	uint32_t entries, head;
+
+	n = __rte_ring_rts_move_cons_head(r, n, behavior, &head, &entries);
+
+	if (n != 0) {
+		__rte_ring_dequeue_elems(r, head, obj_table, esize, n);
+		__rte_ring_rts_v2_update_tail(&r->rts_cons, head, n, r->mask);
+	}
+
+	if (available != NULL)
+		*available = entries - n;
+	return n;
+}
+
 #endif /* _RTE_RING_RTS_ELEM_PVT_H_ */
-- 
2.27.0


^ permalink raw reply	[flat|nested] 7+ messages in thread

end of thread, other threads:[~2025-01-14 15:04 UTC | newest]

Thread overview: 7+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2025-01-05  9:57 [PATCH] ring: add the second version of the RTS interface Huichao Cai
2025-01-05 15:13 ` [PATCH v2] " Huichao Cai
2025-01-08  1:41   ` Huichao Cai
2025-01-14 15:04     ` Thomas Monjalon
2025-01-08 16:49   ` Konstantin Ananyev
2025-01-14 12:55   ` Huichao Cai
2025-01-05 15:09 Huichao Cai

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).