From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <dev-bounces@dpdk.org>
Received: from dpdk.org (dpdk.org [92.243.14.124])
	by inbox.dpdk.org (Postfix) with ESMTP id 873F0A034F;
	Wed, 13 May 2020 17:35:58 +0200 (CEST)
Received: from [92.243.14.124] (localhost [127.0.0.1])
	by dpdk.org (Postfix) with ESMTP id 614731D648;
	Wed, 13 May 2020 17:35:58 +0200 (CEST)
Received: from smartserver.smartsharesystems.com
 (smartserver.smartsharesystems.com [77.243.40.215])
 by dpdk.org (Postfix) with ESMTP id 7A4B51D63D
 for <dev@dpdk.org>; Wed, 13 May 2020 17:35:57 +0200 (CEST)
Received: from dkrd2.smartsharesys.local ([192.168.4.12]) by
 smartserver.smartsharesystems.com with Microsoft SMTPSVC(6.0.3790.4675); 
 Wed, 13 May 2020 17:35:57 +0200
From: =?UTF-8?q?Morten=20Br=C3=B8rup?= <mb@smartsharesystems.com>
To: olivier.matz@6wind.com, konstantin.ananyev@intel.com,
 Honnappa.Nagarahalli@arm.com, nd@arm.com
Cc: dev@dpdk.org, =?UTF-8?q?Morten=20Br=C3=B8rup?= <mb@smartsharesystems.com>
Date: Wed, 13 May 2020 15:35:50 +0000
Message-Id: <20200513153550.37147-1-mb@smartsharesystems.com>
X-Mailer: git-send-email 2.17.1
In-Reply-To: <20200513153111.37063-1-mb@smartsharesystems.com>
References: <20200513153111.37063-1-mb@smartsharesystems.com>
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
X-OriginalArrivalTime: 13 May 2020 15:35:57.0050 (UTC)
 FILETIME=[322B8DA0:01D6293C]
Subject: [dpdk-dev] [PATCH] ring: empty and count optimizations
X-BeenThere: dev@dpdk.org
X-Mailman-Version: 2.1.15
Precedence: list
List-Id: DPDK patches and discussions <dev.dpdk.org>
List-Unsubscribe: <https://mails.dpdk.org/options/dev>,
 <mailto:dev-request@dpdk.org?subject=unsubscribe>
List-Archive: <http://mails.dpdk.org/archives/dev/>
List-Post: <mailto:dev@dpdk.org>
List-Help: <mailto:dev-request@dpdk.org?subject=help>
List-Subscribe: <https://mails.dpdk.org/listinfo/dev>,
 <mailto:dev-request@dpdk.org?subject=subscribe>
Errors-To: dev-bounces@dpdk.org
Sender: "dev" <dev-bounces@dpdk.org>

Testing if the ring is empty is as simple as comparing the producer and
consumer pointers.
In theory, this optimization reduces the number of potential cache misses
from 3 to 2 by not having to read r->mask in rte_ring_count().

It is not possible to enqueue more elements than the capacity of a ring,
so the capacity comparison is a safeguard for observer threads only.
Instead of completely removing the comparison, I have reorganized it to
resemble the other trigrahps in the ring library and added a likely().

The modification of these two functions were discussed in the RFC here:
https://mails.dpdk.org/archives/dev/2020-April/165752.html

Signed-off-by: Morten Brørup <mb@smartsharesystems.com>
---
 lib/librte_ring/rte_ring.h | 36 +++++++++++++++++++-----------------
 1 file changed, 19 insertions(+), 17 deletions(-)

diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index 86faede81..74a8fcdc8 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -55,7 +55,7 @@ extern "C" {
  *   - The memory size needed for the ring on success.
  *   - -EINVAL if count is not a power of 2.
  */
-ssize_t rte_ring_get_memsize(unsigned count);
+ssize_t rte_ring_get_memsize(unsigned int count);
 
 /**
  * Initialize a ring structure.
@@ -109,8 +109,8 @@ ssize_t rte_ring_get_memsize(unsigned count);
  * @return
  *   0 on success, or a negative value on error.
  */
-int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
-	unsigned flags);
+int rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
+	unsigned int flags);
 
 /**
  * Create a new ring named *name* in memory.
@@ -169,8 +169,8 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
  *    - EEXIST - a memzone with the same name already exists
  *    - ENOMEM - no appropriate memory area found in which to create memzone
  */
