From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 201C8457D1; Thu, 15 Aug 2024 10:54:14 +0200 (CEST) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 86FF7427D1; Thu, 15 Aug 2024 10:54:05 +0200 (CEST) Received: from forward502b.mail.yandex.net (forward502b.mail.yandex.net [178.154.239.146]) by mails.dpdk.org (Postfix) with ESMTP id 8F1FB427D5 for ; Thu, 15 Aug 2024 10:54:04 +0200 (CEST) Received: from mail-nwsmtp-smtp-production-main-10.sas.yp-c.yandex.net (mail-nwsmtp-smtp-production-main-10.sas.yp-c.yandex.net [IPv6:2a02:6b8:c10:2222:0:640:c513:0]) by forward502b.mail.yandex.net (Yandex) with ESMTPS id 4B9E35EFF8; Thu, 15 Aug 2024 11:54:04 +0300 (MSK) Received: by mail-nwsmtp-smtp-production-main-10.sas.yp-c.yandex.net (smtp/Yandex) with ESMTPSA id hrDRW66kBeA0-IFb1a1Me; Thu, 15 Aug 2024 11:54:03 +0300 X-Yandex-Fwd: 1 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=yandex.ru; s=mail; t=1723712043; bh=VxBugOqE/H+VVXkPavU/9U/jSaJRxdIjZEEFnmqa61c=; h=Message-Id:Date:In-Reply-To:Cc:Subject:References:To:From; b=JjhBS0kD4fQZ/HPmowuka0O6bm6D/Pi0N2gkCZfzLcuKJZdlZFkj0VFLDWtnEB8tr V8NEeSr0IsvoFPuakmtnsNlnIbwtkpWoOYv5xEgmCbREjkXFh5nkGfaz5/KzpITBJ+ Z4ON3/WxZSOsMVWz2YLUKFb2aXoxRWj4VirNxfDA= Authentication-Results: mail-nwsmtp-smtp-production-main-10.sas.yp-c.yandex.net; dkim=pass header.i=@yandex.ru From: Konstantin Ananyev To: dev@dpdk.org Cc: honnappa.nagarahalli@arm.com, jerinj@marvell.com, hemant.agrawal@nxp.com, bruce.richardson@intel.com, drc@linux.vnet.ibm.com, ruifeng.wang@arm.com, mb@smartsharesystems.com, Konstantin Ananyev Subject: [RFC 2/6] ring: make copying functions generic Date: Thu, 15 Aug 2024 09:53:35 +0100 Message-Id: <20240815085339.1434-3-konstantin.v.ananyev@yandex.ru> X-Mailer: git-send-email 2.25.1 In-Reply-To: <20240815085339.1434-1-konstantin.v.ananyev@yandex.ru> References: <20240815085339.1434-1-konstantin.v.ananyev@yandex.ru> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org From: Konstantin Ananyev Note upfront: that change doesn't introduce any functional or performance changes. It is just a code-reordering for: - improve code modularity and re-usability - ability in future to re-use the same code to introduce new functionality There is no real need for enqueue_elems()/dequeue_elems() to get pointer to actual rte_ring structure, instead it is enough to pass a pointer to actual elements buffer inside the ring. In return, we'll get a copying functions that could be used for other queueing abstractions that do have circular ring buffer inside. Signed-off-by: Konstantin Ananyev --- lib/ring/rte_ring_elem_pvt.h | 117 ++++++++++++++++++++--------------- 1 file changed, 68 insertions(+), 49 deletions(-) diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h index 3a83668a08..216cb6089f 100644 --- a/lib/ring/rte_ring_elem_pvt.h +++ b/lib/ring/rte_ring_elem_pvt.h @@ -17,12 +17,14 @@ #endif static __rte_always_inline void -__rte_ring_enqueue_elems_32(struct rte_ring *r, const uint32_t size, - uint32_t idx, const void *obj_table, uint32_t n) +__rte_ring_enqueue_elems_32(void *ring_table, const void *obj_table, + uint32_t size, uint32_t idx, uint32_t n) { unsigned int i; - uint32_t *ring = (uint32_t *)&r[1]; + + uint32_t *ring = ring_table; const uint32_t *obj = (const uint32_t *)obj_table; + if (likely(idx + n <= size)) { for (i = 0; i < (n & ~0x7); i += 8, idx += 8) { ring[idx] = obj[i]; @@ -60,14 +62,14 @@ __rte_ring_enqueue_elems_32(struct rte_ring *r, const uint32_t size, } static __rte_always_inline void -__rte_ring_enqueue_elems_64(struct rte_ring *r, uint32_t prod_head, - const void *obj_table, uint32_t n) +__rte_ring_enqueue_elems_64(void *ring_table, const void *obj_table, + uint32_t size, uint32_t idx, uint32_t n) { unsigned int i; - const uint32_t size = r->size; - uint32_t idx = prod_head & r->mask; - uint64_t *ring = (uint64_t *)&r[1]; + + uint64_t *ring = ring_table; const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table; + if (likely(idx + n <= size)) { for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { ring[idx] = obj[i]; @@ -93,14 +95,14 @@ __rte_ring_enqueue_elems_64(struct rte_ring *r, uint32_t prod_head, } static __rte_always_inline void -__rte_ring_enqueue_elems_128(struct rte_ring *r, uint32_t prod_head, - const void *obj_table, uint32_t n) +__rte_ring_enqueue_elems_128(void *ring_table, const void *obj_table, + uint32_t size, uint32_t idx, uint32_t n) { unsigned int i; - const uint32_t size = r->size; - uint32_t idx = prod_head & r->mask; - rte_int128_t *ring = (rte_int128_t *)&r[1]; + + rte_int128_t *ring = ring_table; const rte_int128_t *obj = (const rte_int128_t *)obj_table; + if (likely(idx + n <= size)) { for (i = 0; i < (n & ~0x1); i += 2, idx += 2) memcpy((void *)(ring + idx), @@ -126,37 +128,47 @@ __rte_ring_enqueue_elems_128(struct rte_ring *r, uint32_t prod_head, * single and multi producer enqueue functions. */ static __rte_always_inline void -__rte_ring_enqueue_elems(struct rte_ring *r, uint32_t prod_head, - const void *obj_table, uint32_t esize, uint32_t num) +__rte_ring_do_enqueue_elems(void *ring_table, const void *obj_table, + uint32_t size, uint32_t idx, uint32_t esize, uint32_t num) { /* 8B and 16B copies implemented individually to retain * the current performance. */ if (esize == 8) - __rte_ring_enqueue_elems_64(r, prod_head, obj_table, num); + __rte_ring_enqueue_elems_64(ring_table, obj_table, size, + idx, num); else if (esize == 16) - __rte_ring_enqueue_elems_128(r, prod_head, obj_table, num); + __rte_ring_enqueue_elems_128(ring_table, obj_table, size, + idx, num); else { - uint32_t idx, scale, nr_idx, nr_num, nr_size; + uint32_t scale, nr_idx, nr_num, nr_size; /* Normalize to uint32_t */ scale = esize / sizeof(uint32_t); nr_num = num * scale; - idx = prod_head & r->mask; nr_idx = idx * scale; - nr_size = r->size * scale; - __rte_ring_enqueue_elems_32(r, nr_size, nr_idx, - obj_table, nr_num); + nr_size = size * scale; + __rte_ring_enqueue_elems_32(ring_table, obj_table, nr_size, + nr_idx, nr_num); } } static __rte_always_inline void -__rte_ring_dequeue_elems_32(struct rte_ring *r, const uint32_t size, - uint32_t idx, void *obj_table, uint32_t n) +__rte_ring_enqueue_elems(struct rte_ring *r, uint32_t prod_head, + const void *obj_table, uint32_t esize, uint32_t num) +{ + __rte_ring_do_enqueue_elems(&r[1], obj_table, r->size, + prod_head & r->mask, esize, num); +} + +static __rte_always_inline void +__rte_ring_dequeue_elems_32(void *obj_table, const void *ring_table, + uint32_t size, uint32_t idx, uint32_t n) { unsigned int i; - uint32_t *ring = (uint32_t *)&r[1]; - uint32_t *obj = (uint32_t *)obj_table; + uint32_t *obj = obj_table; + const uint32_t *ring = (const uint32_t *)ring_table; + if (likely(idx + n <= size)) { for (i = 0; i < (n & ~0x7); i += 8, idx += 8) { obj[i] = ring[idx]; @@ -194,14 +206,13 @@ __rte_ring_dequeue_elems_32(struct rte_ring *r, const uint32_t size, } static __rte_always_inline void -__rte_ring_dequeue_elems_64(struct rte_ring *r, uint32_t cons_head, - void *obj_table, uint32_t n) +__rte_ring_dequeue_elems_64(void *obj_table, const void *ring_table, + uint32_t size, uint32_t idx, uint32_t n) { unsigned int i; - const uint32_t size = r->size; - uint32_t idx = cons_head & r->mask; - uint64_t *ring = (uint64_t *)&r[1]; unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table; + const uint64_t *ring = (const uint64_t *)ring_table; + if (likely(idx + n <= size)) { for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { obj[i] = ring[idx]; @@ -227,27 +238,26 @@ __rte_ring_dequeue_elems_64(struct rte_ring *r, uint32_t cons_head, } static __rte_always_inline void -__rte_ring_dequeue_elems_128(struct rte_ring *r, uint32_t cons_head, - void *obj_table, uint32_t n) +__rte_ring_dequeue_elems_128(void *obj_table, const void *ring_table, + uint32_t size, uint32_t idx, uint32_t n) { unsigned int i; - const uint32_t size = r->size; - uint32_t idx = cons_head & r->mask; - rte_int128_t *ring = (rte_int128_t *)&r[1]; rte_int128_t *obj = (rte_int128_t *)obj_table; + const rte_int128_t *ring = (const rte_int128_t *)ring_table; + if (likely(idx + n <= size)) { for (i = 0; i < (n & ~0x1); i += 2, idx += 2) - memcpy((void *)(obj + i), (void *)(ring + idx), 32); + memcpy((obj + i), (const void *)(ring + idx), 32); switch (n & 0x1) { case 1: - memcpy((void *)(obj + i), (void *)(ring + idx), 16); + memcpy((obj + i), (const void *)(ring + idx), 16); } } else { for (i = 0; idx < size; i++, idx++) - memcpy((void *)(obj + i), (void *)(ring + idx), 16); + memcpy((obj + i), (const void *)(ring + idx), 16); /* Start at the beginning */ for (idx = 0; i < n; i++, idx++) - memcpy((void *)(obj + i), (void *)(ring + idx), 16); + memcpy((obj + i), (const void *)(ring + idx), 16); } } @@ -256,30 +266,39 @@ __rte_ring_dequeue_elems_128(struct rte_ring *r, uint32_t cons_head, * single and multi producer enqueue functions. */ static __rte_always_inline void -__rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head, - void *obj_table, uint32_t esize, uint32_t num) +__rte_ring_do_dequeue_elems(void *obj_table, const void *ring_table, + uint32_t size, uint32_t idx, uint32_t esize, uint32_t num) { /* 8B and 16B copies implemented individually to retain * the current performance. */ if (esize == 8) - __rte_ring_dequeue_elems_64(r, cons_head, obj_table, num); + __rte_ring_dequeue_elems_64(obj_table, ring_table, size, + idx, num); else if (esize == 16) - __rte_ring_dequeue_elems_128(r, cons_head, obj_table, num); + __rte_ring_dequeue_elems_128(obj_table, ring_table, size, + idx, num); else { - uint32_t idx, scale, nr_idx, nr_num, nr_size; + uint32_t scale, nr_idx, nr_num, nr_size; /* Normalize to uint32_t */ scale = esize / sizeof(uint32_t); nr_num = num * scale; - idx = cons_head & r->mask; nr_idx = idx * scale; - nr_size = r->size * scale; - __rte_ring_dequeue_elems_32(r, nr_size, nr_idx, - obj_table, nr_num); + nr_size = size * scale; + __rte_ring_dequeue_elems_32(obj_table, ring_table, nr_size, + nr_idx, nr_num); } } +static __rte_always_inline void +__rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head, + void *obj_table, uint32_t esize, uint32_t num) +{ + __rte_ring_do_dequeue_elems(obj_table, &r[1], r->size, + cons_head & r->mask, esize, num); +} + /* Between load and load. there might be cpu reorder in weak model * (powerpc/arm). * There are 2 choices for the users -- 2.35.3