DPDK patches and discussions
 help / color / mirror / Atom feed
From: Bruce Richardson <bruce.richardson@intel.com>
To: olivier.matz@6wind.com
Cc: dev@dpdk.org, Bruce Richardson <bruce.richardson@intel.com>
Subject: [dpdk-dev] [PATCH RFCv2 3/4] ring: allow common ring to use 8 or 16 byte values
Date: Tue, 24 Jan 2017 10:39:36 +0000	[thread overview]
Message-ID: <1485254377-20098-4-git-send-email-bruce.richardson@intel.com> (raw)
In-Reply-To: <1485254377-20098-1-git-send-email-bruce.richardson@intel.com>

Change the common ring enqueue/dequeue functions to support enqueuing and
dequeuing 16B values. Add the element size as parameter to all common ring
functions that need it, and pass that as parameter from the rte_ring
functions.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 lib/librte_ring/rte_common_ring.c |  14 ++--
 lib/librte_ring/rte_common_ring.h | 135 ++++++++++++++++++++++++++++----------
 lib/librte_ring/rte_ring.c        |   7 +-
 lib/librte_ring/rte_ring.h        |  16 ++---
 4 files changed, 122 insertions(+), 50 deletions(-)

diff --git a/lib/librte_ring/rte_common_ring.c b/lib/librte_ring/rte_common_ring.c
index a0c4b5a..eb04de4 100644
--- a/lib/librte_ring/rte_common_ring.c
+++ b/lib/librte_ring/rte_common_ring.c
@@ -101,7 +101,7 @@ EAL_REGISTER_TAILQ(rte_common_ring_tailq)
 
 /* return the size of memory occupied by a ring */
 ssize_t
-rte_common_ring_get_memsize(unsigned count)
+rte_common_ring_get_memsize(unsigned count, unsigned int elem_sz)
 {
 	ssize_t sz;
 
@@ -113,14 +113,14 @@ rte_common_ring_get_memsize(unsigned count)
 		return -EINVAL;
 	}
 
