From: Konstantin Ananyev <konstantin.v.ananyev@yandex.ru>
To: dev@dpdk.org
Cc: honnappa.nagarahalli@arm.com, jerinj@marvell.com,
hemant.agrawal@nxp.com, bruce.richardson@intel.com,
drc@linux.vnet.ibm.com, ruifeng.wang@arm.com,
mb@smartsharesystems.com,
Konstantin Ananyev <konstantin.ananyev@huawei.com>
Subject: [RFC 6/6] ring: minimize reads of the counterpart cache-line
Date: Thu, 15 Aug 2024 09:53:39 +0100 [thread overview]
Message-ID: <20240815085339.1434-7-konstantin.v.ananyev@yandex.ru> (raw)
In-Reply-To: <20240815085339.1434-1-konstantin.v.ananyev@yandex.ru>
From: Konstantin Ananyev <konstantin.ananyev@huawei.com>
Note upfront: this change shouldn't affect rte_ring public API.
Though as layout of public structures have changed - it is an ABI
breakage.
This is an attempt to implement rte_ring optimization
that was suggested by Morten and discussed on this mailing
list a while ago.
The idea is to optimize MP/SP & MC/SC ring enqueue/dequeue ops
by storing along with the head its Cached Foreign Tail (CFT) value.
I.E.: for producer we cache consumer tail value and visa-versa.
To avoid races head and CFT values are read/written using atomic
64-bit ops.
In theory that might help by reducing number of times producer
needs to access consumer's cache-line and visa-versa.
In practice, I didn't see any impressive boost so far:
- ring_per_autotest micro-bench - results are a mixed bag,
Some are a bit better, some are worse.
- [so]ring_stress_autotest micro-benchs: ~10-15% improvement
- l3fwd in wqorder/wqundorder (see previous patch for details):
no real difference.
Though so far my testing scope was quite limited, I tried it only
on x86 machines. So can I ask all interested parties:
different platform vendors (ARM, PPC, etc.)
and people who do use rte_ring extensively to give it a try and come up
with the feedback.
If there would be no real performance improvements on
any platform we support, or some problems will be encountered -
I am ok to drop that patch.
Signed-off-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
---
drivers/net/mlx5/mlx5_hws_cnt.h | 5 ++--
drivers/net/ring/rte_eth_ring.c | 2 +-
lib/ring/rte_ring.c | 6 ++--
lib/ring/rte_ring_core.h | 12 +++++++-
lib/ring/rte_ring_generic_pvt.h | 46 +++++++++++++++++++++----------
lib/ring/rte_ring_peek_elem_pvt.h | 4 +--
lib/ring/soring.c | 31 +++++++++++++++------
lib/ring/soring.h | 4 +--
8 files changed, 77 insertions(+), 33 deletions(-)
diff --git a/drivers/net/mlx5/mlx5_hws_cnt.h b/drivers/net/mlx5/mlx5_hws_cnt.h
index 996ac8dd9a..663146563c 100644
--- a/drivers/net/mlx5/mlx5_hws_cnt.h
+++ b/drivers/net/mlx5/mlx5_hws_cnt.h
@@ -388,11 +388,12 @@ __mlx5_hws_cnt_pool_enqueue_revert(struct rte_ring *r, unsigned int n,
MLX5_ASSERT(r->prod.sync_type == RTE_RING_SYNC_ST);
MLX5_ASSERT(r->cons.sync_type == RTE_RING_SYNC_ST);
- current_head = rte_atomic_load_explicit(&r->prod.head, rte_memory_order_relaxed);
+ current_head = rte_atomic_load_explicit(&r->prod.head.val.pos,
+ rte_memory_order_relaxed);
MLX5_ASSERT(n <= r->capacity);
MLX5_ASSERT(n <= rte_ring_count(r));
revert2head = current_head - n;
- r->prod.head = revert2head; /* This ring should be SP. */
+ r->prod.head.val.pos = revert2head; /* This ring should be SP. */
__rte_ring_get_elem_addr(r, revert2head, sizeof(cnt_id_t), n,
&zcd->ptr1, &zcd->n1, &zcd->ptr2);
/* Update tail */
diff --git a/drivers/net/ring/rte_eth_ring.c b/drivers/net/ring/rte_eth_ring.c
index 1346a0dba3..31009e90d2 100644
--- a/drivers/net/ring/rte_eth_ring.c
+++ b/drivers/net/ring/rte_eth_ring.c
@@ -325,7 +325,7 @@ eth_get_monitor_addr(void *rx_queue, struct rte_power_monitor_cond *pmc)
*/
pmc->addr = &rng->prod.head;
pmc->size = sizeof(rng->prod.head);
- pmc->opaque[0] = rng->prod.head;
+ pmc->opaque[0] = rng->prod.head.val.pos;
pmc->fn = ring_monitor_callback;
return 0;
}
diff --git a/lib/ring/rte_ring.c b/lib/ring/rte_ring.c
index aebb6d6728..cb2c39c7ad 100644
--- a/lib/ring/rte_ring.c
+++ b/lib/ring/rte_ring.c
@@ -102,7 +102,7 @@ reset_headtail(void *p)
switch (ht->sync_type) {
case RTE_RING_SYNC_MT:
case RTE_RING_SYNC_ST:
- ht->head = 0;
+ ht->head.raw = 0;
ht->tail = 0;
break;
case RTE_RING_SYNC_MT_RTS:
@@ -373,9 +373,9 @@ rte_ring_dump(FILE *f, const struct rte_ring *r)
fprintf(f, " size=%"PRIu32"\n", r->size);
fprintf(f, " capacity=%"PRIu32"\n", r->capacity);
fprintf(f, " ct=%"PRIu32"\n", r->cons.tail);
- fprintf(f, " ch=%"PRIu32"\n", r->cons.head);
+ fprintf(f, " ch=%"PRIu32"\n", r->cons.head.val.pos);
fprintf(f, " pt=%"PRIu32"\n", r->prod.tail);
- fprintf(f, " ph=%"PRIu32"\n", r->prod.head);
+ fprintf(f, " ph=%"PRIu32"\n", r->prod.head.val.pos);
fprintf(f, " used=%u\n", rte_ring_count(r));
fprintf(f, " avail=%u\n", rte_ring_free_count(r));
}
diff --git a/lib/ring/rte_ring_core.h b/lib/ring/rte_ring_core.h
index 270869d214..b88a1bc352 100644
--- a/lib/ring/rte_ring_core.h
+++ b/lib/ring/rte_ring_core.h
@@ -66,8 +66,17 @@ enum rte_ring_sync_type {
* Depending on sync_type format of that structure might be different,
* but offset for *sync_type* and *tail* values should remain the same.
*/
+union __rte_ring_head_cft {
+ /** raw 8B value to read/write *cnt* and *pos* as one atomic op */
+ alignas(sizeof(uint64_t)) RTE_ATOMIC(uint64_t) raw;
+ struct {
+ uint32_t pos; /**< head position */
+ uint32_t cft; /**< cached foreign tail value*/
+ } val;
+};
+
struct rte_ring_headtail {
- volatile RTE_ATOMIC(uint32_t) head; /**< prod/consumer head. */
+ uint32_t __unused;
volatile RTE_ATOMIC(uint32_t) tail; /**< prod/consumer tail. */
union {
/** sync type of prod/cons */
@@ -75,6 +84,7 @@ struct rte_ring_headtail {
/** deprecated - True if single prod/cons */
uint32_t single;
};
+ union __rte_ring_head_cft head;
};
union __rte_ring_rts_poscnt {
diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_generic_pvt.h
index 12f3595926..e70f4ff32c 100644
--- a/lib/ring/rte_ring_generic_pvt.h
+++ b/lib/ring/rte_ring_generic_pvt.h
@@ -38,17 +38,18 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
{
unsigned int max = n;
int success;
+ uint32_t tail;
+ union __rte_ring_head_cft nh, oh;
+
+ oh.raw = rte_atomic_load_explicit(&d->head.raw,
+ rte_memory_order_acquire);
do {
/* Reset n to the initial burst count */
n = max;
- *old_head = d->head;
-
- /* add rmb barrier to avoid load/load reorder in weak
- * memory model. It is noop on x86
- */
- rte_smp_rmb();
+ *old_head = oh.val.pos;
+ tail = oh.val.cft;
/*
* The subtraction is done between two unsigned 32bits value
@@ -56,24 +57,41 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
* *old_head > s->tail). So 'free_entries' is always between 0
* and capacity (which is < size).
*/
- *entries = (capacity + s->tail - *old_head);
+ *entries = (capacity + tail - *old_head);
- /* check that we have enough room in ring */
- if (unlikely(n > *entries))
- n = (behavior == RTE_RING_QUEUE_FIXED) ?
+ /* attempt #1: check that we have enough room with
+ * cached-foreign-tail value.
+ * Note that actual tail value can go forward till we cached
+ * it, in that case we might have to update our cached value.
+ */
+ if (unlikely(n > *entries)) {
+
+ tail = rte_atomic_load_explicit(&s->tail,
+ rte_memory_order_relaxed);
+ *entries = (capacity + tail - *old_head);
+
+ /* attempt #2: check that we have enough room in ring */
+ if (unlikely(n > *entries))
+ n = (behavior == RTE_RING_QUEUE_FIXED) ?
0 : *entries;
+ }
if (n == 0)
return 0;
*new_head = *old_head + n;
+ nh.val.pos = *new_head;
+ nh.val.cft = tail;
+
if (is_st) {
- d->head = *new_head;
+ d->head.raw = nh.raw;
success = 1;
} else
- success = rte_atomic32_cmpset(
- (uint32_t *)(uintptr_t)&d->head,
- *old_head, *new_head);
+ success = rte_atomic_compare_exchange_strong_explicit(
+ &d->head.raw, (uint64_t *)(uintptr_t)&oh.raw,
+ nh.raw, rte_memory_order_acquire,
+ rte_memory_order_acquire);
+
} while (unlikely(success == 0));
return n;
}
diff --git a/lib/ring/rte_ring_peek_elem_pvt.h b/lib/ring/rte_ring_peek_elem_pvt.h
index b5f0822b7e..e4dd0ae094 100644
--- a/lib/ring/rte_ring_peek_elem_pvt.h
+++ b/lib/ring/rte_ring_peek_elem_pvt.h
@@ -33,7 +33,7 @@ __rte_ring_st_get_tail(struct rte_ring_headtail *ht, uint32_t *tail,
{
uint32_t h, n, t;
- h = ht->head;
+ h = ht->head.val.pos;
t = ht->tail;
n = h - t;
@@ -58,7 +58,7 @@ __rte_ring_st_set_head_tail(struct rte_ring_headtail *ht, uint32_t tail,
RTE_SET_USED(enqueue);
pos = tail + num;
- ht->head = pos;
+ ht->head.val.pos = pos;
rte_atomic_store_explicit(&ht->tail, pos, rte_memory_order_release);
}
diff --git a/lib/ring/soring.c b/lib/ring/soring.c
index 929bde9697..baa449c872 100644
--- a/lib/ring/soring.c
+++ b/lib/ring/soring.c
@@ -90,7 +90,8 @@ __rte_soring_stage_finalize(struct soring_stage_headtail *sht,
* already finished.
*/
- head = rte_atomic_load_explicit(&sht->head, rte_memory_order_relaxed);
+ head = rte_atomic_load_explicit(&sht->head.val.pos,
+ rte_memory_order_relaxed);
n = RTE_MIN(head - ot.pos, maxn);
for (i = 0, tail = ot.pos; i < n; i += k, tail += k) {
@@ -213,22 +214,36 @@ __rte_soring_stage_move_head(struct soring_stage_headtail *d,
uint32_t *old_head, uint32_t *new_head, uint32_t *avail)
{
uint32_t n, tail;
+ union __rte_ring_head_cft nh, oh;
- *old_head = rte_atomic_load_explicit(&d->head,
+ oh.raw = rte_atomic_load_explicit(&d->head.raw,
rte_memory_order_acquire);
do {
n = num;
- tail = rte_atomic_load_explicit(&s->tail,
- rte_memory_order_relaxed);
+ *old_head = oh.val.pos;
+ tail = oh.val.cft;
*avail = capacity + tail - *old_head;
- if (n > *avail)
- n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *avail;
+
+ if (n > *avail) {
+ tail = rte_atomic_load_explicit(&s->tail,
+ rte_memory_order_relaxed);
+ *avail = capacity + tail - *old_head;
+
+ if (n > *avail)
+ n = (behavior == RTE_RING_QUEUE_FIXED) ?
+ 0 : *avail;
+ }
+
if (n == 0)
return 0;
+
*new_head = *old_head + n;
- } while (rte_atomic_compare_exchange_strong_explicit(&d->head,
- old_head, *new_head, rte_memory_order_acquire,
+ nh.val.pos = *new_head;
+ nh.val.cft = tail;
+
+ } while (rte_atomic_compare_exchange_strong_explicit(&d->head.raw,
+ &oh.raw, nh.raw, rte_memory_order_acquire,
rte_memory_order_acquire) == 0);
return n;
diff --git a/lib/ring/soring.h b/lib/ring/soring.h
index 3a3f6efa76..0fb333aa71 100644
--- a/lib/ring/soring.h
+++ b/lib/ring/soring.h
@@ -60,8 +60,8 @@ union soring_stage_tail {
struct soring_stage_headtail {
volatile union soring_stage_tail tail;
- enum rte_ring_sync_type unused; /**< unused */
- volatile RTE_ATOMIC(uint32_t) head;
+ enum rte_ring_sync_type __unused; /**< unused */
+ union __rte_ring_head_cft head;
};
/**
--
2.35.3
next prev parent reply other threads:[~2024-08-15 8:55 UTC|newest]
Thread overview: 31+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-08-15 8:53 [RFC 0/6] Stage-Ordered API and other extensions for ring library Konstantin Ananyev
2024-08-15 8:53 ` [RFC 1/6] ring: common functions for 'move head' ops Konstantin Ananyev
2024-08-15 8:53 ` [RFC 2/6] ring: make copying functions generic Konstantin Ananyev
2024-08-15 8:53 ` [RFC 3/6] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-08-15 11:11 ` Morten Brørup
2024-08-15 12:41 ` Konstantin Ananyev
2024-08-15 13:22 ` Morten Brørup
2024-08-26 19:04 ` Mattias Rönnblom
2024-09-03 13:55 ` Konstantin Ananyev
2024-08-15 8:53 ` [RFC 4/6] app/test: add unit tests for soring API Konstantin Ananyev
2024-08-15 8:53 ` [RFC 5/6] examples/l3fwd: make ACL work in pipeline and eventdev modes Konstantin Ananyev
2024-08-15 8:53 ` Konstantin Ananyev [this message]
2024-09-06 13:13 ` [RFCv2 0/6] Stage-Ordered API and other extensions for ring library Konstantin Ananyev
2024-09-06 13:13 ` [RFCv2 1/6] ring: common functions for 'move head' ops Konstantin Ananyev
2024-09-06 13:13 ` [RFCv2 2/6] ring: make copying functions generic Konstantin Ananyev
2024-09-06 13:13 ` [RFCv2 3/6] ring: make dump function more verbose Konstantin Ananyev
2024-09-06 13:13 ` [RFCv2 4/6] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-09-06 13:13 ` [RFCv2 5/6] app/test: add unit tests for soring API Konstantin Ananyev
2024-09-06 13:13 ` [RFCv2 6/6] examples/l3fwd: make ACL work in pipeline and eventdev modes Konstantin Ananyev
2024-09-16 12:37 ` [PATCH v3 0/5] Stage-Ordered API and other extensions for ring library Konstantin Ananyev
2024-09-16 12:37 ` [PATCH v3 1/5] ring: common functions for 'move head' ops Konstantin Ananyev
2024-09-16 12:37 ` [PATCH v3 2/5] ring: make copying functions generic Konstantin Ananyev
2024-09-16 12:37 ` [PATCH v3 3/5] ring: make dump function more verbose Konstantin Ananyev
2024-09-16 12:37 ` [PATCH v3 4/5] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-09-16 12:37 ` [PATCH v3 5/5] app/test: add unit tests for soring API Konstantin Ananyev
2024-09-17 12:09 ` [PATCH v4 0/5] Stage-Ordered API and other extensions for ring library Konstantin Ananyev
2024-09-17 12:09 ` [PATCH v4 1/5] ring: common functions for 'move head' ops Konstantin Ananyev
2024-09-17 12:09 ` [PATCH v4 2/5] ring: make copying functions generic Konstantin Ananyev
2024-09-17 12:09 ` [PATCH v4 3/5] ring: make dump function more verbose Konstantin Ananyev
2024-09-17 12:09 ` [PATCH v4 4/5] ring/soring: introduce Staged Ordered Ring Konstantin Ananyev
2024-09-17 12:09 ` [PATCH v4 5/5] app/test: add unit tests for soring API Konstantin Ananyev
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20240815085339.1434-7-konstantin.v.ananyev@yandex.ru \
--to=konstantin.v.ananyev@yandex.ru \
--cc=bruce.richardson@intel.com \
--cc=dev@dpdk.org \
--cc=drc@linux.vnet.ibm.com \
--cc=hemant.agrawal@nxp.com \
--cc=honnappa.nagarahalli@arm.com \
--cc=jerinj@marvell.com \
--cc=konstantin.ananyev@huawei.com \
--cc=mb@smartsharesystems.com \
--cc=ruifeng.wang@arm.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).