-struct rte_ring *rte_ring_create(const char *name, unsigned count,
-				 int socket_id, unsigned flags);
+struct rte_ring *rte_ring_create(const char *name, unsigned int count,
+				 int socket_id, unsigned int flags);
 
 /**
  * De-allocate all memory used by the ring.
@@ -199,7 +199,7 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
 	uint32_t idx = prod_head & (r)->mask; \
 	obj_type *ring = (obj_type *)ring_start; \
 	if (likely(idx + n < size)) { \
-		for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
+		for (i = 0; i < (n & ((~(unsigned int)0x3))); i+=4, idx+=4) { \
 			ring[idx] = obj_table[i]; \
 			ring[idx+1] = obj_table[i+1]; \
 			ring[idx+2] = obj_table[i+2]; \
@@ -230,7 +230,7 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
 	const uint32_t size = (r)->size; \
 	obj_type *ring = (obj_type *)ring_start; \
 	if (likely(idx + n < size)) { \
-		for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
+		for (i = 0; i < (n & (~(unsigned int)0x3)); i+=4, idx+=4) {\
 			obj_table[i] = ring[idx]; \
 			obj_table[i+1] = ring[idx+1]; \
 			obj_table[i+2] = ring[idx+2]; \
@@ -683,13 +683,13 @@ rte_ring_reset(struct rte_ring *r);
  * @return
  *   The number of entries in the ring.
  */
-static inline unsigned
+static inline unsigned int
 rte_ring_count(const struct rte_ring *r)
 {
 	uint32_t prod_tail = r->prod.tail;
 	uint32_t cons_tail = r->cons.tail;
 	uint32_t count = (prod_tail - cons_tail) & r->mask;
-	return (count > r->capacity) ? r->capacity : count;
+	return likely(count <= r->capacity) ? count : r->capacity;
 }
 
 /**
@@ -700,7 +700,7 @@ rte_ring_count(const struct rte_ring *r)
  * @return
  *   The number of free entries in the ring.
  */
-static inline unsigned
+static inline unsigned int
 rte_ring_free_count(const struct rte_ring *r)
 {
 	return r->capacity - rte_ring_count(r);
@@ -733,7 +733,9 @@ rte_ring_full(const struct rte_ring *r)
 static inline int
 rte_ring_empty(const struct rte_ring *r)
 {
-	return rte_ring_count(r) == 0;
+	uint32_t prod_tail = r->prod.tail;
+	uint32_t cons_tail = r->cons.tail;
+	return cons_tail == prod_tail;
 }
 
 /**
@@ -860,7 +862,7 @@ struct rte_ring *rte_ring_lookup(const char *name);
  * @return
  *   - n: Actual number of objects enqueued.
  */
-static __rte_always_inline unsigned
+static __rte_always_inline unsigned int
 rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 			 unsigned int n, unsigned int *free_space)
 {
@@ -883,7 +885,7 @@ rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
  * @return
  *   - n: Actual number of objects enqueued.
  */
-static __rte_always_inline unsigned
+static __rte_always_inline unsigned int
 rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 			 unsigned int n, unsigned int *free_space)
 {
@@ -910,7 +912,7 @@ rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
  * @return
  *   - n: Actual number of objects enqueued.
  */
-static __rte_always_inline unsigned
+static __rte_always_inline unsigned int
 rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 		      unsigned int n, unsigned int *free_space)
 {
@@ -954,7 +956,7 @@ rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
  * @return
  *   - n: Actual number of objects dequeued, 0 if ring is empty
  */
-static __rte_always_inline unsigned
+static __rte_always_inline unsigned int
 rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
@@ -979,7 +981,7 @@ rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
  * @return
  *   - n: Actual number of objects dequeued, 0 if ring is empty
  */
-static __rte_always_inline unsigned
+static __rte_always_inline unsigned int
 rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
@@ -1006,7 +1008,7 @@ rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
  * @return
  *   - Number of objects dequeued
  */
-static __rte_always_inline unsigned
+static __rte_always_inline unsigned int
 rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
-- 
2.17.1