-	sz = sizeof(struct rte_ring) + count * sizeof(void *);
+	sz = sizeof(struct rte_ring) + count * elem_sz;
 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
 	return sz;
 }
 
 int
 rte_common_ring_init(struct rte_ring *r, const char *name, unsigned count,
-	unsigned flags)
+	unsigned flags, unsigned int elem_sz)
 {
 	int ret;
 
@@ -146,6 +146,7 @@ rte_common_ring_init(struct rte_ring *r, const char *name, unsigned count,
 	if (ret < 0 || ret >= (int)sizeof(r->name))
 		return -ENAMETOOLONG;
 	r->flags = flags;
+	r->elem_sz = elem_sz;
 	r->prod.watermark = count;
 	r->prod.sp_enqueue = !!(flags & RING_F_SP_ENQ);
 	r->cons.sc_dequeue = !!(flags & RING_F_SC_DEQ);
@@ -160,7 +161,7 @@ rte_common_ring_init(struct rte_ring *r, const char *name, unsigned count,
 /* create the ring */
 struct rte_ring *
 rte_common_ring_create(const char *name, unsigned count, int socket_id,
-		unsigned flags)
+		unsigned flags, unsigned int elem_sz)
 {
 	char mz_name[RTE_MEMZONE_NAMESIZE];
 	struct rte_ring *r;
@@ -173,7 +174,7 @@ rte_common_ring_create(const char *name, unsigned count, int socket_id,
 
 	ring_list = RTE_TAILQ_CAST(rte_common_ring_tailq.head, rte_common_ring_list);
 
-	ring_size = rte_common_ring_get_memsize(count);
+	ring_size = rte_common_ring_get_memsize(count, elem_sz);
 	if (ring_size < 0) {
 		rte_errno = ring_size;
 		return NULL;
@@ -203,7 +204,7 @@ rte_common_ring_create(const char *name, unsigned count, int socket_id,
 		r = mz->addr;
 		/* no need to check return value here, we already checked the
 		 * arguments above */
-		rte_common_ring_init(r, name, count, flags);
+		rte_common_ring_init(r, name, count, flags, elem_sz);
 
 		te->data = (void *) r;
 		r->memzone = mz;
@@ -293,6 +294,7 @@ rte_common_ring_dump(FILE *f, const struct rte_ring *r)
 
 	fprintf(f, "ring <%s>@%p\n", r->name, r);
 	fprintf(f, "  flags=%x\n", r->flags);
+	fprintf(f, "  elem_sz=%u\n", r->elem_sz);
 	fprintf(f, "  size=%"PRIu32"\n", r->prod.size);
 	fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
 	fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
diff --git a/lib/librte_ring/rte_common_ring.h b/lib/librte_ring/rte_common_ring.h
index f2c1c46..314d53b 100644
--- a/lib/librte_ring/rte_common_ring.h
+++ b/lib/librte_ring/rte_common_ring.h
@@ -101,6 +101,7 @@ extern "C" {
 #include <rte_atomic.h>
 #include <rte_branch_prediction.h>
 #include <rte_memzone.h>
+#include <rte_vect.h>
 
 #define RTE_TAILQ_RING_NAME "RTE_RING"
 
@@ -157,6 +158,7 @@ struct rte_ring {
 	 */
 	char name[RTE_MEMZONE_NAMESIZE];    /**< Name of the ring. */
 	int flags;                       /**< Flags supplied at creation. */
+	unsigned int elem_sz;            /**< Size of a ring entry */
 	const struct rte_memzone *memzone;
 			/**< Memzone, if any, containing the rte_ring */
 
@@ -232,7 +234,7 @@ struct rte_ring {
  *   - The memory size needed for the ring on success.
  *   - -EINVAL if count is not a power of 2.
  */
-ssize_t rte_common_ring_get_memsize(unsigned count);
+ssize_t rte_common_ring_get_memsize(unsigned count, unsigned int elem_sz);
 
 /**
  * Initialize a ring structure.
@@ -269,7 +271,7 @@ ssize_t rte_common_ring_get_memsize(unsigned count);
  *   0 on success, or a negative value on error.
  */
 int rte_common_ring_init(struct rte_ring *r, const char *name, unsigned count,
-	unsigned flags);
+	unsigned flags, unsigned int elem_sz);
 
 /**
  * Create a new ring named *name* in memory.
@@ -311,7 +313,8 @@ int rte_common_ring_init(struct rte_ring *r, const char *name, unsigned count,
  *    - ENOMEM - no appropriate memory area found in which to create memzone
  */
 struct rte_ring *rte_common_ring_create(const char *name, unsigned count,
-				 int socket_id, unsigned flags);
+				 int socket_id, unsigned flags,
+				 unsigned int elem_sz);
 /**
  * De-allocate all memory used by the ring.
  *
@@ -354,25 +357,50 @@ void rte_common_ring_dump(FILE *f, const struct rte_ring *r);
  * Placed here since identical code needed in both
  * single and multi producer enqueue functions */
 #define ENQUEUE_PTRS() do { \
+	void * const *objs = obj_table; \
 	const uint32_t size = r->prod.size; \
 	uint32_t idx = prod_head & mask; \
 	if (likely(idx + n < size)) { \
 		for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
-			r->ring[idx] = obj_table[i]; \
-			r->ring[idx+1] = obj_table[i+1]; \
-			r->ring[idx+2] = obj_table[i+2]; \
-			r->ring[idx+3] = obj_table[i+3]; \
+			r->ring[idx] = objs[i]; \
+			r->ring[idx+1] = objs[i+1]; \
+			r->ring[idx+2] = objs[i+2]; \
+			r->ring[idx+3] = objs[i+3]; \
 		} \
 		switch (n & 0x3) { \
-			case 3: r->ring[idx++] = obj_table[i++]; \
-			case 2: r->ring[idx++] = obj_table[i++]; \
-			case 1: r->ring[idx++] = obj_table[i++]; \
+			case 3: r->ring[idx++] = objs[i++]; \
+			case 2: r->ring[idx++] = objs[i++]; \
+			case 1: r->ring[idx++] = objs[i++]; \
 		} \
 	} else { \
 		for (i = 0; idx < size; i++, idx++)\
-			r->ring[idx] = obj_table[i]; \
+			r->ring[idx] = objs[i]; \
 		for (idx = 0; i < n; i++, idx++) \
-			r->ring[idx] = obj_table[i]; \
+			r->ring[idx] = objs[i]; \
+	} \
+} while(0)
+#define ENQUEUE_16B() do { \
+	rte_xmm_t *ring = (void *)r->ring; \
+	const rte_xmm_t *objs = obj_table; \
+	const uint32_t size = r->prod.size; \
+	uint32_t idx = prod_head & mask; \
+	if (likely(idx + n < size)) { \
+		for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
+			ring[idx] = objs[i]; \
+			ring[idx+1] = objs[i+1]; \
+			ring[idx+2] = objs[i+2]; \
+			ring[idx+3] = objs[i+3]; \
+		} \
+		switch (n & 0x3) { \
+			case 3: ring[idx++] = objs[i++]; \
+			case 2: ring[idx++] = objs[i++]; \
+			case 1: ring[idx++] = objs[i++]; \
+		} \
+	} else { \
+		for (i = 0; idx < size; i++, idx++)\
+			ring[idx] = objs[i]; \
+		for (idx = 0; i < n; i++, idx++) \
+			ring[idx] = objs[i]; \
 	} \
 } while(0)
 
@@ -380,25 +408,50 @@ void rte_common_ring_dump(FILE *f, const struct rte_ring *r);
  * Placed here since identical code needed in both
  * single and multi consumer dequeue functions */
 #define DEQUEUE_PTRS() do { \
+	void **objs = obj_table; \
+	uint32_t idx = cons_head & mask; \
+	const uint32_t size = r->cons.size; \
+	if (likely(idx + n < size)) { \
+		for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
+			objs[i] = r->ring[idx]; \
+			objs[i+1] = r->ring[idx+1]; \
+			objs[i+2] = r->ring[idx+2]; \
+			objs[i+3] = r->ring[idx+3]; \
+		} \
+		switch (n & 0x3) { \
+			case 3: objs[i++] = r->ring[idx++]; \
+			case 2: objs[i++] = r->ring[idx++]; \
+			case 1: objs[i++] = r->ring[idx++]; \
+		} \
+	} else { \
+		for (i = 0; idx < size; i++, idx++) \
+			objs[i] = r->ring[idx]; \
+		for (idx = 0; i < n; i++, idx++) \
+			objs[i] = r->ring[idx]; \
+	} \
+} while (0)
+#define DEQUEUE_16B() do { \
+	rte_xmm_t *ring = (void *)r->ring; \
+	rte_xmm_t *objs = obj_table; \
 	uint32_t idx = cons_head & mask; \
 	const uint32_t size = r->cons.size; \
 	if (likely(idx + n < size)) { \
 		for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
-			obj_table[i] = r->ring[idx]; \
-			obj_table[i+1] = r->ring[idx+1]; \
-			obj_table[i+2] = r->ring[idx+2]; \
-			obj_table[i+3] = r->ring[idx+3]; \
+			objs[i] = ring[idx]; \
+			objs[i+1] = ring[idx+1]; \
+			objs[i+2] = ring[idx+2]; \
+			objs[i+3] = ring[idx+3]; \
 		} \
 		switch (n & 0x3) { \
-			case 3: obj_table[i++] = r->ring[idx++]; \
-			case 2: obj_table[i++] = r->ring[idx++]; \
-			case 1: obj_table[i++] = r->ring[idx++]; \
+			case 3: objs[i++] = ring[idx++]; \
+			case 2: objs[i++] = ring[idx++]; \
+			case 1: objs[i++] = ring[idx++]; \
 		} \
 	} else { \
 		for (i = 0; idx < size; i++, idx++) \
-			obj_table[i] = r->ring[idx]; \
+			objs[i] = ring[idx]; \
 		for (idx = 0; i < n; i++, idx++) \
-			obj_table[i] = r->ring[idx]; \
+			objs[i] = ring[idx]; \
 	} \
 } while (0)
 
