* [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing @ 2017-11-02 8:43 Jia He 2017-11-02 13:26 ` Ananyev, Konstantin 2017-11-02 17:23 ` Jerin Jacob 0 siblings, 2 replies; 13+ messages in thread From: Jia He @ 2017-11-02 8:43 UTC (permalink / raw) To: jerin.jacob, dev, olivier.matz Cc: konstantin.ananyev, bruce.richardson, jianbo.liu, hemant.agrawal, Jia He, jie2.liu, bing.zhao, jia.he We watched a rte panic of mbuf_autotest in our qualcomm arm64 server. As for the possible race condition, please refer to [1]. Furthermore, there are 2 options as suggested by Jerin: 1. use rte_smp_rmb 2. use load_acquire/store_release(refer to [2]). CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by default it is n; The reason why providing 2 options is due to the performance benchmark difference in different arm machines, please refer to [3]. Already fuctionally tested on the machines as follows: on X86(passed the compilation) on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=y on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=n [1] http://dpdk.org/ml/archives/dev/2017-October/078366.html [2] https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h#L170 [3] http://dpdk.org/ml/archives/dev/2017-October/080861.html --- Changelog: V2: let users choose whether using load_acquire/store_release V1: rte_smp_rmb() between 2 loads Signed-off-by: Jia He <hejianet@gmail.com> Signed-off-by: jie2.liu@hxt-semitech.com Signed-off-by: bing.zhao@hxt-semitech.com Signed-off-by: jia.he@hxt-semitech.com Suggested-by: jerin.jacob@caviumnetworks.com --- lib/librte_ring/Makefile | 4 +++- lib/librte_ring/rte_ring.h | 38 ++++++++++++++++++++++++------ lib/librte_ring/rte_ring_arm64.h | 48 ++++++++++++++++++++++++++++++++++++++ lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++ 4 files changed, 127 insertions(+), 8 deletions(-) create mode 100644 lib/librte_ring/rte_ring_arm64.h create mode 100644 lib/librte_ring/rte_ring_generic.h diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index e34d9d9..fa57a86 100644 --- a/lib/librte_ring/Makefile +++ b/lib/librte_ring/Makefile @@ -45,6 +45,8 @@ LIBABIVER := 1 SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c # install includes -SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ + rte_ring_arm64.h \ + rte_ring_generic.h include $(RTE_SDK)/mk/rte.lib.mk diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index 5e9b3b7..943b1f9 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -103,6 +103,18 @@ extern "C" { #include <rte_memzone.h> #include <rte_pause.h> +/* In those strong memory models (e.g. x86), there is no need to add rmb() + * between load and load. + * In those weak models(powerpc/arm), there are 2 choices for the users + * 1.use rmb() memory barrier + * 2.use one-direcion load_acquire/store_release barrier + * It depends on performance test results. */ +#ifdef RTE_ARCH_ARM64 +#include "rte_ring_arm64.h" +#else +#include "rte_ring_generic.h" +#endif + #define RTE_TAILQ_RING_NAME "RTE_RING" enum rte_ring_queue_behavior { @@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, while (unlikely(ht->tail != old_val)) rte_pause(); - ht->tail = new_val; + arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE); } /** @@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp, /* Reset n to the initial burst count */ n = max; - *old_head = r->prod.head; + *old_head = arch_rte_atomic_load(&r->prod.head, + __ATOMIC_ACQUIRE); const uint32_t cons_tail = r->cons.tail; /* * The subtraction is done between two unsigned 32bits value @@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp, if (is_sp) r->prod.head = *new_head, success = 1; else - success = rte_atomic32_cmpset(&r->prod.head, - *old_head, *new_head); + success = arch_rte_atomic32_cmpset(&r->prod.head, + old_head, *new_head, + 0, __ATOMIC_ACQUIRE, + __ATOMIC_RELAXED); } while (unlikely(success == 0)); return n; } @@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, goto end; ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); + +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER rte_smp_wmb(); +#endif update_tail(&r->prod, prod_head, prod_next, is_sp); end: @@ -516,7 +534,8 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc, /* Restore n as it may change every loop */ n = max; - *old_head = r->cons.head; + *old_head = arch_rte_atomic_load(&r->cons.head, + __ATOMIC_ACQUIRE); const uint32_t prod_tail = r->prod.tail; /* The subtraction is done between two unsigned 32bits value * (the result is always modulo 32 bits even if we have @@ -535,8 +554,10 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc, if (is_sc) r->cons.head = *new_head, success = 1; else - success = rte_atomic32_cmpset(&r->cons.head, *old_head, - *new_head); + success = arch_rte_atomic32_cmpset(&r->cons.head, + old_head, *new_head, + 0, __ATOMIC_ACQUIRE, + __ATOMIC_RELAXED); } while (unlikely(success == 0)); return n; } @@ -575,7 +596,10 @@ __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, goto end; DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *); + +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER rte_smp_rmb(); +#endif update_tail(&r->cons, cons_head, cons_next, is_sc); diff --git a/lib/librte_ring/rte_ring_arm64.h b/lib/librte_ring/rte_ring_arm64.h new file mode 100644 index 0000000..09438e5 --- /dev/null +++ b/lib/librte_ring/rte_ring_arm64.h @@ -0,0 +1,48 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2017 hxt-semitech. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of hxt-semitech nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _RTE_RING_ARM64_H_ +#define _RTE_RING_ARM64_H_ + +#define arch_rte_atomic_store __atomic_store_n +#define arch_rte_atomic32_cmpset __atomic_compare_exchange_n +#ifdef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER +#define arch_rte_atomic_load __atomic_load_n +#else +#define arch_rte_atomic_load(a, b) \ + *(a); \ + rte_smp_rmb() + +#endif + +#endif /* _RTE_RING_ARM64_H_ */ + diff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_ring_generic.h new file mode 100644 index 0000000..4fb777c --- /dev/null +++ b/lib/librte_ring/rte_ring_generic.h @@ -0,0 +1,45 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2017 hxt-semitech. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of hxt-semitech nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _RTE_RING_GENERIC_H_ +#define _RTE_RING_GENERIC_H_ + +#define arch_rte_atomic_load(a, b) *(a) +#define arch_rte_atomic_store(a, b, c) \ +do { \ + *(a) = b; \ +} while(0) +#define arch_rte_atomic32_cmpset(a, b, c, d, e, f) \ + rte_atomic32_cmpset(a, *(b), c) + +#endif /* _RTE_RING_GENERIC_H_ */ + -- 2.7.4 ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing 2017-11-02 8:43 [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing Jia He @ 2017-11-02 13:26 ` Ananyev, Konstantin 2017-11-02 15:42 ` Jia He 2017-11-02 17:23 ` Jerin Jacob 1 sibling, 1 reply; 13+ messages in thread From: Ananyev, Konstantin @ 2017-11-02 13:26 UTC (permalink / raw) To: Jia He, jerin.jacob, dev, olivier.matz Cc: Richardson, Bruce, jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he Hi Jia, > -----Original Message----- > From: Jia He [mailto:hejianet@gmail.com] > Sent: Thursday, November 2, 2017 8:44 AM > To: jerin.jacob@caviumnetworks.com; dev@dpdk.org; olivier.matz@6wind.com > Cc: Ananyev, Konstantin <konstantin.ananyev@intel.com>; Richardson, Bruce <bruce.richardson@intel.com>; jianbo.liu@arm.com; > hemant.agrawal@nxp.com; Jia He <hejianet@gmail.com>; jie2.liu@hxt-semitech.com; bing.zhao@hxt-semitech.com; jia.he@hxt- > semitech.com > Subject: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing > > We watched a rte panic of mbuf_autotest in our qualcomm arm64 server. > As for the possible race condition, please refer to [1]. > > Furthermore, there are 2 options as suggested by Jerin: > 1. use rte_smp_rmb > 2. use load_acquire/store_release(refer to [2]). > CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by > default it is n; > > The reason why providing 2 options is due to the performance benchmark > difference in different arm machines, please refer to [3]. > > Already fuctionally tested on the machines as follows: > on X86(passed the compilation) > on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=y > on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=n > > [1] http://dpdk.org/ml/archives/dev/2017-October/078366.html > [2] https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h#L170 > [3] http://dpdk.org/ml/archives/dev/2017-October/080861.html > > --- > Changelog: > V2: let users choose whether using load_acquire/store_release > V1: rte_smp_rmb() between 2 loads > > Signed-off-by: Jia He <hejianet@gmail.com> > Signed-off-by: jie2.liu@hxt-semitech.com > Signed-off-by: bing.zhao@hxt-semitech.com > Signed-off-by: jia.he@hxt-semitech.com > Suggested-by: jerin.jacob@caviumnetworks.com > --- > lib/librte_ring/Makefile | 4 +++- > lib/librte_ring/rte_ring.h | 38 ++++++++++++++++++++++++------ > lib/librte_ring/rte_ring_arm64.h | 48 ++++++++++++++++++++++++++++++++++++++ > lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++ > 4 files changed, 127 insertions(+), 8 deletions(-) > create mode 100644 lib/librte_ring/rte_ring_arm64.h > create mode 100644 lib/librte_ring/rte_ring_generic.h > > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile > index e34d9d9..fa57a86 100644 > --- a/lib/librte_ring/Makefile > +++ b/lib/librte_ring/Makefile > @@ -45,6 +45,8 @@ LIBABIVER := 1 > SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c > > # install includes > -SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h > +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ > + rte_ring_arm64.h \ > + rte_ring_generic.h > > include $(RTE_SDK)/mk/rte.lib.mk > diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h > index 5e9b3b7..943b1f9 100644 > --- a/lib/librte_ring/rte_ring.h > +++ b/lib/librte_ring/rte_ring.h > @@ -103,6 +103,18 @@ extern "C" { > #include <rte_memzone.h> > #include <rte_pause.h> > > +/* In those strong memory models (e.g. x86), there is no need to add rmb() > + * between load and load. > + * In those weak models(powerpc/arm), there are 2 choices for the users > + * 1.use rmb() memory barrier > + * 2.use one-direcion load_acquire/store_release barrier > + * It depends on performance test results. */ > +#ifdef RTE_ARCH_ARM64 > +#include "rte_ring_arm64.h" > +#else > +#include "rte_ring_generic.h" > +#endif > + > #define RTE_TAILQ_RING_NAME "RTE_RING" > > enum rte_ring_queue_behavior { > @@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, > while (unlikely(ht->tail != old_val)) > rte_pause(); > > - ht->tail = new_val; > + arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE); > } > > /** > @@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp, > /* Reset n to the initial burst count */ > n = max; > > - *old_head = r->prod.head; > + *old_head = arch_rte_atomic_load(&r->prod.head, > + __ATOMIC_ACQUIRE); > const uint32_t cons_tail = r->cons.tail; The code starts to look a bit messy with all these arch specific macros... So I wonder wouldn't it be more cleaner to: 1. move existing __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail into rte_ring_generic.h 2. Add rte_smp_rmb into generic __rte_ring_move_prod_head/__rte_ring_move_cons_head (as was in v1 of your patch). 3. Introduce ARM specific versions of __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail in the rte_ring_arm64.h That way we will keep ogneric code simple and clean, while still allowing arch specific optimizations. > /* > * The subtraction is done between two unsigned 32bits value > @@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp, > if (is_sp) > r->prod.head = *new_head, success = 1; > else > - success = rte_atomic32_cmpset(&r->prod.head, > - *old_head, *new_head); > + success = arch_rte_atomic32_cmpset(&r->prod.head, > + old_head, *new_head, > + 0, __ATOMIC_ACQUIRE, > + __ATOMIC_RELAXED); > } while (unlikely(success == 0)); > return n; > } > @@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, > goto end; > > ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); > + > +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER I wonder why do we need that macro? Would be there situations when smp_wmb() are not needed here? Konstantin ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing 2017-11-02 13:26 ` Ananyev, Konstantin @ 2017-11-02 15:42 ` Jia He 2017-11-02 16:16 ` Ananyev, Konstantin 0 siblings, 1 reply; 13+ messages in thread From: Jia He @ 2017-11-02 15:42 UTC (permalink / raw) To: Ananyev, Konstantin, jerin.jacob, dev, olivier.matz Cc: Richardson, Bruce, jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he Hi Ananyev On 11/2/2017 9:26 PM, Ananyev, Konstantin Wrote: > Hi Jia, > >> -----Original Message----- >> From: Jia He [mailto:hejianet@gmail.com] >> Sent: Thursday, November 2, 2017 8:44 AM >> To: jerin.jacob@caviumnetworks.com; dev@dpdk.org; olivier.matz@6wind.com >> Cc: Ananyev, Konstantin <konstantin.ananyev@intel.com>; Richardson, Bruce <bruce.richardson@intel.com>; jianbo.liu@arm.com; >> hemant.agrawal@nxp.com; Jia He <hejianet@gmail.com>; jie2.liu@hxt-semitech.com; bing.zhao@hxt-semitech.com; jia.he@hxt- >> semitech.com >> Subject: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing >> >> We watched a rte panic of mbuf_autotest in our qualcomm arm64 server. >> As for the possible race condition, please refer to [1]. >> >> Furthermore, there are 2 options as suggested by Jerin: >> 1. use rte_smp_rmb >> 2. use load_acquire/store_release(refer to [2]). >> CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by >> default it is n; >> >> The reason why providing 2 options is due to the performance benchmark >> difference in different arm machines, please refer to [3]. >> >> Already fuctionally tested on the machines as follows: >> on X86(passed the compilation) >> on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=y >> on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=n >> >> [1] http://dpdk.org/ml/archives/dev/2017-October/078366.html >> [2] https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h#L170 >> [3] http://dpdk.org/ml/archives/dev/2017-October/080861.html >> >> --- >> Changelog: >> V2: let users choose whether using load_acquire/store_release >> V1: rte_smp_rmb() between 2 loads >> >> Signed-off-by: Jia He <hejianet@gmail.com> >> Signed-off-by: jie2.liu@hxt-semitech.com >> Signed-off-by: bing.zhao@hxt-semitech.com >> Signed-off-by: jia.he@hxt-semitech.com >> Suggested-by: jerin.jacob@caviumnetworks.com >> --- >> lib/librte_ring/Makefile | 4 +++- >> lib/librte_ring/rte_ring.h | 38 ++++++++++++++++++++++++------ >> lib/librte_ring/rte_ring_arm64.h | 48 ++++++++++++++++++++++++++++++++++++++ >> lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++ >> 4 files changed, 127 insertions(+), 8 deletions(-) >> create mode 100644 lib/librte_ring/rte_ring_arm64.h >> create mode 100644 lib/librte_ring/rte_ring_generic.h >> >> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile >> index e34d9d9..fa57a86 100644 >> --- a/lib/librte_ring/Makefile >> +++ b/lib/librte_ring/Makefile >> @@ -45,6 +45,8 @@ LIBABIVER := 1 >> SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c >> >> # install includes >> -SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h >> +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ >> + rte_ring_arm64.h \ >> + rte_ring_generic.h >> >> include $(RTE_SDK)/mk/rte.lib.mk >> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h >> index 5e9b3b7..943b1f9 100644 >> --- a/lib/librte_ring/rte_ring.h >> +++ b/lib/librte_ring/rte_ring.h >> @@ -103,6 +103,18 @@ extern "C" { >> #include <rte_memzone.h> >> #include <rte_pause.h> >> >> +/* In those strong memory models (e.g. x86), there is no need to add rmb() >> + * between load and load. >> + * In those weak models(powerpc/arm), there are 2 choices for the users >> + * 1.use rmb() memory barrier >> + * 2.use one-direcion load_acquire/store_release barrier >> + * It depends on performance test results. */ >> +#ifdef RTE_ARCH_ARM64 >> +#include "rte_ring_arm64.h" >> +#else >> +#include "rte_ring_generic.h" >> +#endif >> + >> #define RTE_TAILQ_RING_NAME "RTE_RING" >> >> enum rte_ring_queue_behavior { >> @@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, >> while (unlikely(ht->tail != old_val)) >> rte_pause(); >> >> - ht->tail = new_val; >> + arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE); >> } >> >> /** >> @@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp, >> /* Reset n to the initial burst count */ >> n = max; >> >> - *old_head = r->prod.head; >> + *old_head = arch_rte_atomic_load(&r->prod.head, >> + __ATOMIC_ACQUIRE); >> const uint32_t cons_tail = r->cons.tail; > The code starts to look a bit messy with all these arch specific macros... > So I wonder wouldn't it be more cleaner to: > > 1. move existing __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail > into rte_ring_generic.h > 2. Add rte_smp_rmb into generic __rte_ring_move_prod_head/__rte_ring_move_cons_head > (as was in v1 of your patch). > 3. Introduce ARM specific versions of __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail > in the rte_ring_arm64.h > > That way we will keep ogneric code simple and clean, while still allowing arch specific optimizations. Thanks for your review. But as per your suggestion, there will be at least 2 copies of __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail. Thus, if there are any bugs in the future, both 2 copies have to be changed, right? > >> /* >> * The subtraction is done between two unsigned 32bits value >> @@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp, >> if (is_sp) >> r->prod.head = *new_head, success = 1; >> else >> - success = rte_atomic32_cmpset(&r->prod.head, >> - *old_head, *new_head); >> + success = arch_rte_atomic32_cmpset(&r->prod.head, >> + old_head, *new_head, >> + 0, __ATOMIC_ACQUIRE, >> + __ATOMIC_RELAXED); >> } while (unlikely(success == 0)); >> return n; >> } >> @@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, >> goto end; >> >> ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); >> + >> +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER > I wonder why do we need that macro? > Would be there situations when smp_wmb() are not needed here? If the dpdk user chooses the config acquire/release, the store_release barrier in update_tail together with the load_acquire barrier pair in __rte_ring_move_{prod,cons}_head guarantee the order. So smp_wmb() is not required here. Please refer to the freebsd ring implementation and Jerin's debug patch. https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h https://github.com/jerinjacobk/mytests/blob/master/ring/0001-ring-using-c11-memory-model.patch --- Cheers, Jia > Konstantin > > -- Cheers, Jia ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing 2017-11-02 15:42 ` Jia He @ 2017-11-02 16:16 ` Ananyev, Konstantin 2017-11-02 17:00 ` Jerin Jacob 0 siblings, 1 reply; 13+ messages in thread From: Ananyev, Konstantin @ 2017-11-02 16:16 UTC (permalink / raw) To: Jia He, jerin.jacob, dev, olivier.matz Cc: Richardson, Bruce, jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he > -----Original Message----- > From: Jia He [mailto:hejianet@gmail.com] > Sent: Thursday, November 2, 2017 3:43 PM > To: Ananyev, Konstantin <konstantin.ananyev@intel.com>; jerin.jacob@caviumnetworks.com; dev@dpdk.org; olivier.matz@6wind.com > Cc: Richardson, Bruce <bruce.richardson@intel.com>; jianbo.liu@arm.com; hemant.agrawal@nxp.com; jie2.liu@hxt-semitech.com; > bing.zhao@hxt-semitech.com; jia.he@hxt-semitech.com > Subject: Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing > > Hi Ananyev > > > On 11/2/2017 9:26 PM, Ananyev, Konstantin Wrote: > > Hi Jia, > > > >> -----Original Message----- > >> From: Jia He [mailto:hejianet@gmail.com] > >> Sent: Thursday, November 2, 2017 8:44 AM > >> To: jerin.jacob@caviumnetworks.com; dev@dpdk.org; olivier.matz@6wind.com > >> Cc: Ananyev, Konstantin <konstantin.ananyev@intel.com>; Richardson, Bruce <bruce.richardson@intel.com>; jianbo.liu@arm.com; > >> hemant.agrawal@nxp.com; Jia He <hejianet@gmail.com>; jie2.liu@hxt-semitech.com; bing.zhao@hxt-semitech.com; jia.he@hxt- > >> semitech.com > >> Subject: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing > >> > >> We watched a rte panic of mbuf_autotest in our qualcomm arm64 server. > >> As for the possible race condition, please refer to [1]. > >> > >> Furthermore, there are 2 options as suggested by Jerin: > >> 1. use rte_smp_rmb > >> 2. use load_acquire/store_release(refer to [2]). > >> CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by > >> default it is n; > >> > >> The reason why providing 2 options is due to the performance benchmark > >> difference in different arm machines, please refer to [3]. > >> > >> Already fuctionally tested on the machines as follows: > >> on X86(passed the compilation) > >> on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=y > >> on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=n > >> > >> [1] http://dpdk.org/ml/archives/dev/2017-October/078366.html > >> [2] https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h#L170 > >> [3] http://dpdk.org/ml/archives/dev/2017-October/080861.html > >> > >> --- > >> Changelog: > >> V2: let users choose whether using load_acquire/store_release > >> V1: rte_smp_rmb() between 2 loads > >> > >> Signed-off-by: Jia He <hejianet@gmail.com> > >> Signed-off-by: jie2.liu@hxt-semitech.com > >> Signed-off-by: bing.zhao@hxt-semitech.com > >> Signed-off-by: jia.he@hxt-semitech.com > >> Suggested-by: jerin.jacob@caviumnetworks.com > >> --- > >> lib/librte_ring/Makefile | 4 +++- > >> lib/librte_ring/rte_ring.h | 38 ++++++++++++++++++++++++------ > >> lib/librte_ring/rte_ring_arm64.h | 48 ++++++++++++++++++++++++++++++++++++++ > >> lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++ > >> 4 files changed, 127 insertions(+), 8 deletions(-) > >> create mode 100644 lib/librte_ring/rte_ring_arm64.h > >> create mode 100644 lib/librte_ring/rte_ring_generic.h > >> > >> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile > >> index e34d9d9..fa57a86 100644 > >> --- a/lib/librte_ring/Makefile > >> +++ b/lib/librte_ring/Makefile > >> @@ -45,6 +45,8 @@ LIBABIVER := 1 > >> SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c > >> > >> # install includes > >> -SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h > >> +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ > >> + rte_ring_arm64.h \ > >> + rte_ring_generic.h > >> > >> include $(RTE_SDK)/mk/rte.lib.mk > >> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h > >> index 5e9b3b7..943b1f9 100644 > >> --- a/lib/librte_ring/rte_ring.h > >> +++ b/lib/librte_ring/rte_ring.h > >> @@ -103,6 +103,18 @@ extern "C" { > >> #include <rte_memzone.h> > >> #include <rte_pause.h> > >> > >> +/* In those strong memory models (e.g. x86), there is no need to add rmb() > >> + * between load and load. > >> + * In those weak models(powerpc/arm), there are 2 choices for the users > >> + * 1.use rmb() memory barrier > >> + * 2.use one-direcion load_acquire/store_release barrier > >> + * It depends on performance test results. */ > >> +#ifdef RTE_ARCH_ARM64 > >> +#include "rte_ring_arm64.h" > >> +#else > >> +#include "rte_ring_generic.h" > >> +#endif > >> + > >> #define RTE_TAILQ_RING_NAME "RTE_RING" > >> > >> enum rte_ring_queue_behavior { > >> @@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, > >> while (unlikely(ht->tail != old_val)) > >> rte_pause(); > >> > >> - ht->tail = new_val; > >> + arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE); > >> } > >> > >> /** > >> @@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp, > >> /* Reset n to the initial burst count */ > >> n = max; > >> > >> - *old_head = r->prod.head; > >> + *old_head = arch_rte_atomic_load(&r->prod.head, > >> + __ATOMIC_ACQUIRE); > >> const uint32_t cons_tail = r->cons.tail; > > The code starts to look a bit messy with all these arch specific macros... > > So I wonder wouldn't it be more cleaner to: > > > > 1. move existing __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail > > into rte_ring_generic.h > > 2. Add rte_smp_rmb into generic __rte_ring_move_prod_head/__rte_ring_move_cons_head > > (as was in v1 of your patch). > > 3. Introduce ARM specific versions of __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail > > in the rte_ring_arm64.h > > > > That way we will keep ogneric code simple and clean, while still allowing arch specific optimizations. > Thanks for your review. > But as per your suggestion, there will be at least 2 copies of > __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail. > Thus, if there are any bugs in the future, both 2 copies have to be > changed, right? Yes, there would be some code duplication. Though generic code would be much cleaner and simpler to read in that case. Which is worth some dups I think. > > > >> /* > >> * The subtraction is done between two unsigned 32bits value > >> @@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp, > >> if (is_sp) > >> r->prod.head = *new_head, success = 1; > >> else > >> - success = rte_atomic32_cmpset(&r->prod.head, > >> - *old_head, *new_head); > >> + success = arch_rte_atomic32_cmpset(&r->prod.head, > >> + old_head, *new_head, > >> + 0, __ATOMIC_ACQUIRE, > >> + __ATOMIC_RELAXED); > >> } while (unlikely(success == 0)); > >> return n; > >> } > >> @@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, > >> goto end; > >> > >> ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); > >> + > >> +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER > > I wonder why do we need that macro? > > Would be there situations when smp_wmb() are not needed here? > If the dpdk user chooses the config acquire/release, the store_release > barrier in update_tail together with > the load_acquire barrier pair in __rte_ring_move_{prod,cons}_head > guarantee the order. So smp_wmb() is not required here. Please refer to > the freebsd ring implementation and Jerin's debug patch. > https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h > https://github.com/jerinjacobk/mytests/blob/master/ring/0001-ring-using-c11-memory-model.patch Ok, so because you are doing arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE); inside update_tail() you don't need mb() anymore here... In that case, can we probably hide all that logic inside update_tail()? I.E. move sp_rmb/smp_wmb into update_tail() and then for arm64 you'll Have your special one with atomic_store() Something like that for generic version : static __rte_always_inline void update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, - uint32_t single) + uint32_t single, uint32_t enqueue) { + if (enqueue) + rte_smp_wmb(); + else + rte_smp_rmb(); /* * If there are other enqueues/dequeues in progress that preceded us, * we need to wait for them to complete */ if (!single) while (unlikely(ht->tail != old_val)) rte_pause(); ht->tail = new_val; } .... static __rte_always_inline unsigned int __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, unsigned int n, enum rte_ring_queue_behavior behavior, int is_sp, unsigned int *free_space) { ..... ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); - rte_smp_wmb(); - update_tail(&r->prod, prod_head, prod_next, is_sp); + update_tail(&r->prod, prod_head, prod_next, is_sp, 1); .... static __rte_always_inline unsigned int __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, unsigned int n, enum rte_ring_queue_behavior behavior, int is_sc, unsigned int *available) { .... DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *); - rte_smp_rmb(); - update_tail(&r->cons, cons_head, cons_next, is_sc); + update_tail(&r->cons, cons_head, cons_next, is_sc, 0); Konstantin > > --- > Cheers, > Jia > > Konstantin > > > > > > -- > Cheers, > Jia ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing 2017-11-02 16:16 ` Ananyev, Konstantin @ 2017-11-02 17:00 ` Jerin Jacob 0 siblings, 0 replies; 13+ messages in thread From: Jerin Jacob @ 2017-11-02 17:00 UTC (permalink / raw) To: Ananyev, Konstantin Cc: Jia He, dev, olivier.matz, Richardson, Bruce, jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he -----Original Message----- > Date: Thu, 2 Nov 2017 16:16:33 +0000 > From: "Ananyev, Konstantin" <konstantin.ananyev@intel.com> > To: Jia He <hejianet@gmail.com>, "jerin.jacob@caviumnetworks.com" > <jerin.jacob@caviumnetworks.com>, "dev@dpdk.org" <dev@dpdk.org>, > "olivier.matz@6wind.com" <olivier.matz@6wind.com> > CC: "Richardson, Bruce" <bruce.richardson@intel.com>, "jianbo.liu@arm.com" > <jianbo.liu@arm.com>, "hemant.agrawal@nxp.com" <hemant.agrawal@nxp.com>, > "jie2.liu@hxt-semitech.com" <jie2.liu@hxt-semitech.com>, > "bing.zhao@hxt-semitech.com" <bing.zhao@hxt-semitech.com>, > "jia.he@hxt-semitech.com" <jia.he@hxt-semitech.com> > Subject: RE: [PATCH v2] ring: guarantee ordering of cons/prod loading when > doing > > > > > -----Original Message----- > > From: Jia He [mailto:hejianet@gmail.com] > > Sent: Thursday, November 2, 2017 3:43 PM > > To: Ananyev, Konstantin <konstantin.ananyev@intel.com>; jerin.jacob@caviumnetworks.com; dev@dpdk.org; olivier.matz@6wind.com > > Cc: Richardson, Bruce <bruce.richardson@intel.com>; jianbo.liu@arm.com; hemant.agrawal@nxp.com; jie2.liu@hxt-semitech.com; > > bing.zhao@hxt-semitech.com; jia.he@hxt-semitech.com > > Subject: Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing > > > > Hi Ananyev > > > > > > On 11/2/2017 9:26 PM, Ananyev, Konstantin Wrote: > > > Hi Jia, > > > > > >> -----Original Message----- > > >> From: Jia He [mailto:hejianet@gmail.com] > > >> Sent: Thursday, November 2, 2017 8:44 AM > > >> To: jerin.jacob@caviumnetworks.com; dev@dpdk.org; olivier.matz@6wind.com > > >> Cc: Ananyev, Konstantin <konstantin.ananyev@intel.com>; Richardson, Bruce <bruce.richardson@intel.com>; jianbo.liu@arm.com; > > >> hemant.agrawal@nxp.com; Jia He <hejianet@gmail.com>; jie2.liu@hxt-semitech.com; bing.zhao@hxt-semitech.com; jia.he@hxt- > > >> semitech.com > > >> Subject: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing > > >> > > >> We watched a rte panic of mbuf_autotest in our qualcomm arm64 server. > > >> As for the possible race condition, please refer to [1]. > > >> > > >> Furthermore, there are 2 options as suggested by Jerin: > > >> 1. use rte_smp_rmb > > >> 2. use load_acquire/store_release(refer to [2]). > > >> CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by > > >> default it is n; > > >> > > >> The reason why providing 2 options is due to the performance benchmark > > >> difference in different arm machines, please refer to [3]. > > >> > > >> Already fuctionally tested on the machines as follows: > > >> on X86(passed the compilation) > > >> on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=y > > >> on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=n > > >> > > >> [1] http://dpdk.org/ml/archives/dev/2017-October/078366.html > > >> [2] https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h#L170 > > >> [3] http://dpdk.org/ml/archives/dev/2017-October/080861.html > > >> > > >> --- > > >> Changelog: > > >> V2: let users choose whether using load_acquire/store_release > > >> V1: rte_smp_rmb() between 2 loads > > >> > > >> Signed-off-by: Jia He <hejianet@gmail.com> > > >> Signed-off-by: jie2.liu@hxt-semitech.com > > >> Signed-off-by: bing.zhao@hxt-semitech.com > > >> Signed-off-by: jia.he@hxt-semitech.com > > >> Suggested-by: jerin.jacob@caviumnetworks.com > > >> --- > > >> lib/librte_ring/Makefile | 4 +++- > > >> lib/librte_ring/rte_ring.h | 38 ++++++++++++++++++++++++------ > > >> lib/librte_ring/rte_ring_arm64.h | 48 ++++++++++++++++++++++++++++++++++++++ > > >> lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++ > > >> 4 files changed, 127 insertions(+), 8 deletions(-) > > >> create mode 100644 lib/librte_ring/rte_ring_arm64.h > > >> create mode 100644 lib/librte_ring/rte_ring_generic.h > > >> > > >> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile > > >> index e34d9d9..fa57a86 100644 > > >> --- a/lib/librte_ring/Makefile > > >> +++ b/lib/librte_ring/Makefile > > >> @@ -45,6 +45,8 @@ LIBABIVER := 1 > > >> SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c > > >> > > >> # install includes > > >> -SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h > > >> +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ > > >> + rte_ring_arm64.h \ > > >> + rte_ring_generic.h > > >> > > >> include $(RTE_SDK)/mk/rte.lib.mk > > >> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h > > >> index 5e9b3b7..943b1f9 100644 > > >> --- a/lib/librte_ring/rte_ring.h > > >> +++ b/lib/librte_ring/rte_ring.h > > >> @@ -103,6 +103,18 @@ extern "C" { > > >> #include <rte_memzone.h> > > >> #include <rte_pause.h> > > >> > > >> +/* In those strong memory models (e.g. x86), there is no need to add rmb() > > >> + * between load and load. > > >> + * In those weak models(powerpc/arm), there are 2 choices for the users > > >> + * 1.use rmb() memory barrier > > >> + * 2.use one-direcion load_acquire/store_release barrier > > >> + * It depends on performance test results. */ > > >> +#ifdef RTE_ARCH_ARM64 > > >> +#include "rte_ring_arm64.h" > > >> +#else > > >> +#include "rte_ring_generic.h" > > >> +#endif > > >> + > > >> #define RTE_TAILQ_RING_NAME "RTE_RING" > > >> > > >> enum rte_ring_queue_behavior { > > >> @@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, > > >> while (unlikely(ht->tail != old_val)) > > >> rte_pause(); > > >> > > >> - ht->tail = new_val; > > >> + arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE); > > >> } > > >> > > >> /** > > >> @@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp, > > >> /* Reset n to the initial burst count */ > > >> n = max; > > >> > > >> - *old_head = r->prod.head; > > >> + *old_head = arch_rte_atomic_load(&r->prod.head, > > >> + __ATOMIC_ACQUIRE); > > >> const uint32_t cons_tail = r->cons.tail; > > > The code starts to look a bit messy with all these arch specific macros... > > > So I wonder wouldn't it be more cleaner to: > > > > > > 1. move existing __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail > > > into rte_ring_generic.h > > > 2. Add rte_smp_rmb into generic __rte_ring_move_prod_head/__rte_ring_move_cons_head > > > (as was in v1 of your patch). > > > 3. Introduce ARM specific versions of __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail > > > in the rte_ring_arm64.h > > > > > > That way we will keep ogneric code simple and clean, while still allowing arch specific optimizations. > > Thanks for your review. > > But as per your suggestion, there will be at least 2 copies of > > __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail. > > Thus, if there are any bugs in the future, both 2 copies have to be > > changed, right? > > Yes, there would be some code duplication. > Though generic code would be much cleaner and simpler to read in that case. > Which is worth some dups I think. I agree with Konstantin here. ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing 2017-11-02 8:43 [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing Jia He 2017-11-02 13:26 ` Ananyev, Konstantin @ 2017-11-02 17:23 ` Jerin Jacob 2017-11-03 1:46 ` Jia He 1 sibling, 1 reply; 13+ messages in thread From: Jerin Jacob @ 2017-11-02 17:23 UTC (permalink / raw) To: Jia He Cc: dev, olivier.matz, konstantin.ananyev, bruce.richardson, jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he -----Original Message----- > Date: Thu, 2 Nov 2017 08:43:30 +0000 > From: Jia He <hejianet@gmail.com> > To: jerin.jacob@caviumnetworks.com, dev@dpdk.org, olivier.matz@6wind.com > Cc: konstantin.ananyev@intel.com, bruce.richardson@intel.com, > jianbo.liu@arm.com, hemant.agrawal@nxp.com, Jia He <hejianet@gmail.com>, > jie2.liu@hxt-semitech.com, bing.zhao@hxt-semitech.com, > jia.he@hxt-semitech.com > Subject: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing > X-Mailer: git-send-email 2.7.4 > > We watched a rte panic of mbuf_autotest in our qualcomm arm64 server. > As for the possible race condition, please refer to [1]. Hi Jia, In addition to Konstantin comments. Please find below some review comments. > > Furthermore, there are 2 options as suggested by Jerin: > 1. use rte_smp_rmb > 2. use load_acquire/store_release(refer to [2]). > CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by I think, The better name would be CONFIG_RTE_RING_USE_C11_MEM_MODEL or something like that. > default it is n; > > --- > Changelog: > V2: let users choose whether using load_acquire/store_release > V1: rte_smp_rmb() between 2 loads > > Signed-off-by: Jia He <hejianet@gmail.com> > Signed-off-by: jie2.liu@hxt-semitech.com > Signed-off-by: bing.zhao@hxt-semitech.com > Signed-off-by: jia.he@hxt-semitech.com > Suggested-by: jerin.jacob@caviumnetworks.com > --- > lib/librte_ring/Makefile | 4 +++- > lib/librte_ring/rte_ring.h | 38 ++++++++++++++++++++++++------ > lib/librte_ring/rte_ring_arm64.h | 48 ++++++++++++++++++++++++++++++++++++++ > lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++ > 4 files changed, 127 insertions(+), 8 deletions(-) > create mode 100644 lib/librte_ring/rte_ring_arm64.h > create mode 100644 lib/librte_ring/rte_ring_generic.h > > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile > index e34d9d9..fa57a86 100644 > --- a/lib/librte_ring/Makefile > +++ b/lib/librte_ring/Makefile > @@ -45,6 +45,8 @@ LIBABIVER := 1 > SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c > > # install includes > -SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h > +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ > + rte_ring_arm64.h \ It is really not specific to arm64. We could rename it to rte_ring_c11_mem.h or something like that to reflect the implementation based on c11 memory model. > + rte_ring_generic.h > > include $(RTE_SDK)/mk/rte.lib.mk > diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h > index 5e9b3b7..943b1f9 100644 > --- a/lib/librte_ring/rte_ring.h > +++ b/lib/librte_ring/rte_ring.h > @@ -103,6 +103,18 @@ extern "C" { > #include <rte_memzone.h> > #include <rte_pause.h> > > +/* In those strong memory models (e.g. x86), there is no need to add rmb() > + * between load and load. > + * In those weak models(powerpc/arm), there are 2 choices for the users > + * 1.use rmb() memory barrier > + * 2.use one-direcion load_acquire/store_release barrier > + * It depends on performance test results. */ > +#ifdef RTE_ARCH_ARM64 s/RTE_ARCH_ARM64/RTE_RING_USE_C11_MEM_MODEL and update the generic arm64 config. By that way it can used by another architecture like ppc if they choose to do so. > +#include "rte_ring_arm64.h" > +#else > +#include "rte_ring_generic.h" > +#endif > + > #define RTE_TAILQ_RING_NAME "RTE_RING" > > enum rte_ring_queue_behavior { > @@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, > while (unlikely(ht->tail != old_val)) > rte_pause(); > > - ht->tail = new_val; > + arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE); > } > > /** > @@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp, > /* Reset n to the initial burst count */ > n = max; > > - *old_head = r->prod.head; > + *old_head = arch_rte_atomic_load(&r->prod.head, > + __ATOMIC_ACQUIRE); Same as Konstantin comments, i.e move to this function to c11 memory model header file > const uint32_t cons_tail = r->cons.tail; > /* > * The subtraction is done between two unsigned 32bits value > @@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp, > if (is_sp) > r->prod.head = *new_head, success = 1; > else > - success = rte_atomic32_cmpset(&r->prod.head, > - *old_head, *new_head); > + success = arch_rte_atomic32_cmpset(&r->prod.head, > + old_head, *new_head, > + 0, __ATOMIC_ACQUIRE, > + __ATOMIC_RELAXED); > } while (unlikely(success == 0)); > return n; > } > @@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, > goto end; > > ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); > + > +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER This #define will be removed if the function moves. > rte_smp_wmb(); > +#endif > > update_tail(&r->prod, prod_head, prod_next, is_sp); > end: > @@ -516,7 +534,8 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc, > /* Restore n as it may change every loop */ > n = max; > > - *old_head = r->cons.head; > + *old_head = arch_rte_atomic_load(&r->cons.head, > + __ATOMIC_ACQUIRE); > const uint32_t prod_tail = r->prod.tail; > /* The subtraction is done between two unsigned 32bits value > * (the result is always modulo 32 bits even if we have > @@ -535,8 +554,10 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc, > if (is_sc) > r->cons.head = *new_head, success = 1; > else > - success = rte_atomic32_cmpset(&r->cons.head, *old_head, > - *new_head); > + success = arch_rte_atomic32_cmpset(&r->cons.head, > + old_head, *new_head, > + 0, __ATOMIC_ACQUIRE, > + __ATOMIC_RELAXED); > } while (unlikely(success == 0)); > return n; > } > @@ -575,7 +596,10 @@ __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, > goto end; > > DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *); > + > +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER This #define will be removed if the function moves. > rte_smp_rmb(); > +#endif ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing 2017-11-02 17:23 ` Jerin Jacob @ 2017-11-03 1:46 ` Jia He 2017-11-03 12:56 ` Jerin Jacob 0 siblings, 1 reply; 13+ messages in thread From: Jia He @ 2017-11-03 1:46 UTC (permalink / raw) To: Jerin Jacob Cc: dev, olivier.matz, konstantin.ananyev, bruce.richardson, jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he Hi Jerin On 11/3/2017 1:23 AM, Jerin Jacob Wrote: > -----Original Message----- >> Date: Thu, 2 Nov 2017 08:43:30 +0000 >> From: Jia He <hejianet@gmail.com> >> To: jerin.jacob@caviumnetworks.com, dev@dpdk.org, olivier.matz@6wind.com >> Cc: konstantin.ananyev@intel.com, bruce.richardson@intel.com, >> jianbo.liu@arm.com, hemant.agrawal@nxp.com, Jia He <hejianet@gmail.com>, >> jie2.liu@hxt-semitech.com, bing.zhao@hxt-semitech.com, >> jia.he@hxt-semitech.com >> Subject: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing >> X-Mailer: git-send-email 2.7.4 >> >> We watched a rte panic of mbuf_autotest in our qualcomm arm64 server. >> As for the possible race condition, please refer to [1]. > Hi Jia, > > In addition to Konstantin comments. Please find below some review > comments. >> Furthermore, there are 2 options as suggested by Jerin: >> 1. use rte_smp_rmb >> 2. use load_acquire/store_release(refer to [2]). >> CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by > I think, The better name would be CONFIG_RTE_RING_USE_C11_MEM_MODEL > or something like that. Ok, but how to distinguish following 2 options? CONFIG_RTE_RING_USE_C11_MEM_MODEL doesn't seem to be enough 1. use rte_smp_rmb 2. use load_acquire/store_release(refer to [2]). >> default it is n; >> >> --- >> Changelog: >> V2: let users choose whether using load_acquire/store_release >> V1: rte_smp_rmb() between 2 loads >> >> Signed-off-by: Jia He <hejianet@gmail.com> >> Signed-off-by: jie2.liu@hxt-semitech.com >> Signed-off-by: bing.zhao@hxt-semitech.com >> Signed-off-by: jia.he@hxt-semitech.com >> Suggested-by: jerin.jacob@caviumnetworks.com >> --- >> lib/librte_ring/Makefile | 4 +++- >> lib/librte_ring/rte_ring.h | 38 ++++++++++++++++++++++++------ >> lib/librte_ring/rte_ring_arm64.h | 48 ++++++++++++++++++++++++++++++++++++++ >> lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++ >> 4 files changed, 127 insertions(+), 8 deletions(-) >> create mode 100644 lib/librte_ring/rte_ring_arm64.h >> create mode 100644 lib/librte_ring/rte_ring_generic.h >> >> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile >> index e34d9d9..fa57a86 100644 >> --- a/lib/librte_ring/Makefile >> +++ b/lib/librte_ring/Makefile >> @@ -45,6 +45,8 @@ LIBABIVER := 1 >> SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c >> >> # install includes >> -SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h >> +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ >> + rte_ring_arm64.h \ > It is really not specific to arm64. We could rename it to rte_ring_c11_mem.h or > something like that to reflect the implementation based on c11 memory > model. > > >> + rte_ring_generic.h >> >> include $(RTE_SDK)/mk/rte.lib.mk >> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h >> index 5e9b3b7..943b1f9 100644 >> --- a/lib/librte_ring/rte_ring.h >> +++ b/lib/librte_ring/rte_ring.h >> @@ -103,6 +103,18 @@ extern "C" { >> #include <rte_memzone.h> >> #include <rte_pause.h> >> >> +/* In those strong memory models (e.g. x86), there is no need to add rmb() >> + * between load and load. >> + * In those weak models(powerpc/arm), there are 2 choices for the users >> + * 1.use rmb() memory barrier >> + * 2.use one-direcion load_acquire/store_release barrier >> + * It depends on performance test results. */ >> +#ifdef RTE_ARCH_ARM64 > s/RTE_ARCH_ARM64/RTE_RING_USE_C11_MEM_MODEL and update the generic arm64 config. > By that way it can used by another architecture like ppc if they choose to do so. > > >> +#include "rte_ring_arm64.h" >> +#else >> +#include "rte_ring_generic.h" >> +#endif >> + >> #define RTE_TAILQ_RING_NAME "RTE_RING" >> >> enum rte_ring_queue_behavior { >> @@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, >> while (unlikely(ht->tail != old_val)) >> rte_pause(); >> >> - ht->tail = new_val; >> + arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE); >> } >> >> /** >> @@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp, >> /* Reset n to the initial burst count */ >> n = max; >> >> - *old_head = r->prod.head; >> + *old_head = arch_rte_atomic_load(&r->prod.head, >> + __ATOMIC_ACQUIRE); > Same as Konstantin comments, i.e move to this function to c11 memory model > header file > >> const uint32_t cons_tail = r->cons.tail; >> /* >> * The subtraction is done between two unsigned 32bits value >> @@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp, >> if (is_sp) >> r->prod.head = *new_head, success = 1; >> else >> - success = rte_atomic32_cmpset(&r->prod.head, >> - *old_head, *new_head); >> + success = arch_rte_atomic32_cmpset(&r->prod.head, >> + old_head, *new_head, >> + 0, __ATOMIC_ACQUIRE, >> + __ATOMIC_RELAXED); >> } while (unlikely(success == 0)); >> return n; >> } >> @@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, >> goto end; >> >> ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); >> + >> +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER > This #define will be removed if the function moves. > >> rte_smp_wmb(); >> +#endif >> >> update_tail(&r->prod, prod_head, prod_next, is_sp); >> end: >> @@ -516,7 +534,8 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc, >> /* Restore n as it may change every loop */ >> n = max; >> >> - *old_head = r->cons.head; >> + *old_head = arch_rte_atomic_load(&r->cons.head, >> + __ATOMIC_ACQUIRE); >> const uint32_t prod_tail = r->prod.tail; >> /* The subtraction is done between two unsigned 32bits value >> * (the result is always modulo 32 bits even if we have >> @@ -535,8 +554,10 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc, >> if (is_sc) >> r->cons.head = *new_head, success = 1; >> else >> - success = rte_atomic32_cmpset(&r->cons.head, *old_head, >> - *new_head); >> + success = arch_rte_atomic32_cmpset(&r->cons.head, >> + old_head, *new_head, >> + 0, __ATOMIC_ACQUIRE, >> + __ATOMIC_RELAXED); >> } while (unlikely(success == 0)); >> return n; >> } >> @@ -575,7 +596,10 @@ __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, >> goto end; >> >> DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *); >> + >> +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER > This #define will be removed if the function moves. > >> rte_smp_rmb(); >> +#endif -- Cheers, Jia ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing 2017-11-03 1:46 ` Jia He @ 2017-11-03 12:56 ` Jerin Jacob 2017-11-06 7:25 ` Jia He 0 siblings, 1 reply; 13+ messages in thread From: Jerin Jacob @ 2017-11-03 12:56 UTC (permalink / raw) To: Jia He Cc: dev, olivier.matz, konstantin.ananyev, bruce.richardson, jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he -----Original Message----- > Date: Fri, 3 Nov 2017 09:46:40 +0800 > From: Jia He <hejianet@gmail.com> > To: Jerin Jacob <jerin.jacob@caviumnetworks.com> > Cc: dev@dpdk.org, olivier.matz@6wind.com, konstantin.ananyev@intel.com, > bruce.richardson@intel.com, jianbo.liu@arm.com, hemant.agrawal@nxp.com, > jie2.liu@hxt-semitech.com, bing.zhao@hxt-semitech.com, > jia.he@hxt-semitech.com > Subject: Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when > doing > User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101 > Thunderbird/52.4.0 > > Hi Jerin > > > On 11/3/2017 1:23 AM, Jerin Jacob Wrote: > > -----Original Message----- > > > Date: Thu, 2 Nov 2017 08:43:30 +0000 > > > From: Jia He <hejianet@gmail.com> > > > To: jerin.jacob@caviumnetworks.com, dev@dpdk.org, olivier.matz@6wind.com > > > Cc: konstantin.ananyev@intel.com, bruce.richardson@intel.com, > > > jianbo.liu@arm.com, hemant.agrawal@nxp.com, Jia He <hejianet@gmail.com>, > > > jie2.liu@hxt-semitech.com, bing.zhao@hxt-semitech.com, > > > jia.he@hxt-semitech.com > > > Subject: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing > > > X-Mailer: git-send-email 2.7.4 > > > > > > We watched a rte panic of mbuf_autotest in our qualcomm arm64 server. > > > As for the possible race condition, please refer to [1]. > > Hi Jia, > > > > In addition to Konstantin comments. Please find below some review > > comments. > > > Furthermore, there are 2 options as suggested by Jerin: > > > 1. use rte_smp_rmb > > > 2. use load_acquire/store_release(refer to [2]). > > > CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by > > I think, The better name would be CONFIG_RTE_RING_USE_C11_MEM_MODEL > > or something like that. > Ok, but how to distinguish following 2 options? No clearly understood this question. For arm64 case, you can add CONFIG_RTE_RING_USE_C11_MEM_MODEL=y in config/defconfig_arm64-armv8a-* > > CONFIG_RTE_RING_USE_C11_MEM_MODEL doesn't seem to be enough > > 1. use rte_smp_rmb > 2. use load_acquire/store_release(refer to [2]). > > > > default it is n; > > > > > > --- > > > Changelog: > > > V2: let users choose whether using load_acquire/store_release > > > V1: rte_smp_rmb() between 2 loads > > > > > > Signed-off-by: Jia He <hejianet@gmail.com> > > > Signed-off-by: jie2.liu@hxt-semitech.com > > > Signed-off-by: bing.zhao@hxt-semitech.com > > > Signed-off-by: jia.he@hxt-semitech.com > > > Suggested-by: jerin.jacob@caviumnetworks.com > > > --- > > > lib/librte_ring/Makefile | 4 +++- > > > lib/librte_ring/rte_ring.h | 38 ++++++++++++++++++++++++------ > > > lib/librte_ring/rte_ring_arm64.h | 48 ++++++++++++++++++++++++++++++++++++++ > > > lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++ > > > 4 files changed, 127 insertions(+), 8 deletions(-) > > > create mode 100644 lib/librte_ring/rte_ring_arm64.h > > > create mode 100644 lib/librte_ring/rte_ring_generic.h > > > > > > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile > > > index e34d9d9..fa57a86 100644 > > > --- a/lib/librte_ring/Makefile > > > +++ b/lib/librte_ring/Makefile > > > @@ -45,6 +45,8 @@ LIBABIVER := 1 > > > SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c > > > # install includes > > > -SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h > > > +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ > > > + rte_ring_arm64.h \ > > It is really not specific to arm64. We could rename it to rte_ring_c11_mem.h or > > something like that to reflect the implementation based on c11 memory > > model. > > > > > > > + rte_ring_generic.h > > > include $(RTE_SDK)/mk/rte.lib.mk > > > diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h > > > index 5e9b3b7..943b1f9 100644 > > > --- a/lib/librte_ring/rte_ring.h > > > +++ b/lib/librte_ring/rte_ring.h > > > @@ -103,6 +103,18 @@ extern "C" { > > > #include <rte_memzone.h> > > > #include <rte_pause.h> > > > +/* In those strong memory models (e.g. x86), there is no need to add rmb() > > > + * between load and load. > > > + * In those weak models(powerpc/arm), there are 2 choices for the users > > > + * 1.use rmb() memory barrier > > > + * 2.use one-direcion load_acquire/store_release barrier > > > + * It depends on performance test results. */ > > > +#ifdef RTE_ARCH_ARM64 > > s/RTE_ARCH_ARM64/RTE_RING_USE_C11_MEM_MODEL and update the generic arm64 config. > > By that way it can used by another architecture like ppc if they choose to do so. > > > > > > > +#include "rte_ring_arm64.h" > > > +#else > > > +#include "rte_ring_generic.h" > > > +#endif > > > + > > > #define RTE_TAILQ_RING_NAME "RTE_RING" > > > enum rte_ring_queue_behavior { > > > @@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, > > > while (unlikely(ht->tail != old_val)) > > > rte_pause(); > > > - ht->tail = new_val; > > > + arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE); > > > } > > > /** > > > @@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp, > > > /* Reset n to the initial burst count */ > > > n = max; > > > - *old_head = r->prod.head; > > > + *old_head = arch_rte_atomic_load(&r->prod.head, > > > + __ATOMIC_ACQUIRE); > > Same as Konstantin comments, i.e move to this function to c11 memory model > > header file > > > > > const uint32_t cons_tail = r->cons.tail; > > > /* > > > * The subtraction is done between two unsigned 32bits value > > > @@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp, > > > if (is_sp) > > > r->prod.head = *new_head, success = 1; > > > else > > > - success = rte_atomic32_cmpset(&r->prod.head, > > > - *old_head, *new_head); > > > + success = arch_rte_atomic32_cmpset(&r->prod.head, > > > + old_head, *new_head, > > > + 0, __ATOMIC_ACQUIRE, > > > + __ATOMIC_RELAXED); > > > } while (unlikely(success == 0)); > > > return n; > > > } > > > @@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, > > > goto end; > > > ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); > > > + > > > +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER > > This #define will be removed if the function moves. > > > > > rte_smp_wmb(); > > > +#endif > > > update_tail(&r->prod, prod_head, prod_next, is_sp); > > > end: > > > @@ -516,7 +534,8 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc, > > > /* Restore n as it may change every loop */ > > > n = max; > > > - *old_head = r->cons.head; > > > + *old_head = arch_rte_atomic_load(&r->cons.head, > > > + __ATOMIC_ACQUIRE); > > > const uint32_t prod_tail = r->prod.tail; > > > /* The subtraction is done between two unsigned 32bits value > > > * (the result is always modulo 32 bits even if we have > > > @@ -535,8 +554,10 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc, > > > if (is_sc) > > > r->cons.head = *new_head, success = 1; > > > else > > > - success = rte_atomic32_cmpset(&r->cons.head, *old_head, > > > - *new_head); > > > + success = arch_rte_atomic32_cmpset(&r->cons.head, > > > + old_head, *new_head, > > > + 0, __ATOMIC_ACQUIRE, > > > + __ATOMIC_RELAXED); > > > } while (unlikely(success == 0)); > > > return n; > > > } > > > @@ -575,7 +596,10 @@ __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, > > > goto end; > > > DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *); > > > + > > > +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER > > This #define will be removed if the function moves. > > > > > rte_smp_rmb(); > > > +#endif > > -- > Cheers, > Jia > ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing 2017-11-03 12:56 ` Jerin Jacob @ 2017-11-06 7:25 ` Jia He 2017-11-07 4:36 ` Jerin Jacob 0 siblings, 1 reply; 13+ messages in thread From: Jia He @ 2017-11-06 7:25 UTC (permalink / raw) To: Jerin Jacob Cc: dev, olivier.matz, konstantin.ananyev, bruce.richardson, jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he Hi Jerin On 11/3/2017 8:56 PM, Jerin Jacob Wrote: > -----Original Message----- >> [...] >> g like that. >> Ok, but how to distinguish following 2 options? > No clearly understood this question. For arm64 case, you can add > CONFIG_RTE_RING_USE_C11_MEM_MODEL=y in config/defconfig_arm64-armv8a-* Sorry for my unclear expressions. I mean there should be one additional config macro besides CONFIG_RTE_RING_USE_C11_MEM_MODEL for users to choose? i.e. - On X86:CONFIG_RTE_RING_USE_C11_MEM_MODEL=n include rte_ring_generic.h, no changes - On arm64,CONFIG_RTE_RING_USE_C11_MEM_MODEL=y include rte_ring_c11_mem.h by default. In rte_ring_c11_mem.h, implement new version of __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail Then, how to distinguish the option of using rte_smp_rmb() or __atomic_load/store_n()? Thanks for the clarification. -- Cheers, Jia ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing 2017-11-06 7:25 ` Jia He @ 2017-11-07 4:36 ` Jerin Jacob 2017-11-07 8:34 ` Jia He 0 siblings, 1 reply; 13+ messages in thread From: Jerin Jacob @ 2017-11-07 4:36 UTC (permalink / raw) To: Jia He Cc: dev, olivier.matz, konstantin.ananyev, bruce.richardson, jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he -----Original Message----- > Date: Mon, 6 Nov 2017 15:25:12 +0800 > From: Jia He <hejianet@gmail.com> > To: Jerin Jacob <jerin.jacob@caviumnetworks.com> > Cc: dev@dpdk.org, olivier.matz@6wind.com, konstantin.ananyev@intel.com, > bruce.richardson@intel.com, jianbo.liu@arm.com, hemant.agrawal@nxp.com, > jie2.liu@hxt-semitech.com, bing.zhao@hxt-semitech.com, > jia.he@hxt-semitech.com > Subject: Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when > doing > User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101 > Thunderbird/52.4.0 > > Hi Jerin > > > On 11/3/2017 8:56 PM, Jerin Jacob Wrote: > > -----Original Message----- > > > > [...] > > > g like that. > > > Ok, but how to distinguish following 2 options? > > No clearly understood this question. For arm64 case, you can add > > CONFIG_RTE_RING_USE_C11_MEM_MODEL=y in config/defconfig_arm64-armv8a-* > Sorry for my unclear expressions. > I mean there should be one additional config macro besides > CONFIG_RTE_RING_USE_C11_MEM_MODEL > for users to choose? > > i.e. > - On X86:CONFIG_RTE_RING_USE_C11_MEM_MODEL=n > include rte_ring_generic.h, no changes > - On arm64,CONFIG_RTE_RING_USE_C11_MEM_MODEL=y > include rte_ring_c11_mem.h by default. > In rte_ring_c11_mem.h, implement new version of > __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail > > Then, how to distinguish the option of using rte_smp_rmb() or > __atomic_load/store_n()? On option could be to change the prototype of update_tail() and make compiler accommodate it for zero cost for arm64(Which I think, it it the case. But you can check the generated instructions) If not, move, __rte_ring_do_dequeue() and __rte_ring_do_enqueue() instead of __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail() ➜ [master][dpdk.org] $ git diff diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index 5e9b3b7b4..b32648825 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -358,8 +358,12 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r); static __rte_always_inline void update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, - uint32_t single) + uint32_t single, const uint32_t enqueue) { + if (enqueue) + rte_smp_wmb(); + else + rte_smp_rmb(); /* * If there are other enqueues/dequeues in progress that * preceded us, * we need to wait for them to complete @@ -470,9 +474,8 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, goto end; ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); - rte_smp_wmb(); - update_tail(&r->prod, prod_head, prod_next, is_sp); + update_tail(&r->prod, prod_head, prod_next, is_sp, 1); end: if (free_space != NULL) *free_space = free_entries - n; @@ -575,9 +578,8 @@ __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, goto end; DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *); - rte_smp_rmb(); - update_tail(&r->cons, cons_head, cons_next, is_sc); + update_tail(&r->cons, cons_head, cons_next, is_sc, 0); end: if (available != NULL) > > Thanks for the clarification. > > -- > Cheers, > Jia > ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing 2017-11-07 4:36 ` Jerin Jacob @ 2017-11-07 8:34 ` Jia He 2017-11-07 9:57 ` Jerin Jacob 0 siblings, 1 reply; 13+ messages in thread From: Jia He @ 2017-11-07 8:34 UTC (permalink / raw) To: Jerin Jacob Cc: dev, olivier.matz, konstantin.ananyev, bruce.richardson, jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he On 11/7/2017 12:36 PM, Jerin Jacob Wrote: > -----Original Message----- > > On option could be to change the prototype of update_tail() and make > compiler accommodate it for zero cost for arm64(Which I think, it it the > case. But you can check the generated instructions) > If not, move, __rte_ring_do_dequeue() and __rte_ring_do_enqueue() instead of > __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail() > > > ➜ [master][dpdk.org] $ git diff > diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h > index 5e9b3b7b4..b32648825 100644 > --- a/lib/librte_ring/rte_ring.h > +++ b/lib/librte_ring/rte_ring.h > @@ -358,8 +358,12 @@ void rte_ring_dump(FILE *f, const struct rte_ring > *r); > > static __rte_always_inline void > update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t > new_val, > - uint32_t single) > + uint32_t single, const uint32_t enqueue) > { > + if (enqueue) > + rte_smp_wmb(); > + else > + rte_smp_rmb(); > /* > * If there are other enqueues/dequeues in progress that > * preceded us, > * we need to wait for them to complete > @@ -470,9 +474,8 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * > const *obj_table, > goto end; > > ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); > - rte_smp_wmb(); > > - update_tail(&r->prod, prod_head, prod_next, is_sp); > + update_tail(&r->prod, prod_head, prod_next, is_sp, 1); > end: > if (free_space != NULL) > *free_space = free_entries - n; > @@ -575,9 +578,8 @@ __rte_ring_do_dequeue(struct rte_ring *r, void > **obj_table, > goto end; > > DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *); > - rte_smp_rmb(); > > - update_tail(&r->cons, cons_head, cons_next, is_sc); > + update_tail(&r->cons, cons_head, cons_next, is_sc, 0); > > end: > if (available != NULL) > > > Hi Jerin, yes I knew this suggestion in update_tail. But what I mean is the rte_smp_rmb() in __rte_ring_move_cons_head and __rte_ring_move_pros_head: [option 1] + *old_head = r->cons.head; + rte_smp_rmb(); + const uint32_t prod_tail = r->prod.tail; [option 2] + *old_head = __atomic_load_n(&r->cons.head, + __ATOMIC_ACQUIRE); + *old_head = r->cons.head; ie.I wonder what is the suitable new config name to distinguish the above 2 options? Thanks for the patience :-) see my drafted patch below, the marcro "PREFER": diff --git a/lib/librte_ring/rte_ring_c11_mem.h b/lib/librte_ring/rte_ring_c11_mem.h new file mode 100644 index 0000000..22fe887 --- /dev/null +++ b/lib/librte_ring/rte_ring_c11_mem.h @@ -0,0 +1,305 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2017 hxt-semitech. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of hxt-semitech nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _RTE_RING_C11_MEM_H_ +#define _RTE_RING_C11_MEM_H_ + +static __rte_always_inline void +update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, + uint32_t single, uint32_t enqueue) +{ + /* Don't need wmb/rmb when we prefer to use load_acquire/ + * store_release barrier */ +#ifndef PREFER + if (enqueue) + rte_smp_wmb(); + else + rte_smp_rmb(); +#endif + + /* + * If there are other enqueues/dequeues in progress that preceded us, + * we need to wait for them to complete + */ + if (!single) + while (unlikely(ht->tail != old_val)) + rte_pause(); + +#ifdef PREFER + __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE); +#else + ht->tail = new_val; +#endif +} + +/** + * @internal This function updates the producer head for enqueue + * + * @param r + * A pointer to the ring structure + * @param is_sp + * Indicates whether multi-producer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where enqueue starts + * @param new_head + * Returns the current/new head value i.e. where enqueue finishes + * @param free_entries + * Returns the amount of free space in the ring BEFORE head was moved + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_move_prod_head(struct rte_ring *r, int is_sp, + unsigned int n, enum rte_ring_queue_behavior behavior, + uint32_t *old_head, uint32_t *new_head, + uint32_t *free_entries) +{ + const uint32_t capacity = r->capacity; + unsigned int max = n; + int success; + + do { + /* Reset n to the initial burst count */ + n = max; + +#ifdef PREFER + *old_head = __atomic_load_n(&r->prod.head, + __ATOMIC_ACQUIRE); +#else + *old_head = r->prod.head; + /* prevent reorder of load/load */ + rte_smp_rmb(); +#endif + const uint32_t cons_tail = r->cons.tail; + /* + * The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * *old_head > cons_tail). So 'free_entries' is always between 0 + * and capacity (which is < size). + */ + *free_entries = (capacity + cons_tail - *old_head); + + /* check that we have enough room in ring */ + if (unlikely(n > *free_entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? + 0 : *free_entries; + + if (n == 0) + return 0; + + *new_head = *old_head + n; + if (is_sp) + r->prod.head = *new_head, success = 1; + else +#ifdef PREFER + success = arch_rte_atomic32_cmpset(&r->prod.head, + old_head, *new_head, + 0, __ATOMIC_ACQUIRE, + __ATOMIC_RELAXED); +#else + success = rte_atomic32_cmpset(&r->prod.head, + *old_head, *new_head); +#endif + } while (unlikely(success == 0)); + return n; +} + +/** + * @internal Enqueue several objects on the ring + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param is_sp + * Indicates whether to use single producer or multi-producer head update + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, + unsigned int n, enum rte_ring_queue_behavior behavior, + int is_sp, unsigned int *free_space) +{ + uint32_t prod_head, prod_next; + uint32_t free_entries; + + n = __rte_ring_move_prod_head(r, is_sp, n, behavior, + &prod_head, &prod_next, &free_entries); + if (n == 0) + goto end; + + ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); + + update_tail(&r->prod, prod_head, prod_next, is_sp, 1); +end: + if (free_space != NULL) + *free_space = free_entries - n; + return n; +} + +/** + * @internal This function updates the consumer head for dequeue + * + * @param r + * A pointer to the ring structure + * @param is_sc + * Indicates whether multi-consumer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where dequeue starts + * @param new_head + * Returns the current/new head value i.e. where dequeue finishes + * @param entries + * Returns the number of entries in the ring BEFORE head was moved + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_move_cons_head(struct rte_ring *r, int is_sc, + unsigned int n, enum rte_ring_queue_behavior behavior, + uint32_t *old_head, uint32_t *new_head, + uint32_t *entries) +{ + unsigned int max = n; + int success; + + /* move cons.head atomically */ + do { + /* Restore n as it may change every loop */ + n = max; +#ifdef PREFER + *old_head = __atomic_load_n(&r->cons.head, + __ATOMIC_ACQUIRE); +#else + *old_head = r->cons.head; + /* prevent reorder of load/load */ + rte_smp_rmb(); +#endif + + const uint32_t prod_tail = r->prod.tail; + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. */ + *entries = (prod_tail - *old_head); + + /* Set the actual entries for dequeue */ + if (n > *entries) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + + if (unlikely(n == 0)) + return 0; + + *new_head = *old_head + n; + if (is_sc) + r->cons.head = *new_head, success = 1; + else +#ifdef PREFER + success = arch_rte_atomic32_cmpset(&r->cons.head, + old_head, *new_head, + 0, __ATOMIC_ACQUIRE, + __ATOMIC_RELAXED); +#else + success = rte_atomic32_cmpset(&r->cons.head, *old_head, + *new_head); +#endif + } while (unlikely(success == 0)); + return n; +} + +/** + * @internal Dequeue several objects from the ring + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param is_sc + * Indicates whether to use single consumer or multi-consumer head update + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, + unsigned int n, enum rte_ring_queue_behavior behavior, + int is_sc, unsigned int *available) +{ + uint32_t cons_head, cons_next; + uint32_t entries; + + n = __rte_ring_move_cons_head(r, is_sc, n, behavior, + &cons_head, &cons_next, &entries); + if (n == 0) + goto end; + + DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *); + + update_tail(&r->cons, cons_head, cons_next, is_sc, 0); + +end: + if (available != NULL) + *available = entries - n; + return n; +} + +#endif /* _RTE_RING_C11_MEM_H_ */ + diff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_ring_generic.h new file mode 100644 index 0000000..0ce6d57 --- /dev/null +++ b/lib/librte_ring/rte_ring_generic.h @@ -0,0 +1,268 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2017 hxt-semitech. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of hxt-semitech nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _RTE_RING_GENERIC_H_ +#define _RTE_RING_GENERIC_H_ + +static __rte_always_inline void +update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, + uint32_t single, uint32_t enqueue) +{ + if (enqueue) + rte_smp_wmb(); + else + rte_smp_rmb(); + /* + * If there are other enqueues/dequeues in progress that preceded us, + * we need to wait for them to complete + */ + if (!single) + while (unlikely(ht->tail != old_val)) + rte_pause(); + + ht->tail = new_val; +} + +/** + * @internal This function updates the producer head for enqueue + * + * @param r + * A pointer to the ring structure + * @param is_sp + * Indicates whether multi-producer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where enqueue starts + * @param new_head + * Returns the current/new head value i.e. where enqueue finishes + * @param free_entries + * Returns the amount of free space in the ring BEFORE head was moved + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_move_prod_head(struct rte_ring *r, int is_sp, + unsigned int n, enum rte_ring_queue_behavior behavior, + uint32_t *old_head, uint32_t *new_head, + uint32_t *free_entries) +{ + const uint32_t capacity = r->capacity; + unsigned int max = n; + int success; + + do { + /* Reset n to the initial burst count */ + n = max; + + *old_head = r->prod.head; + const uint32_t cons_tail = r->cons.tail; + /* + * The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * *old_head > cons_tail). So 'free_entries' is always between 0 + * and capacity (which is < size). + */ + *free_entries = (capacity + cons_tail - *old_head); + + /* check that we have enough room in ring */ + if (unlikely(n > *free_entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? + 0 : *free_entries; + + if (n == 0) + return 0; + + *new_head = *old_head + n; + if (is_sp) + r->prod.head = *new_head, success = 1; + else + success = rte_atomic32_cmpset(&r->prod.head, + *old_head, *new_head); + } while (unlikely(success == 0)); + return n; +} + +/** + * @internal Enqueue several objects on the ring + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param is_sp + * Indicates whether to use single producer or multi-producer head update + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, + unsigned int n, enum rte_ring_queue_behavior behavior, + int is_sp, unsigned int *free_space) +{ + uint32_t prod_head, prod_next; + uint32_t free_entries; + + n = __rte_ring_move_prod_head(r, is_sp, n, behavior, + &prod_head, &prod_next, &free_entries); + if (n == 0) + goto end; + + ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); + + update_tail(&r->prod, prod_head, prod_next, is_sp, 1); +end: + if (free_space != NULL) + *free_space = free_entries - n; + return n; +} + +/** + * @internal This function updates the consumer head for dequeue + * + * @param r + * A pointer to the ring structure + * @param is_sc + * Indicates whether multi-consumer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where dequeue starts + * @param new_head + * Returns the current/new head value i.e. where dequeue finishes + * @param entries + * Returns the number of entries in the ring BEFORE head was moved + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_move_cons_head(struct rte_ring *r, int is_sc, + unsigned int n, enum rte_ring_queue_behavior behavior, + uint32_t *old_head, uint32_t *new_head, + uint32_t *entries) +{ + unsigned int max = n; + int success; + + /* move cons.head atomically */ + do { + /* Restore n as it may change every loop */ + n = max; + + *old_head = r->cons.head; + const uint32_t prod_tail = r->prod.tail; + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. */ + *entries = (prod_tail - *old_head); + + /* Set the actual entries for dequeue */ + if (n > *entries) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + + if (unlikely(n == 0)) + return 0; + + *new_head = *old_head + n; + if (is_sc) + r->cons.head = *new_head, success = 1; + else + success = rte_atomic32_cmpset(&r->cons.head, *old_head, + *new_head); + } while (unlikely(success == 0)); + return n; +} + +/** + * @internal Dequeue several objects from the ring + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param is_sc + * Indicates whether to use single consumer or multi-consumer head update + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, + unsigned int n, enum rte_ring_queue_behavior behavior, + int is_sc, unsigned int *available) +{ + uint32_t cons_head, cons_next; + uint32_t entries; + + n = __rte_ring_move_cons_head(r, is_sc, n, behavior, + &cons_head, &cons_next, &entries); + if (n == 0) + goto end; + + DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *); + + update_tail(&r->cons, cons_head, cons_next, is_sc, 0); + +end: + if (available != NULL) + *available = entries - n; + return n; +} + +#endif /* _RTE_RING_GENERIC_H_ */ + -- 2.7.4 -- Cheers, Jia ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing 2017-11-07 8:34 ` Jia He @ 2017-11-07 9:57 ` Jerin Jacob 2017-11-08 2:31 ` Jia He 0 siblings, 1 reply; 13+ messages in thread From: Jerin Jacob @ 2017-11-07 9:57 UTC (permalink / raw) To: Jia He Cc: dev, olivier.matz, konstantin.ananyev, bruce.richardson, jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he -----Original Message----- > Date: Tue, 7 Nov 2017 16:34:30 +0800 > From: Jia He <hejianet@gmail.com> > To: Jerin Jacob <jerin.jacob@caviumnetworks.com> > Cc: dev@dpdk.org, olivier.matz@6wind.com, konstantin.ananyev@intel.com, > bruce.richardson@intel.com, jianbo.liu@arm.com, hemant.agrawal@nxp.com, > jie2.liu@hxt-semitech.com, bing.zhao@hxt-semitech.com, > jia.he@hxt-semitech.com > Subject: Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when > doing > User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101 > Thunderbird/52.4.0 > > > > On 11/7/2017 12:36 PM, Jerin Jacob Wrote: > > -----Original Message----- > > > > On option could be to change the prototype of update_tail() and make > > compiler accommodate it for zero cost for arm64(Which I think, it it the > > case. But you can check the generated instructions) > > If not, move, __rte_ring_do_dequeue() and __rte_ring_do_enqueue() instead of > > __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail() > > > > > > ➜ [master][dpdk.org] $ git diff > > diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h > > index 5e9b3b7b4..b32648825 100644 > > --- a/lib/librte_ring/rte_ring.h > > +++ b/lib/librte_ring/rte_ring.h > > @@ -358,8 +358,12 @@ void rte_ring_dump(FILE *f, const struct rte_ring > > *r); > > static __rte_always_inline void > > update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t > > new_val, > > - uint32_t single) > > + uint32_t single, const uint32_t enqueue) > > { > > + if (enqueue) > > + rte_smp_wmb(); > > + else > > + rte_smp_rmb(); > > /* > > * If there are other enqueues/dequeues in progress that > > * preceded us, > > * we need to wait for them to complete > > @@ -470,9 +474,8 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * > > const *obj_table, > > goto end; > > ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); > > - rte_smp_wmb(); > > - update_tail(&r->prod, prod_head, prod_next, is_sp); > > + update_tail(&r->prod, prod_head, prod_next, is_sp, 1); > > end: > > if (free_space != NULL) > > *free_space = free_entries - n; > > @@ -575,9 +578,8 @@ __rte_ring_do_dequeue(struct rte_ring *r, void > > **obj_table, > > goto end; > > DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *); > > - rte_smp_rmb(); > > - update_tail(&r->cons, cons_head, cons_next, is_sc); > > + update_tail(&r->cons, cons_head, cons_next, is_sc, 0); > > end: > > if (available != NULL) > > > > > > > Hi Jerin, yes I knew this suggestion in update_tail. > But what I mean is the rte_smp_rmb() in __rte_ring_move_cons_head and > __rte_ring_move_pros_head: > [option 1] > + *old_head = r->cons.head; > + rte_smp_rmb(); > + const uint32_t prod_tail = r->prod.tail; > > [option 2] > + *old_head = __atomic_load_n(&r->cons.head, > + __ATOMIC_ACQUIRE); > + *old_head = r->cons.head; > > ie.I wonder what is the suitable new config name to distinguish the above 2 > options? Why? If you fix the generic version with rte_smp_rmb() then we just need only one config to differentiate between c11 vs generic. See comments below, > Thanks for the patience :-) > > see my drafted patch below, the marcro "PREFER": > + */ > + > +#ifndef _RTE_RING_C11_MEM_H_ > +#define _RTE_RING_C11_MEM_H_ > + > +static __rte_always_inline void > +update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t > new_val, > + uint32_t single, uint32_t enqueue) > +{ > + /* Don't need wmb/rmb when we prefer to use load_acquire/ > + * store_release barrier */ > +#ifndef PREFER > + if (enqueue) > + rte_smp_wmb(); > + else > + rte_smp_rmb(); > +#endif You can remove PREFER and let the "generic" version has this. For x86, rte_smp_?mb() it will be NOOP. So no issue. > + > + /* > + * If there are other enqueues/dequeues in progress that preceded us, > + * we need to wait for them to complete > + */ > + if (!single) > + while (unlikely(ht->tail != old_val)) > + rte_pause(); > + > +#ifdef PREFER > + __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE); for c11 mem model version, it needs only __atomic_store_n version. > +#else > + ht->tail = new_val; > +#endif > +} > + > +/** > + * @internal This function updates the producer head for enqueue > + * > + * @param r > + * A pointer to the ring structure > + * @param is_sp > + * Indicates whether multi-producer path is needed or not > + * @param n > + * The number of elements we will want to enqueue, i.e. how far should > the > + * head be moved > + * @param behavior > + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring > + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring > + * @param old_head > + * Returns head value as it was before the move, i.e. where enqueue > starts > + * @param new_head > + * Returns the current/new head value i.e. where enqueue finishes > + * @param free_entries > + * Returns the amount of free space in the ring BEFORE head was moved > + * @return > + * Actual number of objects enqueued. > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_move_prod_head(struct rte_ring *r, int is_sp, > + unsigned int n, enum rte_ring_queue_behavior behavior, > + uint32_t *old_head, uint32_t *new_head, > + uint32_t *free_entries) > +{ > + const uint32_t capacity = r->capacity; > + unsigned int max = n; > + int success; > + > + do { > + /* Reset n to the initial burst count */ > + n = max; > + > +#ifdef PREFER > + *old_head = __atomic_load_n(&r->prod.head, > + __ATOMIC_ACQUIRE); > +#else > + *old_head = r->prod.head; > + /* prevent reorder of load/load */ > + rte_smp_rmb(); > +#endif Same as above comment. > + const uint32_t cons_tail = r->cons.tail; > + /* > + * The subtraction is done between two unsigned 32bits value > + * (the result is always modulo 32 bits even if we have > + * *old_head > cons_tail). So 'free_entries' is always between 0 > + * and capacity (which is < size). > + */ > + *free_entries = (capacity + cons_tail - *old_head); > + > + /* check that we have enough room in ring */ > + if (unlikely(n > *free_entries)) > +static __rte_always_inline unsigned int > +__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, > + unsigned int n, enum rte_ring_queue_behavior behavior, > + int is_sp, unsigned int *free_space) > +{ Duplicate function, No need to replicate on both versions. > +static __rte_always_inline unsigned int > +__rte_ring_move_cons_head(struct rte_ring *r, int is_sc, > + unsigned int n, enum rte_ring_queue_behavior behavior, > + uint32_t *old_head, uint32_t *new_head, > + uint32_t *entries) > +{ > + unsigned int max = n; > + int success; > + > + /* move cons.head atomically */ > + do { > + /* Restore n as it may change every loop */ > + n = max; > +#ifdef PREFER > + *old_head = __atomic_load_n(&r->cons.head, > + __ATOMIC_ACQUIRE); > +#else > + *old_head = r->cons.head; > + /* prevent reorder of load/load */ > + rte_smp_rmb(); > +#endif Same as above comment > + > + const uint32_t prod_tail = r->prod.tail; > + /* The subtraction is done between two unsigned 32bits value > + * (the result is always modulo 32 bits even if we have > + * cons_head > prod_tail). So 'entries' is always between 0 > + * and size(ring)-1. */ > + *entries = (prod_tail - *old_head); > + > + /* Set the actual entries for dequeue */ > + if (n > *entries) > + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; > + > + if (unlikely(n == 0)) > + return 0; > + > + *new_head = *old_head + n; > + if (is_sc) > + r->cons.head = *new_head, success = 1; > + else > +#ifdef PREFER > + success = arch_rte_atomic32_cmpset(&r->cons.head, > + old_head, *new_head, > + 0, __ATOMIC_ACQUIRE, > + __ATOMIC_RELAXED); > +#else > + success = rte_atomic32_cmpset(&r->cons.head, *old_head, > + *new_head); > +#endif Same as above comment > + } while (unlikely(success == 0)); > + return n; > +} > + > +/** > + * @internal Dequeue several objects from the ring > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param n > + * The number of objects to pull from the ring. > + * @param behavior > + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring > + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring > + * @param is_sc > + * Indicates whether to use single consumer or multi-consumer head update > + * @param available > + * returns the number of remaining ring entries after the dequeue has > finished > + * @return > + * - Actual number of objects dequeued. > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, > + unsigned int n, enum rte_ring_queue_behavior behavior, > + int is_sc, unsigned int *available) > +{ Duplicate function, No need to replicate on both versions. > + uint32_t cons_head, cons_next; > + uint32_t entries; > + > + n = __rte_ring_move_cons_head(r, is_sc, n, behavior, > + &cons_head, &cons_next, &entries); > + if (n == 0) > + goto end; > + > + DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *); > + > + update_tail(&r->cons, cons_head, cons_next, is_sc, 0); > + > +end: > + if (available != NULL) > + *available = entries - n; > + return n; > +} > + > +#endif /* _RTE_RING_C11_MEM_H_ */ > + > diff --git a/lib/librte_ring/rte_ring_generic.h > b/lib/librte_ring/rte_ring_generic.h > new file mode 100644 > index 0000000..0ce6d57 > --- /dev/null > +++ b/lib/librte_ring/rte_ring_generic.h > @@ -0,0 +1,268 @@ > +/*- > + * BSD LICENSE > + * > + * Copyright(c) 2017 hxt-semitech. All rights reserved. > + * > + * Redistribution and use in source and binary forms, with or without > + * modification, are permitted provided that the following conditions > + * are met: > + * > + * * Redistributions of source code must retain the above copyright > + * notice, this list of conditions and the following disclaimer. > + * * Redistributions in binary form must reproduce the above copyright > + * notice, this list of conditions and the following disclaimer in > + * the documentation and/or other materials provided with the > + * distribution. > + * * Neither the name of hxt-semitech nor the names of its > + * contributors may be used to endorse or promote products derived > + * from this software without specific prior written permission. > + * > + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS > + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT > + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT > + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, > + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT > + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, > + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY > + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE > + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. > + */ > + > +#ifndef _RTE_RING_GENERIC_H_ > +#define _RTE_RING_GENERIC_H_ > + > +static __rte_always_inline void > +update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t > new_val, > + uint32_t single, uint32_t enqueue) > +{ > + if (enqueue) > + rte_smp_wmb(); > + else > + rte_smp_rmb(); > + /* > + * If there are other enqueues/dequeues in progress that preceded us, > + * we need to wait for them to complete > + */ > + if (!single) > + while (unlikely(ht->tail != old_val)) > + rte_pause(); > + > + ht->tail = new_val; > +} > + > +/** > + * @internal This function updates the producer head for enqueue > + * > + * @param r > + * A pointer to the ring structure > + * @param is_sp > + * Indicates whether multi-producer path is needed or not > + * @param n > + * The number of elements we will want to enqueue, i.e. how far should > the > + * head be moved > + * @param behavior > + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring > + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring > + * @param old_head > + * Returns head value as it was before the move, i.e. where enqueue > starts > + * @param new_head > + * Returns the current/new head value i.e. where enqueue finishes > + * @param free_entries > + * Returns the amount of free space in the ring BEFORE head was moved > + * @return > + * Actual number of objects enqueued. > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_move_prod_head(struct rte_ring *r, int is_sp, > + unsigned int n, enum rte_ring_queue_behavior behavior, > + uint32_t *old_head, uint32_t *new_head, > + uint32_t *free_entries) > +{ > + const uint32_t capacity = r->capacity; > + unsigned int max = n; > + int success; > + > + do { > + /* Reset n to the initial burst count */ > + n = max; > + > + *old_head = r->prod.head; adding rte_smp_rmb() no harm here as it is NOOP for x86 and it is semantically correct too. > + const uint32_t cons_tail = r->cons.tail; > + /* > + * The subtraction is done between two unsigned 32bits value > + * (the result is always modulo 32 bits even if we have > + * *old_head > cons_tail). So 'free_entries' is always between 0 > + * and capacity (which is < size). > + */ > + *free_entries = (capacity + cons_tail - *old_head); > + > + /* check that we have enough room in ring */ > + if (unlikely(n > *free_entries)) > + n = (behavior == RTE_RING_QUEUE_FIXED) ? > + 0 : *free_entries; > + > + if (n == 0) > + return 0; > + > + *new_head = *old_head + n; > + if (is_sp) > + r->prod.head = *new_head, success = 1; > + else > + success = rte_atomic32_cmpset(&r->prod.head, > + *old_head, *new_head); > + } while (unlikely(success == 0)); > + return n; > +} > + > +/** > + * @internal Enqueue several objects on the ring > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param n > + * The number of objects to add in the ring from the obj_table. > + * @param behavior > + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring > + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring > + * @param is_sp > + * Indicates whether to use single producer or multi-producer head update > + * @param free_space > + * returns the amount of space after the enqueue operation has finished > + * @return > + * Actual number of objects enqueued. > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, > + unsigned int n, enum rte_ring_queue_behavior behavior, > + int is_sp, unsigned int *free_space) > +{ Duplicate function, No need to replicate on both versions. > + > +/** > + * @internal This function updates the consumer head for dequeue > + * > + * @param r > + * A pointer to the ring structure > + * @param is_sc > + * Indicates whether multi-consumer path is needed or not > + * @param n > + * The number of elements we will want to enqueue, i.e. how far should > the > + * head be moved > + * @param behavior > + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring > + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring > + * @param old_head > + * Returns head value as it was before the move, i.e. where dequeue > starts > + * @param new_head > + * Returns the current/new head value i.e. where dequeue finishes > + * @param entries > + * Returns the number of entries in the ring BEFORE head was moved > + * @return > + * - Actual number of objects dequeued. > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_move_cons_head(struct rte_ring *r, int is_sc, > + unsigned int n, enum rte_ring_queue_behavior behavior, > + uint32_t *old_head, uint32_t *new_head, > + uint32_t *entries) > +{ > + unsigned int max = n; > + int success; > + > + /* move cons.head atomically */ > + do { > + /* Restore n as it may change every loop */ > + n = max; > + > + *old_head = r->cons.head; > + const uint32_t prod_tail = r->prod.tail; Same as above comment. > + /* The subtraction is done between two unsigned 32bits value > + * (the result is always modulo 32 bits even if we have > + * cons_head > prod_tail). So 'entries' is always between 0 > + * and size(ring)-1. */ > + *entries = (prod_tail - *old_head); > + > + /* Set the actual entries for dequeue */ > + if (n > *entries) > + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; > + > + if (unlikely(n == 0)) > + return 0; > + > + *new_head = *old_head + n; > + if (is_sc) > + r->cons.head = *new_head, success = 1; > + else > + success = rte_atomic32_cmpset(&r->cons.head, *old_head, > + *new_head); > + } while (unlikely(success == 0)); > + return n; > +} > + > +/** > + * @internal Dequeue several objects from the ring > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param n > + * The number of objects to pull from the ring. > + * @param behavior > + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring > + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring > + * @param is_sc > + * Indicates whether to use single consumer or multi-consumer head update > + * @param available > + * returns the number of remaining ring entries after the dequeue has > finished > + * @return > + * - Actual number of objects dequeued. > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, > + unsigned int n, enum rte_ring_queue_behavior behavior, > + int is_sc, unsigned int *available) > +{ Duplicate function, No need to replicate on both versions. > ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing 2017-11-07 9:57 ` Jerin Jacob @ 2017-11-08 2:31 ` Jia He 0 siblings, 0 replies; 13+ messages in thread From: Jia He @ 2017-11-08 2:31 UTC (permalink / raw) To: Jerin Jacob Cc: dev, olivier.matz, konstantin.ananyev, bruce.richardson, jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he Hi Jerin Thank you, I mistakenly think x86 doen't need rte_smp_rmb(). Since rte_smp_rmb() only impact x86's compiler optimization, I will simplify the codes as your suggestions Cheers, Jia On 11/7/2017 5:57 PM, Jerin Jacob Wrote: > -----Original Message----- >> Date: Tue, 7 Nov 2017 16:34:30 +0800 >> From: Jia He <hejianet@gmail.com> >> To: Jerin Jacob <jerin.jacob@caviumnetworks.com> >> Cc: dev@dpdk.org, olivier.matz@6wind.com, konstantin.ananyev@intel.com, >> bruce.richardson@intel.com, jianbo.liu@arm.com, hemant.agrawal@nxp.com, >> jie2.liu@hxt-semitech.com, bing.zhao@hxt-semitech.com, >> jia.he@hxt-semitech.com >> Subject: Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when >> doing >> User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101 >> Thunderbird/52.4.0 >> >> >> >> On 11/7/2017 12:36 PM, Jerin Jacob Wrote: >>> -----Original Message----- >>> >>> On option could be to change the prototype of update_tail() and make >>> compiler accommodate it for zero cost for arm64(Which I think, it it the >>> case. But you can check the generated instructions) >>> If not, move, __rte_ring_do_dequeue() and __rte_ring_do_enqueue() instead of >>> __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail() >>> >>> >>> ➜ [master][dpdk.org] $ git diff >>> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h >>> index 5e9b3b7b4..b32648825 100644 >>> --- a/lib/librte_ring/rte_ring.h >>> +++ b/lib/librte_ring/rte_ring.h >>> @@ -358,8 +358,12 @@ void rte_ring_dump(FILE *f, const struct rte_ring >>> *r); >>> static __rte_always_inline void >>> update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t >>> new_val, >>> - uint32_t single) >>> + uint32_t single, const uint32_t enqueue) >>> { >>> + if (enqueue) >>> + rte_smp_wmb(); >>> + else >>> + rte_smp_rmb(); >>> /* >>> * If there are other enqueues/dequeues in progress that >>> * preceded us, >>> * we need to wait for them to complete >>> @@ -470,9 +474,8 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * >>> const *obj_table, >>> goto end; >>> ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); >>> - rte_smp_wmb(); >>> - update_tail(&r->prod, prod_head, prod_next, is_sp); >>> + update_tail(&r->prod, prod_head, prod_next, is_sp, 1); >>> end: >>> if (free_space != NULL) >>> *free_space = free_entries - n; >>> @@ -575,9 +578,8 @@ __rte_ring_do_dequeue(struct rte_ring *r, void >>> **obj_table, >>> goto end; >>> DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *); >>> - rte_smp_rmb(); >>> - update_tail(&r->cons, cons_head, cons_next, is_sc); >>> + update_tail(&r->cons, cons_head, cons_next, is_sc, 0); >>> end: >>> if (available != NULL) >>> >>> >>> >> Hi Jerin, yes I knew this suggestion in update_tail. >> But what I mean is the rte_smp_rmb() in __rte_ring_move_cons_head and >> __rte_ring_move_pros_head: >> [option 1] >> + *old_head = r->cons.head; >> + rte_smp_rmb(); >> + const uint32_t prod_tail = r->prod.tail; >> >> [option 2] >> + *old_head = __atomic_load_n(&r->cons.head, >> + __ATOMIC_ACQUIRE); >> + *old_head = r->cons.head; >> >> ie.I wonder what is the suitable new config name to distinguish the above 2 >> options? > Why? > If you fix the generic version with rte_smp_rmb() then we just need only > one config to differentiate between c11 vs generic. See comments below, > >> Thanks for the patience :-) >> >> see my drafted patch below, the marcro "PREFER": >> + */ >> + >> +#ifndef _RTE_RING_C11_MEM_H_ >> +#define _RTE_RING_C11_MEM_H_ >> + >> +static __rte_always_inline void >> +update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t >> new_val, >> + uint32_t single, uint32_t enqueue) >> +{ >> + /* Don't need wmb/rmb when we prefer to use load_acquire/ >> + * store_release barrier */ >> +#ifndef PREFER >> + if (enqueue) >> + rte_smp_wmb(); >> + else >> + rte_smp_rmb(); >> +#endif > You can remove PREFER and let the "generic" version has this. For x86, > rte_smp_?mb() it will be NOOP. So no issue. > >> + >> + /* >> + * If there are other enqueues/dequeues in progress that preceded us, >> + * we need to wait for them to complete >> + */ >> + if (!single) >> + while (unlikely(ht->tail != old_val)) >> + rte_pause(); >> + >> +#ifdef PREFER >> + __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE); > for c11 mem model version, it needs only __atomic_store_n version. > >> +#else >> + ht->tail = new_val; >> +#endif >> +} >> + >> +/** >> + * @internal This function updates the producer head for enqueue >> + * >> + * @param r >> + * A pointer to the ring structure >> + * @param is_sp >> + * Indicates whether multi-producer path is needed or not >> + * @param n >> + * The number of elements we will want to enqueue, i.e. how far should >> the >> + * head be moved >> + * @param behavior >> + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring >> + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring >> + * @param old_head >> + * Returns head value as it was before the move, i.e. where enqueue >> starts >> + * @param new_head >> + * Returns the current/new head value i.e. where enqueue finishes >> + * @param free_entries >> + * Returns the amount of free space in the ring BEFORE head was moved >> + * @return >> + * Actual number of objects enqueued. >> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. >> + */ >> +static __rte_always_inline unsigned int >> +__rte_ring_move_prod_head(struct rte_ring *r, int is_sp, >> + unsigned int n, enum rte_ring_queue_behavior behavior, >> + uint32_t *old_head, uint32_t *new_head, >> + uint32_t *free_entries) >> +{ >> + const uint32_t capacity = r->capacity; >> + unsigned int max = n; >> + int success; >> + >> + do { >> + /* Reset n to the initial burst count */ >> + n = max; >> + >> +#ifdef PREFER >> + *old_head = __atomic_load_n(&r->prod.head, >> + __ATOMIC_ACQUIRE); >> +#else >> + *old_head = r->prod.head; >> + /* prevent reorder of load/load */ >> + rte_smp_rmb(); >> +#endif > Same as above comment. > >> + const uint32_t cons_tail = r->cons.tail; >> + /* >> + * The subtraction is done between two unsigned 32bits value >> + * (the result is always modulo 32 bits even if we have >> + * *old_head > cons_tail). So 'free_entries' is always between 0 >> + * and capacity (which is < size). >> + */ >> + *free_entries = (capacity + cons_tail - *old_head); >> + >> + /* check that we have enough room in ring */ >> + if (unlikely(n > *free_entries)) >> +static __rte_always_inline unsigned int >> +__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, >> + unsigned int n, enum rte_ring_queue_behavior behavior, >> + int is_sp, unsigned int *free_space) >> +{ > > Duplicate function, No need to replicate on both versions. > > >> +static __rte_always_inline unsigned int >> +__rte_ring_move_cons_head(struct rte_ring *r, int is_sc, >> + unsigned int n, enum rte_ring_queue_behavior behavior, >> + uint32_t *old_head, uint32_t *new_head, >> + uint32_t *entries) >> +{ >> + unsigned int max = n; >> + int success; >> + >> + /* move cons.head atomically */ >> + do { >> + /* Restore n as it may change every loop */ >> + n = max; >> +#ifdef PREFER >> + *old_head = __atomic_load_n(&r->cons.head, >> + __ATOMIC_ACQUIRE); >> +#else >> + *old_head = r->cons.head; >> + /* prevent reorder of load/load */ >> + rte_smp_rmb(); >> +#endif > Same as above comment > >> + >> + const uint32_t prod_tail = r->prod.tail; >> + /* The subtraction is done between two unsigned 32bits value >> + * (the result is always modulo 32 bits even if we have >> + * cons_head > prod_tail). So 'entries' is always between 0 >> + * and size(ring)-1. */ >> + *entries = (prod_tail - *old_head); >> + >> + /* Set the actual entries for dequeue */ >> + if (n > *entries) >> + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; >> + >> + if (unlikely(n == 0)) >> + return 0; >> + >> + *new_head = *old_head + n; >> + if (is_sc) >> + r->cons.head = *new_head, success = 1; >> + else >> +#ifdef PREFER >> + success = arch_rte_atomic32_cmpset(&r->cons.head, >> + old_head, *new_head, >> + 0, __ATOMIC_ACQUIRE, >> + __ATOMIC_RELAXED); >> +#else >> + success = rte_atomic32_cmpset(&r->cons.head, *old_head, >> + *new_head); >> +#endif > Same as above comment > >> + } while (unlikely(success == 0)); >> + return n; >> +} >> + >> +/** >> + * @internal Dequeue several objects from the ring >> + * >> + * @param r >> + * A pointer to the ring structure. >> + * @param obj_table >> + * A pointer to a table of void * pointers (objects). >> + * @param n >> + * The number of objects to pull from the ring. >> + * @param behavior >> + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring >> + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring >> + * @param is_sc >> + * Indicates whether to use single consumer or multi-consumer head update >> + * @param available >> + * returns the number of remaining ring entries after the dequeue has >> finished >> + * @return >> + * - Actual number of objects dequeued. >> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. >> + */ >> +static __rte_always_inline unsigned int >> +__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, >> + unsigned int n, enum rte_ring_queue_behavior behavior, >> + int is_sc, unsigned int *available) >> +{ > Duplicate function, No need to replicate on both versions. > > >> + uint32_t cons_head, cons_next; >> + uint32_t entries; >> + >> + n = __rte_ring_move_cons_head(r, is_sc, n, behavior, >> + &cons_head, &cons_next, &entries); >> + if (n == 0) >> + goto end; >> + >> + DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *); >> + >> + update_tail(&r->cons, cons_head, cons_next, is_sc, 0); >> + >> +end: >> + if (available != NULL) >> + *available = entries - n; >> + return n; >> +} >> + >> +#endif /* _RTE_RING_C11_MEM_H_ */ >> + >> diff --git a/lib/librte_ring/rte_ring_generic.h >> b/lib/librte_ring/rte_ring_generic.h >> new file mode 100644 >> index 0000000..0ce6d57 >> --- /dev/null >> +++ b/lib/librte_ring/rte_ring_generic.h >> @@ -0,0 +1,268 @@ >> +/*- >> + * BSD LICENSE >> + * >> + * Copyright(c) 2017 hxt-semitech. All rights reserved. >> + * >> + * Redistribution and use in source and binary forms, with or without >> + * modification, are permitted provided that the following conditions >> + * are met: >> + * >> + * * Redistributions of source code must retain the above copyright >> + * notice, this list of conditions and the following disclaimer. >> + * * Redistributions in binary form must reproduce the above copyright >> + * notice, this list of conditions and the following disclaimer in >> + * the documentation and/or other materials provided with the >> + * distribution. >> + * * Neither the name of hxt-semitech nor the names of its >> + * contributors may be used to endorse or promote products derived >> + * from this software without specific prior written permission. >> + * >> + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS >> + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT >> + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR >> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT >> + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, >> + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT >> + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, >> + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY >> + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT >> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE >> + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. >> + */ >> + >> +#ifndef _RTE_RING_GENERIC_H_ >> +#define _RTE_RING_GENERIC_H_ >> + >> +static __rte_always_inline void >> +update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t >> new_val, >> + uint32_t single, uint32_t enqueue) >> +{ >> + if (enqueue) >> + rte_smp_wmb(); >> + else >> + rte_smp_rmb(); >> + /* >> + * If there are other enqueues/dequeues in progress that preceded us, >> + * we need to wait for them to complete >> + */ >> + if (!single) >> + while (unlikely(ht->tail != old_val)) >> + rte_pause(); >> + >> + ht->tail = new_val; >> +} >> + >> +/** >> + * @internal This function updates the producer head for enqueue >> + * >> + * @param r >> + * A pointer to the ring structure >> + * @param is_sp >> + * Indicates whether multi-producer path is needed or not >> + * @param n >> + * The number of elements we will want to enqueue, i.e. how far should >> the >> + * head be moved >> + * @param behavior >> + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring >> + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring >> + * @param old_head >> + * Returns head value as it was before the move, i.e. where enqueue >> starts >> + * @param new_head >> + * Returns the current/new head value i.e. where enqueue finishes >> + * @param free_entries >> + * Returns the amount of free space in the ring BEFORE head was moved >> + * @return >> + * Actual number of objects enqueued. >> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. >> + */ >> +static __rte_always_inline unsigned int >> +__rte_ring_move_prod_head(struct rte_ring *r, int is_sp, >> + unsigned int n, enum rte_ring_queue_behavior behavior, >> + uint32_t *old_head, uint32_t *new_head, >> + uint32_t *free_entries) >> +{ >> + const uint32_t capacity = r->capacity; >> + unsigned int max = n; >> + int success; >> + >> + do { >> + /* Reset n to the initial burst count */ >> + n = max; >> + >> + *old_head = r->prod.head; > adding rte_smp_rmb() no harm here as it is NOOP for x86 and it is > semantically correct too. > > >> + const uint32_t cons_tail = r->cons.tail; >> + /* >> + * The subtraction is done between two unsigned 32bits value >> + * (the result is always modulo 32 bits even if we have >> + * *old_head > cons_tail). So 'free_entries' is always between 0 >> + * and capacity (which is < size). >> + */ >> + *free_entries = (capacity + cons_tail - *old_head); >> + >> + /* check that we have enough room in ring */ >> + if (unlikely(n > *free_entries)) >> + n = (behavior == RTE_RING_QUEUE_FIXED) ? >> + 0 : *free_entries; >> + >> + if (n == 0) >> + return 0; >> + >> + *new_head = *old_head + n; >> + if (is_sp) >> + r->prod.head = *new_head, success = 1; >> + else >> + success = rte_atomic32_cmpset(&r->prod.head, >> + *old_head, *new_head); >> + } while (unlikely(success == 0)); >> + return n; >> +} >> + >> +/** >> + * @internal Enqueue several objects on the ring >> + * >> + * @param r >> + * A pointer to the ring structure. >> + * @param obj_table >> + * A pointer to a table of void * pointers (objects). >> + * @param n >> + * The number of objects to add in the ring from the obj_table. >> + * @param behavior >> + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring >> + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring >> + * @param is_sp >> + * Indicates whether to use single producer or multi-producer head update >> + * @param free_space >> + * returns the amount of space after the enqueue operation has finished >> + * @return >> + * Actual number of objects enqueued. >> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. >> + */ >> +static __rte_always_inline unsigned int >> +__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, >> + unsigned int n, enum rte_ring_queue_behavior behavior, >> + int is_sp, unsigned int *free_space) >> +{ > Duplicate function, No need to replicate on both versions. > >> + >> +/** >> + * @internal This function updates the consumer head for dequeue >> + * >> + * @param r >> + * A pointer to the ring structure >> + * @param is_sc >> + * Indicates whether multi-consumer path is needed or not >> + * @param n >> + * The number of elements we will want to enqueue, i.e. how far should >> the >> + * head be moved >> + * @param behavior >> + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring >> + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring >> + * @param old_head >> + * Returns head value as it was before the move, i.e. where dequeue >> starts >> + * @param new_head >> + * Returns the current/new head value i.e. where dequeue finishes >> + * @param entries >> + * Returns the number of entries in the ring BEFORE head was moved >> + * @return >> + * - Actual number of objects dequeued. >> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. >> + */ >> +static __rte_always_inline unsigned int >> +__rte_ring_move_cons_head(struct rte_ring *r, int is_sc, >> + unsigned int n, enum rte_ring_queue_behavior behavior, >> + uint32_t *old_head, uint32_t *new_head, >> + uint32_t *entries) >> +{ >> + unsigned int max = n; >> + int success; >> + >> + /* move cons.head atomically */ >> + do { >> + /* Restore n as it may change every loop */ >> + n = max; >> + >> + *old_head = r->cons.head; >> + const uint32_t prod_tail = r->prod.tail; > Same as above comment. > >> + /* The subtraction is done between two unsigned 32bits value >> + * (the result is always modulo 32 bits even if we have >> + * cons_head > prod_tail). So 'entries' is always between 0 >> + * and size(ring)-1. */ >> + *entries = (prod_tail - *old_head); >> + >> + /* Set the actual entries for dequeue */ >> + if (n > *entries) >> + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; >> + >> + if (unlikely(n == 0)) >> + return 0; >> + >> + *new_head = *old_head + n; >> + if (is_sc) >> + r->cons.head = *new_head, success = 1; >> + else >> + success = rte_atomic32_cmpset(&r->cons.head, *old_head, >> + *new_head); >> + } while (unlikely(success == 0)); >> + return n; >> +} >> + >> +/** >> + * @internal Dequeue several objects from the ring >> + * >> + * @param r >> + * A pointer to the ring structure. >> + * @param obj_table >> + * A pointer to a table of void * pointers (objects). >> + * @param n >> + * The number of objects to pull from the ring. >> + * @param behavior >> + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring >> + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring >> + * @param is_sc >> + * Indicates whether to use single consumer or multi-consumer head update >> + * @param available >> + * returns the number of remaining ring entries after the dequeue has >> finished >> + * @return >> + * - Actual number of objects dequeued. >> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. >> + */ >> +static __rte_always_inline unsigned int >> +__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, >> + unsigned int n, enum rte_ring_queue_behavior behavior, >> + int is_sc, unsigned int *available) >> +{ > Duplicate function, No need to replicate on both versions. -- Cheers, Jia ^ permalink raw reply [flat|nested] 13+ messages in thread
end of thread, other threads:[~2017-11-08 2:31 UTC | newest] Thread overview: 13+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2017-11-02 8:43 [dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing Jia He 2017-11-02 13:26 ` Ananyev, Konstantin 2017-11-02 15:42 ` Jia He 2017-11-02 16:16 ` Ananyev, Konstantin 2017-11-02 17:00 ` Jerin Jacob 2017-11-02 17:23 ` Jerin Jacob 2017-11-03 1:46 ` Jia He 2017-11-03 12:56 ` Jerin Jacob 2017-11-06 7:25 ` Jia He 2017-11-07 4:36 ` Jerin Jacob 2017-11-07 8:34 ` Jia He 2017-11-07 9:57 ` Jerin Jacob 2017-11-08 2:31 ` Jia He
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox; as well as URLs for NNTP newsgroup(s).