* [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
@ 2017-11-02 8:43 Jia He
2017-11-02 13:26 ` Ananyev, Konstantin
2017-11-02 17:23 ` Jerin Jacob
0 siblings, 2 replies; 13+ messages in thread
From: Jia He @ 2017-11-02 8:43 UTC (permalink / raw)
To: jerin.jacob, dev, olivier.matz
Cc: konstantin.ananyev, bruce.richardson, jianbo.liu, hemant.agrawal,
Jia He, jie2.liu, bing.zhao, jia.he
We watched a rte panic of mbuf_autotest in our qualcomm arm64 server.
As for the possible race condition, please refer to [1].
Furthermore, there are 2 options as suggested by Jerin:
1. use rte_smp_rmb
2. use load_acquire/store_release(refer to [2]).
CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by
default it is n;
The reason why providing 2 options is due to the performance benchmark
difference in different arm machines, please refer to [3].
Already fuctionally tested on the machines as follows:
on X86(passed the compilation)
on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=y
on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=n
[1] http://dpdk.org/ml/archives/dev/2017-October/078366.html
[2] https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h#L170
[3] http://dpdk.org/ml/archives/dev/2017-October/080861.html
---
Changelog:
V2: let users choose whether using load_acquire/store_release
V1: rte_smp_rmb() between 2 loads
Signed-off-by: Jia He <hejianet@gmail.com>
Signed-off-by: jie2.liu@hxt-semitech.com
Signed-off-by: bing.zhao@hxt-semitech.com
Signed-off-by: jia.he@hxt-semitech.com
Suggested-by: jerin.jacob@caviumnetworks.com
---
lib/librte_ring/Makefile | 4 +++-
lib/librte_ring/rte_ring.h | 38 ++++++++++++++++++++++++------
lib/librte_ring/rte_ring_arm64.h | 48 ++++++++++++++++++++++++++++++++++++++
lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++
4 files changed, 127 insertions(+), 8 deletions(-)
create mode 100644 lib/librte_ring/rte_ring_arm64.h
create mode 100644 lib/librte_ring/rte_ring_generic.h
diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
index e34d9d9..fa57a86 100644
--- a/lib/librte_ring/Makefile
+++ b/lib/librte_ring/Makefile
@@ -45,6 +45,8 @@ LIBABIVER := 1
SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
# install includes
-SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h
+SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
+ rte_ring_arm64.h \
+ rte_ring_generic.h
include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index 5e9b3b7..943b1f9 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -103,6 +103,18 @@ extern "C" {
#include <rte_memzone.h>
#include <rte_pause.h>
+/* In those strong memory models (e.g. x86), there is no need to add rmb()
+ * between load and load.
+ * In those weak models(powerpc/arm), there are 2 choices for the users
+ * 1.use rmb() memory barrier
+ * 2.use one-direcion load_acquire/store_release barrier
+ * It depends on performance test results. */
+#ifdef RTE_ARCH_ARM64
+#include "rte_ring_arm64.h"
+#else
+#include "rte_ring_generic.h"
+#endif
+
#define RTE_TAILQ_RING_NAME "RTE_RING"
enum rte_ring_queue_behavior {
@@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
while (unlikely(ht->tail != old_val))
rte_pause();
- ht->tail = new_val;
+ arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE);
}
/**
@@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
/* Reset n to the initial burst count */
n = max;
- *old_head = r->prod.head;
+ *old_head = arch_rte_atomic_load(&r->prod.head,
+ __ATOMIC_ACQUIRE);
const uint32_t cons_tail = r->cons.tail;
/*
* The subtraction is done between two unsigned 32bits value
@@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
if (is_sp)
r->prod.head = *new_head, success = 1;
else
- success = rte_atomic32_cmpset(&r->prod.head,
- *old_head, *new_head);
+ success = arch_rte_atomic32_cmpset(&r->prod.head,
+ old_head, *new_head,
+ 0, __ATOMIC_ACQUIRE,
+ __ATOMIC_RELAXED);
} while (unlikely(success == 0));
return n;
}
@@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
goto end;
ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
+
+#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
rte_smp_wmb();
+#endif
update_tail(&r->prod, prod_head, prod_next, is_sp);
end:
@@ -516,7 +534,8 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
/* Restore n as it may change every loop */
n = max;
- *old_head = r->cons.head;
+ *old_head = arch_rte_atomic_load(&r->cons.head,
+ __ATOMIC_ACQUIRE);
const uint32_t prod_tail = r->prod.tail;
/* The subtraction is done between two unsigned 32bits value
* (the result is always modulo 32 bits even if we have
@@ -535,8 +554,10 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
if (is_sc)
r->cons.head = *new_head, success = 1;
else
- success = rte_atomic32_cmpset(&r->cons.head, *old_head,
- *new_head);
+ success = arch_rte_atomic32_cmpset(&r->cons.head,
+ old_head, *new_head,
+ 0, __ATOMIC_ACQUIRE,
+ __ATOMIC_RELAXED);
} while (unlikely(success == 0));
return n;
}
@@ -575,7 +596,10 @@ __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
goto end;
DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
+
+#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
rte_smp_rmb();
+#endif
update_tail(&r->cons, cons_head, cons_next, is_sc);
diff --git a/lib/librte_ring/rte_ring_arm64.h b/lib/librte_ring/rte_ring_arm64.h
new file mode 100644
index 0000000..09438e5
--- /dev/null
+++ b/lib/librte_ring/rte_ring_arm64.h
@@ -0,0 +1,48 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2017 hxt-semitech. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of hxt-semitech nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_RING_ARM64_H_
+#define _RTE_RING_ARM64_H_
+
+#define arch_rte_atomic_store __atomic_store_n
+#define arch_rte_atomic32_cmpset __atomic_compare_exchange_n
+#ifdef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
+#define arch_rte_atomic_load __atomic_load_n
+#else
+#define arch_rte_atomic_load(a, b) \
+ *(a); \
+ rte_smp_rmb()
+
+#endif
+
+#endif /* _RTE_RING_ARM64_H_ */
+
diff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_ring_generic.h
new file mode 100644
index 0000000..4fb777c
--- /dev/null
+++ b/lib/librte_ring/rte_ring_generic.h
@@ -0,0 +1,45 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2017 hxt-semitech. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of hxt-semitech nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_RING_GENERIC_H_
+#define _RTE_RING_GENERIC_H_
+
+#define arch_rte_atomic_load(a, b) *(a)
+#define arch_rte_atomic_store(a, b, c) \
+do { \
+ *(a) = b; \
+} while(0)
+#define arch_rte_atomic32_cmpset(a, b, c, d, e, f) \
+ rte_atomic32_cmpset(a, *(b), c)
+
+#endif /* _RTE_RING_GENERIC_H_ */
+
--
2.7.4
^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
2017-11-02 8:43 [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing Jia He
@ 2017-11-02 13:26 ` Ananyev, Konstantin
2017-11-02 15:42 ` Jia He
2017-11-02 17:23 ` Jerin Jacob
1 sibling, 1 reply; 13+ messages in thread
From: Ananyev, Konstantin @ 2017-11-02 13:26 UTC (permalink / raw)
To: Jia He, jerin.jacob, dev, olivier.matz
Cc: Richardson, Bruce, jianbo.liu, hemant.agrawal, jie2.liu,
bing.zhao, jia.he
Hi Jia,
> -----Original Message-----
> From: Jia He [mailto:hejianet@gmail.com]
> Sent: Thursday, November 2, 2017 8:44 AM
> To: jerin.jacob@caviumnetworks.com; dev@dpdk.org; olivier.matz@6wind.com
> Cc: Ananyev, Konstantin <konstantin.ananyev@intel.com>; Richardson, Bruce <bruce.richardson@intel.com>; jianbo.liu@arm.com;
> hemant.agrawal@nxp.com; Jia He <hejianet@gmail.com>; jie2.liu@hxt-semitech.com; bing.zhao@hxt-semitech.com; jia.he@hxt-
> semitech.com
> Subject: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
>
> We watched a rte panic of mbuf_autotest in our qualcomm arm64 server.
> As for the possible race condition, please refer to [1].
>
> Furthermore, there are 2 options as suggested by Jerin:
> 1. use rte_smp_rmb
> 2. use load_acquire/store_release(refer to [2]).
> CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by
> default it is n;
>
> The reason why providing 2 options is due to the performance benchmark
> difference in different arm machines, please refer to [3].
>
> Already fuctionally tested on the machines as follows:
> on X86(passed the compilation)
> on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=y
> on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=n
>
> [1] http://dpdk.org/ml/archives/dev/2017-October/078366.html
> [2] https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h#L170
> [3] http://dpdk.org/ml/archives/dev/2017-October/080861.html
>
> ---
> Changelog:
> V2: let users choose whether using load_acquire/store_release
> V1: rte_smp_rmb() between 2 loads
>
> Signed-off-by: Jia He <hejianet@gmail.com>
> Signed-off-by: jie2.liu@hxt-semitech.com
> Signed-off-by: bing.zhao@hxt-semitech.com
> Signed-off-by: jia.he@hxt-semitech.com
> Suggested-by: jerin.jacob@caviumnetworks.com
> ---
> lib/librte_ring/Makefile | 4 +++-
> lib/librte_ring/rte_ring.h | 38 ++++++++++++++++++++++++------
> lib/librte_ring/rte_ring_arm64.h | 48 ++++++++++++++++++++++++++++++++++++++
> lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++
> 4 files changed, 127 insertions(+), 8 deletions(-)
> create mode 100644 lib/librte_ring/rte_ring_arm64.h
> create mode 100644 lib/librte_ring/rte_ring_generic.h
>
> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
> index e34d9d9..fa57a86 100644
> --- a/lib/librte_ring/Makefile
> +++ b/lib/librte_ring/Makefile
> @@ -45,6 +45,8 @@ LIBABIVER := 1
> SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
>
> # install includes
> -SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h
> +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
> + rte_ring_arm64.h \
> + rte_ring_generic.h
>
> include $(RTE_SDK)/mk/rte.lib.mk
> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> index 5e9b3b7..943b1f9 100644
> --- a/lib/librte_ring/rte_ring.h
> +++ b/lib/librte_ring/rte_ring.h
> @@ -103,6 +103,18 @@ extern "C" {
> #include <rte_memzone.h>
> #include <rte_pause.h>
>
> +/* In those strong memory models (e.g. x86), there is no need to add rmb()
> + * between load and load.
> + * In those weak models(powerpc/arm), there are 2 choices for the users
> + * 1.use rmb() memory barrier
> + * 2.use one-direcion load_acquire/store_release barrier
> + * It depends on performance test results. */
> +#ifdef RTE_ARCH_ARM64
> +#include "rte_ring_arm64.h"
> +#else
> +#include "rte_ring_generic.h"
> +#endif
> +
> #define RTE_TAILQ_RING_NAME "RTE_RING"
>
> enum rte_ring_queue_behavior {
> @@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
> while (unlikely(ht->tail != old_val))
> rte_pause();
>
> - ht->tail = new_val;
> + arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE);
> }
>
> /**
> @@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
> /* Reset n to the initial burst count */
> n = max;
>
> - *old_head = r->prod.head;
> + *old_head = arch_rte_atomic_load(&r->prod.head,
> + __ATOMIC_ACQUIRE);
> const uint32_t cons_tail = r->cons.tail;
The code starts to look a bit messy with all these arch specific macros...
So I wonder wouldn't it be more cleaner to:
1. move existing __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail
into rte_ring_generic.h
2. Add rte_smp_rmb into generic __rte_ring_move_prod_head/__rte_ring_move_cons_head
(as was in v1 of your patch).
3. Introduce ARM specific versions of __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail
in the rte_ring_arm64.h
That way we will keep ogneric code simple and clean, while still allowing arch specific optimizations.
> /*
> * The subtraction is done between two unsigned 32bits value
> @@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
> if (is_sp)
> r->prod.head = *new_head, success = 1;
> else
> - success = rte_atomic32_cmpset(&r->prod.head,
> - *old_head, *new_head);
> + success = arch_rte_atomic32_cmpset(&r->prod.head,
> + old_head, *new_head,
> + 0, __ATOMIC_ACQUIRE,
> + __ATOMIC_RELAXED);
> } while (unlikely(success == 0));
> return n;
> }
> @@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
> goto end;
>
> ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
> +
> +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
I wonder why do we need that macro?
Would be there situations when smp_wmb() are not needed here?
Konstantin
^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
2017-11-02 13:26 ` Ananyev, Konstantin
@ 2017-11-02 15:42 ` Jia He
2017-11-02 16:16 ` Ananyev, Konstantin
0 siblings, 1 reply; 13+ messages in thread
From: Jia He @ 2017-11-02 15:42 UTC (permalink / raw)
To: Ananyev, Konstantin, jerin.jacob, dev, olivier.matz
Cc: Richardson, Bruce, jianbo.liu, hemant.agrawal, jie2.liu,
bing.zhao, jia.he
Hi Ananyev
On 11/2/2017 9:26 PM, Ananyev, Konstantin Wrote:
> Hi Jia,
>
>> -----Original Message-----
>> From: Jia He [mailto:hejianet@gmail.com]
>> Sent: Thursday, November 2, 2017 8:44 AM
>> To: jerin.jacob@caviumnetworks.com; dev@dpdk.org; olivier.matz@6wind.com
>> Cc: Ananyev, Konstantin <konstantin.ananyev@intel.com>; Richardson, Bruce <bruce.richardson@intel.com>; jianbo.liu@arm.com;
>> hemant.agrawal@nxp.com; Jia He <hejianet@gmail.com>; jie2.liu@hxt-semitech.com; bing.zhao@hxt-semitech.com; jia.he@hxt-
>> semitech.com
>> Subject: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
>>
>> We watched a rte panic of mbuf_autotest in our qualcomm arm64 server.
>> As for the possible race condition, please refer to [1].
>>
>> Furthermore, there are 2 options as suggested by Jerin:
>> 1. use rte_smp_rmb
>> 2. use load_acquire/store_release(refer to [2]).
>> CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by
>> default it is n;
>>
>> The reason why providing 2 options is due to the performance benchmark
>> difference in different arm machines, please refer to [3].
>>
>> Already fuctionally tested on the machines as follows:
>> on X86(passed the compilation)
>> on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=y
>> on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=n
>>
>> [1] http://dpdk.org/ml/archives/dev/2017-October/078366.html
>> [2] https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h#L170
>> [3] http://dpdk.org/ml/archives/dev/2017-October/080861.html
>>
>> ---
>> Changelog:
>> V2: let users choose whether using load_acquire/store_release
>> V1: rte_smp_rmb() between 2 loads
>>
>> Signed-off-by: Jia He <hejianet@gmail.com>
>> Signed-off-by: jie2.liu@hxt-semitech.com
>> Signed-off-by: bing.zhao@hxt-semitech.com
>> Signed-off-by: jia.he@hxt-semitech.com
>> Suggested-by: jerin.jacob@caviumnetworks.com
>> ---
>> lib/librte_ring/Makefile | 4 +++-
>> lib/librte_ring/rte_ring.h | 38 ++++++++++++++++++++++++------
>> lib/librte_ring/rte_ring_arm64.h | 48 ++++++++++++++++++++++++++++++++++++++
>> lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++
>> 4 files changed, 127 insertions(+), 8 deletions(-)
>> create mode 100644 lib/librte_ring/rte_ring_arm64.h
>> create mode 100644 lib/librte_ring/rte_ring_generic.h
>>
>> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
>> index e34d9d9..fa57a86 100644
>> --- a/lib/librte_ring/Makefile
>> +++ b/lib/librte_ring/Makefile
>> @@ -45,6 +45,8 @@ LIBABIVER := 1
>> SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
>>
>> # install includes
>> -SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h
>> +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
>> + rte_ring_arm64.h \
>> + rte_ring_generic.h
>>
>> include $(RTE_SDK)/mk/rte.lib.mk
>> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
>> index 5e9b3b7..943b1f9 100644
>> --- a/lib/librte_ring/rte_ring.h
>> +++ b/lib/librte_ring/rte_ring.h
>> @@ -103,6 +103,18 @@ extern "C" {
>> #include <rte_memzone.h>
>> #include <rte_pause.h>
>>
>> +/* In those strong memory models (e.g. x86), there is no need to add rmb()
>> + * between load and load.
>> + * In those weak models(powerpc/arm), there are 2 choices for the users
>> + * 1.use rmb() memory barrier
>> + * 2.use one-direcion load_acquire/store_release barrier
>> + * It depends on performance test results. */
>> +#ifdef RTE_ARCH_ARM64
>> +#include "rte_ring_arm64.h"
>> +#else
>> +#include "rte_ring_generic.h"
>> +#endif
>> +
>> #define RTE_TAILQ_RING_NAME "RTE_RING"
>>
>> enum rte_ring_queue_behavior {
>> @@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
>> while (unlikely(ht->tail != old_val))
>> rte_pause();
>>
>> - ht->tail = new_val;
>> + arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE);
>> }
>>
>> /**
>> @@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
>> /* Reset n to the initial burst count */
>> n = max;
>>
>> - *old_head = r->prod.head;
>> + *old_head = arch_rte_atomic_load(&r->prod.head,
>> + __ATOMIC_ACQUIRE);
>> const uint32_t cons_tail = r->cons.tail;
> The code starts to look a bit messy with all these arch specific macros...
> So I wonder wouldn't it be more cleaner to:
>
> 1. move existing __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail
> into rte_ring_generic.h
> 2. Add rte_smp_rmb into generic __rte_ring_move_prod_head/__rte_ring_move_cons_head
> (as was in v1 of your patch).
> 3. Introduce ARM specific versions of __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail
> in the rte_ring_arm64.h
>
> That way we will keep ogneric code simple and clean, while still allowing arch specific optimizations.
Thanks for your review.
But as per your suggestion, there will be at least 2 copies of
__rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail.
Thus, if there are any bugs in the future, both 2 copies have to be
changed, right?
>
>> /*
>> * The subtraction is done between two unsigned 32bits value
>> @@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
>> if (is_sp)
>> r->prod.head = *new_head, success = 1;
>> else
>> - success = rte_atomic32_cmpset(&r->prod.head,
>> - *old_head, *new_head);
>> + success = arch_rte_atomic32_cmpset(&r->prod.head,
>> + old_head, *new_head,
>> + 0, __ATOMIC_ACQUIRE,
>> + __ATOMIC_RELAXED);
>> } while (unlikely(success == 0));
>> return n;
>> }
>> @@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
>> goto end;
>>
>> ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
>> +
>> +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
> I wonder why do we need that macro?
> Would be there situations when smp_wmb() are not needed here?
If the dpdk user chooses the config acquire/release, the store_release
barrier in update_tail together with
the load_acquire barrier pair in __rte_ring_move_{prod,cons}_head
guarantee the order. So smp_wmb() is not required here. Please refer to
the freebsd ring implementation and Jerin's debug patch.
https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h
https://github.com/jerinjacobk/mytests/blob/master/ring/0001-ring-using-c11-memory-model.patch
---
Cheers,
Jia
> Konstantin
>
>
--
Cheers,
Jia
^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
2017-11-02 15:42 ` Jia He
@ 2017-11-02 16:16 ` Ananyev, Konstantin
2017-11-02 17:00 ` Jerin Jacob
0 siblings, 1 reply; 13+ messages in thread
From: Ananyev, Konstantin @ 2017-11-02 16:16 UTC (permalink / raw)
To: Jia He, jerin.jacob, dev, olivier.matz
Cc: Richardson, Bruce, jianbo.liu, hemant.agrawal, jie2.liu,
bing.zhao, jia.he
> -----Original Message-----
> From: Jia He [mailto:hejianet@gmail.com]
> Sent: Thursday, November 2, 2017 3:43 PM
> To: Ananyev, Konstantin <konstantin.ananyev@intel.com>; jerin.jacob@caviumnetworks.com; dev@dpdk.org; olivier.matz@6wind.com
> Cc: Richardson, Bruce <bruce.richardson@intel.com>; jianbo.liu@arm.com; hemant.agrawal@nxp.com; jie2.liu@hxt-semitech.com;
> bing.zhao@hxt-semitech.com; jia.he@hxt-semitech.com
> Subject: Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
>
> Hi Ananyev
>
>
> On 11/2/2017 9:26 PM, Ananyev, Konstantin Wrote:
> > Hi Jia,
> >
> >> -----Original Message-----
> >> From: Jia He [mailto:hejianet@gmail.com]
> >> Sent: Thursday, November 2, 2017 8:44 AM
> >> To: jerin.jacob@caviumnetworks.com; dev@dpdk.org; olivier.matz@6wind.com
> >> Cc: Ananyev, Konstantin <konstantin.ananyev@intel.com>; Richardson, Bruce <bruce.richardson@intel.com>; jianbo.liu@arm.com;
> >> hemant.agrawal@nxp.com; Jia He <hejianet@gmail.com>; jie2.liu@hxt-semitech.com; bing.zhao@hxt-semitech.com; jia.he@hxt-
> >> semitech.com
> >> Subject: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
> >>
> >> We watched a rte panic of mbuf_autotest in our qualcomm arm64 server.
> >> As for the possible race condition, please refer to [1].
> >>
> >> Furthermore, there are 2 options as suggested by Jerin:
> >> 1. use rte_smp_rmb
> >> 2. use load_acquire/store_release(refer to [2]).
> >> CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by
> >> default it is n;
> >>
> >> The reason why providing 2 options is due to the performance benchmark
> >> difference in different arm machines, please refer to [3].
> >>
> >> Already fuctionally tested on the machines as follows:
> >> on X86(passed the compilation)
> >> on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=y
> >> on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=n
> >>
> >> [1] http://dpdk.org/ml/archives/dev/2017-October/078366.html
> >> [2] https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h#L170
> >> [3] http://dpdk.org/ml/archives/dev/2017-October/080861.html
> >>
> >> ---
> >> Changelog:
> >> V2: let users choose whether using load_acquire/store_release
> >> V1: rte_smp_rmb() between 2 loads
> >>
> >> Signed-off-by: Jia He <hejianet@gmail.com>
> >> Signed-off-by: jie2.liu@hxt-semitech.com
> >> Signed-off-by: bing.zhao@hxt-semitech.com
> >> Signed-off-by: jia.he@hxt-semitech.com
> >> Suggested-by: jerin.jacob@caviumnetworks.com
> >> ---
> >> lib/librte_ring/Makefile | 4 +++-
> >> lib/librte_ring/rte_ring.h | 38 ++++++++++++++++++++++++------
> >> lib/librte_ring/rte_ring_arm64.h | 48 ++++++++++++++++++++++++++++++++++++++
> >> lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++
> >> 4 files changed, 127 insertions(+), 8 deletions(-)
> >> create mode 100644 lib/librte_ring/rte_ring_arm64.h
> >> create mode 100644 lib/librte_ring/rte_ring_generic.h
> >>
> >> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
> >> index e34d9d9..fa57a86 100644
> >> --- a/lib/librte_ring/Makefile
> >> +++ b/lib/librte_ring/Makefile
> >> @@ -45,6 +45,8 @@ LIBABIVER := 1
> >> SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
> >>
> >> # install includes
> >> -SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h
> >> +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
> >> + rte_ring_arm64.h \
> >> + rte_ring_generic.h
> >>
> >> include $(RTE_SDK)/mk/rte.lib.mk
> >> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> >> index 5e9b3b7..943b1f9 100644
> >> --- a/lib/librte_ring/rte_ring.h
> >> +++ b/lib/librte_ring/rte_ring.h
> >> @@ -103,6 +103,18 @@ extern "C" {
> >> #include <rte_memzone.h>
> >> #include <rte_pause.h>
> >>
> >> +/* In those strong memory models (e.g. x86), there is no need to add rmb()
> >> + * between load and load.
> >> + * In those weak models(powerpc/arm), there are 2 choices for the users
> >> + * 1.use rmb() memory barrier
> >> + * 2.use one-direcion load_acquire/store_release barrier
> >> + * It depends on performance test results. */
> >> +#ifdef RTE_ARCH_ARM64
> >> +#include "rte_ring_arm64.h"
> >> +#else
> >> +#include "rte_ring_generic.h"
> >> +#endif
> >> +
> >> #define RTE_TAILQ_RING_NAME "RTE_RING"
> >>
> >> enum rte_ring_queue_behavior {
> >> @@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
> >> while (unlikely(ht->tail != old_val))
> >> rte_pause();
> >>
> >> - ht->tail = new_val;
> >> + arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE);
> >> }
> >>
> >> /**
> >> @@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
> >> /* Reset n to the initial burst count */
> >> n = max;
> >>
> >> - *old_head = r->prod.head;
> >> + *old_head = arch_rte_atomic_load(&r->prod.head,
> >> + __ATOMIC_ACQUIRE);
> >> const uint32_t cons_tail = r->cons.tail;
> > The code starts to look a bit messy with all these arch specific macros...
> > So I wonder wouldn't it be more cleaner to:
> >
> > 1. move existing __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail
> > into rte_ring_generic.h
> > 2. Add rte_smp_rmb into generic __rte_ring_move_prod_head/__rte_ring_move_cons_head
> > (as was in v1 of your patch).
> > 3. Introduce ARM specific versions of __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail
> > in the rte_ring_arm64.h
> >
> > That way we will keep ogneric code simple and clean, while still allowing arch specific optimizations.
> Thanks for your review.
> But as per your suggestion, there will be at least 2 copies of
> __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail.
> Thus, if there are any bugs in the future, both 2 copies have to be
> changed, right?
Yes, there would be some code duplication.
Though generic code would be much cleaner and simpler to read in that case.
Which is worth some dups I think.
> >
> >> /*
> >> * The subtraction is done between two unsigned 32bits value
> >> @@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
> >> if (is_sp)
> >> r->prod.head = *new_head, success = 1;
> >> else
> >> - success = rte_atomic32_cmpset(&r->prod.head,
> >> - *old_head, *new_head);
> >> + success = arch_rte_atomic32_cmpset(&r->prod.head,
> >> + old_head, *new_head,
> >> + 0, __ATOMIC_ACQUIRE,
> >> + __ATOMIC_RELAXED);
> >> } while (unlikely(success == 0));
> >> return n;
> >> }
> >> @@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
> >> goto end;
> >>
> >> ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
> >> +
> >> +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
> > I wonder why do we need that macro?
> > Would be there situations when smp_wmb() are not needed here?
> If the dpdk user chooses the config acquire/release, the store_release
> barrier in update_tail together with
> the load_acquire barrier pair in __rte_ring_move_{prod,cons}_head
> guarantee the order. So smp_wmb() is not required here. Please refer to
> the freebsd ring implementation and Jerin's debug patch.
> https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h
> https://github.com/jerinjacobk/mytests/blob/master/ring/0001-ring-using-c11-memory-model.patch
Ok, so because you are doing
arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE);
inside update_tail() you don't need mb() anymore here...
In that case, can we probably hide all that logic inside update_tail()?
I.E. move sp_rmb/smp_wmb into update_tail() and then for arm64 you'll
Have your special one with atomic_store()
Something like that for generic version :
static __rte_always_inline void
update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
- uint32_t single)
+ uint32_t single, uint32_t enqueue)
{
+ if (enqueue)
+ rte_smp_wmb();
+ else
+ rte_smp_rmb();
/*
* If there are other enqueues/dequeues in progress that preceded us,
* we need to wait for them to complete
*/
if (!single)
while (unlikely(ht->tail != old_val))
rte_pause();
ht->tail = new_val;
}
....
static __rte_always_inline unsigned int
__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
unsigned int n, enum rte_ring_queue_behavior behavior,
int is_sp, unsigned int *free_space)
{
.....
ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
- rte_smp_wmb();
- update_tail(&r->prod, prod_head, prod_next, is_sp);
+ update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
....
static __rte_always_inline unsigned int
__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
unsigned int n, enum rte_ring_queue_behavior behavior,
int is_sc, unsigned int *available)
{
....
DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
- rte_smp_rmb();
- update_tail(&r->cons, cons_head, cons_next, is_sc);
+ update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
Konstantin
>
> ---
> Cheers,
> Jia
> > Konstantin
> >
> >
>
> --
> Cheers,
> Jia
^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
2017-11-02 16:16 ` Ananyev, Konstantin
@ 2017-11-02 17:00 ` Jerin Jacob
0 siblings, 0 replies; 13+ messages in thread
From: Jerin Jacob @ 2017-11-02 17:00 UTC (permalink / raw)
To: Ananyev, Konstantin
Cc: Jia He, dev, olivier.matz, Richardson, Bruce, jianbo.liu,
hemant.agrawal, jie2.liu, bing.zhao, jia.he
-----Original Message-----
> Date: Thu, 2 Nov 2017 16:16:33 +0000
> From: "Ananyev, Konstantin" <konstantin.ananyev@intel.com>
> To: Jia He <hejianet@gmail.com>, "jerin.jacob@caviumnetworks.com"
> <jerin.jacob@caviumnetworks.com>, "dev@dpdk.org" <dev@dpdk.org>,
> "olivier.matz@6wind.com" <olivier.matz@6wind.com>
> CC: "Richardson, Bruce" <bruce.richardson@intel.com>, "jianbo.liu@arm.com"
> <jianbo.liu@arm.com>, "hemant.agrawal@nxp.com" <hemant.agrawal@nxp.com>,
> "jie2.liu@hxt-semitech.com" <jie2.liu@hxt-semitech.com>,
> "bing.zhao@hxt-semitech.com" <bing.zhao@hxt-semitech.com>,
> "jia.he@hxt-semitech.com" <jia.he@hxt-semitech.com>
> Subject: RE: [PATCH v2] ring: guarantee ordering of cons/prod loading when
> doing
>
>
>
> > -----Original Message-----
> > From: Jia He [mailto:hejianet@gmail.com]
> > Sent: Thursday, November 2, 2017 3:43 PM
> > To: Ananyev, Konstantin <konstantin.ananyev@intel.com>; jerin.jacob@caviumnetworks.com; dev@dpdk.org; olivier.matz@6wind.com
> > Cc: Richardson, Bruce <bruce.richardson@intel.com>; jianbo.liu@arm.com; hemant.agrawal@nxp.com; jie2.liu@hxt-semitech.com;
> > bing.zhao@hxt-semitech.com; jia.he@hxt-semitech.com
> > Subject: Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
> >
> > Hi Ananyev
> >
> >
> > On 11/2/2017 9:26 PM, Ananyev, Konstantin Wrote:
> > > Hi Jia,
> > >
> > >> -----Original Message-----
> > >> From: Jia He [mailto:hejianet@gmail.com]
> > >> Sent: Thursday, November 2, 2017 8:44 AM
> > >> To: jerin.jacob@caviumnetworks.com; dev@dpdk.org; olivier.matz@6wind.com
> > >> Cc: Ananyev, Konstantin <konstantin.ananyev@intel.com>; Richardson, Bruce <bruce.richardson@intel.com>; jianbo.liu@arm.com;
> > >> hemant.agrawal@nxp.com; Jia He <hejianet@gmail.com>; jie2.liu@hxt-semitech.com; bing.zhao@hxt-semitech.com; jia.he@hxt-
> > >> semitech.com
> > >> Subject: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
> > >>
> > >> We watched a rte panic of mbuf_autotest in our qualcomm arm64 server.
> > >> As for the possible race condition, please refer to [1].
> > >>
> > >> Furthermore, there are 2 options as suggested by Jerin:
> > >> 1. use rte_smp_rmb
> > >> 2. use load_acquire/store_release(refer to [2]).
> > >> CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by
> > >> default it is n;
> > >>
> > >> The reason why providing 2 options is due to the performance benchmark
> > >> difference in different arm machines, please refer to [3].
> > >>
> > >> Already fuctionally tested on the machines as follows:
> > >> on X86(passed the compilation)
> > >> on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=y
> > >> on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=n
> > >>
> > >> [1] http://dpdk.org/ml/archives/dev/2017-October/078366.html
> > >> [2] https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h#L170
> > >> [3] http://dpdk.org/ml/archives/dev/2017-October/080861.html
> > >>
> > >> ---
> > >> Changelog:
> > >> V2: let users choose whether using load_acquire/store_release
> > >> V1: rte_smp_rmb() between 2 loads
> > >>
> > >> Signed-off-by: Jia He <hejianet@gmail.com>
> > >> Signed-off-by: jie2.liu@hxt-semitech.com
> > >> Signed-off-by: bing.zhao@hxt-semitech.com
> > >> Signed-off-by: jia.he@hxt-semitech.com
> > >> Suggested-by: jerin.jacob@caviumnetworks.com
> > >> ---
> > >> lib/librte_ring/Makefile | 4 +++-
> > >> lib/librte_ring/rte_ring.h | 38 ++++++++++++++++++++++++------
> > >> lib/librte_ring/rte_ring_arm64.h | 48 ++++++++++++++++++++++++++++++++++++++
> > >> lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++
> > >> 4 files changed, 127 insertions(+), 8 deletions(-)
> > >> create mode 100644 lib/librte_ring/rte_ring_arm64.h
> > >> create mode 100644 lib/librte_ring/rte_ring_generic.h
> > >>
> > >> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
> > >> index e34d9d9..fa57a86 100644
> > >> --- a/lib/librte_ring/Makefile
> > >> +++ b/lib/librte_ring/Makefile
> > >> @@ -45,6 +45,8 @@ LIBABIVER := 1
> > >> SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
> > >>
> > >> # install includes
> > >> -SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h
> > >> +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
> > >> + rte_ring_arm64.h \
> > >> + rte_ring_generic.h
> > >>
> > >> include $(RTE_SDK)/mk/rte.lib.mk
> > >> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> > >> index 5e9b3b7..943b1f9 100644
> > >> --- a/lib/librte_ring/rte_ring.h
> > >> +++ b/lib/librte_ring/rte_ring.h
> > >> @@ -103,6 +103,18 @@ extern "C" {
> > >> #include <rte_memzone.h>
> > >> #include <rte_pause.h>
> > >>
> > >> +/* In those strong memory models (e.g. x86), there is no need to add rmb()
> > >> + * between load and load.
> > >> + * In those weak models(powerpc/arm), there are 2 choices for the users
> > >> + * 1.use rmb() memory barrier
> > >> + * 2.use one-direcion load_acquire/store_release barrier
> > >> + * It depends on performance test results. */
> > >> +#ifdef RTE_ARCH_ARM64
> > >> +#include "rte_ring_arm64.h"
> > >> +#else
> > >> +#include "rte_ring_generic.h"
> > >> +#endif
> > >> +
> > >> #define RTE_TAILQ_RING_NAME "RTE_RING"
> > >>
> > >> enum rte_ring_queue_behavior {
> > >> @@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
> > >> while (unlikely(ht->tail != old_val))
> > >> rte_pause();
> > >>
> > >> - ht->tail = new_val;
> > >> + arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE);
> > >> }
> > >>
> > >> /**
> > >> @@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
> > >> /* Reset n to the initial burst count */
> > >> n = max;
> > >>
> > >> - *old_head = r->prod.head;
> > >> + *old_head = arch_rte_atomic_load(&r->prod.head,
> > >> + __ATOMIC_ACQUIRE);
> > >> const uint32_t cons_tail = r->cons.tail;
> > > The code starts to look a bit messy with all these arch specific macros...
> > > So I wonder wouldn't it be more cleaner to:
> > >
> > > 1. move existing __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail
> > > into rte_ring_generic.h
> > > 2. Add rte_smp_rmb into generic __rte_ring_move_prod_head/__rte_ring_move_cons_head
> > > (as was in v1 of your patch).
> > > 3. Introduce ARM specific versions of __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail
> > > in the rte_ring_arm64.h
> > >
> > > That way we will keep ogneric code simple and clean, while still allowing arch specific optimizations.
> > Thanks for your review.
> > But as per your suggestion, there will be at least 2 copies of
> > __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail.
> > Thus, if there are any bugs in the future, both 2 copies have to be
> > changed, right?
>
> Yes, there would be some code duplication.
> Though generic code would be much cleaner and simpler to read in that case.
> Which is worth some dups I think.
I agree with Konstantin here.
^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
2017-11-02 8:43 [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing Jia He
2017-11-02 13:26 ` Ananyev, Konstantin
@ 2017-11-02 17:23 ` Jerin Jacob
2017-11-03 1:46 ` Jia He
1 sibling, 1 reply; 13+ messages in thread
From: Jerin Jacob @ 2017-11-02 17:23 UTC (permalink / raw)
To: Jia He
Cc: dev, olivier.matz, konstantin.ananyev, bruce.richardson,
jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he
-----Original Message-----
> Date: Thu, 2 Nov 2017 08:43:30 +0000
> From: Jia He <hejianet@gmail.com>
> To: jerin.jacob@caviumnetworks.com, dev@dpdk.org, olivier.matz@6wind.com
> Cc: konstantin.ananyev@intel.com, bruce.richardson@intel.com,
> jianbo.liu@arm.com, hemant.agrawal@nxp.com, Jia He <hejianet@gmail.com>,
> jie2.liu@hxt-semitech.com, bing.zhao@hxt-semitech.com,
> jia.he@hxt-semitech.com
> Subject: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
> X-Mailer: git-send-email 2.7.4
>
> We watched a rte panic of mbuf_autotest in our qualcomm arm64 server.
> As for the possible race condition, please refer to [1].
Hi Jia,
In addition to Konstantin comments. Please find below some review
comments.
>
> Furthermore, there are 2 options as suggested by Jerin:
> 1. use rte_smp_rmb
> 2. use load_acquire/store_release(refer to [2]).
> CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by
I think, The better name would be CONFIG_RTE_RING_USE_C11_MEM_MODEL
or something like that.
> default it is n;
>
> ---
> Changelog:
> V2: let users choose whether using load_acquire/store_release
> V1: rte_smp_rmb() between 2 loads
>
> Signed-off-by: Jia He <hejianet@gmail.com>
> Signed-off-by: jie2.liu@hxt-semitech.com
> Signed-off-by: bing.zhao@hxt-semitech.com
> Signed-off-by: jia.he@hxt-semitech.com
> Suggested-by: jerin.jacob@caviumnetworks.com
> ---
> lib/librte_ring/Makefile | 4 +++-
> lib/librte_ring/rte_ring.h | 38 ++++++++++++++++++++++++------
> lib/librte_ring/rte_ring_arm64.h | 48 ++++++++++++++++++++++++++++++++++++++
> lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++
> 4 files changed, 127 insertions(+), 8 deletions(-)
> create mode 100644 lib/librte_ring/rte_ring_arm64.h
> create mode 100644 lib/librte_ring/rte_ring_generic.h
>
> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
> index e34d9d9..fa57a86 100644
> --- a/lib/librte_ring/Makefile
> +++ b/lib/librte_ring/Makefile
> @@ -45,6 +45,8 @@ LIBABIVER := 1
> SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
>
> # install includes
> -SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h
> +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
> + rte_ring_arm64.h \
It is really not specific to arm64. We could rename it to rte_ring_c11_mem.h or
something like that to reflect the implementation based on c11 memory
model.
> + rte_ring_generic.h
>
> include $(RTE_SDK)/mk/rte.lib.mk
> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> index 5e9b3b7..943b1f9 100644
> --- a/lib/librte_ring/rte_ring.h
> +++ b/lib/librte_ring/rte_ring.h
> @@ -103,6 +103,18 @@ extern "C" {
> #include <rte_memzone.h>
> #include <rte_pause.h>
>
> +/* In those strong memory models (e.g. x86), there is no need to add rmb()
> + * between load and load.
> + * In those weak models(powerpc/arm), there are 2 choices for the users
> + * 1.use rmb() memory barrier
> + * 2.use one-direcion load_acquire/store_release barrier
> + * It depends on performance test results. */
> +#ifdef RTE_ARCH_ARM64
s/RTE_ARCH_ARM64/RTE_RING_USE_C11_MEM_MODEL and update the generic arm64 config.
By that way it can used by another architecture like ppc if they choose to do so.
> +#include "rte_ring_arm64.h"
> +#else
> +#include "rte_ring_generic.h"
> +#endif
> +
> #define RTE_TAILQ_RING_NAME "RTE_RING"
>
> enum rte_ring_queue_behavior {
> @@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
> while (unlikely(ht->tail != old_val))
> rte_pause();
>
> - ht->tail = new_val;
> + arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE);
> }
>
> /**
> @@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
> /* Reset n to the initial burst count */
> n = max;
>
> - *old_head = r->prod.head;
> + *old_head = arch_rte_atomic_load(&r->prod.head,
> + __ATOMIC_ACQUIRE);
Same as Konstantin comments, i.e move to this function to c11 memory model
header file
> const uint32_t cons_tail = r->cons.tail;
> /*
> * The subtraction is done between two unsigned 32bits value
> @@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
> if (is_sp)
> r->prod.head = *new_head, success = 1;
> else
> - success = rte_atomic32_cmpset(&r->prod.head,
> - *old_head, *new_head);
> + success = arch_rte_atomic32_cmpset(&r->prod.head,
> + old_head, *new_head,
> + 0, __ATOMIC_ACQUIRE,
> + __ATOMIC_RELAXED);
> } while (unlikely(success == 0));
> return n;
> }
> @@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
> goto end;
>
> ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
> +
> +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
This #define will be removed if the function moves.
> rte_smp_wmb();
> +#endif
>
> update_tail(&r->prod, prod_head, prod_next, is_sp);
> end:
> @@ -516,7 +534,8 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
> /* Restore n as it may change every loop */
> n = max;
>
> - *old_head = r->cons.head;
> + *old_head = arch_rte_atomic_load(&r->cons.head,
> + __ATOMIC_ACQUIRE);
> const uint32_t prod_tail = r->prod.tail;
> /* The subtraction is done between two unsigned 32bits value
> * (the result is always modulo 32 bits even if we have
> @@ -535,8 +554,10 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
> if (is_sc)
> r->cons.head = *new_head, success = 1;
> else
> - success = rte_atomic32_cmpset(&r->cons.head, *old_head,
> - *new_head);
> + success = arch_rte_atomic32_cmpset(&r->cons.head,
> + old_head, *new_head,
> + 0, __ATOMIC_ACQUIRE,
> + __ATOMIC_RELAXED);
> } while (unlikely(success == 0));
> return n;
> }
> @@ -575,7 +596,10 @@ __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
> goto end;
>
> DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
> +
> +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
This #define will be removed if the function moves.
> rte_smp_rmb();
> +#endif
^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
2017-11-02 17:23 ` Jerin Jacob
@ 2017-11-03 1:46 ` Jia He
2017-11-03 12:56 ` Jerin Jacob
0 siblings, 1 reply; 13+ messages in thread
From: Jia He @ 2017-11-03 1:46 UTC (permalink / raw)
To: Jerin Jacob
Cc: dev, olivier.matz, konstantin.ananyev, bruce.richardson,
jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he
Hi Jerin
On 11/3/2017 1:23 AM, Jerin Jacob Wrote:
> -----Original Message-----
>> Date: Thu, 2 Nov 2017 08:43:30 +0000
>> From: Jia He <hejianet@gmail.com>
>> To: jerin.jacob@caviumnetworks.com, dev@dpdk.org, olivier.matz@6wind.com
>> Cc: konstantin.ananyev@intel.com, bruce.richardson@intel.com,
>> jianbo.liu@arm.com, hemant.agrawal@nxp.com, Jia He <hejianet@gmail.com>,
>> jie2.liu@hxt-semitech.com, bing.zhao@hxt-semitech.com,
>> jia.he@hxt-semitech.com
>> Subject: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
>> X-Mailer: git-send-email 2.7.4
>>
>> We watched a rte panic of mbuf_autotest in our qualcomm arm64 server.
>> As for the possible race condition, please refer to [1].
> Hi Jia,
>
> In addition to Konstantin comments. Please find below some review
> comments.
>> Furthermore, there are 2 options as suggested by Jerin:
>> 1. use rte_smp_rmb
>> 2. use load_acquire/store_release(refer to [2]).
>> CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by
> I think, The better name would be CONFIG_RTE_RING_USE_C11_MEM_MODEL
> or something like that.
Ok, but how to distinguish following 2 options?
CONFIG_RTE_RING_USE_C11_MEM_MODEL doesn't seem to be enough
1. use rte_smp_rmb
2. use load_acquire/store_release(refer to [2]).
>> default it is n;
>>
>> ---
>> Changelog:
>> V2: let users choose whether using load_acquire/store_release
>> V1: rte_smp_rmb() between 2 loads
>>
>> Signed-off-by: Jia He <hejianet@gmail.com>
>> Signed-off-by: jie2.liu@hxt-semitech.com
>> Signed-off-by: bing.zhao@hxt-semitech.com
>> Signed-off-by: jia.he@hxt-semitech.com
>> Suggested-by: jerin.jacob@caviumnetworks.com
>> ---
>> lib/librte_ring/Makefile | 4 +++-
>> lib/librte_ring/rte_ring.h | 38 ++++++++++++++++++++++++------
>> lib/librte_ring/rte_ring_arm64.h | 48 ++++++++++++++++++++++++++++++++++++++
>> lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++
>> 4 files changed, 127 insertions(+), 8 deletions(-)
>> create mode 100644 lib/librte_ring/rte_ring_arm64.h
>> create mode 100644 lib/librte_ring/rte_ring_generic.h
>>
>> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
>> index e34d9d9..fa57a86 100644
>> --- a/lib/librte_ring/Makefile
>> +++ b/lib/librte_ring/Makefile
>> @@ -45,6 +45,8 @@ LIBABIVER := 1
>> SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
>>
>> # install includes
>> -SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h
>> +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
>> + rte_ring_arm64.h \
> It is really not specific to arm64. We could rename it to rte_ring_c11_mem.h or
> something like that to reflect the implementation based on c11 memory
> model.
>
>
>> + rte_ring_generic.h
>>
>> include $(RTE_SDK)/mk/rte.lib.mk
>> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
>> index 5e9b3b7..943b1f9 100644
>> --- a/lib/librte_ring/rte_ring.h
>> +++ b/lib/librte_ring/rte_ring.h
>> @@ -103,6 +103,18 @@ extern "C" {
>> #include <rte_memzone.h>
>> #include <rte_pause.h>
>>
>> +/* In those strong memory models (e.g. x86), there is no need to add rmb()
>> + * between load and load.
>> + * In those weak models(powerpc/arm), there are 2 choices for the users
>> + * 1.use rmb() memory barrier
>> + * 2.use one-direcion load_acquire/store_release barrier
>> + * It depends on performance test results. */
>> +#ifdef RTE_ARCH_ARM64
> s/RTE_ARCH_ARM64/RTE_RING_USE_C11_MEM_MODEL and update the generic arm64 config.
> By that way it can used by another architecture like ppc if they choose to do so.
>
>
>> +#include "rte_ring_arm64.h"
>> +#else
>> +#include "rte_ring_generic.h"
>> +#endif
>> +
>> #define RTE_TAILQ_RING_NAME "RTE_RING"
>>
>> enum rte_ring_queue_behavior {
>> @@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
>> while (unlikely(ht->tail != old_val))
>> rte_pause();
>>
>> - ht->tail = new_val;
>> + arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE);
>> }
>>
>> /**
>> @@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
>> /* Reset n to the initial burst count */
>> n = max;
>>
>> - *old_head = r->prod.head;
>> + *old_head = arch_rte_atomic_load(&r->prod.head,
>> + __ATOMIC_ACQUIRE);
> Same as Konstantin comments, i.e move to this function to c11 memory model
> header file
>
>> const uint32_t cons_tail = r->cons.tail;
>> /*
>> * The subtraction is done between two unsigned 32bits value
>> @@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
>> if (is_sp)
>> r->prod.head = *new_head, success = 1;
>> else
>> - success = rte_atomic32_cmpset(&r->prod.head,
>> - *old_head, *new_head);
>> + success = arch_rte_atomic32_cmpset(&r->prod.head,
>> + old_head, *new_head,
>> + 0, __ATOMIC_ACQUIRE,
>> + __ATOMIC_RELAXED);
>> } while (unlikely(success == 0));
>> return n;
>> }
>> @@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
>> goto end;
>>
>> ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
>> +
>> +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
> This #define will be removed if the function moves.
>
>> rte_smp_wmb();
>> +#endif
>>
>> update_tail(&r->prod, prod_head, prod_next, is_sp);
>> end:
>> @@ -516,7 +534,8 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
>> /* Restore n as it may change every loop */
>> n = max;
>>
>> - *old_head = r->cons.head;
>> + *old_head = arch_rte_atomic_load(&r->cons.head,
>> + __ATOMIC_ACQUIRE);
>> const uint32_t prod_tail = r->prod.tail;
>> /* The subtraction is done between two unsigned 32bits value
>> * (the result is always modulo 32 bits even if we have
>> @@ -535,8 +554,10 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
>> if (is_sc)
>> r->cons.head = *new_head, success = 1;
>> else
>> - success = rte_atomic32_cmpset(&r->cons.head, *old_head,
>> - *new_head);
>> + success = arch_rte_atomic32_cmpset(&r->cons.head,
>> + old_head, *new_head,
>> + 0, __ATOMIC_ACQUIRE,
>> + __ATOMIC_RELAXED);
>> } while (unlikely(success == 0));
>> return n;
>> }
>> @@ -575,7 +596,10 @@ __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
>> goto end;
>>
>> DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
>> +
>> +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
> This #define will be removed if the function moves.
>
>> rte_smp_rmb();
>> +#endif
--
Cheers,
Jia
^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
2017-11-03 1:46 ` Jia He
@ 2017-11-03 12:56 ` Jerin Jacob
2017-11-06 7:25 ` Jia He
0 siblings, 1 reply; 13+ messages in thread
From: Jerin Jacob @ 2017-11-03 12:56 UTC (permalink / raw)
To: Jia He
Cc: dev, olivier.matz, konstantin.ananyev, bruce.richardson,
jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he
-----Original Message-----
> Date: Fri, 3 Nov 2017 09:46:40 +0800
> From: Jia He <hejianet@gmail.com>
> To: Jerin Jacob <jerin.jacob@caviumnetworks.com>
> Cc: dev@dpdk.org, olivier.matz@6wind.com, konstantin.ananyev@intel.com,
> bruce.richardson@intel.com, jianbo.liu@arm.com, hemant.agrawal@nxp.com,
> jie2.liu@hxt-semitech.com, bing.zhao@hxt-semitech.com,
> jia.he@hxt-semitech.com
> Subject: Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when
> doing
> User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101
> Thunderbird/52.4.0
>
> Hi Jerin
>
>
> On 11/3/2017 1:23 AM, Jerin Jacob Wrote:
> > -----Original Message-----
> > > Date: Thu, 2 Nov 2017 08:43:30 +0000
> > > From: Jia He <hejianet@gmail.com>
> > > To: jerin.jacob@caviumnetworks.com, dev@dpdk.org, olivier.matz@6wind.com
> > > Cc: konstantin.ananyev@intel.com, bruce.richardson@intel.com,
> > > jianbo.liu@arm.com, hemant.agrawal@nxp.com, Jia He <hejianet@gmail.com>,
> > > jie2.liu@hxt-semitech.com, bing.zhao@hxt-semitech.com,
> > > jia.he@hxt-semitech.com
> > > Subject: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
> > > X-Mailer: git-send-email 2.7.4
> > >
> > > We watched a rte panic of mbuf_autotest in our qualcomm arm64 server.
> > > As for the possible race condition, please refer to [1].
> > Hi Jia,
> >
> > In addition to Konstantin comments. Please find below some review
> > comments.
> > > Furthermore, there are 2 options as suggested by Jerin:
> > > 1. use rte_smp_rmb
> > > 2. use load_acquire/store_release(refer to [2]).
> > > CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by
> > I think, The better name would be CONFIG_RTE_RING_USE_C11_MEM_MODEL
> > or something like that.
> Ok, but how to distinguish following 2 options?
No clearly understood this question. For arm64 case, you can add
CONFIG_RTE_RING_USE_C11_MEM_MODEL=y in config/defconfig_arm64-armv8a-*
>
> CONFIG_RTE_RING_USE_C11_MEM_MODEL doesn't seem to be enough
>
> 1. use rte_smp_rmb
> 2. use load_acquire/store_release(refer to [2]).
>
> > > default it is n;
> > >
> > > ---
> > > Changelog:
> > > V2: let users choose whether using load_acquire/store_release
> > > V1: rte_smp_rmb() between 2 loads
> > >
> > > Signed-off-by: Jia He <hejianet@gmail.com>
> > > Signed-off-by: jie2.liu@hxt-semitech.com
> > > Signed-off-by: bing.zhao@hxt-semitech.com
> > > Signed-off-by: jia.he@hxt-semitech.com
> > > Suggested-by: jerin.jacob@caviumnetworks.com
> > > ---
> > > lib/librte_ring/Makefile | 4 +++-
> > > lib/librte_ring/rte_ring.h | 38 ++++++++++++++++++++++++------
> > > lib/librte_ring/rte_ring_arm64.h | 48 ++++++++++++++++++++++++++++++++++++++
> > > lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++
> > > 4 files changed, 127 insertions(+), 8 deletions(-)
> > > create mode 100644 lib/librte_ring/rte_ring_arm64.h
> > > create mode 100644 lib/librte_ring/rte_ring_generic.h
> > >
> > > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
> > > index e34d9d9..fa57a86 100644
> > > --- a/lib/librte_ring/Makefile
> > > +++ b/lib/librte_ring/Makefile
> > > @@ -45,6 +45,8 @@ LIBABIVER := 1
> > > SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
> > > # install includes
> > > -SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h
> > > +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
> > > + rte_ring_arm64.h \
> > It is really not specific to arm64. We could rename it to rte_ring_c11_mem.h or
> > something like that to reflect the implementation based on c11 memory
> > model.
> >
> >
> > > + rte_ring_generic.h
> > > include $(RTE_SDK)/mk/rte.lib.mk
> > > diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> > > index 5e9b3b7..943b1f9 100644
> > > --- a/lib/librte_ring/rte_ring.h
> > > +++ b/lib/librte_ring/rte_ring.h
> > > @@ -103,6 +103,18 @@ extern "C" {
> > > #include <rte_memzone.h>
> > > #include <rte_pause.h>
> > > +/* In those strong memory models (e.g. x86), there is no need to add rmb()
> > > + * between load and load.
> > > + * In those weak models(powerpc/arm), there are 2 choices for the users
> > > + * 1.use rmb() memory barrier
> > > + * 2.use one-direcion load_acquire/store_release barrier
> > > + * It depends on performance test results. */
> > > +#ifdef RTE_ARCH_ARM64
> > s/RTE_ARCH_ARM64/RTE_RING_USE_C11_MEM_MODEL and update the generic arm64 config.
> > By that way it can used by another architecture like ppc if they choose to do so.
> >
> >
> > > +#include "rte_ring_arm64.h"
> > > +#else
> > > +#include "rte_ring_generic.h"
> > > +#endif
> > > +
> > > #define RTE_TAILQ_RING_NAME "RTE_RING"
> > > enum rte_ring_queue_behavior {
> > > @@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
> > > while (unlikely(ht->tail != old_val))
> > > rte_pause();
> > > - ht->tail = new_val;
> > > + arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE);
> > > }
> > > /**
> > > @@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
> > > /* Reset n to the initial burst count */
> > > n = max;
> > > - *old_head = r->prod.head;
> > > + *old_head = arch_rte_atomic_load(&r->prod.head,
> > > + __ATOMIC_ACQUIRE);
> > Same as Konstantin comments, i.e move to this function to c11 memory model
> > header file
> >
> > > const uint32_t cons_tail = r->cons.tail;
> > > /*
> > > * The subtraction is done between two unsigned 32bits value
> > > @@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
> > > if (is_sp)
> > > r->prod.head = *new_head, success = 1;
> > > else
> > > - success = rte_atomic32_cmpset(&r->prod.head,
> > > - *old_head, *new_head);
> > > + success = arch_rte_atomic32_cmpset(&r->prod.head,
> > > + old_head, *new_head,
> > > + 0, __ATOMIC_ACQUIRE,
> > > + __ATOMIC_RELAXED);
> > > } while (unlikely(success == 0));
> > > return n;
> > > }
> > > @@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
> > > goto end;
> > > ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
> > > +
> > > +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
> > This #define will be removed if the function moves.
> >
> > > rte_smp_wmb();
> > > +#endif
> > > update_tail(&r->prod, prod_head, prod_next, is_sp);
> > > end:
> > > @@ -516,7 +534,8 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
> > > /* Restore n as it may change every loop */
> > > n = max;
> > > - *old_head = r->cons.head;
> > > + *old_head = arch_rte_atomic_load(&r->cons.head,
> > > + __ATOMIC_ACQUIRE);
> > > const uint32_t prod_tail = r->prod.tail;
> > > /* The subtraction is done between two unsigned 32bits value
> > > * (the result is always modulo 32 bits even if we have
> > > @@ -535,8 +554,10 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
> > > if (is_sc)
> > > r->cons.head = *new_head, success = 1;
> > > else
> > > - success = rte_atomic32_cmpset(&r->cons.head, *old_head,
> > > - *new_head);
> > > + success = arch_rte_atomic32_cmpset(&r->cons.head,
> > > + old_head, *new_head,
> > > + 0, __ATOMIC_ACQUIRE,
> > > + __ATOMIC_RELAXED);
> > > } while (unlikely(success == 0));
> > > return n;
> > > }
> > > @@ -575,7 +596,10 @@ __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
> > > goto end;
> > > DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
> > > +
> > > +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
> > This #define will be removed if the function moves.
> >
> > > rte_smp_rmb();
> > > +#endif
>
> --
> Cheers,
> Jia
>
^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
2017-11-03 12:56 ` Jerin Jacob
@ 2017-11-06 7:25 ` Jia He
2017-11-07 4:36 ` Jerin Jacob
0 siblings, 1 reply; 13+ messages in thread
From: Jia He @ 2017-11-06 7:25 UTC (permalink / raw)
To: Jerin Jacob
Cc: dev, olivier.matz, konstantin.ananyev, bruce.richardson,
jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he
Hi Jerin
On 11/3/2017 8:56 PM, Jerin Jacob Wrote:
> -----Original Message-----
>>
[...]
>> g like that.
>> Ok, but how to distinguish following 2 options?
> No clearly understood this question. For arm64 case, you can add
> CONFIG_RTE_RING_USE_C11_MEM_MODEL=y in config/defconfig_arm64-armv8a-*
Sorry for my unclear expressions.
I mean there should be one additional config macro besides
CONFIG_RTE_RING_USE_C11_MEM_MODEL
for users to choose?
i.e.
- On X86:CONFIG_RTE_RING_USE_C11_MEM_MODEL=n
include rte_ring_generic.h, no changes
- On arm64,CONFIG_RTE_RING_USE_C11_MEM_MODEL=y
include rte_ring_c11_mem.h by default.
In rte_ring_c11_mem.h, implement new version of
__rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail
Then, how to distinguish the option of using rte_smp_rmb() or
__atomic_load/store_n()?
Thanks for the clarification.
--
Cheers,
Jia
^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
2017-11-06 7:25 ` Jia He
@ 2017-11-07 4:36 ` Jerin Jacob
2017-11-07 8:34 ` Jia He
0 siblings, 1 reply; 13+ messages in thread
From: Jerin Jacob @ 2017-11-07 4:36 UTC (permalink / raw)
To: Jia He
Cc: dev, olivier.matz, konstantin.ananyev, bruce.richardson,
jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he
-----Original Message-----
> Date: Mon, 6 Nov 2017 15:25:12 +0800
> From: Jia He <hejianet@gmail.com>
> To: Jerin Jacob <jerin.jacob@caviumnetworks.com>
> Cc: dev@dpdk.org, olivier.matz@6wind.com, konstantin.ananyev@intel.com,
> bruce.richardson@intel.com, jianbo.liu@arm.com, hemant.agrawal@nxp.com,
> jie2.liu@hxt-semitech.com, bing.zhao@hxt-semitech.com,
> jia.he@hxt-semitech.com
> Subject: Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when
> doing
> User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101
> Thunderbird/52.4.0
>
> Hi Jerin
>
>
> On 11/3/2017 8:56 PM, Jerin Jacob Wrote:
> > -----Original Message-----
> > >
> [...]
> > > g like that.
> > > Ok, but how to distinguish following 2 options?
> > No clearly understood this question. For arm64 case, you can add
> > CONFIG_RTE_RING_USE_C11_MEM_MODEL=y in config/defconfig_arm64-armv8a-*
> Sorry for my unclear expressions.
> I mean there should be one additional config macro besides
> CONFIG_RTE_RING_USE_C11_MEM_MODEL
> for users to choose?
>
> i.e.
> - On X86:CONFIG_RTE_RING_USE_C11_MEM_MODEL=n
> include rte_ring_generic.h, no changes
> - On arm64,CONFIG_RTE_RING_USE_C11_MEM_MODEL=y
> include rte_ring_c11_mem.h by default.
> In rte_ring_c11_mem.h, implement new version of
> __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail
>
> Then, how to distinguish the option of using rte_smp_rmb() or
> __atomic_load/store_n()?
On option could be to change the prototype of update_tail() and make
compiler accommodate it for zero cost for arm64(Which I think, it it the
case. But you can check the generated instructions)
If not, move, __rte_ring_do_dequeue() and __rte_ring_do_enqueue() instead of
__rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail()
➜ [master][dpdk.org] $ git diff
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index 5e9b3b7b4..b32648825 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -358,8 +358,12 @@ void rte_ring_dump(FILE *f, const struct rte_ring
*r);
static __rte_always_inline void
update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t
new_val,
- uint32_t single)
+ uint32_t single, const uint32_t enqueue)
{
+ if (enqueue)
+ rte_smp_wmb();
+ else
+ rte_smp_rmb();
/*
* If there are other enqueues/dequeues in progress that
* preceded us,
* we need to wait for them to complete
@@ -470,9 +474,8 @@ __rte_ring_do_enqueue(struct rte_ring *r, void *
const *obj_table,
goto end;
ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
- rte_smp_wmb();
- update_tail(&r->prod, prod_head, prod_next, is_sp);
+ update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
end:
if (free_space != NULL)
*free_space = free_entries - n;
@@ -575,9 +578,8 @@ __rte_ring_do_dequeue(struct rte_ring *r, void
**obj_table,
goto end;
DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
- rte_smp_rmb();
- update_tail(&r->cons, cons_head, cons_next, is_sc);
+ update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
end:
if (available != NULL)
>
> Thanks for the clarification.
>
> --
> Cheers,
> Jia
>
^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
2017-11-07 4:36 ` Jerin Jacob
@ 2017-11-07 8:34 ` Jia He
2017-11-07 9:57 ` Jerin Jacob
0 siblings, 1 reply; 13+ messages in thread
From: Jia He @ 2017-11-07 8:34 UTC (permalink / raw)
To: Jerin Jacob
Cc: dev, olivier.matz, konstantin.ananyev, bruce.richardson,
jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he
On 11/7/2017 12:36 PM, Jerin Jacob Wrote:
> -----Original Message-----
>
> On option could be to change the prototype of update_tail() and make
> compiler accommodate it for zero cost for arm64(Which I think, it it the
> case. But you can check the generated instructions)
> If not, move, __rte_ring_do_dequeue() and __rte_ring_do_enqueue() instead of
> __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail()
>
>
> ➜ [master][dpdk.org] $ git diff
> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> index 5e9b3b7b4..b32648825 100644
> --- a/lib/librte_ring/rte_ring.h
> +++ b/lib/librte_ring/rte_ring.h
> @@ -358,8 +358,12 @@ void rte_ring_dump(FILE *f, const struct rte_ring
> *r);
>
> static __rte_always_inline void
> update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t
> new_val,
> - uint32_t single)
> + uint32_t single, const uint32_t enqueue)
> {
> + if (enqueue)
> + rte_smp_wmb();
> + else
> + rte_smp_rmb();
> /*
> * If there are other enqueues/dequeues in progress that
> * preceded us,
> * we need to wait for them to complete
> @@ -470,9 +474,8 @@ __rte_ring_do_enqueue(struct rte_ring *r, void *
> const *obj_table,
> goto end;
>
> ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
> - rte_smp_wmb();
>
> - update_tail(&r->prod, prod_head, prod_next, is_sp);
> + update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
> end:
> if (free_space != NULL)
> *free_space = free_entries - n;
> @@ -575,9 +578,8 @@ __rte_ring_do_dequeue(struct rte_ring *r, void
> **obj_table,
> goto end;
>
> DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
> - rte_smp_rmb();
>
> - update_tail(&r->cons, cons_head, cons_next, is_sc);
> + update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
>
> end:
> if (available != NULL)
>
>
>
Hi Jerin, yes I knew this suggestion in update_tail.
But what I mean is the rte_smp_rmb() in __rte_ring_move_cons_head and
__rte_ring_move_pros_head:
[option 1]
+ *old_head = r->cons.head;
+ rte_smp_rmb();
+ const uint32_t prod_tail = r->prod.tail;
[option 2]
+ *old_head = __atomic_load_n(&r->cons.head,
+ __ATOMIC_ACQUIRE);
+ *old_head = r->cons.head;
ie.I wonder what is the suitable new config name to distinguish the
above 2 options?
Thanks for the patience :-)
see my drafted patch below, the marcro "PREFER":
diff --git a/lib/librte_ring/rte_ring_c11_mem.h
b/lib/librte_ring/rte_ring_c11_mem.h
new file mode 100644
index 0000000..22fe887
--- /dev/null
+++ b/lib/librte_ring/rte_ring_c11_mem.h
@@ -0,0 +1,305 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2017 hxt-semitech. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of hxt-semitech nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_RING_C11_MEM_H_
+#define _RTE_RING_C11_MEM_H_
+
+static __rte_always_inline void
+update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t
new_val,
+ uint32_t single, uint32_t enqueue)
+{
+ /* Don't need wmb/rmb when we prefer to use load_acquire/
+ * store_release barrier */
+#ifndef PREFER
+ if (enqueue)
+ rte_smp_wmb();
+ else
+ rte_smp_rmb();
+#endif
+
+ /*
+ * If there are other enqueues/dequeues in progress that preceded us,
+ * we need to wait for them to complete
+ */
+ if (!single)
+ while (unlikely(ht->tail != old_val))
+ rte_pause();
+
+#ifdef PREFER
+ __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE);
+#else
+ ht->tail = new_val;
+#endif
+}
+
+/**
+ * @internal This function updates the producer head for enqueue
+ *
+ * @param r
+ * A pointer to the ring structure
+ * @param is_sp
+ * Indicates whether multi-producer path is needed or not
+ * @param n
+ * The number of elements we will want to enqueue, i.e. how far
should the
+ * head be moved
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
+ * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param old_head
+ * Returns head value as it was before the move, i.e. where enqueue
starts
+ * @param new_head
+ * Returns the current/new head value i.e. where enqueue finishes
+ * @param free_entries
+ * Returns the amount of free space in the ring BEFORE head was moved
+ * @return
+ * Actual number of objects enqueued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ uint32_t *old_head, uint32_t *new_head,
+ uint32_t *free_entries)
+{
+ const uint32_t capacity = r->capacity;
+ unsigned int max = n;
+ int success;
+
+ do {
+ /* Reset n to the initial burst count */
+ n = max;
+
+#ifdef PREFER
+ *old_head = __atomic_load_n(&r->prod.head,
+ __ATOMIC_ACQUIRE);
+#else
+ *old_head = r->prod.head;
+ /* prevent reorder of load/load */
+ rte_smp_rmb();
+#endif
+ const uint32_t cons_tail = r->cons.tail;
+ /*
+ * The subtraction is done between two unsigned 32bits value
+ * (the result is always modulo 32 bits even if we have
+ * *old_head > cons_tail). So 'free_entries' is always between 0
+ * and capacity (which is < size).
+ */
+ *free_entries = (capacity + cons_tail - *old_head);
+
+ /* check that we have enough room in ring */
+ if (unlikely(n > *free_entries))
+ n = (behavior == RTE_RING_QUEUE_FIXED) ?
+ 0 : *free_entries;
+
+ if (n == 0)
+ return 0;
+
+ *new_head = *old_head + n;
+ if (is_sp)
+ r->prod.head = *new_head, success = 1;
+ else
+#ifdef PREFER
+ success = arch_rte_atomic32_cmpset(&r->prod.head,
+ old_head, *new_head,
+ 0, __ATOMIC_ACQUIRE,
+ __ATOMIC_RELAXED);
+#else
+ success = rte_atomic32_cmpset(&r->prod.head,
+ *old_head, *new_head);
+#endif
+ } while (unlikely(success == 0));
+ return n;
+}
+
+/**
+ * @internal Enqueue several objects on the ring
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
+ * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param is_sp
+ * Indicates whether to use single producer or multi-producer head update
+ * @param free_space
+ * returns the amount of space after the enqueue operation has finished
+ * @return
+ * Actual number of objects enqueued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ int is_sp, unsigned int *free_space)
+{
+ uint32_t prod_head, prod_next;
+ uint32_t free_entries;
+
+ n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
+ &prod_head, &prod_next, &free_entries);
+ if (n == 0)
+ goto end;
+
+ ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
+
+ update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
+end:
+ if (free_space != NULL)
+ *free_space = free_entries - n;
+ return n;
+}
+
+/**
+ * @internal This function updates the consumer head for dequeue
+ *
+ * @param r
+ * A pointer to the ring structure
+ * @param is_sc
+ * Indicates whether multi-consumer path is needed or not
+ * @param n
+ * The number of elements we will want to enqueue, i.e. how far
should the
+ * head be moved
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
+ * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param old_head
+ * Returns head value as it was before the move, i.e. where dequeue
starts
+ * @param new_head
+ * Returns the current/new head value i.e. where dequeue finishes
+ * @param entries
+ * Returns the number of entries in the ring BEFORE head was moved
+ * @return
+ * - Actual number of objects dequeued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ uint32_t *old_head, uint32_t *new_head,
+ uint32_t *entries)
+{
+ unsigned int max = n;
+ int success;
+
+ /* move cons.head atomically */
+ do {
+ /* Restore n as it may change every loop */
+ n = max;
+#ifdef PREFER
+ *old_head = __atomic_load_n(&r->cons.head,
+ __ATOMIC_ACQUIRE);
+#else
+ *old_head = r->cons.head;
+ /* prevent reorder of load/load */
+ rte_smp_rmb();
+#endif
+
+ const uint32_t prod_tail = r->prod.tail;
+ /* The subtraction is done between two unsigned 32bits value
+ * (the result is always modulo 32 bits even if we have
+ * cons_head > prod_tail). So 'entries' is always between 0
+ * and size(ring)-1. */
+ *entries = (prod_tail - *old_head);
+
+ /* Set the actual entries for dequeue */
+ if (n > *entries)
+ n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+ if (unlikely(n == 0))
+ return 0;
+
+ *new_head = *old_head + n;
+ if (is_sc)
+ r->cons.head = *new_head, success = 1;
+ else
+#ifdef PREFER
+ success = arch_rte_atomic32_cmpset(&r->cons.head,
+ old_head, *new_head,
+ 0, __ATOMIC_ACQUIRE,
+ __ATOMIC_RELAXED);
+#else
+ success = rte_atomic32_cmpset(&r->cons.head, *old_head,
+ *new_head);
+#endif
+ } while (unlikely(success == 0));
+ return n;
+}
+
+/**
+ * @internal Dequeue several objects from the ring
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to pull from the ring.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
+ * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param is_sc
+ * Indicates whether to use single consumer or multi-consumer head update
+ * @param available
+ * returns the number of remaining ring entries after the dequeue has
finished
+ * @return
+ * - Actual number of objects dequeued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ int is_sc, unsigned int *available)
+{
+ uint32_t cons_head, cons_next;
+ uint32_t entries;
+
+ n = __rte_ring_move_cons_head(r, is_sc, n, behavior,
+ &cons_head, &cons_next, &entries);
+ if (n == 0)
+ goto end;
+
+ DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
+
+ update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
+
+end:
+ if (available != NULL)
+ *available = entries - n;
+ return n;
+}
+
+#endif /* _RTE_RING_C11_MEM_H_ */
+
diff --git a/lib/librte_ring/rte_ring_generic.h
b/lib/librte_ring/rte_ring_generic.h
new file mode 100644
index 0000000..0ce6d57
--- /dev/null
+++ b/lib/librte_ring/rte_ring_generic.h
@@ -0,0 +1,268 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2017 hxt-semitech. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of hxt-semitech nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_RING_GENERIC_H_
+#define _RTE_RING_GENERIC_H_
+
+static __rte_always_inline void
+update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t
new_val,
+ uint32_t single, uint32_t enqueue)
+{
+ if (enqueue)
+ rte_smp_wmb();
+ else
+ rte_smp_rmb();
+ /*
+ * If there are other enqueues/dequeues in progress that preceded us,
+ * we need to wait for them to complete
+ */
+ if (!single)
+ while (unlikely(ht->tail != old_val))
+ rte_pause();
+
+ ht->tail = new_val;
+}
+
+/**
+ * @internal This function updates the producer head for enqueue
+ *
+ * @param r
+ * A pointer to the ring structure
+ * @param is_sp
+ * Indicates whether multi-producer path is needed or not
+ * @param n
+ * The number of elements we will want to enqueue, i.e. how far
should the
+ * head be moved
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
+ * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param old_head
+ * Returns head value as it was before the move, i.e. where enqueue
starts
+ * @param new_head
+ * Returns the current/new head value i.e. where enqueue finishes
+ * @param free_entries
+ * Returns the amount of free space in the ring BEFORE head was moved
+ * @return
+ * Actual number of objects enqueued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ uint32_t *old_head, uint32_t *new_head,
+ uint32_t *free_entries)
+{
+ const uint32_t capacity = r->capacity;
+ unsigned int max = n;
+ int success;
+
+ do {
+ /* Reset n to the initial burst count */
+ n = max;
+
+ *old_head = r->prod.head;
+ const uint32_t cons_tail = r->cons.tail;
+ /*
+ * The subtraction is done between two unsigned 32bits value
+ * (the result is always modulo 32 bits even if we have
+ * *old_head > cons_tail). So 'free_entries' is always between 0
+ * and capacity (which is < size).
+ */
+ *free_entries = (capacity + cons_tail - *old_head);
+
+ /* check that we have enough room in ring */
+ if (unlikely(n > *free_entries))
+ n = (behavior == RTE_RING_QUEUE_FIXED) ?
+ 0 : *free_entries;
+
+ if (n == 0)
+ return 0;
+
+ *new_head = *old_head + n;
+ if (is_sp)
+ r->prod.head = *new_head, success = 1;
+ else
+ success = rte_atomic32_cmpset(&r->prod.head,
+ *old_head, *new_head);
+ } while (unlikely(success == 0));
+ return n;
+}
+
+/**
+ * @internal Enqueue several objects on the ring
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
+ * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param is_sp
+ * Indicates whether to use single producer or multi-producer head update
+ * @param free_space
+ * returns the amount of space after the enqueue operation has finished
+ * @return
+ * Actual number of objects enqueued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ int is_sp, unsigned int *free_space)
+{
+ uint32_t prod_head, prod_next;
+ uint32_t free_entries;
+
+ n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
+ &prod_head, &prod_next, &free_entries);
+ if (n == 0)
+ goto end;
+
+ ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
+
+ update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
+end:
+ if (free_space != NULL)
+ *free_space = free_entries - n;
+ return n;
+}
+
+/**
+ * @internal This function updates the consumer head for dequeue
+ *
+ * @param r
+ * A pointer to the ring structure
+ * @param is_sc
+ * Indicates whether multi-consumer path is needed or not
+ * @param n
+ * The number of elements we will want to enqueue, i.e. how far
should the
+ * head be moved
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
+ * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param old_head
+ * Returns head value as it was before the move, i.e. where dequeue
starts
+ * @param new_head
+ * Returns the current/new head value i.e. where dequeue finishes
+ * @param entries
+ * Returns the number of entries in the ring BEFORE head was moved
+ * @return
+ * - Actual number of objects dequeued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ uint32_t *old_head, uint32_t *new_head,
+ uint32_t *entries)
+{
+ unsigned int max = n;
+ int success;
+
+ /* move cons.head atomically */
+ do {
+ /* Restore n as it may change every loop */
+ n = max;
+
+ *old_head = r->cons.head;
+ const uint32_t prod_tail = r->prod.tail;
+ /* The subtraction is done between two unsigned 32bits value
+ * (the result is always modulo 32 bits even if we have
+ * cons_head > prod_tail). So 'entries' is always between 0
+ * and size(ring)-1. */
+ *entries = (prod_tail - *old_head);
+
+ /* Set the actual entries for dequeue */
+ if (n > *entries)
+ n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+ if (unlikely(n == 0))
+ return 0;
+
+ *new_head = *old_head + n;
+ if (is_sc)
+ r->cons.head = *new_head, success = 1;
+ else
+ success = rte_atomic32_cmpset(&r->cons.head, *old_head,
+ *new_head);
+ } while (unlikely(success == 0));
+ return n;
+}
+
+/**
+ * @internal Dequeue several objects from the ring
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to pull from the ring.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
+ * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param is_sc
+ * Indicates whether to use single consumer or multi-consumer head update
+ * @param available
+ * returns the number of remaining ring entries after the dequeue has
finished
+ * @return
+ * - Actual number of objects dequeued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ int is_sc, unsigned int *available)
+{
+ uint32_t cons_head, cons_next;
+ uint32_t entries;
+
+ n = __rte_ring_move_cons_head(r, is_sc, n, behavior,
+ &cons_head, &cons_next, &entries);
+ if (n == 0)
+ goto end;
+
+ DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
+
+ update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
+
+end:
+ if (available != NULL)
+ *available = entries - n;
+ return n;
+}
+
+#endif /* _RTE_RING_GENERIC_H_ */
+
--
2.7.4
--
Cheers,
Jia
^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
2017-11-07 8:34 ` Jia He
@ 2017-11-07 9:57 ` Jerin Jacob
2017-11-08 2:31 ` Jia He
0 siblings, 1 reply; 13+ messages in thread
From: Jerin Jacob @ 2017-11-07 9:57 UTC (permalink / raw)
To: Jia He
Cc: dev, olivier.matz, konstantin.ananyev, bruce.richardson,
jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he
-----Original Message-----
> Date: Tue, 7 Nov 2017 16:34:30 +0800
> From: Jia He <hejianet@gmail.com>
> To: Jerin Jacob <jerin.jacob@caviumnetworks.com>
> Cc: dev@dpdk.org, olivier.matz@6wind.com, konstantin.ananyev@intel.com,
> bruce.richardson@intel.com, jianbo.liu@arm.com, hemant.agrawal@nxp.com,
> jie2.liu@hxt-semitech.com, bing.zhao@hxt-semitech.com,
> jia.he@hxt-semitech.com
> Subject: Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when
> doing
> User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101
> Thunderbird/52.4.0
>
>
>
> On 11/7/2017 12:36 PM, Jerin Jacob Wrote:
> > -----Original Message-----
> >
> > On option could be to change the prototype of update_tail() and make
> > compiler accommodate it for zero cost for arm64(Which I think, it it the
> > case. But you can check the generated instructions)
> > If not, move, __rte_ring_do_dequeue() and __rte_ring_do_enqueue() instead of
> > __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail()
> >
> >
> > ➜ [master][dpdk.org] $ git diff
> > diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> > index 5e9b3b7b4..b32648825 100644
> > --- a/lib/librte_ring/rte_ring.h
> > +++ b/lib/librte_ring/rte_ring.h
> > @@ -358,8 +358,12 @@ void rte_ring_dump(FILE *f, const struct rte_ring
> > *r);
> > static __rte_always_inline void
> > update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t
> > new_val,
> > - uint32_t single)
> > + uint32_t single, const uint32_t enqueue)
> > {
> > + if (enqueue)
> > + rte_smp_wmb();
> > + else
> > + rte_smp_rmb();
> > /*
> > * If there are other enqueues/dequeues in progress that
> > * preceded us,
> > * we need to wait for them to complete
> > @@ -470,9 +474,8 @@ __rte_ring_do_enqueue(struct rte_ring *r, void *
> > const *obj_table,
> > goto end;
> > ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
> > - rte_smp_wmb();
> > - update_tail(&r->prod, prod_head, prod_next, is_sp);
> > + update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
> > end:
> > if (free_space != NULL)
> > *free_space = free_entries - n;
> > @@ -575,9 +578,8 @@ __rte_ring_do_dequeue(struct rte_ring *r, void
> > **obj_table,
> > goto end;
> > DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
> > - rte_smp_rmb();
> > - update_tail(&r->cons, cons_head, cons_next, is_sc);
> > + update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
> > end:
> > if (available != NULL)
> >
> >
> >
> Hi Jerin, yes I knew this suggestion in update_tail.
> But what I mean is the rte_smp_rmb() in __rte_ring_move_cons_head and
> __rte_ring_move_pros_head:
> [option 1]
> + *old_head = r->cons.head;
> + rte_smp_rmb();
> + const uint32_t prod_tail = r->prod.tail;
>
> [option 2]
> + *old_head = __atomic_load_n(&r->cons.head,
> + __ATOMIC_ACQUIRE);
> + *old_head = r->cons.head;
>
> ie.I wonder what is the suitable new config name to distinguish the above 2
> options?
Why?
If you fix the generic version with rte_smp_rmb() then we just need only
one config to differentiate between c11 vs generic. See comments below,
> Thanks for the patience :-)
>
> see my drafted patch below, the marcro "PREFER":
> + */
> +
> +#ifndef _RTE_RING_C11_MEM_H_
> +#define _RTE_RING_C11_MEM_H_
> +
> +static __rte_always_inline void
> +update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t
> new_val,
> + uint32_t single, uint32_t enqueue)
> +{
> + /* Don't need wmb/rmb when we prefer to use load_acquire/
> + * store_release barrier */
> +#ifndef PREFER
> + if (enqueue)
> + rte_smp_wmb();
> + else
> + rte_smp_rmb();
> +#endif
You can remove PREFER and let the "generic" version has this. For x86,
rte_smp_?mb() it will be NOOP. So no issue.
> +
> + /*
> + * If there are other enqueues/dequeues in progress that preceded us,
> + * we need to wait for them to complete
> + */
> + if (!single)
> + while (unlikely(ht->tail != old_val))
> + rte_pause();
> +
> +#ifdef PREFER
> + __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE);
for c11 mem model version, it needs only __atomic_store_n version.
> +#else
> + ht->tail = new_val;
> +#endif
> +}
> +
> +/**
> + * @internal This function updates the producer head for enqueue
> + *
> + * @param r
> + * A pointer to the ring structure
> + * @param is_sp
> + * Indicates whether multi-producer path is needed or not
> + * @param n
> + * The number of elements we will want to enqueue, i.e. how far should
> the
> + * head be moved
> + * @param behavior
> + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
> + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
> + * @param old_head
> + * Returns head value as it was before the move, i.e. where enqueue
> starts
> + * @param new_head
> + * Returns the current/new head value i.e. where enqueue finishes
> + * @param free_entries
> + * Returns the amount of free space in the ring BEFORE head was moved
> + * @return
> + * Actual number of objects enqueued.
> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
> + unsigned int n, enum rte_ring_queue_behavior behavior,
> + uint32_t *old_head, uint32_t *new_head,
> + uint32_t *free_entries)
> +{
> + const uint32_t capacity = r->capacity;
> + unsigned int max = n;
> + int success;
> +
> + do {
> + /* Reset n to the initial burst count */
> + n = max;
> +
> +#ifdef PREFER
> + *old_head = __atomic_load_n(&r->prod.head,
> + __ATOMIC_ACQUIRE);
> +#else
> + *old_head = r->prod.head;
> + /* prevent reorder of load/load */
> + rte_smp_rmb();
> +#endif
Same as above comment.
> + const uint32_t cons_tail = r->cons.tail;
> + /*
> + * The subtraction is done between two unsigned 32bits value
> + * (the result is always modulo 32 bits even if we have
> + * *old_head > cons_tail). So 'free_entries' is always between 0
> + * and capacity (which is < size).
> + */
> + *free_entries = (capacity + cons_tail - *old_head);
> +
> + /* check that we have enough room in ring */
> + if (unlikely(n > *free_entries))
> +static __rte_always_inline unsigned int
> +__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
> + unsigned int n, enum rte_ring_queue_behavior behavior,
> + int is_sp, unsigned int *free_space)
> +{
Duplicate function, No need to replicate on both versions.
> +static __rte_always_inline unsigned int
> +__rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
> + unsigned int n, enum rte_ring_queue_behavior behavior,
> + uint32_t *old_head, uint32_t *new_head,
> + uint32_t *entries)
> +{
> + unsigned int max = n;
> + int success;
> +
> + /* move cons.head atomically */
> + do {
> + /* Restore n as it may change every loop */
> + n = max;
> +#ifdef PREFER
> + *old_head = __atomic_load_n(&r->cons.head,
> + __ATOMIC_ACQUIRE);
> +#else
> + *old_head = r->cons.head;
> + /* prevent reorder of load/load */
> + rte_smp_rmb();
> +#endif
Same as above comment
> +
> + const uint32_t prod_tail = r->prod.tail;
> + /* The subtraction is done between two unsigned 32bits value
> + * (the result is always modulo 32 bits even if we have
> + * cons_head > prod_tail). So 'entries' is always between 0
> + * and size(ring)-1. */
> + *entries = (prod_tail - *old_head);
> +
> + /* Set the actual entries for dequeue */
> + if (n > *entries)
> + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
> +
> + if (unlikely(n == 0))
> + return 0;
> +
> + *new_head = *old_head + n;
> + if (is_sc)
> + r->cons.head = *new_head, success = 1;
> + else
> +#ifdef PREFER
> + success = arch_rte_atomic32_cmpset(&r->cons.head,
> + old_head, *new_head,
> + 0, __ATOMIC_ACQUIRE,
> + __ATOMIC_RELAXED);
> +#else
> + success = rte_atomic32_cmpset(&r->cons.head, *old_head,
> + *new_head);
> +#endif
Same as above comment
> + } while (unlikely(success == 0));
> + return n;
> +}
> +
> +/**
> + * @internal Dequeue several objects from the ring
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @param obj_table
> + * A pointer to a table of void * pointers (objects).
> + * @param n
> + * The number of objects to pull from the ring.
> + * @param behavior
> + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
> + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
> + * @param is_sc
> + * Indicates whether to use single consumer or multi-consumer head update
> + * @param available
> + * returns the number of remaining ring entries after the dequeue has
> finished
> + * @return
> + * - Actual number of objects dequeued.
> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
> + unsigned int n, enum rte_ring_queue_behavior behavior,
> + int is_sc, unsigned int *available)
> +{
Duplicate function, No need to replicate on both versions.
> + uint32_t cons_head, cons_next;
> + uint32_t entries;
> +
> + n = __rte_ring_move_cons_head(r, is_sc, n, behavior,
> + &cons_head, &cons_next, &entries);
> + if (n == 0)
> + goto end;
> +
> + DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
> +
> + update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
> +
> +end:
> + if (available != NULL)
> + *available = entries - n;
> + return n;
> +}
> +
> +#endif /* _RTE_RING_C11_MEM_H_ */
> +
> diff --git a/lib/librte_ring/rte_ring_generic.h
> b/lib/librte_ring/rte_ring_generic.h
> new file mode 100644
> index 0000000..0ce6d57
> --- /dev/null
> +++ b/lib/librte_ring/rte_ring_generic.h
> @@ -0,0 +1,268 @@
> +/*-
> + * BSD LICENSE
> + *
> + * Copyright(c) 2017 hxt-semitech. All rights reserved.
> + *
> + * Redistribution and use in source and binary forms, with or without
> + * modification, are permitted provided that the following conditions
> + * are met:
> + *
> + * * Redistributions of source code must retain the above copyright
> + * notice, this list of conditions and the following disclaimer.
> + * * Redistributions in binary form must reproduce the above copyright
> + * notice, this list of conditions and the following disclaimer in
> + * the documentation and/or other materials provided with the
> + * distribution.
> + * * Neither the name of hxt-semitech nor the names of its
> + * contributors may be used to endorse or promote products derived
> + * from this software without specific prior written permission.
> + *
> + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> + */
> +
> +#ifndef _RTE_RING_GENERIC_H_
> +#define _RTE_RING_GENERIC_H_
> +
> +static __rte_always_inline void
> +update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t
> new_val,
> + uint32_t single, uint32_t enqueue)
> +{
> + if (enqueue)
> + rte_smp_wmb();
> + else
> + rte_smp_rmb();
> + /*
> + * If there are other enqueues/dequeues in progress that preceded us,
> + * we need to wait for them to complete
> + */
> + if (!single)
> + while (unlikely(ht->tail != old_val))
> + rte_pause();
> +
> + ht->tail = new_val;
> +}
> +
> +/**
> + * @internal This function updates the producer head for enqueue
> + *
> + * @param r
> + * A pointer to the ring structure
> + * @param is_sp
> + * Indicates whether multi-producer path is needed or not
> + * @param n
> + * The number of elements we will want to enqueue, i.e. how far should
> the
> + * head be moved
> + * @param behavior
> + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
> + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
> + * @param old_head
> + * Returns head value as it was before the move, i.e. where enqueue
> starts
> + * @param new_head
> + * Returns the current/new head value i.e. where enqueue finishes
> + * @param free_entries
> + * Returns the amount of free space in the ring BEFORE head was moved
> + * @return
> + * Actual number of objects enqueued.
> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
> + unsigned int n, enum rte_ring_queue_behavior behavior,
> + uint32_t *old_head, uint32_t *new_head,
> + uint32_t *free_entries)
> +{
> + const uint32_t capacity = r->capacity;
> + unsigned int max = n;
> + int success;
> +
> + do {
> + /* Reset n to the initial burst count */
> + n = max;
> +
> + *old_head = r->prod.head;
adding rte_smp_rmb() no harm here as it is NOOP for x86 and it is
semantically correct too.
> + const uint32_t cons_tail = r->cons.tail;
> + /*
> + * The subtraction is done between two unsigned 32bits value
> + * (the result is always modulo 32 bits even if we have
> + * *old_head > cons_tail). So 'free_entries' is always between 0
> + * and capacity (which is < size).
> + */
> + *free_entries = (capacity + cons_tail - *old_head);
> +
> + /* check that we have enough room in ring */
> + if (unlikely(n > *free_entries))
> + n = (behavior == RTE_RING_QUEUE_FIXED) ?
> + 0 : *free_entries;
> +
> + if (n == 0)
> + return 0;
> +
> + *new_head = *old_head + n;
> + if (is_sp)
> + r->prod.head = *new_head, success = 1;
> + else
> + success = rte_atomic32_cmpset(&r->prod.head,
> + *old_head, *new_head);
> + } while (unlikely(success == 0));
> + return n;
> +}
> +
> +/**
> + * @internal Enqueue several objects on the ring
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @param obj_table
> + * A pointer to a table of void * pointers (objects).
> + * @param n
> + * The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
> + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
> + * @param is_sp
> + * Indicates whether to use single producer or multi-producer head update
> + * @param free_space
> + * returns the amount of space after the enqueue operation has finished
> + * @return
> + * Actual number of objects enqueued.
> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
> + unsigned int n, enum rte_ring_queue_behavior behavior,
> + int is_sp, unsigned int *free_space)
> +{
Duplicate function, No need to replicate on both versions.
> +
> +/**
> + * @internal This function updates the consumer head for dequeue
> + *
> + * @param r
> + * A pointer to the ring structure
> + * @param is_sc
> + * Indicates whether multi-consumer path is needed or not
> + * @param n
> + * The number of elements we will want to enqueue, i.e. how far should
> the
> + * head be moved
> + * @param behavior
> + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
> + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
> + * @param old_head
> + * Returns head value as it was before the move, i.e. where dequeue
> starts
> + * @param new_head
> + * Returns the current/new head value i.e. where dequeue finishes
> + * @param entries
> + * Returns the number of entries in the ring BEFORE head was moved
> + * @return
> + * - Actual number of objects dequeued.
> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
> + unsigned int n, enum rte_ring_queue_behavior behavior,
> + uint32_t *old_head, uint32_t *new_head,
> + uint32_t *entries)
> +{
> + unsigned int max = n;
> + int success;
> +
> + /* move cons.head atomically */
> + do {
> + /* Restore n as it may change every loop */
> + n = max;
> +
> + *old_head = r->cons.head;
> + const uint32_t prod_tail = r->prod.tail;
Same as above comment.
> + /* The subtraction is done between two unsigned 32bits value
> + * (the result is always modulo 32 bits even if we have
> + * cons_head > prod_tail). So 'entries' is always between 0
> + * and size(ring)-1. */
> + *entries = (prod_tail - *old_head);
> +
> + /* Set the actual entries for dequeue */
> + if (n > *entries)
> + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
> +
> + if (unlikely(n == 0))
> + return 0;
> +
> + *new_head = *old_head + n;
> + if (is_sc)
> + r->cons.head = *new_head, success = 1;
> + else
> + success = rte_atomic32_cmpset(&r->cons.head, *old_head,
> + *new_head);
> + } while (unlikely(success == 0));
> + return n;
> +}
> +
> +/**
> + * @internal Dequeue several objects from the ring
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @param obj_table
> + * A pointer to a table of void * pointers (objects).
> + * @param n
> + * The number of objects to pull from the ring.
> + * @param behavior
> + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
> + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
> + * @param is_sc
> + * Indicates whether to use single consumer or multi-consumer head update
> + * @param available
> + * returns the number of remaining ring entries after the dequeue has
> finished
> + * @return
> + * - Actual number of objects dequeued.
> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
> + unsigned int n, enum rte_ring_queue_behavior behavior,
> + int is_sc, unsigned int *available)
> +{
Duplicate function, No need to replicate on both versions.
>
^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
2017-11-07 9:57 ` Jerin Jacob
@ 2017-11-08 2:31 ` Jia He
0 siblings, 0 replies; 13+ messages in thread
From: Jia He @ 2017-11-08 2:31 UTC (permalink / raw)
To: Jerin Jacob
Cc: dev, olivier.matz, konstantin.ananyev, bruce.richardson,
jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he
Hi Jerin
Thank you,
I mistakenly think x86 doen't need rte_smp_rmb().
Since rte_smp_rmb() only impact x86's compiler optimization, I will
simplify the codes as your suggestions
Cheers,
Jia
On 11/7/2017 5:57 PM, Jerin Jacob Wrote:
> -----Original Message-----
>> Date: Tue, 7 Nov 2017 16:34:30 +0800
>> From: Jia He <hejianet@gmail.com>
>> To: Jerin Jacob <jerin.jacob@caviumnetworks.com>
>> Cc: dev@dpdk.org, olivier.matz@6wind.com, konstantin.ananyev@intel.com,
>> bruce.richardson@intel.com, jianbo.liu@arm.com, hemant.agrawal@nxp.com,
>> jie2.liu@hxt-semitech.com, bing.zhao@hxt-semitech.com,
>> jia.he@hxt-semitech.com
>> Subject: Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when
>> doing
>> User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101
>> Thunderbird/52.4.0
>>
>>
>>
>> On 11/7/2017 12:36 PM, Jerin Jacob Wrote:
>>> -----Original Message-----
>>>
>>> On option could be to change the prototype of update_tail() and make
>>> compiler accommodate it for zero cost for arm64(Which I think, it it the
>>> case. But you can check the generated instructions)
>>> If not, move, __rte_ring_do_dequeue() and __rte_ring_do_enqueue() instead of
>>> __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail()
>>>
>>>
>>> ➜ [master][dpdk.org] $ git diff
>>> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
>>> index 5e9b3b7b4..b32648825 100644
>>> --- a/lib/librte_ring/rte_ring.h
>>> +++ b/lib/librte_ring/rte_ring.h
>>> @@ -358,8 +358,12 @@ void rte_ring_dump(FILE *f, const struct rte_ring
>>> *r);
>>> static __rte_always_inline void
>>> update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t
>>> new_val,
>>> - uint32_t single)
>>> + uint32_t single, const uint32_t enqueue)
>>> {
>>> + if (enqueue)
>>> + rte_smp_wmb();
>>> + else
>>> + rte_smp_rmb();
>>> /*
>>> * If there are other enqueues/dequeues in progress that
>>> * preceded us,
>>> * we need to wait for them to complete
>>> @@ -470,9 +474,8 @@ __rte_ring_do_enqueue(struct rte_ring *r, void *
>>> const *obj_table,
>>> goto end;
>>> ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
>>> - rte_smp_wmb();
>>> - update_tail(&r->prod, prod_head, prod_next, is_sp);
>>> + update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
>>> end:
>>> if (free_space != NULL)
>>> *free_space = free_entries - n;
>>> @@ -575,9 +578,8 @@ __rte_ring_do_dequeue(struct rte_ring *r, void
>>> **obj_table,
>>> goto end;
>>> DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
>>> - rte_smp_rmb();
>>> - update_tail(&r->cons, cons_head, cons_next, is_sc);
>>> + update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
>>> end:
>>> if (available != NULL)
>>>
>>>
>>>
>> Hi Jerin, yes I knew this suggestion in update_tail.
>> But what I mean is the rte_smp_rmb() in __rte_ring_move_cons_head and
>> __rte_ring_move_pros_head:
>> [option 1]
>> + *old_head = r->cons.head;
>> + rte_smp_rmb();
>> + const uint32_t prod_tail = r->prod.tail;
>>
>> [option 2]
>> + *old_head = __atomic_load_n(&r->cons.head,
>> + __ATOMIC_ACQUIRE);
>> + *old_head = r->cons.head;
>>
>> ie.I wonder what is the suitable new config name to distinguish the above 2
>> options?
> Why?
> If you fix the generic version with rte_smp_rmb() then we just need only
> one config to differentiate between c11 vs generic. See comments below,
>
>> Thanks for the patience :-)
>>
>> see my drafted patch below, the marcro "PREFER":
>> + */
>> +
>> +#ifndef _RTE_RING_C11_MEM_H_
>> +#define _RTE_RING_C11_MEM_H_
>> +
>> +static __rte_always_inline void
>> +update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t
>> new_val,
>> + uint32_t single, uint32_t enqueue)
>> +{
>> + /* Don't need wmb/rmb when we prefer to use load_acquire/
>> + * store_release barrier */
>> +#ifndef PREFER
>> + if (enqueue)
>> + rte_smp_wmb();
>> + else
>> + rte_smp_rmb();
>> +#endif
> You can remove PREFER and let the "generic" version has this. For x86,
> rte_smp_?mb() it will be NOOP. So no issue.
>
>> +
>> + /*
>> + * If there are other enqueues/dequeues in progress that preceded us,
>> + * we need to wait for them to complete
>> + */
>> + if (!single)
>> + while (unlikely(ht->tail != old_val))
>> + rte_pause();
>> +
>> +#ifdef PREFER
>> + __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE);
> for c11 mem model version, it needs only __atomic_store_n version.
>
>> +#else
>> + ht->tail = new_val;
>> +#endif
>> +}
>> +
>> +/**
>> + * @internal This function updates the producer head for enqueue
>> + *
>> + * @param r
>> + * A pointer to the ring structure
>> + * @param is_sp
>> + * Indicates whether multi-producer path is needed or not
>> + * @param n
>> + * The number of elements we will want to enqueue, i.e. how far should
>> the
>> + * head be moved
>> + * @param behavior
>> + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
>> + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
>> + * @param old_head
>> + * Returns head value as it was before the move, i.e. where enqueue
>> starts
>> + * @param new_head
>> + * Returns the current/new head value i.e. where enqueue finishes
>> + * @param free_entries
>> + * Returns the amount of free space in the ring BEFORE head was moved
>> + * @return
>> + * Actual number of objects enqueued.
>> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
>> + */
>> +static __rte_always_inline unsigned int
>> +__rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
>> + unsigned int n, enum rte_ring_queue_behavior behavior,
>> + uint32_t *old_head, uint32_t *new_head,
>> + uint32_t *free_entries)
>> +{
>> + const uint32_t capacity = r->capacity;
>> + unsigned int max = n;
>> + int success;
>> +
>> + do {
>> + /* Reset n to the initial burst count */
>> + n = max;
>> +
>> +#ifdef PREFER
>> + *old_head = __atomic_load_n(&r->prod.head,
>> + __ATOMIC_ACQUIRE);
>> +#else
>> + *old_head = r->prod.head;
>> + /* prevent reorder of load/load */
>> + rte_smp_rmb();
>> +#endif
> Same as above comment.
>
>> + const uint32_t cons_tail = r->cons.tail;
>> + /*
>> + * The subtraction is done between two unsigned 32bits value
>> + * (the result is always modulo 32 bits even if we have
>> + * *old_head > cons_tail). So 'free_entries' is always between 0
>> + * and capacity (which is < size).
>> + */
>> + *free_entries = (capacity + cons_tail - *old_head);
>> +
>> + /* check that we have enough room in ring */
>> + if (unlikely(n > *free_entries))
>> +static __rte_always_inline unsigned int
>> +__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
>> + unsigned int n, enum rte_ring_queue_behavior behavior,
>> + int is_sp, unsigned int *free_space)
>> +{
>
> Duplicate function, No need to replicate on both versions.
>
>
>> +static __rte_always_inline unsigned int
>> +__rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
>> + unsigned int n, enum rte_ring_queue_behavior behavior,
>> + uint32_t *old_head, uint32_t *new_head,
>> + uint32_t *entries)
>> +{
>> + unsigned int max = n;
>> + int success;
>> +
>> + /* move cons.head atomically */
>> + do {
>> + /* Restore n as it may change every loop */
>> + n = max;
>> +#ifdef PREFER
>> + *old_head = __atomic_load_n(&r->cons.head,
>> + __ATOMIC_ACQUIRE);
>> +#else
>> + *old_head = r->cons.head;
>> + /* prevent reorder of load/load */
>> + rte_smp_rmb();
>> +#endif
> Same as above comment
>
>> +
>> + const uint32_t prod_tail = r->prod.tail;
>> + /* The subtraction is done between two unsigned 32bits value
>> + * (the result is always modulo 32 bits even if we have
>> + * cons_head > prod_tail). So 'entries' is always between 0
>> + * and size(ring)-1. */
>> + *entries = (prod_tail - *old_head);
>> +
>> + /* Set the actual entries for dequeue */
>> + if (n > *entries)
>> + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
>> +
>> + if (unlikely(n == 0))
>> + return 0;
>> +
>> + *new_head = *old_head + n;
>> + if (is_sc)
>> + r->cons.head = *new_head, success = 1;
>> + else
>> +#ifdef PREFER
>> + success = arch_rte_atomic32_cmpset(&r->cons.head,
>> + old_head, *new_head,
>> + 0, __ATOMIC_ACQUIRE,
>> + __ATOMIC_RELAXED);
>> +#else
>> + success = rte_atomic32_cmpset(&r->cons.head, *old_head,
>> + *new_head);
>> +#endif
> Same as above comment
>
>> + } while (unlikely(success == 0));
>> + return n;
>> +}
>> +
>> +/**
>> + * @internal Dequeue several objects from the ring
>> + *
>> + * @param r
>> + * A pointer to the ring structure.
>> + * @param obj_table
>> + * A pointer to a table of void * pointers (objects).
>> + * @param n
>> + * The number of objects to pull from the ring.
>> + * @param behavior
>> + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
>> + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
>> + * @param is_sc
>> + * Indicates whether to use single consumer or multi-consumer head update
>> + * @param available
>> + * returns the number of remaining ring entries after the dequeue has
>> finished
>> + * @return
>> + * - Actual number of objects dequeued.
>> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
>> + */
>> +static __rte_always_inline unsigned int
>> +__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
>> + unsigned int n, enum rte_ring_queue_behavior behavior,
>> + int is_sc, unsigned int *available)
>> +{
> Duplicate function, No need to replicate on both versions.
>
>
>> + uint32_t cons_head, cons_next;
>> + uint32_t entries;
>> +
>> + n = __rte_ring_move_cons_head(r, is_sc, n, behavior,
>> + &cons_head, &cons_next, &entries);
>> + if (n == 0)
>> + goto end;
>> +
>> + DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
>> +
>> + update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
>> +
>> +end:
>> + if (available != NULL)
>> + *available = entries - n;
>> + return n;
>> +}
>> +
>> +#endif /* _RTE_RING_C11_MEM_H_ */
>> +
>> diff --git a/lib/librte_ring/rte_ring_generic.h
>> b/lib/librte_ring/rte_ring_generic.h
>> new file mode 100644
>> index 0000000..0ce6d57
>> --- /dev/null
>> +++ b/lib/librte_ring/rte_ring_generic.h
>> @@ -0,0 +1,268 @@
>> +/*-
>> + * BSD LICENSE
>> + *
>> + * Copyright(c) 2017 hxt-semitech. All rights reserved.
>> + *
>> + * Redistribution and use in source and binary forms, with or without
>> + * modification, are permitted provided that the following conditions
>> + * are met:
>> + *
>> + * * Redistributions of source code must retain the above copyright
>> + * notice, this list of conditions and the following disclaimer.
>> + * * Redistributions in binary form must reproduce the above copyright
>> + * notice, this list of conditions and the following disclaimer in
>> + * the documentation and/or other materials provided with the
>> + * distribution.
>> + * * Neither the name of hxt-semitech nor the names of its
>> + * contributors may be used to endorse or promote products derived
>> + * from this software without specific prior written permission.
>> + *
>> + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
>> + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
>> + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
>> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
>> + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
>> + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
>> + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
>> + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
>> + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
>> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
>> + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
>> + */
>> +
>> +#ifndef _RTE_RING_GENERIC_H_
>> +#define _RTE_RING_GENERIC_H_
>> +
>> +static __rte_always_inline void
>> +update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t
>> new_val,
>> + uint32_t single, uint32_t enqueue)
>> +{
>> + if (enqueue)
>> + rte_smp_wmb();
>> + else
>> + rte_smp_rmb();
>> + /*
>> + * If there are other enqueues/dequeues in progress that preceded us,
>> + * we need to wait for them to complete
>> + */
>> + if (!single)
>> + while (unlikely(ht->tail != old_val))
>> + rte_pause();
>> +
>> + ht->tail = new_val;
>> +}
>> +
>> +/**
>> + * @internal This function updates the producer head for enqueue
>> + *
>> + * @param r
>> + * A pointer to the ring structure
>> + * @param is_sp
>> + * Indicates whether multi-producer path is needed or not
>> + * @param n
>> + * The number of elements we will want to enqueue, i.e. how far should
>> the
>> + * head be moved
>> + * @param behavior
>> + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
>> + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
>> + * @param old_head
>> + * Returns head value as it was before the move, i.e. where enqueue
>> starts
>> + * @param new_head
>> + * Returns the current/new head value i.e. where enqueue finishes
>> + * @param free_entries
>> + * Returns the amount of free space in the ring BEFORE head was moved
>> + * @return
>> + * Actual number of objects enqueued.
>> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
>> + */
>> +static __rte_always_inline unsigned int
>> +__rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
>> + unsigned int n, enum rte_ring_queue_behavior behavior,
>> + uint32_t *old_head, uint32_t *new_head,
>> + uint32_t *free_entries)
>> +{
>> + const uint32_t capacity = r->capacity;
>> + unsigned int max = n;
>> + int success;
>> +
>> + do {
>> + /* Reset n to the initial burst count */
>> + n = max;
>> +
>> + *old_head = r->prod.head;
> adding rte_smp_rmb() no harm here as it is NOOP for x86 and it is
> semantically correct too.
>
>
>> + const uint32_t cons_tail = r->cons.tail;
>> + /*
>> + * The subtraction is done between two unsigned 32bits value
>> + * (the result is always modulo 32 bits even if we have
>> + * *old_head > cons_tail). So 'free_entries' is always between 0
>> + * and capacity (which is < size).
>> + */
>> + *free_entries = (capacity + cons_tail - *old_head);
>> +
>> + /* check that we have enough room in ring */
>> + if (unlikely(n > *free_entries))
>> + n = (behavior == RTE_RING_QUEUE_FIXED) ?
>> + 0 : *free_entries;
>> +
>> + if (n == 0)
>> + return 0;
>> +
>> + *new_head = *old_head + n;
>> + if (is_sp)
>> + r->prod.head = *new_head, success = 1;
>> + else
>> + success = rte_atomic32_cmpset(&r->prod.head,
>> + *old_head, *new_head);
>> + } while (unlikely(success == 0));
>> + return n;
>> +}
>> +
>> +/**
>> + * @internal Enqueue several objects on the ring
>> + *
>> + * @param r
>> + * A pointer to the ring structure.
>> + * @param obj_table
>> + * A pointer to a table of void * pointers (objects).
>> + * @param n
>> + * The number of objects to add in the ring from the obj_table.
>> + * @param behavior
>> + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
>> + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
>> + * @param is_sp
>> + * Indicates whether to use single producer or multi-producer head update
>> + * @param free_space
>> + * returns the amount of space after the enqueue operation has finished
>> + * @return
>> + * Actual number of objects enqueued.
>> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
>> + */
>> +static __rte_always_inline unsigned int
>> +__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
>> + unsigned int n, enum rte_ring_queue_behavior behavior,
>> + int is_sp, unsigned int *free_space)
>> +{
> Duplicate function, No need to replicate on both versions.
>
>> +
>> +/**
>> + * @internal This function updates the consumer head for dequeue
>> + *
>> + * @param r
>> + * A pointer to the ring structure
>> + * @param is_sc
>> + * Indicates whether multi-consumer path is needed or not
>> + * @param n
>> + * The number of elements we will want to enqueue, i.e. how far should
>> the
>> + * head be moved
>> + * @param behavior
>> + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
>> + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
>> + * @param old_head
>> + * Returns head value as it was before the move, i.e. where dequeue
>> starts
>> + * @param new_head
>> + * Returns the current/new head value i.e. where dequeue finishes
>> + * @param entries
>> + * Returns the number of entries in the ring BEFORE head was moved
>> + * @return
>> + * - Actual number of objects dequeued.
>> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
>> + */
>> +static __rte_always_inline unsigned int
>> +__rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
>> + unsigned int n, enum rte_ring_queue_behavior behavior,
>> + uint32_t *old_head, uint32_t *new_head,
>> + uint32_t *entries)
>> +{
>> + unsigned int max = n;
>> + int success;
>> +
>> + /* move cons.head atomically */
>> + do {
>> + /* Restore n as it may change every loop */
>> + n = max;
>> +
>> + *old_head = r->cons.head;
>> + const uint32_t prod_tail = r->prod.tail;
> Same as above comment.
>
>> + /* The subtraction is done between two unsigned 32bits value
>> + * (the result is always modulo 32 bits even if we have
>> + * cons_head > prod_tail). So 'entries' is always between 0
>> + * and size(ring)-1. */
>> + *entries = (prod_tail - *old_head);
>> +
>> + /* Set the actual entries for dequeue */
>> + if (n > *entries)
>> + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
>> +
>> + if (unlikely(n == 0))
>> + return 0;
>> +
>> + *new_head = *old_head + n;
>> + if (is_sc)
>> + r->cons.head = *new_head, success = 1;
>> + else
>> + success = rte_atomic32_cmpset(&r->cons.head, *old_head,
>> + *new_head);
>> + } while (unlikely(success == 0));
>> + return n;
>> +}
>> +
>> +/**
>> + * @internal Dequeue several objects from the ring
>> + *
>> + * @param r
>> + * A pointer to the ring structure.
>> + * @param obj_table
>> + * A pointer to a table of void * pointers (objects).
>> + * @param n
>> + * The number of objects to pull from the ring.
>> + * @param behavior
>> + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
>> + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
>> + * @param is_sc
>> + * Indicates whether to use single consumer or multi-consumer head update
>> + * @param available
>> + * returns the number of remaining ring entries after the dequeue has
>> finished
>> + * @return
>> + * - Actual number of objects dequeued.
>> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
>> + */
>> +static __rte_always_inline unsigned int
>> +__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
>> + unsigned int n, enum rte_ring_queue_behavior behavior,
>> + int is_sc, unsigned int *available)
>> +{
> Duplicate function, No need to replicate on both versions.
--
Cheers,
Jia
^ permalink raw reply [flat|nested] 13+ messages in thread
end of thread, other threads:[~2017-11-08 2:31 UTC | newest]
Thread overview: 13+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2017-11-02 8:43 [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing Jia He
2017-11-02 13:26 ` Ananyev, Konstantin
2017-11-02 15:42 ` Jia He
2017-11-02 16:16 ` Ananyev, Konstantin
2017-11-02 17:00 ` Jerin Jacob
2017-11-02 17:23 ` Jerin Jacob
2017-11-03 1:46 ` Jia He
2017-11-03 12:56 ` Jerin Jacob
2017-11-06 7:25 ` Jia He
2017-11-07 4:36 ` Jerin Jacob
2017-11-07 8:34 ` Jia He
2017-11-07 9:57 ` Jerin Jacob
2017-11-08 2:31 ` Jia He
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).