@@ -428,8 +481,9 @@ void rte_common_ring_dump(FILE *f, const struct rte_ring *r);
  *   - n: Actual number of objects enqueued.
  */
 static inline int __attribute__((always_inline))
-__rte_common_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
-			 unsigned n, enum rte_ring_queue_behavior behavior)
+__rte_common_ring_mp_do_enqueue(struct rte_ring *r, const void *obj_table,
+			 unsigned n, unsigned int elem_sz,
+			 enum rte_ring_queue_behavior behavior)
 {
 	uint32_t prod_head, prod_next;
 	uint32_t cons_tail, free_entries;
@@ -480,7 +534,10 @@ __rte_common_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
 	} while (unlikely(success == 0));
 
 	/* write entries in ring */
-	ENQUEUE_PTRS();
+	switch (elem_sz) {
+		case sizeof(void *): ENQUEUE_PTRS(); break;
+		case sizeof(rte_xmm_t): ENQUEUE_16B(); break;
+	}
 	rte_smp_wmb();
 
 	/* if we exceed the watermark */
@@ -537,8 +594,9 @@ __rte_common_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
  *   - n: Actual number of objects enqueued.
  */
 static inline int __attribute__((always_inline))
-__rte_common_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
-			 unsigned n, enum rte_ring_queue_behavior behavior)
+__rte_common_ring_sp_do_enqueue(struct rte_ring *r, const void *obj_table,
+			 unsigned n, unsigned int elem_sz,
+			 enum rte_ring_queue_behavior behavior)
 {
 	uint32_t prod_head, cons_tail;
 	uint32_t prod_next, free_entries;
@@ -575,7 +633,10 @@ __rte_common_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
 	r->prod.head = prod_next;
 
 	/* write entries in ring */
-	ENQUEUE_PTRS();
+	switch (elem_sz) {
+		case sizeof(void *): ENQUEUE_PTRS(); break;
+		case sizeof(rte_xmm_t): ENQUEUE_16B(); break;
+	}
 	rte_smp_wmb();
 
 	/* if we exceed the watermark */
@@ -621,8 +682,9 @@ __rte_common_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
  */
 
 static inline int __attribute__((always_inline))
-__rte_common_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
-		 unsigned n, enum rte_ring_queue_behavior behavior)
+__rte_common_ring_mc_do_dequeue(struct rte_ring *r, void *obj_table,
+		 unsigned n, unsigned int elem_sz,
+		 enum rte_ring_queue_behavior behavior)
 {
 	uint32_t cons_head, prod_tail;
 	uint32_t cons_next, entries;
@@ -671,7 +733,10 @@ __rte_common_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
 	} while (unlikely(success == 0));
 
 	/* copy in table */
