From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id D9F45A0519; Fri, 3 Jul 2020 12:27:27 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 677EC1DB6C; Fri, 3 Jul 2020 12:27:16 +0200 (CEST) Received: from foss.arm.com (foss.arm.com [217.140.110.172]) by dpdk.org (Postfix) with ESMTP id C086B1DB6B for ; Fri, 3 Jul 2020 12:27:15 +0200 (CEST) Received: from usa-sjc-imap-foss1.foss.arm.com (unknown [10.121.207.14]) by usa-sjc-mx-foss1.foss.arm.com (Postfix) with ESMTP id 5AC231045; Fri, 3 Jul 2020 03:27:15 -0700 (PDT) Received: from net-arm-c2400.shanghai.arm.com (net-arm-c2400.shanghai.arm.com [10.169.40.212]) by usa-sjc-imap-foss1.foss.arm.com (Postfix) with ESMTPA id 4873A3F68F; Fri, 3 Jul 2020 03:27:13 -0700 (PDT) From: Feifei Wang To: Honnappa Nagarahalli , Konstantin Ananyev Cc: dev@dpdk.org, nd@arm.com, Feifei Wang Date: Fri, 3 Jul 2020 18:26:51 +0800 Message-Id: <20200703102651.8918-4-feifei.wang2@arm.com> X-Mailer: git-send-email 2.17.1 In-Reply-To: <20200703102651.8918-1-feifei.wang2@arm.com> References: <20200703102651.8918-1-feifei.wang2@arm.com> Subject: [dpdk-dev] [PATCH 3/3] ring: use element APIs to implement legacy APIs X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Use rte_ring_xxx_elem_xxx APIs to replace legacy API implementation. This reduces code duplication and improves code maintenance. aarch64: HW:N1sdp, 1 socket, 4 cores, 1 thread/core, 2.6GHz OS:Ubuntu 18.04.1 LTS, Kernel: 5.4.0+ DPDK: 20.05-rc3, Configuration: arm64-n1sdp-linux-gcc gcc:9.2.1 $sudo ./arm64-n1sdp-linux-gcc/app/test -l 1-2 RTE>>ring_perf_autotest test results on aarch64 in the case of esize 4: without this patch with this patch Testing burst enq/deq legacy APIs: SP/SC: burst (size: 8): 1.11 1.10 legacy APIs: SP/SC: burst (size: 32): 1.95 1.97 legacy APIs: MP/MC: burst (size: 8): 1.86 1.94 legacy APIs: MP/MC: burst (size: 32): 2.65 2.69 Testing bulk enq/deq legacy APIs: SP/SC: bulk (size: 8): 1.08 1.09 legacy APIs: SP/SC: bulk (size: 32): 1.89 1.90 legacy APIs: MP/MC: bulk (size: 8): 1.85 1.98 legacy APIs: MP/MC: bulk (size: 32): 2.65 2.69 x86: HW: dell, CPU Intel(R) Xeon(R) Gold 6240, 2 sockets, 18 cores/socket, 1 thread/core, 3.3GHz OS: Ubuntu 20.04 LTS, Kernel: 5.4.0-37-generic DPDK: 20.05-rc3, Configuration: x86_64-native-linuxapp-gcc gcc: 9.3.0 $sudo ./x86_64-native-linuxapp-gcc/app/test -l 14,16 RTE>>ring_perf_autotest test results on x86 in the case of esize 4: without this patch with this patch Testing burst enq/deq legacy APIs: SP/SC: burst (size: 8): 29.35 27.78 legacy APIs: SP/SC: burst (size: 32): 73.11 73.39 legacy APIs: MP/MC: burst (size: 8): 62.36 62.37 legacy APIs: MP/MC: burst (size: 32): 101.01 101.03 Testing bulk enq/deq legacy APIs: SP/SC: bulk (size: 8): 25.94 29.55 legacy APIs: SP/SC: bulk (size: 32): 70.00 78.87 legacy APIs: MP/MC: bulk (size: 8): 63.41 62.48 legacy APIs: MP/MC: bulk (size: 32): 105.86 103.84 Summary: In aarch64 server with this patch, there is almost no performance difference. In x86 server with this patch, in some cases, the performance slightly improve, in other cases, the performance slightly drop. Signed-off-by: Feifei Wang Reviewed-by: Honnappa Nagarahalli Reviewed-by: Ruifeng Wang --- lib/librte_ring/rte_ring.h | 284 ++++--------------------------------- 1 file changed, 30 insertions(+), 254 deletions(-) diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index 35f3f8c42..2a2190bfc 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -191,168 +191,6 @@ void rte_ring_free(struct rte_ring *r); */ void rte_ring_dump(FILE *f, const struct rte_ring *r); -/* the actual enqueue of pointers on the ring. - * Placed here since identical code needed in both - * single and multi producer enqueue functions */ -#define ENQUEUE_PTRS(r, ring_start, prod_head, obj_table, n, obj_type) do { \ - unsigned int i; \ - const uint32_t size = (r)->size; \ - uint32_t idx = prod_head & (r)->mask; \ - obj_type *ring = (obj_type *)ring_start; \ - if (likely(idx + n < size)) { \ - for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { \ - ring[idx] = obj_table[i]; \ - ring[idx + 1] = obj_table[i + 1]; \ - ring[idx + 2] = obj_table[i + 2]; \ - ring[idx + 3] = obj_table[i + 3]; \ - } \ - switch (n & 0x3) { \ - case 3: \ - ring[idx++] = obj_table[i++]; /* fallthrough */ \ - case 2: \ - ring[idx++] = obj_table[i++]; /* fallthrough */ \ - case 1: \ - ring[idx++] = obj_table[i++]; \ - } \ - } else { \ - for (i = 0; idx < size; i++, idx++)\ - ring[idx] = obj_table[i]; \ - for (idx = 0; i < n; i++, idx++) \ - ring[idx] = obj_table[i]; \ - } \ -} while (0) - -/* the actual copy of pointers on the ring to obj_table. - * Placed here since identical code needed in both - * single and multi consumer dequeue functions */ -#define DEQUEUE_PTRS(r, ring_start, cons_head, obj_table, n, obj_type) do { \ - unsigned int i; \ - uint32_t idx = cons_head & (r)->mask; \ - const uint32_t size = (r)->size; \ - obj_type *ring = (obj_type *)ring_start; \ - if (likely(idx + n < size)) { \ - for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {\ - obj_table[i] = ring[idx]; \ - obj_table[i + 1] = ring[idx + 1]; \ - obj_table[i + 2] = ring[idx + 2]; \ - obj_table[i + 3] = ring[idx + 3]; \ - } \ - switch (n & 0x3) { \ - case 3: \ - obj_table[i++] = ring[idx++]; /* fallthrough */ \ - case 2: \ - obj_table[i++] = ring[idx++]; /* fallthrough */ \ - case 1: \ - obj_table[i++] = ring[idx++]; \ - } \ - } else { \ - for (i = 0; idx < size; i++, idx++) \ - obj_table[i] = ring[idx]; \ - for (idx = 0; i < n; i++, idx++) \ - obj_table[i] = ring[idx]; \ - } \ -} while (0) - -/* Between load and load. there might be cpu reorder in weak model - * (powerpc/arm). - * There are 2 choices for the users - * 1.use rmb() memory barrier - * 2.use one-direction load_acquire/store_release barrier,defined by - * CONFIG_RTE_USE_C11_MEM_MODEL=y - * It depends on performance test results. - * By default, move common functions to rte_ring_generic.h - */ -#ifdef RTE_USE_C11_MEM_MODEL -#include "rte_ring_c11_mem.h" -#else -#include "rte_ring_generic.h" -#endif - -/** - * @internal Enqueue several objects on the ring - * - * @param r - * A pointer to the ring structure. - * @param obj_table - * A pointer to a table of void * pointers (objects). - * @param n - * The number of objects to add in the ring from the obj_table. - * @param behavior - * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring - * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring - * @param is_sp - * Indicates whether to use single producer or multi-producer head update - * @param free_space - * returns the amount of space after the enqueue operation has finished - * @return - * Actual number of objects enqueued. - * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. - */ -static __rte_always_inline unsigned int -__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, - unsigned int n, enum rte_ring_queue_behavior behavior, - unsigned int is_sp, unsigned int *free_space) -{ - uint32_t prod_head, prod_next; - uint32_t free_entries; - - n = __rte_ring_move_prod_head(r, is_sp, n, behavior, - &prod_head, &prod_next, &free_entries); - if (n == 0) - goto end; - - ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); - - update_tail(&r->prod, prod_head, prod_next, is_sp, 1); -end: - if (free_space != NULL) - *free_space = free_entries - n; - return n; -} - -/** - * @internal Dequeue several objects from the ring - * - * @param r - * A pointer to the ring structure. - * @param obj_table - * A pointer to a table of void * pointers (objects). - * @param n - * The number of objects to pull from the ring. - * @param behavior - * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring - * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring - * @param is_sc - * Indicates whether to use single consumer or multi-consumer head update - * @param available - * returns the number of remaining ring entries after the dequeue has finished - * @return - * - Actual number of objects dequeued. - * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. - */ -static __rte_always_inline unsigned int -__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, - unsigned int n, enum rte_ring_queue_behavior behavior, - unsigned int is_sc, unsigned int *available) -{ - uint32_t cons_head, cons_next; - uint32_t entries; - - n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior, - &cons_head, &cons_next, &entries); - if (n == 0) - goto end; - - DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *); - - update_tail(&r->cons, cons_head, cons_next, is_sc, 0); - -end: - if (available != NULL) - *available = entries - n; - return n; -} - /** * Enqueue several objects on the ring (multi-producers safe). * @@ -375,8 +213,8 @@ static __rte_always_inline unsigned int rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { - return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - RTE_RING_SYNC_MT, free_space); + return rte_ring_mp_enqueue_bulk_elem(r, obj_table, sizeof(void *), + n, free_space); } /** @@ -398,8 +236,8 @@ static __rte_always_inline unsigned int rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { - return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - RTE_RING_SYNC_ST, free_space); + return rte_ring_sp_enqueue_bulk_elem(r, obj_table, sizeof(void *), + n, free_space); } /** @@ -425,24 +263,8 @@ static __rte_always_inline unsigned int rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { - switch (r->prod.sync_type) { - case RTE_RING_SYNC_MT: - return rte_ring_mp_enqueue_bulk(r, obj_table, n, free_space); - case RTE_RING_SYNC_ST: - return rte_ring_sp_enqueue_bulk(r, obj_table, n, free_space); -#ifdef ALLOW_EXPERIMENTAL_API - case RTE_RING_SYNC_MT_RTS: - return rte_ring_mp_rts_enqueue_bulk(r, obj_table, n, - free_space); - case RTE_RING_SYNC_MT_HTS: - return rte_ring_mp_hts_enqueue_bulk(r, obj_table, n, - free_space); -#endif - } - - /* valid ring should never reach this point */ - RTE_ASSERT(0); - return 0; + return rte_ring_enqueue_bulk_elem(r, obj_table, sizeof(void *), + n, free_space); } /** @@ -462,7 +284,7 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, static __rte_always_inline int rte_ring_mp_enqueue(struct rte_ring *r, void *obj) { - return rte_ring_mp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS; + return rte_ring_mp_enqueue_elem(r, obj, sizeof(void *)); } /** @@ -479,7 +301,7 @@ rte_ring_mp_enqueue(struct rte_ring *r, void *obj) static __rte_always_inline int rte_ring_sp_enqueue(struct rte_ring *r, void *obj) { - return rte_ring_sp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS; + return rte_ring_sp_enqueue_elem(r, obj, sizeof(void *)); } /** @@ -500,7 +322,7 @@ rte_ring_sp_enqueue(struct rte_ring *r, void *obj) static __rte_always_inline int rte_ring_enqueue(struct rte_ring *r, void *obj) { - return rte_ring_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS; + return rte_ring_enqueue_elem(r, obj, sizeof(void *)); } /** @@ -525,8 +347,8 @@ static __rte_always_inline unsigned int rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - RTE_RING_SYNC_MT, available); + return rte_ring_mc_dequeue_bulk_elem(r, obj_table, sizeof(void *), + n, available); } /** @@ -549,8 +371,8 @@ static __rte_always_inline unsigned int rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - RTE_RING_SYNC_ST, available); + return rte_ring_sc_dequeue_bulk_elem(r, obj_table, sizeof(void *), + n, available); } /** @@ -576,22 +398,8 @@ static __rte_always_inline unsigned int rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { - switch (r->cons.sync_type) { - case RTE_RING_SYNC_MT: - return rte_ring_mc_dequeue_bulk(r, obj_table, n, available); - case RTE_RING_SYNC_ST: - return rte_ring_sc_dequeue_bulk(r, obj_table, n, available); -#ifdef ALLOW_EXPERIMENTAL_API - case RTE_RING_SYNC_MT_RTS: - return rte_ring_mc_rts_dequeue_bulk(r, obj_table, n, available); - case RTE_RING_SYNC_MT_HTS: - return rte_ring_mc_hts_dequeue_bulk(r, obj_table, n, available); -#endif - } - - /* valid ring should never reach this point */ - RTE_ASSERT(0); - return 0; + return rte_ring_dequeue_bulk_elem(r, obj_table, sizeof(void *), + n, available); } /** @@ -612,7 +420,7 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, static __rte_always_inline int rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p) { - return rte_ring_mc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT; + return rte_ring_mc_dequeue_elem(r, obj_p, sizeof(void *)); } /** @@ -630,7 +438,7 @@ rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p) static __rte_always_inline int rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p) { - return rte_ring_sc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT; + return rte_ring_sc_dequeue_elem(r, obj_p, sizeof(void *)); } /** @@ -652,7 +460,7 @@ rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p) static __rte_always_inline int rte_ring_dequeue(struct rte_ring *r, void **obj_p) { - return rte_ring_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT; + return rte_ring_dequeue_elem(r, obj_p, sizeof(void *)); } /** @@ -860,8 +668,8 @@ static __rte_always_inline unsigned int rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { - return __rte_ring_do_enqueue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, free_space); + return rte_ring_mp_enqueue_burst_elem(r, obj_table, sizeof(void *), + n, free_space); } /** @@ -883,8 +691,8 @@ static __rte_always_inline unsigned int rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { - return __rte_ring_do_enqueue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, free_space); + return rte_ring_sp_enqueue_burst_elem(r, obj_table, sizeof(void *), + n, free_space); } /** @@ -910,24 +718,8 @@ static __rte_always_inline unsigned int rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { - switch (r->prod.sync_type) { - case RTE_RING_SYNC_MT: - return rte_ring_mp_enqueue_burst(r, obj_table, n, free_space); - case RTE_RING_SYNC_ST: - return rte_ring_sp_enqueue_burst(r, obj_table, n, free_space); -#ifdef ALLOW_EXPERIMENTAL_API - case RTE_RING_SYNC_MT_RTS: - return rte_ring_mp_rts_enqueue_burst(r, obj_table, n, - free_space); - case RTE_RING_SYNC_MT_HTS: - return rte_ring_mp_hts_enqueue_burst(r, obj_table, n, - free_space); -#endif - } - - /* valid ring should never reach this point */ - RTE_ASSERT(0); - return 0; + return rte_ring_enqueue_burst_elem(r, obj_table, sizeof(void *), + n, free_space); } /** @@ -954,8 +746,8 @@ static __rte_always_inline unsigned int rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, available); + return rte_ring_mc_dequeue_burst_elem(r, obj_table, sizeof(void *), + n, available); } /** @@ -979,8 +771,8 @@ static __rte_always_inline unsigned int rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, available); + return rte_ring_sc_dequeue_burst_elem(r, obj_table, sizeof(void *), + n, available); } /** @@ -1006,24 +798,8 @@ static __rte_always_inline unsigned int rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { - switch (r->cons.sync_type) { - case RTE_RING_SYNC_MT: - return rte_ring_mc_dequeue_burst(r, obj_table, n, available); - case RTE_RING_SYNC_ST: - return rte_ring_sc_dequeue_burst(r, obj_table, n, available); -#ifdef ALLOW_EXPERIMENTAL_API - case RTE_RING_SYNC_MT_RTS: - return rte_ring_mc_rts_dequeue_burst(r, obj_table, n, - available); - case RTE_RING_SYNC_MT_HTS: - return rte_ring_mc_hts_dequeue_burst(r, obj_table, n, - available); -#endif - } - - /* valid ring should never reach this point */ - RTE_ASSERT(0); - return 0; + return rte_ring_dequeue_burst_elem(r, obj_table, sizeof(void *), + n, available); } #ifdef __cplusplus -- 2.17.1