-	DEQUEUE_PTRS();
+	switch (elem_sz) {
+		case sizeof(void *): DEQUEUE_PTRS(); break;
+		case sizeof(rte_xmm_t): DEQUEUE_16B(); break;
+	}
 	rte_smp_rmb();
 
 	/*
@@ -720,8 +785,9 @@ __rte_common_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
  *   - n: Actual number of objects dequeued.
  */
 static inline int __attribute__((always_inline))
-__rte_common_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
-		 unsigned n, enum rte_ring_queue_behavior behavior)
+__rte_common_ring_sc_do_dequeue(struct rte_ring *r, void *obj_table,
+		 unsigned n, unsigned int elem_sz,
+		 enum rte_ring_queue_behavior behavior)
 {
 	uint32_t cons_head, prod_tail;
 	uint32_t cons_next, entries;
@@ -755,7 +821,10 @@ __rte_common_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
 	r->cons.head = cons_next;
 
 	/* copy in table */
-	DEQUEUE_PTRS();
+	switch (elem_sz) {
+		case sizeof(void *): DEQUEUE_PTRS(); break;
+		case sizeof(rte_xmm_t): DEQUEUE_16B(); break;
+	}
 	rte_smp_rmb();
 
 	__RING_STAT_ADD(r, deq_success, n);
diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
index 16ddc39..0fd20f3 100644
--- a/lib/librte_ring/rte_ring.c
+++ b/lib/librte_ring/rte_ring.c
@@ -36,14 +36,14 @@
 ssize_t
 rte_ring_get_memsize(unsigned count)
 {
-	return rte_common_ring_get_memsize(count);
+	return rte_common_ring_get_memsize(count, sizeof(void *));
 }
 
 int
 rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
 	unsigned flags)
 {
-	return rte_common_ring_init(r, name, count, flags);
+	return rte_common_ring_init(r, name, count, flags, sizeof(void *));
 }
 
 
@@ -51,7 +51,8 @@ struct rte_ring *
 rte_ring_create(const char *name, unsigned count, int socket_id,
 		unsigned flags)
 {
-	return rte_common_ring_create(name, count, socket_id, flags);
+	return rte_common_ring_create(name, count, socket_id, flags,
+			sizeof(void *));
 }
 
 void
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index 993796f..3b934da 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -215,7 +215,7 @@ static inline int __attribute__((always_inline))
 rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 			 unsigned n)
 {
-	return __rte_common_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+	return __rte_common_ring_mp_do_enqueue(r, obj_table, n, sizeof(void *), RTE_RING_QUEUE_FIXED);
 }
 
 /**
@@ -237,7 +237,7 @@ static inline int __attribute__((always_inline))
 rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 			 unsigned n)
 {
-	return __rte_common_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+	return __rte_common_ring_sp_do_enqueue(r, obj_table, n, sizeof(void *), RTE_RING_QUEUE_FIXED);
 }
 
 /**
@@ -356,7 +356,7 @@ rte_ring_enqueue(struct rte_ring *r, void *obj)
 static inline int __attribute__((always_inline))
 rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
 {
-	return __rte_common_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+	return __rte_common_ring_mc_do_dequeue(r, obj_table, n, sizeof(void *), RTE_RING_QUEUE_FIXED);
 }
 
 /**
@@ -377,7 +377,7 @@ rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
 static inline int __attribute__((always_inline))
 rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
 {
-	return __rte_common_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+	return __rte_common_ring_sc_do_dequeue(r, obj_table, n, sizeof(void *), RTE_RING_QUEUE_FIXED);
 }
 
 /**
@@ -568,7 +568,7 @@ static inline unsigned __attribute__((always_inline))
 rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 			 unsigned n)
 {
-	return __rte_common_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+	return __rte_common_ring_mp_do_enqueue(r, obj_table, n, sizeof(void *), RTE_RING_QUEUE_VARIABLE);
 }
 
 /**
@@ -587,7 +587,7 @@ static inline unsigned __attribute__((always_inline))
 rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 			 unsigned n)
 {
-	return __rte_common_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+	return __rte_common_ring_sp_do_enqueue(r, obj_table, n, sizeof(void *), RTE_RING_QUEUE_VARIABLE);
 }
 
 /**
@@ -636,7 +636,7 @@ rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 static inline unsigned __attribute__((always_inline))
 rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
 {
-	return __rte_common_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+	return __rte_common_ring_mc_do_dequeue(r, obj_table, n, sizeof(void *), RTE_RING_QUEUE_VARIABLE);
 }
 
 /**
@@ -656,7 +656,7 @@ rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
 static inline unsigned __attribute__((always_inline))
 rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
 {
-	return __rte_common_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+	return __rte_common_ring_sc_do_dequeue(r, obj_table, n, sizeof(void *), RTE_RING_QUEUE_VARIABLE);
 }
 
 /**
-- 
2.9.3

  parent reply	other threads:[~2017-01-24 10:40 UTC|newest]

Thread overview: 5+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2017-01-24 10:39 [dpdk-dev] [PATCH RFCv2 0/4] generalise rte_ring to allow different datatypes Bruce Richardson
2017-01-24 10:39 ` [dpdk-dev] [PATCH RFCv2 1/4] ring: create common ring files Bruce Richardson
2017-01-24 10:39 ` [dpdk-dev] [PATCH RFCv2 2/4] ring: separate common and rte_ring specific functions Bruce Richardson
2017-01-24 10:39 ` Bruce Richardson [this message]
2017-01-24 10:39 ` [dpdk-dev] [PATCH RFCv2 4/4] ring: add new event ring class Bruce Richardson

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1485254377-20098-4-git-send-email-bruce.richardson@intel.com \
    --to=bruce.richardson@intel.com \
    --cc=dev@dpdk.org \
    --cc=olivier.matz@6wind.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).