DPDK patches and discussions
 help / color / mirror / Atom feed
* [dpdk-dev] [RFC PATCH 00/11] generalise rte_ring to allow different datatypes
@ 2017-01-11 15:05 Bruce Richardson
  2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 01/11] ring: add new typed ring header file Bruce Richardson
                   ` (11 more replies)
  0 siblings, 12 replies; 18+ messages in thread
From: Bruce Richardson @ 2017-01-11 15:05 UTC (permalink / raw)
  To: olivier.matz; +Cc: dev, Bruce Richardson

The rte_ring library in DPDK provides an excellent high-performance
mechanism which can be used for passing pointers between cores and
for other tasks such as buffering. However, it does have a number
of limitations:

* type information of pointers is lost, as it works with void pointers
* typecasting is needed when using enqueue/dequeue burst functions,
  since arrays of other types cannot be automatically cast to void **
* the data to be passed through the ring itself must be no bigger than
  a pointer

While the first two limitations are an inconvenience, the final one is
one that can prevent use of rte_rings in cases where their functionality
is needed. The use-case which has inspired the patchset is that of
eventdev. When working with rte_events, each event is a 16-byte structure
consisting of a pointer and some metadata e.g. priority and type. For
these events, what is passed around between cores is not pointers to
events, but the events themselves. This makes existing rings unsuitable
for use by applications working with rte_events, and also for use
internally inside any software implementation of an eventdev.

For rings to handle events or other similarly sized structures, e.g.
NIC descriptors, etc., we then have two options - duplicate rte_ring
code to create new ring implementations for each of those types, or
generalise the existing code using macros so that the data type handled
by each rings is a compile time paramter. This patchset takes the latter
approach, and once applied would allow us to add an rte_event_ring type
to DPDK using a header file containing:

#define RING_TYPE struct rte_event
#define RING_TYPE_NAME rte_event
#include <rte_typed_ring.h>
#undef RING_TYPE_NAME
#undef RING_TYPE

[NOTE: the event_ring is not defined in this set, since it depends on
the eventdev implementation not present in the main tree]

If we want to elimiate some of the typecasting on our code when enqueuing
and dequeuing mbuf pointers, an rte_mbuf_ring type can be similarly
created using the same number of lines of code.

The downside of this generalisation is that the code for the rings now
has far more use of macros in it. However, I do not feel that overall
readability suffers much from this change, the since the changes are
pretty much just search-replace onces. There should also be no ABI
compatibility issues with this change, since the existing rte_ring
structures remain the same.

Bruce Richardson (11):
  ring: add new typed ring header file
  test: add new test file for typed rings
  ring: add ring management functions to typed ring header
  ring: make ring tailq variable public
  ring: add user-specified typing to typed rings
  ring: use existing power-of-2 function
  ring: allow multiple typed rings in the same unit
  app/pdump: remove duplicate macro definition
  ring: make existing rings reuse the typed ring definitions
  ring: reuse typed rings management functions
  ring: reuse typed ring enqueue and dequeue functions

 app/pdump/main.c                     |    1 -
 app/test/Makefile                    |    1 +
 app/test/test_typed_ring.c           |  156 ++++
 lib/librte_ring/Makefile             |    1 +
 lib/librte_ring/rte_ring.c           |  246 +-----
 lib/librte_ring/rte_ring.h           |  563 +-----------
 lib/librte_ring/rte_ring_version.map |    7 +
 lib/librte_ring/rte_typed_ring.h     | 1570 ++++++++++++++++++++++++++++++++++
 8 files changed, 1758 insertions(+), 787 deletions(-)
 create mode 100644 app/test/test_typed_ring.c
 create mode 100644 lib/librte_ring/rte_typed_ring.h

-- 
2.9.3

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [dpdk-dev] [RFC PATCH 01/11] ring: add new typed ring header file
  2017-01-11 15:05 [dpdk-dev] [RFC PATCH 00/11] generalise rte_ring to allow different datatypes Bruce Richardson
@ 2017-01-11 15:05 ` Bruce Richardson
  2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 02/11] test: add new test file for typed rings Bruce Richardson
                   ` (10 subsequent siblings)
  11 siblings, 0 replies; 18+ messages in thread
From: Bruce Richardson @ 2017-01-11 15:05 UTC (permalink / raw)
  To: olivier.matz; +Cc: dev, Bruce Richardson

initially this is a clone of rte_ring.h with checkpatch errors/warnings
fixed, but will be modified by later commits to be a generic ring
implementation.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 lib/librte_ring/Makefile         |    1 +
 lib/librte_ring/rte_typed_ring.h | 1285 ++++++++++++++++++++++++++++++++++++++
 2 files changed, 1286 insertions(+)
 create mode 100644 lib/librte_ring/rte_typed_ring.h

diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
index 4b1112e..3aa494c 100644
--- a/lib/librte_ring/Makefile
+++ b/lib/librte_ring/Makefile
@@ -45,6 +45,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
 
 # install includes
 SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h
+SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include += rte_typed_ring.h
 
 DEPDIRS-$(CONFIG_RTE_LIBRTE_RING) += lib/librte_eal
 
diff --git a/lib/librte_ring/rte_typed_ring.h b/lib/librte_ring/rte_typed_ring.h
new file mode 100644
index 0000000..18cc6fe
--- /dev/null
+++ b/lib/librte_ring/rte_typed_ring.h
@@ -0,0 +1,1285 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2010-2017 Intel Corporation. All rights reserved.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * Derived from FreeBSD's bufring.h
+ *
+ **************************************************************************
+ *
+ * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ *
+ * 2. The name of Kip Macy nor the names of other
+ *    contributors may be used to endorse or promote products derived from
+ *    this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ ***************************************************************************/
+
+#ifndef _RTE_RING_H_
+#define _RTE_RING_H_
+
+/**
+ * @file
+ * RTE Ring
+ *
+ * The Ring Manager is a fixed-size queue, implemented as a table of
+ * pointers. Head and tail pointers are modified atomically, allowing
+ * concurrent access to it. It has the following features:
+ *
+ * - FIFO (First In First Out)
+ * - Maximum size is fixed; the pointers are stored in a table.
+ * - Lockless implementation.
+ * - Multi- or single-consumer dequeue.
+ * - Multi- or single-producer enqueue.
+ * - Bulk dequeue.
+ * - Bulk enqueue.
+ *
+ * Note: the ring implementation is not preemptible. A lcore must not
+ * be interrupted by another task that uses the same ring.
+ *
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdio.h>
+#include <stdint.h>
+#include <sys/queue.h>
+#include <errno.h>
+#include <rte_common.h>
+#include <rte_memory.h>
+#include <rte_lcore.h>
+#include <rte_atomic.h>
+#include <rte_branch_prediction.h>
+#include <rte_memzone.h>
+
+#define RTE_TAILQ_RING_NAME "RTE_RING"
+
+enum rte_ring_queue_behavior {
+	RTE_RING_QUEUE_FIXED = 0, /* Enq/Deq a fixed number of items from a ring */
+	RTE_RING_QUEUE_VARIABLE   /* Enq/Deq as many items as possible from ring */
+};
+
+#ifdef RTE_LIBRTE_RING_DEBUG
+/**
+ * A structure that stores the ring statistics (per-lcore).
+ */
+struct rte_ring_debug_stats {
+	uint64_t enq_success_bulk; /**< Successful enqueues number. */
+	uint64_t enq_success_objs; /**< Objects successfully enqueued. */
+	uint64_t enq_quota_bulk;   /**< Successful enqueues above watermark. */
+	uint64_t enq_quota_objs;   /**< Objects enqueued above watermark. */
+	uint64_t enq_fail_bulk;    /**< Failed enqueues number. */
+	uint64_t enq_fail_objs;    /**< Objects that failed to be enqueued. */
+	uint64_t deq_success_bulk; /**< Successful dequeues number. */
+	uint64_t deq_success_objs; /**< Objects successfully dequeued. */
+	uint64_t deq_fail_bulk;    /**< Failed dequeues number. */
+	uint64_t deq_fail_objs;    /**< Objects that failed to be dequeued. */
+} __rte_cache_aligned;
+#endif
+
+#define RTE_RING_MZ_PREFIX "RG_"
+/**< The maximum length of a ring name. */
+#define RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
+			   sizeof(RTE_RING_MZ_PREFIX) + 1)
+
+#ifndef RTE_RING_PAUSE_REP_COUNT
+/**
+ * Yield after pause num of times, no yield
+ * if RTE_RING_PAUSE_REP not defined.
+ */
+#define RTE_RING_PAUSE_REP_COUNT 0
+#endif
+
+struct rte_memzone; /* forward declaration, so as not to require memzone.h */
+
+/**
+ * An RTE ring structure.
+ *
+ * The producer and the consumer have a head and a tail index. The particularity
+ * of these index is that they are not between 0 and size(ring). These indexes
+ * are between 0 and 2^32, and we mask their value when we access the ring[]
+ * field. Thanks to this assumption, we can do subtractions between 2 index
+ * values in a modulo-32bit base: that's why the overflow of the indexes is not
+ * a problem.
+ */
+struct rte_ring {
+	/*
+	 * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
+	 * compatibility requirements, it could be changed to RTE_RING_NAMESIZE
+	 * next time the ABI changes
+	 */
+	char name[RTE_MEMZONE_NAMESIZE];    /**< Name of the ring. */
+	int flags;                       /**< Flags supplied at creation. */
+	const struct rte_memzone *memzone;
+			/**< Memzone, if any, containing the rte_ring */
+
+	/** Ring producer status. */
+	struct prod {
+		uint32_t watermark;      /**< Maximum items before EDQUOT. */
+		uint32_t sp_enqueue;     /**< True, if single producer. */
+		uint32_t size;           /**< Size of ring. */
+		uint32_t mask;           /**< Mask (size-1) of ring. */
+		volatile uint32_t head;  /**< Producer head. */
+		volatile uint32_t tail;  /**< Producer tail. */
+	} prod __rte_cache_aligned;
+
+	/** Ring consumer status. */
+	struct cons {
+		uint32_t sc_dequeue;     /**< True, if single consumer. */
+		uint32_t size;           /**< Size of the ring. */
+		uint32_t mask;           /**< Mask (size-1) of ring. */
+		volatile uint32_t head;  /**< Consumer head. */
+		volatile uint32_t tail;  /**< Consumer tail. */
+#ifdef RTE_RING_SPLIT_PROD_CONS
+	} cons __rte_cache_aligned;
+#else
+	} cons;
+#endif
+
+#ifdef RTE_LIBRTE_RING_DEBUG
+	struct rte_ring_debug_stats stats[RTE_MAX_LCORE];
+#endif
+
+	/**
+	 * Memory space of ring starts here.
+	 * not volatile so need to be careful
+	 * about compiler re-ordering
+	 */
+	void *ring[] __rte_cache_aligned;
+};
+
+#define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
+#define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */
+#define RTE_RING_QUOT_EXCEED (1 << 31)  /**< Quota exceed for burst ops */
+#define RTE_RING_SZ_MASK  (unsigned int)(0x0fffffff) /**< Ring size mask */
+
+/**
+ * @internal When debug is enabled, store ring statistics.
+ * @param r
+ *   A pointer to the ring.
+ * @param name
+ *   The name of the statistics field to increment in the ring.
+ * @param n
+ *   The number to add to the object-oriented statistics.
+ */
+#ifdef RTE_LIBRTE_RING_DEBUG
+#define __RING_STAT_ADD(r, name, n) do {                        \
+		unsigned int __lcore_id = rte_lcore_id();       \
+		if (__lcore_id < RTE_MAX_LCORE) {               \
+			r->stats[__lcore_id].name##_objs += n;  \
+			r->stats[__lcore_id].name##_bulk += 1;  \
+		}                                               \
+	} while (0)
+#else
+#define __RING_STAT_ADD(r, name, n) do {} while (0)
+#endif
+
+/**
+ * Calculate the memory size needed for a ring
+ *
+ * This function returns the number of bytes needed for a ring, given
+ * the number of elements in it. This value is the sum of the size of
+ * the structure rte_ring and the size of the memory needed by the
+ * objects pointers. The value is aligned to a cache line size.
+ *
+ * @param count
+ *   The number of elements in the ring (must be a power of 2).
+ * @return
+ *   - The memory size needed for the ring on success.
+ *   - -EINVAL if count is not a power of 2.
+ */
+ssize_t rte_ring_get_memsize(unsigned int count);
+
+/**
+ * Initialize a ring structure.
+ *
+ * Initialize a ring structure in memory pointed by "r". The size of the
+ * memory area must be large enough to store the ring structure and the
+ * object table. It is advised to use rte_ring_get_memsize() to get the
+ * appropriate size.
+ *
+ * The ring size is set to *count*, which must be a power of two. Water
+ * marking is disabled by default. The real usable ring size is
+ * *count-1* instead of *count* to differentiate a free ring from an
+ * empty ring.
+ *
+ * The ring is not added in RTE_TAILQ_RING global list. Indeed, the
+ * memory given by the caller may not be shareable among dpdk
+ * processes.
+ *
+ * @param r
+ *   The pointer to the ring structure followed by the objects table.
+ * @param name
+ *   The name of the ring.
+ * @param count
+ *   The number of elements in the ring (must be a power of 2).
+ * @param flags
+ *   An OR of the following:
+ *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
+ *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
+ *      is "single-producer". Otherwise, it is "multi-producers".
+ *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
+ *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
+ *      is "single-consumer". Otherwise, it is "multi-consumers".
+ * @return
+ *   0 on success, or a negative value on error.
+ */
+int rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
+	unsigned int flags);
+
+/**
+ * Create a new ring named *name* in memory.
+ *
+ * This function uses ``memzone_reserve()`` to allocate memory. Then it
+ * calls rte_ring_init() to initialize an empty ring.
+ *
+ * The new ring size is set to *count*, which must be a power of
+ * two. Water marking is disabled by default. The real usable ring size
+ * is *count-1* instead of *count* to differentiate a free ring from an
+ * empty ring.
+ *
+ * The ring is added in RTE_TAILQ_RING list.
+ *
+ * @param name
+ *   The name of the ring.
+ * @param count
+ *   The size of the ring (must be a power of 2).
+ * @param socket_id
+ *   The *socket_id* argument is the socket identifier in case of
+ *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
+ *   constraint for the reserved zone.
+ * @param flags
+ *   An OR of the following:
+ *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
+ *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
+ *      is "single-producer". Otherwise, it is "multi-producers".
+ *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
+ *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
+ *      is "single-consumer". Otherwise, it is "multi-consumers".
+ * @return
+ *   On success, the pointer to the new allocated ring. NULL on error with
+ *    rte_errno set appropriately. Possible errno values include:
+ *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
+ *    - E_RTE_SECONDARY - function was called from a secondary process instance
+ *    - EINVAL - count provided is not a power of 2
+ *    - ENOSPC - the maximum number of memzones has already been allocated
+ *    - EEXIST - a memzone with the same name already exists
+ *    - ENOMEM - no appropriate memory area found in which to create memzone
+ */
+struct rte_ring *rte_ring_create(const char *name, unsigned int count,
+				 int socket_id, unsigned int flags);
+/**
+ * De-allocate all memory used by the ring.
+ *
+ * @param r
+ *   Ring to free
+ */
+void rte_ring_free(struct rte_ring *r);
+
+/**
+ * Change the high water mark.
+ *
+ * If *count* is 0, water marking is disabled. Otherwise, it is set to the
+ * *count* value. The *count* value must be greater than 0 and less
+ * than the ring size.
+ *
+ * This function can be called at any time (not necessarily at
+ * initialization).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param count
+ *   The new water mark value.
+ * @return
+ *   - 0: Success; water mark changed.
+ *   - -EINVAL: Invalid water mark value.
+ */
+int rte_ring_set_water_mark(struct rte_ring *r, unsigned int count);
+
+/**
+ * Dump the status of the ring to a file.
+ *
+ * @param f
+ *   A pointer to a file for output
+ * @param r
+ *   A pointer to the ring structure.
+ */
+void rte_ring_dump(FILE *f, const struct rte_ring *r);
+
+/* the actual enqueue of pointers on the ring.
+ * Placed here since identical code needed in both
+ * single and multi producer enqueue functions
+ */
+#define ENQUEUE_PTRS() do { \
+	const uint32_t size = r->prod.size; \
+	uint32_t idx = prod_head & mask; \
+	if (likely(idx + n < size)) { \
+		for (i = 0; i < (n & (~0x3U)); i += 4, idx += 4) { \
+			r->ring[idx] = obj_table[i]; \
+			r->ring[idx+1] = obj_table[i+1]; \
+			r->ring[idx+2] = obj_table[i+2]; \
+			r->ring[idx+3] = obj_table[i+3]; \
+		} \
+		switch (n & 0x3) { \
+		case 3: \
+			r->ring[idx++] = obj_table[i++]; /* fallthrough */ \
+		case 2: \
+			r->ring[idx++] = obj_table[i++]; /* fallthrough */ \
+		case 1: \
+			r->ring[idx++] = obj_table[i++]; \
+		} \
+	} else { \
+		for (i = 0; idx < size; i++, idx++)\
+			r->ring[idx] = obj_table[i]; \
+		for (idx = 0; i < n; i++, idx++) \
+			r->ring[idx] = obj_table[i]; \
+	} \
+} while (0)
+
+/* the actual copy of pointers on the ring to obj_table.
+ * Placed here since identical code needed in both
+ * single and multi consumer dequeue functions
+ */
+#define DEQUEUE_PTRS() do { \
+	uint32_t idx = cons_head & mask; \
+	const uint32_t size = r->cons.size; \
+	if (likely(idx + n < size)) { \
+		for (i = 0; i < (n & (~0x3U)); i += 4, idx += 4) { \
+			obj_table[i] = r->ring[idx]; \
+			obj_table[i+1] = r->ring[idx+1]; \
+			obj_table[i+2] = r->ring[idx+2]; \
+			obj_table[i+3] = r->ring[idx+3]; \
+		} \
+		switch (n & 0x3) { \
+		case 3: \
+			obj_table[i++] = r->ring[idx++]; /* fallthrough */ \
+		case 2: \
+			obj_table[i++] = r->ring[idx++]; /* fallthrough */ \
+		case 1: \
+			obj_table[i++] = r->ring[idx++]; \
+		} \
+	} else { \
+		for (i = 0; idx < size; i++, idx++) \
+			obj_table[i] = r->ring[idx]; \
+		for (idx = 0; i < n; i++, idx++) \
+			obj_table[i] = r->ring[idx]; \
+	} \
+} while (0)
+
+/**
+ * @internal Enqueue several objects on the ring (multi-producers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * producer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
+ * @return
+ *   Depend on the behavior value
+ *   if behavior = RTE_RING_QUEUE_FIXED
+ *   - 0: Success; objects enqueue.
+ *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
+ *     high water mark is exceeded.
+ *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
+ *   if behavior = RTE_RING_QUEUE_VARIABLE
+ *   - n: Actual number of objects enqueued.
+ */
+static inline int __attribute__((always_inline))
+__rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
+			 unsigned int n, enum rte_ring_queue_behavior behavior)
+{
+	uint32_t prod_head, prod_next;
+	uint32_t cons_tail, free_entries;
+	const unsigned int max = n;
+	int success;
+	unsigned int i, rep = 0;
+	uint32_t mask = r->prod.mask;
+	int ret;
+
+	/* Avoid the unnecessary cmpset operation below, which is also
+	 * potentially harmful when n equals 0.
+	 */
+	if (n == 0)
+		return 0;
+
+	/* move prod.head atomically */
+	do {
+		/* Reset n to the initial burst count */
+		n = max;
+
+		prod_head = r->prod.head;
+		cons_tail = r->cons.tail;
+		/* The subtraction is done between two unsigned 32bits value
+		 * (the result is always modulo 32 bits even if we have
+		 * prod_head > cons_tail). So 'free_entries' is always between 0
+		 * and size(ring)-1.
+		 */
+		free_entries = (mask + cons_tail - prod_head);
+
+		/* check that we have enough room in ring */
+		if (unlikely(n > free_entries)) {
+			if (behavior == RTE_RING_QUEUE_FIXED) {
+				__RING_STAT_ADD(r, enq_fail, n);
+				return -ENOBUFS;
+			}
+
+			/* Check for space for at least 1 entry */
+			if (unlikely(free_entries == 0)) {
+				__RING_STAT_ADD(r, enq_fail, n);
+				return 0;
+			}
+
+			n = free_entries;
+		}
+
+		prod_next = prod_head + n;
+		success = rte_atomic32_cmpset(&r->prod.head, prod_head,
+					      prod_next);
+	} while (unlikely(success == 0));
+
+	/* write entries in ring */
+	ENQUEUE_PTRS();
+	rte_smp_wmb();
+
+	/* if we exceed the watermark */
+	if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
+		ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
+				(int)(n | RTE_RING_QUOT_EXCEED);
+		__RING_STAT_ADD(r, enq_quota, n);
+	} else {
+		ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
+		__RING_STAT_ADD(r, enq_success, n);
+	}
+
+	/*
+	 * If there are other enqueues in progress that preceded us,
+	 * we need to wait for them to complete
+	 */
+	while (unlikely(r->prod.tail != prod_head)) {
+		rte_pause();
+
+		/* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
+		 * for other thread finish. It gives pre-empted thread a chance
+		 * to proceed and finish with ring dequeue operation.
+		 */
+		if (RTE_RING_PAUSE_REP_COUNT &&
+		    ++rep == RTE_RING_PAUSE_REP_COUNT) {
+			rep = 0;
+			sched_yield();
+		}
+	}
+	r->prod.tail = prod_next;
+	return ret;
+}
+
+/**
+ * @internal Enqueue several objects on a ring (NOT multi-producers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
+ * @return
+ *   Depend on the behavior value
+ *   if behavior = RTE_RING_QUEUE_FIXED
+ *   - 0: Success; objects enqueue.
+ *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
+ *     high water mark is exceeded.
+ *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
+ *   if behavior = RTE_RING_QUEUE_VARIABLE
+ *   - n: Actual number of objects enqueued.
+ */
+static inline int __attribute__((always_inline))
+__rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
+			 unsigned int n, enum rte_ring_queue_behavior behavior)
+{
+	uint32_t prod_head, cons_tail;
+	uint32_t prod_next, free_entries;
+	unsigned int i;
+	uint32_t mask = r->prod.mask;
+	int ret;
+
+	prod_head = r->prod.head;
+	cons_tail = r->cons.tail;
+	/* The subtraction is done between two unsigned 32bits value
+	 * (the result is always modulo 32 bits even if we have
+	 * prod_head > cons_tail). So 'free_entries' is always between 0
+	 * and size(ring)-1.
+	 */
+	free_entries = mask + cons_tail - prod_head;
+
+	/* check that we have enough room in ring */
+	if (unlikely(n > free_entries)) {
+		if (behavior == RTE_RING_QUEUE_FIXED) {
+			__RING_STAT_ADD(r, enq_fail, n);
+			return -ENOBUFS;
+		}
+
+		/* Check for space for at least 1 entry */
+		if (unlikely(free_entries == 0)) {
+			__RING_STAT_ADD(r, enq_fail, n);
+			return 0;
+		}
+
+		n = free_entries;
+	}
+
+	prod_next = prod_head + n;
+	r->prod.head = prod_next;
+
+	/* write entries in ring */
+	ENQUEUE_PTRS();
+	rte_smp_wmb();
+
+	/* if we exceed the watermark */
+	if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
+		ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
+			(int)(n | RTE_RING_QUOT_EXCEED);
+		__RING_STAT_ADD(r, enq_quota, n);
+	} else {
+		ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
+		__RING_STAT_ADD(r, enq_success, n);
+	}
+
+	r->prod.tail = prod_next;
+	return ret;
+}
+
+/**
+ * @internal Dequeue several objects from a ring (multi-consumers safe). When
+ * the request objects are more than the available objects, only dequeue the
+ * actual number of objects
+ *
+ * This function uses a "compare and set" instruction to move the
+ * consumer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
+ * @return
+ *   Depend on the behavior value
+ *   if behavior = RTE_RING_QUEUE_FIXED
+ *   - 0: Success; objects dequeued.
+ *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
+ *     dequeued.
+ *   if behavior = RTE_RING_QUEUE_VARIABLE
+ *   - n: Actual number of objects dequeued.
+ */
+
+static inline int __attribute__((always_inline))
+__rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
+		 unsigned int n, enum rte_ring_queue_behavior behavior)
+{
+	uint32_t cons_head, prod_tail;
+	uint32_t cons_next, entries;
+	const unsigned int max = n;
+	int success;
+	unsigned int i, rep = 0;
+	uint32_t mask = r->prod.mask;
+
+	/* Avoid the unnecessary cmpset operation below, which is also
+	 * potentially harmful when n equals 0.
+	 */
+	if (n == 0)
+		return 0;
+
+	/* move cons.head atomically */
+	do {
+		/* Restore n as it may change every loop */
+		n = max;
+
+		cons_head = r->cons.head;
+		prod_tail = r->prod.tail;
+		/* The subtraction is done between two unsigned 32bits value
+		 * (the result is always modulo 32 bits even if we have
+		 * cons_head > prod_tail). So 'entries' is always between 0
+		 * and size(ring)-1.
+		 */
+		entries = (prod_tail - cons_head);
+
+		/* Set the actual entries for dequeue */
+		if (n > entries) {
+			if (behavior == RTE_RING_QUEUE_FIXED) {
+				__RING_STAT_ADD(r, deq_fail, n);
+				return -ENOENT;
+			}
+
+			if (unlikely(entries == 0)) {
+				__RING_STAT_ADD(r, deq_fail, n);
+				return 0;
+			}
+
+			n = entries;
+		}
+
+		cons_next = cons_head + n;
+		success = rte_atomic32_cmpset(&r->cons.head, cons_head,
+					      cons_next);
+	} while (unlikely(success == 0));
+
+	/* copy in table */
+	DEQUEUE_PTRS();
+	rte_smp_rmb();
+
+	/*
+	 * If there are other dequeues in progress that preceded us,
+	 * we need to wait for them to complete
+	 */
+	while (unlikely(r->cons.tail != cons_head)) {
+		rte_pause();
+
+		/* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
+		 * for other thread finish. It gives pre-empted thread a chance
+		 * to proceed and finish with ring dequeue operation.
+		 */
+		if (RTE_RING_PAUSE_REP_COUNT &&
+		    ++rep == RTE_RING_PAUSE_REP_COUNT) {
+			rep = 0;
+			sched_yield();
+		}
+	}
+	__RING_STAT_ADD(r, deq_success, n);
+	r->cons.tail = cons_next;
+
+	return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
+}
+
+/**
+ * @internal Dequeue several objects from a ring (NOT multi-consumers safe).
+ * When the request objects are more than the available objects, only dequeue
+ * the actual number of objects
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
+ * @return
+ *   Depend on the behavior value
+ *   if behavior = RTE_RING_QUEUE_FIXED
+ *   - 0: Success; objects dequeued.
+ *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
+ *     dequeued.
+ *   if behavior = RTE_RING_QUEUE_VARIABLE
+ *   - n: Actual number of objects dequeued.
+ */
+static inline int __attribute__((always_inline))
+__rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
+		 unsigned int n, enum rte_ring_queue_behavior behavior)
+{
+	uint32_t cons_head, prod_tail;
+	uint32_t cons_next, entries;
+	unsigned int i;
+	uint32_t mask = r->prod.mask;
+
+	cons_head = r->cons.head;
+	prod_tail = r->prod.tail;
+	/* The subtraction is done between two unsigned 32bits value
+	 * (the result is always modulo 32 bits even if we have
+	 * cons_head > prod_tail). So 'entries' is always between 0
+	 * and size(ring)-1.
+	 */
+	entries = prod_tail - cons_head;
+
+	if (n > entries) {
+		if (behavior == RTE_RING_QUEUE_FIXED) {
+			__RING_STAT_ADD(r, deq_fail, n);
+			return -ENOENT;
+		}
+
+		if (unlikely(entries == 0)) {
+			__RING_STAT_ADD(r, deq_fail, n);
+			return 0;
+		}
+
+		n = entries;
+	}
+
+	cons_next = cons_head + n;
+	r->cons.head = cons_next;
+
+	/* copy in table */
+	DEQUEUE_PTRS();
+	rte_smp_rmb();
+
+	__RING_STAT_ADD(r, deq_success, n);
+	r->cons.tail = cons_next;
+	return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
+}
+
+/**
+ * Enqueue several objects on the ring (multi-producers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * producer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @return
+ *   - 0: Success; objects enqueue.
+ *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
+ *     high water mark is exceeded.
+ *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
+ */
+static inline int __attribute__((always_inline))
+rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
+			 unsigned int n)
+{
+	return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+}
+
+/**
+ * Enqueue several objects on a ring (NOT multi-producers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @return
+ *   - 0: Success; objects enqueued.
+ *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
+ *     high water mark is exceeded.
+ *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ */
+static inline int __attribute__((always_inline))
+rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
+			 unsigned int n)
+{
+	return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+}
+
+/**
+ * Enqueue several objects on a ring.
+ *
+ * This function calls the multi-producer or the single-producer
+ * version depending on the default behavior that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @return
+ *   - 0: Success; objects enqueued.
+ *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
+ *     high water mark is exceeded.
+ *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ */
+static inline int __attribute__((always_inline))
+rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
+		      unsigned int n)
+{
+	if (r->prod.sp_enqueue)
+		return rte_ring_sp_enqueue_bulk(r, obj_table, n);
+	else
+		return rte_ring_mp_enqueue_bulk(r, obj_table, n);
+}
+
+/**
+ * Enqueue one object on a ring (multi-producers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * producer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj
+ *   A pointer to the object to be added.
+ * @return
+ *   - 0: Success; objects enqueued.
+ *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
+ *     high water mark is exceeded.
+ *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ */
+static inline int __attribute__((always_inline))
+rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
+{
+	return rte_ring_mp_enqueue_bulk(r, &obj, 1);
+}
+
+/**
+ * Enqueue one object on a ring (NOT multi-producers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj
+ *   A pointer to the object to be added.
+ * @return
+ *   - 0: Success; objects enqueued.
+ *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
+ *     high water mark is exceeded.
+ *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ */
+static inline int __attribute__((always_inline))
+rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
+{
+	return rte_ring_sp_enqueue_bulk(r, &obj, 1);
+}
+
+/**
+ * Enqueue one object on a ring.
+ *
+ * This function calls the multi-producer or the single-producer
+ * version, depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj
+ *   A pointer to the object to be added.
+ * @return
+ *   - 0: Success; objects enqueued.
+ *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
+ *     high water mark is exceeded.
+ *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ */
+static inline int __attribute__((always_inline))
+rte_ring_enqueue(struct rte_ring *r, void *obj)
+{
+	if (r->prod.sp_enqueue)
+		return rte_ring_sp_enqueue(r, obj);
+	else
+		return rte_ring_mp_enqueue(r, obj);
+}
+
+/**
+ * Dequeue several objects from a ring (multi-consumers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * consumer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @return
+ *   - 0: Success; objects dequeued.
+ *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
+ *     dequeued.
+ */
+static inline int __attribute__((always_inline))
+rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n)
+{
+	return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+}
+
+/**
+ * Dequeue several objects from a ring (NOT multi-consumers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table,
+ *   must be strictly positive.
+ * @return
+ *   - 0: Success; objects dequeued.
+ *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
+ *     dequeued.
+ */
+static inline int __attribute__((always_inline))
+rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n)
+{
+	return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+}
+
+/**
+ * Dequeue several objects from a ring.
+ *
+ * This function calls the multi-consumers or the single-consumer
+ * version, depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @return
+ *   - 0: Success; objects dequeued.
+ *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
+ *     dequeued.
+ */
+static inline int __attribute__((always_inline))
+rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n)
+{
+	if (r->cons.sc_dequeue)
+		return rte_ring_sc_dequeue_bulk(r, obj_table, n);
+	else
+		return rte_ring_mc_dequeue_bulk(r, obj_table, n);
+}
+
+/**
+ * Dequeue one object from a ring (multi-consumers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * consumer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_p
+ *   A pointer to a void * pointer (object) that will be filled.
+ * @return
+ *   - 0: Success; objects dequeued.
+ *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
+ *     dequeued.
+ */
+static inline int __attribute__((always_inline))
+rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
+{
+	return rte_ring_mc_dequeue_bulk(r, obj_p, 1);
+}
+
+/**
+ * Dequeue one object from a ring (NOT multi-consumers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_p
+ *   A pointer to a void * pointer (object) that will be filled.
+ * @return
+ *   - 0: Success; objects dequeued.
+ *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
+ *     dequeued.
+ */
+static inline int __attribute__((always_inline))
+rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
+{
+	return rte_ring_sc_dequeue_bulk(r, obj_p, 1);
+}
+
+/**
+ * Dequeue one object from a ring.
+ *
+ * This function calls the multi-consumers or the single-consumer
+ * version depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_p
+ *   A pointer to a void * pointer (object) that will be filled.
+ * @return
+ *   - 0: Success, objects dequeued.
+ *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
+ *     dequeued.
+ */
+static inline int __attribute__((always_inline))
+rte_ring_dequeue(struct rte_ring *r, void **obj_p)
+{
+	if (r->cons.sc_dequeue)
+		return rte_ring_sc_dequeue(r, obj_p);
+	else
+		return rte_ring_mc_dequeue(r, obj_p);
+}
+
+/**
+ * Test if a ring is full.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @return
+ *   - 1: The ring is full.
+ *   - 0: The ring is not full.
+ */
+static inline int
+rte_ring_full(const struct rte_ring *r)
+{
+	uint32_t prod_tail = r->prod.tail;
+	uint32_t cons_tail = r->cons.tail;
+	return ((cons_tail - prod_tail - 1) & r->prod.mask) == 0;
+}
+
+/**
+ * Test if a ring is empty.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @return
+ *   - 1: The ring is empty.
+ *   - 0: The ring is not empty.
+ */
+static inline int
+rte_ring_empty(const struct rte_ring *r)
+{
+	uint32_t prod_tail = r->prod.tail;
+	uint32_t cons_tail = r->cons.tail;
+	return !!(cons_tail == prod_tail);
+}
+
+/**
+ * Return the number of entries in a ring.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @return
+ *   The number of entries in the ring.
+ */
+static inline unsigned
+rte_ring_count(const struct rte_ring *r)
+{
+	uint32_t prod_tail = r->prod.tail;
+	uint32_t cons_tail = r->cons.tail;
+	return (prod_tail - cons_tail) & r->prod.mask;
+}
+
+/**
+ * Return the number of free entries in a ring.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @return
+ *   The number of free entries in the ring.
+ */
+static inline unsigned
+rte_ring_free_count(const struct rte_ring *r)
+{
+	uint32_t prod_tail = r->prod.tail;
+	uint32_t cons_tail = r->cons.tail;
+	return (cons_tail - prod_tail - 1) & r->prod.mask;
+}
+
+/**
+ * Dump the status of all rings on the console
+ *
+ * @param f
+ *   A pointer to a file for output
+ */
+void rte_ring_list_dump(FILE *f);
+
+/**
+ * Search a ring from its name
+ *
+ * @param name
+ *   The name of the ring.
+ * @return
+ *   The pointer to the ring matching the name, or NULL if not found,
+ *   with rte_errno set appropriately. Possible rte_errno values include:
+ *    - ENOENT - required entry not available to return.
+ */
+struct rte_ring *rte_ring_lookup(const char *name);
+
+/**
+ * Enqueue several objects on the ring (multi-producers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * producer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @return
+ *   - n: Actual number of objects enqueued.
+ */
+static inline unsigned int __attribute__((always_inline))
+rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
+			 unsigned int n)
+{
+	return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+}
+
+/**
+ * Enqueue several objects on a ring (NOT multi-producers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @return
+ *   - n: Actual number of objects enqueued.
+ */
+static inline unsigned int __attribute__((always_inline))
+rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
+			 unsigned int n)
+{
+	return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+}
+
+/**
+ * Enqueue several objects on a ring.
+ *
+ * This function calls the multi-producer or the single-producer
+ * version depending on the default behavior that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @return
+ *   - n: Actual number of objects enqueued.
+ */
+static inline unsigned int __attribute__((always_inline))
+rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
+		      unsigned int n)
+{
+	if (r->prod.sp_enqueue)
+		return rte_ring_sp_enqueue_burst(r, obj_table, n);
+	else
+		return rte_ring_mp_enqueue_burst(r, obj_table, n);
+}
+
+/**
+ * Dequeue several objects from a ring (multi-consumers safe). When the request
+ * objects are more than the available objects, only dequeue the actual number
+ * of objects
+ *
+ * This function uses a "compare and set" instruction to move the
+ * consumer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @return
+ *   - n: Actual number of objects dequeued, 0 if ring is empty
+ */
+static inline unsigned int __attribute__((always_inline))
+rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n)
+{
+	return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+}
+
+/**
+ * Dequeue several objects from a ring (NOT multi-consumers safe).When the
+ * request objects are more than the available objects, only dequeue the
+ * actual number of objects
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @return
+ *   - n: Actual number of objects dequeued, 0 if ring is empty
+ */
+static inline unsigned int __attribute__((always_inline))
+rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n)
+{
+	return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+}
+
+/**
+ * Dequeue multiple objects from a ring up to a maximum number.
+ *
+ * This function calls the multi-consumers or the single-consumer
+ * version, depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @return
+ *   - Number of objects dequeued
+ */
+static inline unsigned int __attribute__((always_inline))
+rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n)
+{
+	if (r->cons.sc_dequeue)
+		return rte_ring_sc_dequeue_burst(r, obj_table, n);
+	else
+		return rte_ring_mc_dequeue_burst(r, obj_table, n);
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_RING_H_ */
-- 
2.9.3

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [dpdk-dev] [RFC PATCH 02/11] test: add new test file for typed rings
  2017-01-11 15:05 [dpdk-dev] [RFC PATCH 00/11] generalise rte_ring to allow different datatypes Bruce Richardson
  2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 01/11] ring: add new typed ring header file Bruce Richardson
@ 2017-01-11 15:05 ` Bruce Richardson
  2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 03/11] ring: add ring management functions to typed ring header Bruce Richardson
                   ` (9 subsequent siblings)
  11 siblings, 0 replies; 18+ messages in thread
From: Bruce Richardson @ 2017-01-11 15:05 UTC (permalink / raw)
  To: olivier.matz; +Cc: dev, Bruce Richardson

initial test file just ensures compilation by including the header file
and contains an empty test case.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 app/test/Makefile          |  1 +
 app/test/test_typed_ring.c | 46 ++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 47 insertions(+)
 create mode 100644 app/test/test_typed_ring.c

diff --git a/app/test/Makefile b/app/test/Makefile
index 5be023a..929267f 100644
--- a/app/test/Makefile
+++ b/app/test/Makefile
@@ -97,6 +97,7 @@ SRCS-y += test_memzone.c
 
 SRCS-y += test_ring.c
 SRCS-y += test_ring_perf.c
+SRCS-y += test_typed_ring.c
 SRCS-y += test_pmd_perf.c
 
 ifeq ($(CONFIG_RTE_LIBRTE_TABLE),y)
diff --git a/app/test/test_typed_ring.c b/app/test/test_typed_ring.c
new file mode 100644
index 0000000..fbfb820
--- /dev/null
+++ b/app/test/test_typed_ring.c
@@ -0,0 +1,46 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2017 Intel Corporation. All rights reserved.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <rte_typed_ring.h>
+#include "test.h"
+
+/**
+ * test entry point
+ */
+static int
+test_typed_ring(void)
+{
+	return 0;
+}
+
+REGISTER_TEST_COMMAND(typed_ring_autotest, test_typed_ring);
-- 
2.9.3

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [dpdk-dev] [RFC PATCH 03/11] ring: add ring management functions to typed ring header
  2017-01-11 15:05 [dpdk-dev] [RFC PATCH 00/11] generalise rte_ring to allow different datatypes Bruce Richardson
  2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 01/11] ring: add new typed ring header file Bruce Richardson
  2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 02/11] test: add new test file for typed rings Bruce Richardson
@ 2017-01-11 15:05 ` Bruce Richardson
  2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 04/11] ring: make ring tailq variable public Bruce Richardson
                   ` (8 subsequent siblings)
  11 siblings, 0 replies; 18+ messages in thread
From: Bruce Richardson @ 2017-01-11 15:05 UTC (permalink / raw)
  To: olivier.matz; +Cc: dev, Bruce Richardson

take the ring management functions from rte_ring.c and copy them
into the typed ring header file. This gives us a complete ring
implementation that we can look to generalize in later commits.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 lib/librte_ring/rte_typed_ring.h | 579 +++++++++++++++++++++++++++++----------
 1 file changed, 427 insertions(+), 152 deletions(-)

diff --git a/lib/librte_ring/rte_typed_ring.h b/lib/librte_ring/rte_typed_ring.h
index 18cc6fe..5a14403 100644
--- a/lib/librte_ring/rte_typed_ring.h
+++ b/lib/librte_ring/rte_typed_ring.h
@@ -95,12 +95,21 @@ extern "C" {
 #include <stdint.h>
 #include <sys/queue.h>
 #include <errno.h>
+#include <string.h>
+#include <inttypes.h>
+
 #include <rte_common.h>
 #include <rte_memory.h>
 #include <rte_lcore.h>
 #include <rte_atomic.h>
 #include <rte_branch_prediction.h>
 #include <rte_memzone.h>
+#include <rte_tailq.h>
+#include <rte_log.h>
+#include <rte_errno.h>
+#include <rte_malloc.h>
+#include <rte_rwlock.h>
+#include <rte_eal_memconfig.h>
 
 #define RTE_TAILQ_RING_NAME "RTE_RING"
 
@@ -224,138 +233,6 @@ struct rte_ring {
 #define __RING_STAT_ADD(r, name, n) do {} while (0)
 #endif
 
-/**
- * Calculate the memory size needed for a ring
- *
- * This function returns the number of bytes needed for a ring, given
- * the number of elements in it. This value is the sum of the size of
- * the structure rte_ring and the size of the memory needed by the
- * objects pointers. The value is aligned to a cache line size.
- *
- * @param count
- *   The number of elements in the ring (must be a power of 2).
- * @return
- *   - The memory size needed for the ring on success.
- *   - -EINVAL if count is not a power of 2.
- */
-ssize_t rte_ring_get_memsize(unsigned int count);
-
-/**
- * Initialize a ring structure.
- *
- * Initialize a ring structure in memory pointed by "r". The size of the
- * memory area must be large enough to store the ring structure and the
- * object table. It is advised to use rte_ring_get_memsize() to get the
- * appropriate size.
- *
- * The ring size is set to *count*, which must be a power of two. Water
- * marking is disabled by default. The real usable ring size is
- * *count-1* instead of *count* to differentiate a free ring from an
- * empty ring.
- *
- * The ring is not added in RTE_TAILQ_RING global list. Indeed, the
- * memory given by the caller may not be shareable among dpdk
- * processes.
- *
- * @param r
- *   The pointer to the ring structure followed by the objects table.
- * @param name
- *   The name of the ring.
- * @param count
- *   The number of elements in the ring (must be a power of 2).
- * @param flags
- *   An OR of the following:
- *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
- *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
- *      is "single-producer". Otherwise, it is "multi-producers".
- *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
- *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
- *      is "single-consumer". Otherwise, it is "multi-consumers".
- * @return
- *   0 on success, or a negative value on error.
- */
-int rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
-	unsigned int flags);
-
-/**
- * Create a new ring named *name* in memory.
- *
- * This function uses ``memzone_reserve()`` to allocate memory. Then it
- * calls rte_ring_init() to initialize an empty ring.
- *
- * The new ring size is set to *count*, which must be a power of
- * two. Water marking is disabled by default. The real usable ring size
- * is *count-1* instead of *count* to differentiate a free ring from an
- * empty ring.
- *
- * The ring is added in RTE_TAILQ_RING list.
- *
- * @param name
- *   The name of the ring.
- * @param count
- *   The size of the ring (must be a power of 2).
- * @param socket_id
- *   The *socket_id* argument is the socket identifier in case of
- *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
- *   constraint for the reserved zone.
- * @param flags
- *   An OR of the following:
- *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
- *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
- *      is "single-producer". Otherwise, it is "multi-producers".
- *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
- *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
- *      is "single-consumer". Otherwise, it is "multi-consumers".
- * @return
- *   On success, the pointer to the new allocated ring. NULL on error with
- *    rte_errno set appropriately. Possible errno values include:
- *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
- *    - E_RTE_SECONDARY - function was called from a secondary process instance
- *    - EINVAL - count provided is not a power of 2
- *    - ENOSPC - the maximum number of memzones has already been allocated
- *    - EEXIST - a memzone with the same name already exists
- *    - ENOMEM - no appropriate memory area found in which to create memzone
- */
-struct rte_ring *rte_ring_create(const char *name, unsigned int count,
-				 int socket_id, unsigned int flags);
-/**
- * De-allocate all memory used by the ring.
- *
- * @param r
- *   Ring to free
- */
-void rte_ring_free(struct rte_ring *r);
-
-/**
- * Change the high water mark.
- *
- * If *count* is 0, water marking is disabled. Otherwise, it is set to the
- * *count* value. The *count* value must be greater than 0 and less
- * than the ring size.
- *
- * This function can be called at any time (not necessarily at
- * initialization).
- *
- * @param r
- *   A pointer to the ring structure.
- * @param count
- *   The new water mark value.
- * @return
- *   - 0: Success; water mark changed.
- *   - -EINVAL: Invalid water mark value.
- */
-int rte_ring_set_water_mark(struct rte_ring *r, unsigned int count);
-
-/**
- * Dump the status of the ring to a file.
- *
- * @param f
- *   A pointer to a file for output
- * @param r
- *   A pointer to the ring structure.
- */
-void rte_ring_dump(FILE *f, const struct rte_ring *r);
-
 /* the actual enqueue of pointers on the ring.
  * Placed here since identical code needed in both
  * single and multi producer enqueue functions
@@ -1124,26 +1001,6 @@ rte_ring_free_count(const struct rte_ring *r)
 }
 
 /**
- * Dump the status of all rings on the console
- *
- * @param f
- *   A pointer to a file for output
- */
-void rte_ring_list_dump(FILE *f);
-
-/**
- * Search a ring from its name
- *
- * @param name
- *   The name of the ring.
- * @return
- *   The pointer to the ring matching the name, or NULL if not found,
- *   with rte_errno set appropriately. Possible rte_errno values include:
- *    - ENOENT - required entry not available to return.
- */
-struct rte_ring *rte_ring_lookup(const char *name);
-
-/**
  * Enqueue several objects on the ring (multi-producers safe).
  *
  * This function uses a "compare and set" instruction to move the
@@ -1278,6 +1135,424 @@ rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n)
 		return rte_ring_mc_dequeue_burst(r, obj_table, n);
 }
 
+TAILQ_HEAD(rte_ring_list, rte_tailq_entry);
+
+static struct rte_tailq_elem rte_ring_tailq = {
+	.name = RTE_TAILQ_RING_NAME,
+};
+EAL_REGISTER_TAILQ(rte_ring_tailq)
+
+/* true if x is a power of 2 */
+#define POWEROF2(x) ((((x)-1) & (x)) == 0)
+
+/**
+ * Calculate the memory size needed for a ring
+ *
+ * This function returns the number of bytes needed for a ring, given
+ * the number of elements in it. This value is the sum of the size of
+ * the structure rte_ring and the size of the memory needed by the
+ * objects pointers. The value is aligned to a cache line size.
+ *
+ * @param count
+ *   The number of elements in the ring (must be a power of 2).
+ * @return
+ *   - The memory size needed for the ring on success.
+ *   - -EINVAL if count is not a power of 2.
+ */
+static inline ssize_t
+rte_ring_get_memsize(unsigned int count)
+{
+	ssize_t sz;
+
+	/* count must be a power of 2 */
+	if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK)) {
+		RTE_LOG(ERR, RING,
+			"Requested size is invalid, must be power of 2, and "
+			"do not exceed the size limit %u\n", RTE_RING_SZ_MASK);
+		return -EINVAL;
+	}
+
+	sz = sizeof(struct rte_ring) + count * sizeof(void *);
+	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
+	return sz;
+}
+
+/**
+ * Initialize a ring structure.
+ *
+ * Initialize a ring structure in memory pointed by "r". The size of the
+ * memory area must be large enough to store the ring structure and the
+ * object table. It is advised to use rte_ring_get_memsize() to get the
+ * appropriate size.
+ *
+ * The ring size is set to *count*, which must be a power of two. Water
+ * marking is disabled by default. The real usable ring size is
+ * *count-1* instead of *count* to differentiate a free ring from an
+ * empty ring.
+ *
+ * The ring is not added in RTE_TAILQ_RING global list. Indeed, the
+ * memory given by the caller may not be shareable among dpdk
+ * processes.
+ *
+ * @param r
+ *   The pointer to the ring structure followed by the objects table.
+ * @param name
+ *   The name of the ring.
+ * @param count
+ *   The number of elements in the ring (must be a power of 2).
+ * @param flags
+ *   An OR of the following:
+ *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
+ *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
+ *      is "single-producer". Otherwise, it is "multi-producers".
+ *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
+ *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
+ *      is "single-consumer". Otherwise, it is "multi-consumers".
+ * @return
+ *   0 on success, or a negative value on error.
+ */
+static inline int
+rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
+	unsigned int flags)
+{
+	int ret;
+
+	/* compilation-time checks */
+	RTE_BUILD_BUG_ON((sizeof(struct rte_ring) &
+			  RTE_CACHE_LINE_MASK) != 0);
+#ifdef RTE_RING_SPLIT_PROD_CONS
+	RTE_BUILD_BUG_ON((offsetof(struct rte_ring, cons) &
+			  RTE_CACHE_LINE_MASK) != 0);
+#endif
+	RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) &
+			  RTE_CACHE_LINE_MASK) != 0);
+#ifdef RTE_LIBRTE_RING_DEBUG
+	RTE_BUILD_BUG_ON((sizeof(struct rte_ring_debug_stats) &
+			  RTE_CACHE_LINE_MASK) != 0);
+	RTE_BUILD_BUG_ON((offsetof(struct rte_ring, stats) &
+			  RTE_CACHE_LINE_MASK) != 0);
+#endif
+
+	/* init the ring structure */
+	memset(r, 0, sizeof(*r));
+	ret = snprintf(r->name, sizeof(r->name), "%s", name);
+	if (ret < 0 || ret >= (int)sizeof(r->name))
+		return -ENAMETOOLONG;
+	r->flags = flags;
+	r->prod.watermark = count;
+	r->prod.sp_enqueue = !!(flags & RING_F_SP_ENQ);
+	r->cons.sc_dequeue = !!(flags & RING_F_SC_DEQ);
+	r->prod.size = r->cons.size = count;
+	r->prod.mask = r->cons.mask = count-1;
+	r->prod.head = r->cons.head = 0;
+	r->prod.tail = r->cons.tail = 0;
+
+	return 0;
+}
+
+/**
+ * Create a new ring named *name* in memory.
+ *
+ * This function uses ``memzone_reserve()`` to allocate memory. Then it
+ * calls rte_ring_init() to initialize an empty ring.
+ *
+ * The new ring size is set to *count*, which must be a power of
+ * two. Water marking is disabled by default. The real usable ring size
+ * is *count-1* instead of *count* to differentiate a free ring from an
+ * empty ring.
+ *
+ * The ring is added in RTE_TAILQ_RING list.
+ *
+ * @param name
+ *   The name of the ring.
+ * @param count
+ *   The size of the ring (must be a power of 2).
+ * @param socket_id
+ *   The *socket_id* argument is the socket identifier in case of
+ *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
+ *   constraint for the reserved zone.
+ * @param flags
+ *   An OR of the following:
+ *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
+ *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
+ *      is "single-producer". Otherwise, it is "multi-producers".
+ *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
+ *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
+ *      is "single-consumer". Otherwise, it is "multi-consumers".
+ * @return
+ *   On success, the pointer to the new allocated ring. NULL on error with
+ *    rte_errno set appropriately. Possible errno values include:
+ *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
+ *    - E_RTE_SECONDARY - function was called from a secondary process instance
+ *    - EINVAL - count provided is not a power of 2
+ *    - ENOSPC - the maximum number of memzones has already been allocated
+ *    - EEXIST - a memzone with the same name already exists
+ *    - ENOMEM - no appropriate memory area found in which to create memzone
+ */
+static inline struct rte_ring *
+rte_ring_create(const char *name, unsigned int count, int socket_id,
+		unsigned int flags)
+{
+	char mz_name[RTE_MEMZONE_NAMESIZE];
+	struct rte_ring *r;
+	struct rte_tailq_entry *te;
+	const struct rte_memzone *mz;
+	ssize_t ring_size;
+	int mz_flags = 0;
+	struct rte_ring_list *ring_list = NULL;
+	int ret;
+
+	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
+
+	ring_size = rte_ring_get_memsize(count);
+	if (ring_size < 0) {
+		rte_errno = ring_size;
+		return NULL;
+	}
+
+	ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
+		RTE_RING_MZ_PREFIX, name);
+	if (ret < 0 || ret >= (int)sizeof(mz_name)) {
+		rte_errno = ENAMETOOLONG;
+		return NULL;
+	}
+
+	te = rte_zmalloc("RING_TAILQ_ENTRY", sizeof(*te), 0);
+	if (te == NULL) {
+		RTE_LOG(ERR, RING, "Cannot reserve memory for tailq\n");
+		rte_errno = ENOMEM;
+		return NULL;
+	}
+
+	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+
+	/* reserve a memory zone for this ring. If we can't get rte_config or
+	 * we are secondary process, the memzone_reserve function will set
+	 * rte_errno for us appropriately - hence no check in this this function
+	 */
+	mz = rte_memzone_reserve(mz_name, ring_size, socket_id, mz_flags);
+	if (mz != NULL) {
+		r = mz->addr;
+		/* no need to check return value here, checked the args above */
+		rte_ring_init(r, name, count, flags);
+
+		te->data = (void *) r;
+		r->memzone = mz;
+
+		TAILQ_INSERT_TAIL(ring_list, te, next);
+	} else {
+		r = NULL;
+		RTE_LOG(ERR, RING, "Cannot reserve memory\n");
+		rte_free(te);
+	}
+	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+	return r;
+}
+
+/**
+ * De-allocate all memory used by the ring.
+ *
+ * @param r
+ *   Ring to free
+ */
+static inline void
+rte_ring_free(struct rte_ring *r)
+{
+	struct rte_ring_list *ring_list = NULL;
+	struct rte_tailq_entry *te;
+
+	if (r == NULL)
+		return;
+
+	/*
+	 * Ring was not created with rte_ring_create,
+	 * therefore, there is no memzone to free.
+	 */
+	if (r->memzone == NULL) {
+		RTE_LOG(ERR, RING, "Cannot free ring (not created with rte_ring_create()");
+		return;
+	}
+
+	if (rte_memzone_free(r->memzone) != 0) {
+		RTE_LOG(ERR, RING, "Cannot free memory\n");
+		return;
+	}
+
+	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
+	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+
+	/* find out tailq entry */
+	TAILQ_FOREACH(te, ring_list, next) {
+		if (te->data == (void *) r)
+			break;
+	}
+
+	if (te == NULL) {
+		rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+		return;
+	}
+
+	TAILQ_REMOVE(ring_list, te, next);
+
+	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+	rte_free(te);
+}
+
+/**
+ * Change the high water mark.
+ *
+ * If *count* is 0, water marking is disabled. Otherwise, it is set to the
+ * *count* value. The *count* value must be greater than 0 and less
+ * than the ring size.
+ *
+ * This function can be called at any time (not necessarily at
+ * initialization).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param count
+ *   The new water mark value.
+ * @return
+ *   - 0: Success; water mark changed.
+ *   - -EINVAL: Invalid water mark value.
+ */
+static inline int
+rte_ring_set_water_mark(struct rte_ring *r, unsigned int count)
+{
+	if (count >= r->prod.size)
+		return -EINVAL;
+
+	/* if count is 0, disable the watermarking */
+	if (count == 0)
+		count = r->prod.size;
+
+	r->prod.watermark = count;
+	return 0;
+}
+
+/**
+ * Dump the status of the ring to a file.
+ *
+ * @param f
+ *   A pointer to a file for output
+ * @param r
+ *   A pointer to the ring structure.
+ */
+static inline void
+rte_ring_dump(FILE *f, const struct rte_ring *r)
+{
+#ifdef RTE_LIBRTE_RING_DEBUG
+	struct rte_ring_debug_stats sum;
+	unsigned int lcore_id;
+#endif
+
+	fprintf(f, "ring <%s>@%p\n", r->name, r);
+	fprintf(f, "  flags=%x\n", r->flags);
+	fprintf(f, "  size=%"PRIu32"\n", r->prod.size);
+	fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
+	fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
+	fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
+	fprintf(f, "  ph=%"PRIu32"\n", r->prod.head);
+	fprintf(f, "  used=%u\n", rte_ring_count(r));
+	fprintf(f, "  avail=%u\n", rte_ring_free_count(r));
+	if (r->prod.watermark == r->prod.size)
+		fprintf(f, "  watermark=0\n");
+	else
+		fprintf(f, "  watermark=%"PRIu32"\n", r->prod.watermark);
+
+	/* sum and dump statistics */
+#ifdef RTE_LIBRTE_RING_DEBUG
+	memset(&sum, 0, sizeof(sum));
+	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
+		sum.enq_success_bulk += r->stats[lcore_id].enq_success_bulk;
+		sum.enq_success_objs += r->stats[lcore_id].enq_success_objs;
+		sum.enq_quota_bulk += r->stats[lcore_id].enq_quota_bulk;
+		sum.enq_quota_objs += r->stats[lcore_id].enq_quota_objs;
+		sum.enq_fail_bulk += r->stats[lcore_id].enq_fail_bulk;
+		sum.enq_fail_objs += r->stats[lcore_id].enq_fail_objs;
+		sum.deq_success_bulk += r->stats[lcore_id].deq_success_bulk;
+		sum.deq_success_objs += r->stats[lcore_id].deq_success_objs;
+		sum.deq_fail_bulk += r->stats[lcore_id].deq_fail_bulk;
+		sum.deq_fail_objs += r->stats[lcore_id].deq_fail_objs;
+	}
+	fprintf(f, "  size=%"PRIu32"\n", r->prod.size);
+	fprintf(f, "  enq_success_bulk=%"PRIu64"\n", sum.enq_success_bulk);
+	fprintf(f, "  enq_success_objs=%"PRIu64"\n", sum.enq_success_objs);
+	fprintf(f, "  enq_quota_bulk=%"PRIu64"\n", sum.enq_quota_bulk);
+	fprintf(f, "  enq_quota_objs=%"PRIu64"\n", sum.enq_quota_objs);
+	fprintf(f, "  enq_fail_bulk=%"PRIu64"\n", sum.enq_fail_bulk);
+	fprintf(f, "  enq_fail_objs=%"PRIu64"\n", sum.enq_fail_objs);
+	fprintf(f, "  deq_success_bulk=%"PRIu64"\n", sum.deq_success_bulk);
+	fprintf(f, "  deq_success_objs=%"PRIu64"\n", sum.deq_success_objs);
+	fprintf(f, "  deq_fail_bulk=%"PRIu64"\n", sum.deq_fail_bulk);
+	fprintf(f, "  deq_fail_objs=%"PRIu64"\n", sum.deq_fail_objs);
+#else
+	fprintf(f, "  no statistics available\n");
+#endif
+}
+
+/**
+ * Dump the status of all rings on the console
+ *
+ * @param f
+ *   A pointer to a file for output
+ */
+static inline void
+rte_ring_list_dump(FILE *f)
+{
+	const struct rte_tailq_entry *te;
+	struct rte_ring_list *ring_list;
+
+	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
+
+	rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK);
+
+	TAILQ_FOREACH(te, ring_list, next) {
+		rte_ring_dump(f, (struct rte_ring *) te->data);
+	}
+
+	rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK);
+}
+
+/**
+ * Search a ring from its name
+ *
+ * @param name
+ *   The name of the ring.
+ * @return
+ *   The pointer to the ring matching the name, or NULL if not found,
+ *   with rte_errno set appropriately. Possible rte_errno values include:
+ *    - ENOENT - required entry not available to return.
+ */
+static inline struct rte_ring *
+rte_ring_lookup(const char *name)
+{
+	struct rte_tailq_entry *te;
+	struct rte_ring *r = NULL;
+	struct rte_ring_list *ring_list;
+
+	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
+
+	rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK);
+
+	TAILQ_FOREACH(te, ring_list, next) {
+		r = (struct rte_ring *) te->data;
+		if (strncmp(name, r->name, RTE_RING_NAMESIZE) == 0)
+			break;
+	}
+
+	rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+	if (te == NULL) {
+		rte_errno = ENOENT;
+		return NULL;
+	}
+
+	return r;
+}
+
 #ifdef __cplusplus
 }
 #endif
-- 
2.9.3

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [dpdk-dev] [RFC PATCH 04/11] ring: make ring tailq variable public
  2017-01-11 15:05 [dpdk-dev] [RFC PATCH 00/11] generalise rte_ring to allow different datatypes Bruce Richardson
                   ` (2 preceding siblings ...)
  2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 03/11] ring: add ring management functions to typed ring header Bruce Richardson
@ 2017-01-11 15:05 ` Bruce Richardson
  2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 05/11] ring: add user-specified typing to typed rings Bruce Richardson
                   ` (7 subsequent siblings)
  11 siblings, 0 replies; 18+ messages in thread
From: Bruce Richardson @ 2017-01-11 15:05 UTC (permalink / raw)
  To: olivier.matz; +Cc: dev, Bruce Richardson

Make the rte_ring_tailq variable non-static, which means that other C
files - i.e. those files which will include the typed rings header
can register their rings in the tailq. By making all rings use the same
tailq, we ensure that all rings are discoverable, while at the same time
not requiring a new C file for each new ring type.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 lib/librte_ring/rte_ring.c           | 2 +-
 lib/librte_ring/rte_ring_version.map | 7 +++++++
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
index ca0a108..8ead295 100644
--- a/lib/librte_ring/rte_ring.c
+++ b/lib/librte_ring/rte_ring.c
@@ -91,7 +91,7 @@
 
 TAILQ_HEAD(rte_ring_list, rte_tailq_entry);
 
-static struct rte_tailq_elem rte_ring_tailq = {
+struct rte_tailq_elem rte_ring_tailq = {
 	.name = RTE_TAILQ_RING_NAME,
 };
 EAL_REGISTER_TAILQ(rte_ring_tailq)
diff --git a/lib/librte_ring/rte_ring_version.map b/lib/librte_ring/rte_ring_version.map
index 5474b98..975d2f2 100644
--- a/lib/librte_ring/rte_ring_version.map
+++ b/lib/librte_ring/rte_ring_version.map
@@ -18,3 +18,10 @@ DPDK_2.2 {
 	rte_ring_free;
 
 } DPDK_2.0;
+
+DPDK_17.05 {
+	global:
+
+	rte_ring_tailq;
+
+} DPDK_2.2;
-- 
2.9.3

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [dpdk-dev] [RFC PATCH 05/11] ring: add user-specified typing to typed rings
  2017-01-11 15:05 [dpdk-dev] [RFC PATCH 00/11] generalise rte_ring to allow different datatypes Bruce Richardson
                   ` (3 preceding siblings ...)
  2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 04/11] ring: make ring tailq variable public Bruce Richardson
@ 2017-01-11 15:05 ` Bruce Richardson
  2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 06/11] ring: use existing power-of-2 function Bruce Richardson
                   ` (6 subsequent siblings)
  11 siblings, 0 replies; 18+ messages in thread
From: Bruce Richardson @ 2017-01-11 15:05 UTC (permalink / raw)
  To: olivier.matz; +Cc: dev, Bruce Richardson

Make the typed ring header as the name suggests, with rings created based
on a user-defined type. This is done by using macros to create all
functions in the file. For now, the file can still only be included once
so only one type of ring can be used by a single C file, but that will be
fixed in a later commit. Test this flexibility by defining a ring which
works with mbuf pointers instead of void pointers. The only difference here
is in usability, in that no casts are necessary to pass an array of mbuf
pointers to ring enqueue/dequeue. [While any other pointer type can be
passed without cast to "void *", the same does not hold true for "void **"]

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 app/test/test_typed_ring.c       |  56 +++++++++-
 lib/librte_ring/rte_typed_ring.h | 227 ++++++++++++++++++++-------------------
 2 files changed, 173 insertions(+), 110 deletions(-)

diff --git a/app/test/test_typed_ring.c b/app/test/test_typed_ring.c
index fbfb820..aaef023 100644
--- a/app/test/test_typed_ring.c
+++ b/app/test/test_typed_ring.c
@@ -31,15 +31,69 @@
  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  */
 
-#include <rte_typed_ring.h>
+#include <rte_random.h>
 #include "test.h"
 
+#define RING_TYPE struct rte_mbuf *
+#define RING_TYPE_NAME rte_mbuf
+#include <rte_typed_ring.h>
+
+#define RING_SIZE 256
+#define BURST_SZ 32
+#define ITERATIONS (RING_SIZE * 2)
+
+static int
+test_mbuf_enqueue_dequeue(struct rte_mbuf_ring *r)
+{
+	struct rte_mbuf *inbufs[BURST_SZ];
+	struct rte_mbuf *outbufs[BURST_SZ];
+	unsigned int i, j;
+
+	for (i = 0; i < BURST_SZ; i++)
+		inbufs[i] = (void *)((uintptr_t)rte_rand());
+
+	for (i = 0; i < ITERATIONS; i++) {
+		uint16_t in = rte_mbuf_ring_enqueue_burst(r, inbufs, BURST_SZ);
+		if (in != BURST_SZ) {
+			printf("Error enqueuing mbuf ptrs\n");
+			return -1;
+		}
+		uint16_t out = rte_mbuf_ring_dequeue_burst(r, outbufs, BURST_SZ);
+		if (out != BURST_SZ) {
+			printf("Error dequeuing mbuf ptrs\n");
+			return -1;
+		}
+
+		for (j = 0; j < BURST_SZ; j++)
+			if (outbufs[j] != inbufs[j]) {
+				printf("Error: dequeued ptr != enqueued ptr\n");
+				return -1;
+			}
+	}
+	return 0;
+}
+
 /**
  * test entry point
  */
 static int
 test_typed_ring(void)
 {
+	struct rte_mbuf_ring *r;
+	r = rte_mbuf_ring_create("Test_mbuf_ring", RING_SIZE, rte_socket_id(),
+			RING_F_SP_ENQ | RING_F_SC_DEQ);
+	if (r == NULL) {
+		fprintf(stderr, "ln %d: Error creating mbuf ring\n", __LINE__);
+		return -1;
+	}
+	rte_mbuf_ring_list_dump(stdout);
+
+	if (test_mbuf_enqueue_dequeue(r) != 0) {
+		rte_mbuf_ring_free(r);
+		return -1;
+	}
+
+	rte_mbuf_ring_free(r);
 	return 0;
 }
 
diff --git a/lib/librte_ring/rte_typed_ring.h b/lib/librte_ring/rte_typed_ring.h
index 5a14403..03a9bd7 100644
--- a/lib/librte_ring/rte_typed_ring.h
+++ b/lib/librte_ring/rte_typed_ring.h
@@ -111,6 +111,17 @@ extern "C" {
 #include <rte_rwlock.h>
 #include <rte_eal_memconfig.h>
 
+#define _CAT(a, b) a ## _ ## b
+#define CAT(a, b) _CAT(a, b)
+
+#ifndef RING_TYPE_NAME
+#error "Need RING_TYPE_NAME defined before including"
+#endif
+#ifndef RING_TYPE
+#error "Need RING_TYPE defined before including"
+#endif
+#define TYPE(x) CAT(RING_TYPE_NAME, x)
+
 #define RTE_TAILQ_RING_NAME "RTE_RING"
 
 enum rte_ring_queue_behavior {
@@ -161,7 +172,7 @@ struct rte_memzone; /* forward declaration, so as not to require memzone.h */
  * values in a modulo-32bit base: that's why the overflow of the indexes is not
  * a problem.
  */
-struct rte_ring {
+struct TYPE(ring) {
 	/*
 	 * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
 	 * compatibility requirements, it could be changed to RTE_RING_NAMESIZE
@@ -170,7 +181,7 @@ struct rte_ring {
 	char name[RTE_MEMZONE_NAMESIZE];    /**< Name of the ring. */
 	int flags;                       /**< Flags supplied at creation. */
 	const struct rte_memzone *memzone;
-			/**< Memzone, if any, containing the rte_ring */
+			/**< Memzone, if any, containing the ring */
 
 	/** Ring producer status. */
 	struct prod {
@@ -204,7 +215,7 @@ struct rte_ring {
 	 * not volatile so need to be careful
 	 * about compiler re-ordering
 	 */
-	void *ring[] __rte_cache_aligned;
+	RING_TYPE ring[] __rte_cache_aligned;
 };
 
 #define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
@@ -302,7 +313,7 @@ struct rte_ring {
  * @param r
  *   A pointer to the ring structure.
  * @param obj_table
- *   A pointer to a table of void * pointers (objects).
+ *   A pointer to a table of RING_TYPE pointers (objects).
  * @param n
  *   The number of objects to add in the ring from the obj_table.
  * @param behavior
@@ -319,7 +330,7 @@ struct rte_ring {
  *   - n: Actual number of objects enqueued.
  */
 static inline int __attribute__((always_inline))
-__rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
+TYPE(__ring_mp_do_enqueue)(struct TYPE(ring) *r, RING_TYPE const *obj_table,
 			 unsigned int n, enum rte_ring_queue_behavior behavior)
 {
 	uint32_t prod_head, prod_next;
@@ -412,7 +423,7 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
  * @param r
  *   A pointer to the ring structure.
  * @param obj_table
- *   A pointer to a table of void * pointers (objects).
+ *   A pointer to a table of RING_TYPE pointers (objects).
  * @param n
  *   The number of objects to add in the ring from the obj_table.
  * @param behavior
@@ -429,7 +440,7 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
  *   - n: Actual number of objects enqueued.
  */
 static inline int __attribute__((always_inline))
-__rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
+TYPE(__ring_sp_do_enqueue)(struct TYPE(ring) *r, RING_TYPE const *obj_table,
 			 unsigned int n, enum rte_ring_queue_behavior behavior)
 {
 	uint32_t prod_head, cons_tail;
@@ -495,7 +506,7 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
  * @param r
  *   A pointer to the ring structure.
  * @param obj_table
- *   A pointer to a table of void * pointers (objects) that will be filled.
+ *   A pointer to a table of RING_TYPE pointers (objects) that will be filled.
  * @param n
  *   The number of objects to dequeue from the ring to the obj_table.
  * @param behavior
@@ -512,7 +523,7 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
  */
 
 static inline int __attribute__((always_inline))
-__rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
+TYPE(__ring_mc_do_dequeue)(struct TYPE(ring) *r, RING_TYPE *obj_table,
 		 unsigned int n, enum rte_ring_queue_behavior behavior)
 {
 	uint32_t cons_head, prod_tail;
@@ -597,7 +608,7 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
  * @param r
  *   A pointer to the ring structure.
  * @param obj_table
- *   A pointer to a table of void * pointers (objects) that will be filled.
+ *   A pointer to a table of RING_TYPE pointers (objects) that will be filled.
  * @param n
  *   The number of objects to dequeue from the ring to the obj_table.
  * @param behavior
@@ -613,7 +624,7 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
  *   - n: Actual number of objects dequeued.
  */
 static inline int __attribute__((always_inline))
-__rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
+TYPE(__ring_sc_do_dequeue)(struct TYPE(ring) *r, RING_TYPE *obj_table,
 		 unsigned int n, enum rte_ring_queue_behavior behavior)
 {
 	uint32_t cons_head, prod_tail;
@@ -665,7 +676,7 @@ __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
  * @param r
  *   A pointer to the ring structure.
  * @param obj_table
- *   A pointer to a table of void * pointers (objects).
+ *   A pointer to a table of RING_TYPE pointers (objects).
  * @param n
  *   The number of objects to add in the ring from the obj_table.
  * @return
@@ -675,10 +686,10 @@ __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
  *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
  */
 static inline int __attribute__((always_inline))
-rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
+TYPE(ring_mp_enqueue_bulk)(struct TYPE(ring) *r, RING_TYPE const *obj_table,
 			 unsigned int n)
 {
-	return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+	return TYPE(__ring_mp_do_enqueue)(r, obj_table, n, RTE_RING_QUEUE_FIXED);
 }
 
 /**
@@ -687,7 +698,7 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
  * @param r
  *   A pointer to the ring structure.
  * @param obj_table
- *   A pointer to a table of void * pointers (objects).
+ *   A pointer to a table of RING_TYPE pointers (objects).
  * @param n
  *   The number of objects to add in the ring from the obj_table.
  * @return
@@ -697,10 +708,10 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
  */
 static inline int __attribute__((always_inline))
-rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
+TYPE(ring_sp_enqueue_bulk)(struct TYPE(ring) *r, RING_TYPE const *obj_table,
 			 unsigned int n)
 {
-	return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+	return TYPE(__ring_sp_do_enqueue)(r, obj_table, n, RTE_RING_QUEUE_FIXED);
 }
 
 /**
@@ -713,7 +724,7 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
  * @param r
  *   A pointer to the ring structure.
  * @param obj_table
- *   A pointer to a table of void * pointers (objects).
+ *   A pointer to a table of RING_TYPE pointers (objects).
  * @param n
  *   The number of objects to add in the ring from the obj_table.
  * @return
@@ -723,13 +734,13 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
  */
 static inline int __attribute__((always_inline))
-rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
+TYPE(ring_enqueue_bulk)(struct TYPE(ring) *r, RING_TYPE const *obj_table,
 		      unsigned int n)
 {
 	if (r->prod.sp_enqueue)
-		return rte_ring_sp_enqueue_bulk(r, obj_table, n);
+		return TYPE(ring_sp_enqueue_bulk)(r, obj_table, n);
 	else
-		return rte_ring_mp_enqueue_bulk(r, obj_table, n);
+		return TYPE(ring_mp_enqueue_bulk)(r, obj_table, n);
 }
 
 /**
@@ -749,9 +760,9 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
  */
 static inline int __attribute__((always_inline))
-rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
+TYPE(ring_mp_enqueue)(struct TYPE(ring) *r, RING_TYPE obj)
 {
-	return rte_ring_mp_enqueue_bulk(r, &obj, 1);
+	return TYPE(ring_mp_enqueue_bulk)(r, &obj, 1);
 }
 
 /**
@@ -768,9 +779,9 @@ rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
  */
 static inline int __attribute__((always_inline))
-rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
+TYPE(ring_sp_enqueue)(struct TYPE(ring) *r, RING_TYPE obj)
 {
-	return rte_ring_sp_enqueue_bulk(r, &obj, 1);
+	return TYPE(ring_sp_enqueue_bulk)(r, &obj, 1);
 }
 
 /**
@@ -791,12 +802,12 @@ rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
  */
 static inline int __attribute__((always_inline))
-rte_ring_enqueue(struct rte_ring *r, void *obj)
+TYPE(ring_enqueue)(struct TYPE(ring) *r, RING_TYPE obj)
 {
 	if (r->prod.sp_enqueue)
-		return rte_ring_sp_enqueue(r, obj);
+		return TYPE(ring_sp_enqueue(r, obj));
 	else
-		return rte_ring_mp_enqueue(r, obj);
+		return TYPE(ring_mp_enqueue(r, obj));
 }
 
 /**
@@ -808,7 +819,7 @@ rte_ring_enqueue(struct rte_ring *r, void *obj)
  * @param r
  *   A pointer to the ring structure.
  * @param obj_table
- *   A pointer to a table of void * pointers (objects) that will be filled.
+ *   A pointer to a table of RING_TYPE pointers (objects) that will be filled.
  * @param n
  *   The number of objects to dequeue from the ring to the obj_table.
  * @return
@@ -817,9 +828,9 @@ rte_ring_enqueue(struct rte_ring *r, void *obj)
  *     dequeued.
  */
 static inline int __attribute__((always_inline))
-rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n)
+TYPE(ring_mc_dequeue_bulk)(struct TYPE(ring) *r, RING_TYPE *obj_table, unsigned int n)
 {
-	return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+	return TYPE(__ring_mc_do_dequeue)(r, obj_table, n, RTE_RING_QUEUE_FIXED);
 }
 
 /**
@@ -828,7 +839,7 @@ rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n)
  * @param r
  *   A pointer to the ring structure.
  * @param obj_table
- *   A pointer to a table of void * pointers (objects) that will be filled.
+ *   A pointer to a table of RING_TYPE pointers (objects) that will be filled.
  * @param n
  *   The number of objects to dequeue from the ring to the obj_table,
  *   must be strictly positive.
@@ -838,9 +849,9 @@ rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n)
  *     dequeued.
  */
 static inline int __attribute__((always_inline))
-rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n)
+TYPE(ring_sc_dequeue_bulk)(struct TYPE(ring) *r, RING_TYPE *obj_table, unsigned int n)
 {
-	return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+	return TYPE(__ring_sc_do_dequeue)(r, obj_table, n, RTE_RING_QUEUE_FIXED);
 }
 
 /**
@@ -853,7 +864,7 @@ rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n)
  * @param r
  *   A pointer to the ring structure.
  * @param obj_table
- *   A pointer to a table of void * pointers (objects) that will be filled.
+ *   A pointer to a table of RING_TYPE pointers (objects) that will be filled.
  * @param n
  *   The number of objects to dequeue from the ring to the obj_table.
  * @return
@@ -862,12 +873,12 @@ rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n)
  *     dequeued.
  */
 static inline int __attribute__((always_inline))
-rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n)
+TYPE(ring_dequeue_bulk)(struct TYPE(ring) *r, RING_TYPE *obj_table, unsigned int n)
 {
 	if (r->cons.sc_dequeue)
-		return rte_ring_sc_dequeue_bulk(r, obj_table, n);
+		return TYPE(ring_sc_dequeue_bulk)(r, obj_table, n);
 	else
-		return rte_ring_mc_dequeue_bulk(r, obj_table, n);
+		return TYPE(ring_mc_dequeue_bulk)(r, obj_table, n);
 }
 
 /**
@@ -879,16 +890,16 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n)
  * @param r
  *   A pointer to the ring structure.
  * @param obj_p
- *   A pointer to a void * pointer (object) that will be filled.
+ *   A pointer to a RING_TYPE pointer (object) that will be filled.
  * @return
  *   - 0: Success; objects dequeued.
  *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
  *     dequeued.
  */
 static inline int __attribute__((always_inline))
-rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
+TYPE(ring_mc_dequeue)(struct TYPE(ring) *r, RING_TYPE *obj_p)
 {
-	return rte_ring_mc_dequeue_bulk(r, obj_p, 1);
+	return TYPE(ring_mc_dequeue_bulk)(r, obj_p, 1);
 }
 
 /**
@@ -897,16 +908,16 @@ rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
  * @param r
  *   A pointer to the ring structure.
  * @param obj_p
- *   A pointer to a void * pointer (object) that will be filled.
+ *   A pointer to a RING_TYPE pointer (object) that will be filled.
  * @return
  *   - 0: Success; objects dequeued.
  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
  *     dequeued.
  */
 static inline int __attribute__((always_inline))
-rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
+TYPE(ring_sc_dequeue)(struct TYPE(ring) *r, RING_TYPE *obj_p)
 {
-	return rte_ring_sc_dequeue_bulk(r, obj_p, 1);
+	return TYPE(ring_sc_dequeue_bulk)(r, obj_p, 1);
 }
 
 /**
@@ -919,19 +930,19 @@ rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
  * @param r
  *   A pointer to the ring structure.
  * @param obj_p
- *   A pointer to a void * pointer (object) that will be filled.
+ *   A pointer to a RING_TYPE pointer (object) that will be filled.
  * @return
  *   - 0: Success, objects dequeued.
  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
  *     dequeued.
  */
 static inline int __attribute__((always_inline))
-rte_ring_dequeue(struct rte_ring *r, void **obj_p)
+TYPE(ring_dequeue)(struct TYPE(ring) *r, RING_TYPE *obj_p)
 {
 	if (r->cons.sc_dequeue)
-		return rte_ring_sc_dequeue(r, obj_p);
+		return TYPE(ring_sc_dequeue)(r, obj_p);
 	else
-		return rte_ring_mc_dequeue(r, obj_p);
+		return TYPE(ring_mc_dequeue)(r, obj_p);
 }
 
 /**
@@ -944,7 +955,7 @@ rte_ring_dequeue(struct rte_ring *r, void **obj_p)
  *   - 0: The ring is not full.
  */
 static inline int
-rte_ring_full(const struct rte_ring *r)
+TYPE(ring_full)(const struct TYPE(ring) *r)
 {
 	uint32_t prod_tail = r->prod.tail;
 	uint32_t cons_tail = r->cons.tail;
@@ -961,7 +972,7 @@ rte_ring_full(const struct rte_ring *r)
  *   - 0: The ring is not empty.
  */
 static inline int
-rte_ring_empty(const struct rte_ring *r)
+TYPE(ring_empty)(const struct TYPE(ring) *r)
 {
 	uint32_t prod_tail = r->prod.tail;
 	uint32_t cons_tail = r->cons.tail;
@@ -977,7 +988,7 @@ rte_ring_empty(const struct rte_ring *r)
  *   The number of entries in the ring.
  */
 static inline unsigned
-rte_ring_count(const struct rte_ring *r)
+TYPE(ring_count)(const struct TYPE(ring) *r)
 {
 	uint32_t prod_tail = r->prod.tail;
 	uint32_t cons_tail = r->cons.tail;
@@ -993,7 +1004,7 @@ rte_ring_count(const struct rte_ring *r)
  *   The number of free entries in the ring.
  */
 static inline unsigned
-rte_ring_free_count(const struct rte_ring *r)
+TYPE(ring_free_count)(const struct TYPE(ring) *r)
 {
 	uint32_t prod_tail = r->prod.tail;
 	uint32_t cons_tail = r->cons.tail;
@@ -1009,17 +1020,17 @@ rte_ring_free_count(const struct rte_ring *r)
  * @param r
  *   A pointer to the ring structure.
  * @param obj_table
- *   A pointer to a table of void * pointers (objects).
+ *   A pointer to a table of RING_TYPE pointers (objects).
  * @param n
  *   The number of objects to add in the ring from the obj_table.
  * @return
  *   - n: Actual number of objects enqueued.
  */
 static inline unsigned int __attribute__((always_inline))
-rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
+TYPE(ring_mp_enqueue_burst)(struct TYPE(ring) *r, RING_TYPE const *obj_table,
 			 unsigned int n)
 {
-	return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+	return TYPE(__ring_mp_do_enqueue)(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
 }
 
 /**
@@ -1028,17 +1039,17 @@ rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
  * @param r
  *   A pointer to the ring structure.
  * @param obj_table
- *   A pointer to a table of void * pointers (objects).
+ *   A pointer to a table of RING_TYPE pointers (objects).
  * @param n
  *   The number of objects to add in the ring from the obj_table.
  * @return
  *   - n: Actual number of objects enqueued.
  */
 static inline unsigned int __attribute__((always_inline))
-rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
+TYPE(ring_sp_enqueue_burst)(struct TYPE(ring) *r, RING_TYPE const *obj_table,
 			 unsigned int n)
 {
-	return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+	return TYPE(__ring_sp_do_enqueue)(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
 }
 
 /**
@@ -1051,20 +1062,20 @@ rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
  * @param r
  *   A pointer to the ring structure.
  * @param obj_table
- *   A pointer to a table of void * pointers (objects).
+ *   A pointer to a table of RING_TYPE pointers (objects).
  * @param n
  *   The number of objects to add in the ring from the obj_table.
  * @return
  *   - n: Actual number of objects enqueued.
  */
 static inline unsigned int __attribute__((always_inline))
-rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
+TYPE(ring_enqueue_burst)(struct TYPE(ring) *r, RING_TYPE const *obj_table,
 		      unsigned int n)
 {
 	if (r->prod.sp_enqueue)
-		return rte_ring_sp_enqueue_burst(r, obj_table, n);
+		return TYPE(ring_sp_enqueue_burst)(r, obj_table, n);
 	else
-		return rte_ring_mp_enqueue_burst(r, obj_table, n);
+		return TYPE(ring_mp_enqueue_burst)(r, obj_table, n);
 }
 
 /**
@@ -1078,16 +1089,16 @@ rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
  * @param r
  *   A pointer to the ring structure.
  * @param obj_table
- *   A pointer to a table of void * pointers (objects) that will be filled.
+ *   A pointer to a table of RING_TYPE pointers (objects) that will be filled.
  * @param n
  *   The number of objects to dequeue from the ring to the obj_table.
  * @return
  *   - n: Actual number of objects dequeued, 0 if ring is empty
  */
 static inline unsigned int __attribute__((always_inline))
-rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n)
+TYPE(ring_mc_dequeue_burst)(struct TYPE(ring) *r, RING_TYPE *obj_table, unsigned int n)
 {
-	return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+	return TYPE(__ring_mc_do_dequeue)(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
 }
 
 /**
@@ -1098,16 +1109,16 @@ rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n)
  * @param r
  *   A pointer to the ring structure.
  * @param obj_table
- *   A pointer to a table of void * pointers (objects) that will be filled.
+ *   A pointer to a table of RING_TYPE pointers (objects) that will be filled.
  * @param n
  *   The number of objects to dequeue from the ring to the obj_table.
  * @return
  *   - n: Actual number of objects dequeued, 0 if ring is empty
  */
 static inline unsigned int __attribute__((always_inline))
-rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n)
+TYPE(ring_sc_dequeue_burst)(struct TYPE(ring) *r, RING_TYPE *obj_table, unsigned int n)
 {
-	return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+	return TYPE(__ring_sc_do_dequeue)(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
 }
 
 /**
@@ -1120,27 +1131,24 @@ rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n)
  * @param r
  *   A pointer to the ring structure.
  * @param obj_table
- *   A pointer to a table of void * pointers (objects) that will be filled.
+ *   A pointer to a table of RING_TYPE pointers (objects) that will be filled.
  * @param n
  *   The number of objects to dequeue from the ring to the obj_table.
  * @return
  *   - Number of objects dequeued
  */
 static inline unsigned int __attribute__((always_inline))
-rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n)
+TYPE(ring_dequeue_burst)(struct TYPE(ring) *r, RING_TYPE *obj_table, unsigned int n)
 {
 	if (r->cons.sc_dequeue)
-		return rte_ring_sc_dequeue_burst(r, obj_table, n);
+		return TYPE(ring_sc_dequeue_burst)(r, obj_table, n);
 	else
-		return rte_ring_mc_dequeue_burst(r, obj_table, n);
+		return TYPE(ring_mc_dequeue_burst)(r, obj_table, n);
 }
 
 TAILQ_HEAD(rte_ring_list, rte_tailq_entry);
 
-static struct rte_tailq_elem rte_ring_tailq = {
-	.name = RTE_TAILQ_RING_NAME,
-};
-EAL_REGISTER_TAILQ(rte_ring_tailq)
+extern struct rte_tailq_elem rte_ring_tailq;
 
 /* true if x is a power of 2 */
 #define POWEROF2(x) ((((x)-1) & (x)) == 0)
@@ -1150,7 +1158,7 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
  *
  * This function returns the number of bytes needed for a ring, given
  * the number of elements in it. This value is the sum of the size of
- * the structure rte_ring and the size of the memory needed by the
+ * the ring structure and the size of the memory needed by the
  * objects pointers. The value is aligned to a cache line size.
  *
  * @param count
@@ -1160,7 +1168,7 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
  *   - -EINVAL if count is not a power of 2.
  */
 static inline ssize_t
-rte_ring_get_memsize(unsigned int count)
+TYPE(ring_get_memsize)(unsigned int count)
 {
 	ssize_t sz;
 
@@ -1172,7 +1180,7 @@ rte_ring_get_memsize(unsigned int count)
 		return -EINVAL;
 	}
 
-	sz = sizeof(struct rte_ring) + count * sizeof(void *);
+	sz = sizeof(struct TYPE(ring)) + count * sizeof(RING_TYPE);
 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
 	return sz;
 }
@@ -1182,7 +1190,7 @@ rte_ring_get_memsize(unsigned int count)
  *
  * Initialize a ring structure in memory pointed by "r". The size of the
  * memory area must be large enough to store the ring structure and the
- * object table. It is advised to use rte_ring_get_memsize() to get the
+ * object table. It is advised to use ring_get_memsize() to get the
  * appropriate size.
  *
  * The ring size is set to *count*, which must be a power of two. Water
@@ -1203,33 +1211,33 @@ rte_ring_get_memsize(unsigned int count)
  * @param flags
  *   An OR of the following:
  *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
- *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
+ *      using ``enqueue()`` or ``enqueue_bulk()``
  *      is "single-producer". Otherwise, it is "multi-producers".
  *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
- *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
+ *      using ``dequeue()`` or ``dequeue_bulk()``
  *      is "single-consumer". Otherwise, it is "multi-consumers".
  * @return
  *   0 on success, or a negative value on error.
  */
 static inline int
-rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
+TYPE(ring_init)(struct TYPE(ring) *r, const char *name, unsigned int count,
 	unsigned int flags)
 {
 	int ret;
 
 	/* compilation-time checks */
-	RTE_BUILD_BUG_ON((sizeof(struct rte_ring) &
+	RTE_BUILD_BUG_ON((sizeof(struct TYPE(ring)) &
 			  RTE_CACHE_LINE_MASK) != 0);
 #ifdef RTE_RING_SPLIT_PROD_CONS
-	RTE_BUILD_BUG_ON((offsetof(struct rte_ring, cons) &
+	RTE_BUILD_BUG_ON((offsetof(struct TYPE(ring), cons) &
 			  RTE_CACHE_LINE_MASK) != 0);
 #endif
-	RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) &
+	RTE_BUILD_BUG_ON((offsetof(struct TYPE(ring), prod) &
 			  RTE_CACHE_LINE_MASK) != 0);
 #ifdef RTE_LIBRTE_RING_DEBUG
-	RTE_BUILD_BUG_ON((sizeof(struct rte_ring_debug_stats) &
+	RTE_BUILD_BUG_ON((sizeof(struct TYPE(ring_debug_stats) &
 			  RTE_CACHE_LINE_MASK) != 0);
-	RTE_BUILD_BUG_ON((offsetof(struct rte_ring, stats) &
+	RTE_BUILD_BUG_ON((offsetof(struct TYPE(ring), stats) &
 			  RTE_CACHE_LINE_MASK) != 0);
 #endif
 
@@ -1254,7 +1262,7 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
  * Create a new ring named *name* in memory.
  *
  * This function uses ``memzone_reserve()`` to allocate memory. Then it
- * calls rte_ring_init() to initialize an empty ring.
+ * calls ring_init() to initialize an empty ring.
  *
  * The new ring size is set to *count*, which must be a power of
  * two. Water marking is disabled by default. The real usable ring size
@@ -1274,10 +1282,10 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
  * @param flags
  *   An OR of the following:
  *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
- *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
+ *      using ``enqueue()`` or ``enqueue_bulk()``
  *      is "single-producer". Otherwise, it is "multi-producers".
  *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
- *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
+ *      using ``dequeue()`` or ``dequeue_bulk()``
  *      is "single-consumer". Otherwise, it is "multi-consumers".
  * @return
  *   On success, the pointer to the new allocated ring. NULL on error with
@@ -1289,12 +1297,12 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
  *    - EEXIST - a memzone with the same name already exists
  *    - ENOMEM - no appropriate memory area found in which to create memzone
  */
-static inline struct rte_ring *
-rte_ring_create(const char *name, unsigned int count, int socket_id,
+static inline struct TYPE(ring) *
+TYPE(ring_create)(const char *name, unsigned int count, int socket_id,
 		unsigned int flags)
 {
 	char mz_name[RTE_MEMZONE_NAMESIZE];
-	struct rte_ring *r;
+	struct TYPE(ring) *r;
 	struct rte_tailq_entry *te;
 	const struct rte_memzone *mz;
 	ssize_t ring_size;
@@ -1304,7 +1312,7 @@ rte_ring_create(const char *name, unsigned int count, int socket_id,
 
 	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
 
-	ring_size = rte_ring_get_memsize(count);
+	ring_size = TYPE(ring_get_memsize)(count);
 	if (ring_size < 0) {
 		rte_errno = ring_size;
 		return NULL;
@@ -1334,7 +1342,7 @@ rte_ring_create(const char *name, unsigned int count, int socket_id,
 	if (mz != NULL) {
 		r = mz->addr;
 		/* no need to check return value here, checked the args above */
-		rte_ring_init(r, name, count, flags);
+		TYPE(ring_init)(r, name, count, flags);
 
 		te->data = (void *) r;
 		r->memzone = mz;
@@ -1357,7 +1365,7 @@ rte_ring_create(const char *name, unsigned int count, int socket_id,
  *   Ring to free
  */
 static inline void
-rte_ring_free(struct rte_ring *r)
+TYPE(ring_free)(struct TYPE(ring) *r)
 {
 	struct rte_ring_list *ring_list = NULL;
 	struct rte_tailq_entry *te;
@@ -1366,11 +1374,12 @@ rte_ring_free(struct rte_ring *r)
 		return;
 
 	/*
-	 * Ring was not created with rte_ring_create,
+	 * Ring was not created with create,
 	 * therefore, there is no memzone to free.
 	 */
 	if (r->memzone == NULL) {
-		RTE_LOG(ERR, RING, "Cannot free ring (not created with rte_ring_create()");
+		RTE_LOG(ERR, RING,
+			"Cannot free ring (not created with create())\n");
 		return;
 	}
 
@@ -1419,7 +1428,7 @@ rte_ring_free(struct rte_ring *r)
  *   - -EINVAL: Invalid water mark value.
  */
 static inline int
-rte_ring_set_water_mark(struct rte_ring *r, unsigned int count)
+TYPE(ring_set_water_mark)(struct TYPE(ring) *r, unsigned int count)
 {
 	if (count >= r->prod.size)
 		return -EINVAL;
@@ -1441,7 +1450,7 @@ rte_ring_set_water_mark(struct rte_ring *r, unsigned int count)
  *   A pointer to the ring structure.
  */
 static inline void
-rte_ring_dump(FILE *f, const struct rte_ring *r)
+TYPE(ring_dump)(FILE *f, const struct TYPE(ring) *r)
 {
 #ifdef RTE_LIBRTE_RING_DEBUG
 	struct rte_ring_debug_stats sum;
@@ -1455,8 +1464,8 @@ rte_ring_dump(FILE *f, const struct rte_ring *r)
 	fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
 	fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
 	fprintf(f, "  ph=%"PRIu32"\n", r->prod.head);
-	fprintf(f, "  used=%u\n", rte_ring_count(r));
-	fprintf(f, "  avail=%u\n", rte_ring_free_count(r));
+	fprintf(f, "  used=%u\n", TYPE(ring_count)(r));
+	fprintf(f, "  avail=%u\n", TYPE(ring_free_count)(r));
 	if (r->prod.watermark == r->prod.size)
 		fprintf(f, "  watermark=0\n");
 	else
@@ -1500,7 +1509,7 @@ rte_ring_dump(FILE *f, const struct rte_ring *r)
  *   A pointer to a file for output
  */
 static inline void
-rte_ring_list_dump(FILE *f)
+TYPE(ring_list_dump)(FILE *f)
 {
 	const struct rte_tailq_entry *te;
 	struct rte_ring_list *ring_list;
@@ -1510,7 +1519,7 @@ rte_ring_list_dump(FILE *f)
 	rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK);
 
 	TAILQ_FOREACH(te, ring_list, next) {
-		rte_ring_dump(f, (struct rte_ring *) te->data);
+		TYPE(ring_dump)(f, (struct TYPE(ring) *) te->data);
 	}
 
 	rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK);
@@ -1526,11 +1535,11 @@ rte_ring_list_dump(FILE *f)
  *   with rte_errno set appropriately. Possible rte_errno values include:
  *    - ENOENT - required entry not available to return.
  */
-static inline struct rte_ring *
-rte_ring_lookup(const char *name)
+static inline struct TYPE(ring) *
+TYPE(ring_lookup)(const char *name)
 {
 	struct rte_tailq_entry *te;
-	struct rte_ring *r = NULL;
+	struct TYPE(ring) *r = NULL;
 	struct rte_ring_list *ring_list;
 
 	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
@@ -1538,7 +1547,7 @@ rte_ring_lookup(const char *name)
 	rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK);
 
 	TAILQ_FOREACH(te, ring_list, next) {
-		r = (struct rte_ring *) te->data;
+		r = (struct TYPE(ring) *) te->data;
 		if (strncmp(name, r->name, RTE_RING_NAMESIZE) == 0)
 			break;
 	}
-- 
2.9.3

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [dpdk-dev] [RFC PATCH 06/11] ring: use existing power-of-2 function
  2017-01-11 15:05 [dpdk-dev] [RFC PATCH 00/11] generalise rte_ring to allow different datatypes Bruce Richardson
                   ` (4 preceding siblings ...)
  2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 05/11] ring: add user-specified typing to typed rings Bruce Richardson
@ 2017-01-11 15:05 ` Bruce Richardson
  2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 07/11] ring: allow multiple typed rings in the same unit Bruce Richardson
                   ` (5 subsequent siblings)
  11 siblings, 0 replies; 18+ messages in thread
From: Bruce Richardson @ 2017-01-11 15:05 UTC (permalink / raw)
  To: olivier.matz; +Cc: dev, Bruce Richardson

rather than having a special macro for checking for powers of two in
the ring code, use the existing inline function from rte_common.h

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 lib/librte_ring/rte_typed_ring.h | 5 +----
 1 file changed, 1 insertion(+), 4 deletions(-)

diff --git a/lib/librte_ring/rte_typed_ring.h b/lib/librte_ring/rte_typed_ring.h
index 03a9bd7..3f7514f 100644
--- a/lib/librte_ring/rte_typed_ring.h
+++ b/lib/librte_ring/rte_typed_ring.h
@@ -1150,9 +1150,6 @@ TAILQ_HEAD(rte_ring_list, rte_tailq_entry);
 
 extern struct rte_tailq_elem rte_ring_tailq;
 
-/* true if x is a power of 2 */
-#define POWEROF2(x) ((((x)-1) & (x)) == 0)
-
 /**
  * Calculate the memory size needed for a ring
  *
@@ -1173,7 +1170,7 @@ TYPE(ring_get_memsize)(unsigned int count)
 	ssize_t sz;
 
 	/* count must be a power of 2 */
-	if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK)) {
+	if ((!rte_is_power_of_2(count)) || (count > RTE_RING_SZ_MASK)) {
 		RTE_LOG(ERR, RING,
 			"Requested size is invalid, must be power of 2, and "
 			"do not exceed the size limit %u\n", RTE_RING_SZ_MASK);
-- 
2.9.3

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [dpdk-dev] [RFC PATCH 07/11] ring: allow multiple typed rings in the same unit
  2017-01-11 15:05 [dpdk-dev] [RFC PATCH 00/11] generalise rte_ring to allow different datatypes Bruce Richardson
                   ` (5 preceding siblings ...)
  2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 06/11] ring: use existing power-of-2 function Bruce Richardson
@ 2017-01-11 15:05 ` Bruce Richardson
  2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 08/11] app/pdump: remove duplicate macro definition Bruce Richardson
                   ` (4 subsequent siblings)
  11 siblings, 0 replies; 18+ messages in thread
From: Bruce Richardson @ 2017-01-11 15:05 UTC (permalink / raw)
  To: olivier.matz; +Cc: dev, Bruce Richardson

allow the typed ring header file to be included multiple times inside a
C file so that we can have multiple different ring types in use. This
is tested by having a second ring type in the unit tests, which works
with small (16 byte) structures, rather than just pointers.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 app/test/test_typed_ring.c       |  60 +++++++++++++++-
 lib/librte_ring/rte_typed_ring.h | 144 ++++++++++++++++++++-------------------
 2 files changed, 132 insertions(+), 72 deletions(-)

diff --git a/app/test/test_typed_ring.c b/app/test/test_typed_ring.c
index aaef023..b403af5 100644
--- a/app/test/test_typed_ring.c
+++ b/app/test/test_typed_ring.c
@@ -37,6 +37,19 @@
 #define RING_TYPE struct rte_mbuf *
 #define RING_TYPE_NAME rte_mbuf
 #include <rte_typed_ring.h>
+#undef RING_TYPE_NAME
+#undef RING_TYPE
+
+struct xyval {
+	uint64_t x;
+	void *y;
+};
+
+#define RING_TYPE struct xyval /* structure not pointer */
+#define RING_TYPE_NAME rte_xyval
+#include <rte_typed_ring.h>
+#undef RING_TYPE_NAME
+#undef RING_TYPE
 
 #define RING_SIZE 256
 #define BURST_SZ 32
@@ -73,6 +86,38 @@ test_mbuf_enqueue_dequeue(struct rte_mbuf_ring *r)
 	return 0;
 }
 
+static int
+test_xyval_enqueue_dequeue(struct rte_xyval_ring *r)
+{
+	struct xyval inbufs[BURST_SZ];
+	struct xyval outbufs[BURST_SZ];
+	unsigned int i, j;
+
+	for (i = 0; i < BURST_SZ; i++)
+		inbufs[i].x = rte_rand();
+
+	for (i = 0; i < ITERATIONS; i++) {
+		uint16_t in = rte_xyval_ring_enqueue_burst(r, inbufs, BURST_SZ);
+		if (in != BURST_SZ) {
+			printf("Error enqueuing xyvals\n");
+			return -1;
+		}
+		uint16_t out = rte_xyval_ring_dequeue_burst(r, outbufs, BURST_SZ);
+		if (out != BURST_SZ) {
+			printf("Error dequeuing xyvals\n");
+			return -1;
+		}
+
+		for (j = 0; j < BURST_SZ; j++)
+			if (outbufs[j].x != inbufs[j].x ||
+					outbufs[j].y != inbufs[j].y) {
+				printf("Error: dequeued val != enqueued val\n");
+				return -1;
+			}
+	}
+	return 0;
+}
+
 /**
  * test entry point
  */
@@ -87,13 +132,24 @@ test_typed_ring(void)
 		return -1;
 	}
 	rte_mbuf_ring_list_dump(stdout);
-
+	printf("mbuf ring has memory size %u\n",
+			(unsigned int)rte_mbuf_ring_get_memsize(RING_SIZE));
 	if (test_mbuf_enqueue_dequeue(r) != 0) {
 		rte_mbuf_ring_free(r);
 		return -1;
 	}
-
 	rte_mbuf_ring_free(r);
+
+	struct rte_xyval_ring *r2;
+	r2 = rte_xyval_ring_create("xyval_ring", RING_SIZE, rte_socket_id(),
+			RING_F_SP_ENQ | RING_F_SC_DEQ);
+	if (test_xyval_enqueue_dequeue(r2) != 0) {
+		rte_xyval_ring_free(r2);
+		return -1;
+	}
+	printf("xyval ring has memory size %u\n",
+			(unsigned int)rte_xyval_ring_get_memsize(RING_SIZE));
+	rte_xyval_ring_free(r2);
 	return 0;
 }
 
diff --git a/lib/librte_ring/rte_typed_ring.h b/lib/librte_ring/rte_typed_ring.h
index 3f7514f..79edc65 100644
--- a/lib/librte_ring/rte_typed_ring.h
+++ b/lib/librte_ring/rte_typed_ring.h
@@ -114,14 +114,6 @@ extern "C" {
 #define _CAT(a, b) a ## _ ## b
 #define CAT(a, b) _CAT(a, b)
 
-#ifndef RING_TYPE_NAME
-#error "Need RING_TYPE_NAME defined before including"
-#endif
-#ifndef RING_TYPE
-#error "Need RING_TYPE defined before including"
-#endif
-#define TYPE(x) CAT(RING_TYPE_NAME, x)
-
 #define RTE_TAILQ_RING_NAME "RTE_RING"
 
 enum rte_ring_queue_behavior {
@@ -160,63 +152,7 @@ struct rte_ring_debug_stats {
 #define RTE_RING_PAUSE_REP_COUNT 0
 #endif
 
-struct rte_memzone; /* forward declaration, so as not to require memzone.h */
-
-/**
- * An RTE ring structure.
- *
- * The producer and the consumer have a head and a tail index. The particularity
- * of these index is that they are not between 0 and size(ring). These indexes
- * are between 0 and 2^32, and we mask their value when we access the ring[]
- * field. Thanks to this assumption, we can do subtractions between 2 index
- * values in a modulo-32bit base: that's why the overflow of the indexes is not
- * a problem.
- */
-struct TYPE(ring) {
-	/*
-	 * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
-	 * compatibility requirements, it could be changed to RTE_RING_NAMESIZE
-	 * next time the ABI changes
-	 */
-	char name[RTE_MEMZONE_NAMESIZE];    /**< Name of the ring. */
-	int flags;                       /**< Flags supplied at creation. */
-	const struct rte_memzone *memzone;
-			/**< Memzone, if any, containing the ring */
-
-	/** Ring producer status. */
-	struct prod {
-		uint32_t watermark;      /**< Maximum items before EDQUOT. */
-		uint32_t sp_enqueue;     /**< True, if single producer. */
-		uint32_t size;           /**< Size of ring. */
-		uint32_t mask;           /**< Mask (size-1) of ring. */
-		volatile uint32_t head;  /**< Producer head. */
-		volatile uint32_t tail;  /**< Producer tail. */
-	} prod __rte_cache_aligned;
-
-	/** Ring consumer status. */
-	struct cons {
-		uint32_t sc_dequeue;     /**< True, if single consumer. */
-		uint32_t size;           /**< Size of the ring. */
-		uint32_t mask;           /**< Mask (size-1) of ring. */
-		volatile uint32_t head;  /**< Consumer head. */
-		volatile uint32_t tail;  /**< Consumer tail. */
-#ifdef RTE_RING_SPLIT_PROD_CONS
-	} cons __rte_cache_aligned;
-#else
-	} cons;
-#endif
-
-#ifdef RTE_LIBRTE_RING_DEBUG
-	struct rte_ring_debug_stats stats[RTE_MAX_LCORE];
-#endif
-
-	/**
-	 * Memory space of ring starts here.
-	 * not volatile so need to be careful
-	 * about compiler re-ordering
-	 */
-	RING_TYPE ring[] __rte_cache_aligned;
-};
+TAILQ_HEAD(rte_ring_list, rte_tailq_entry);
 
 #define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
 #define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */
@@ -304,6 +240,77 @@ struct TYPE(ring) {
 	} \
 } while (0)
 
+extern struct rte_tailq_elem rte_ring_tailq;
+
+#endif /* _RTE_RING_H_ */
+
+#ifndef RING_TYPE_NAME
+#error "Need RING_TYPE_NAME defined before including"
+#endif
+#ifndef RING_TYPE
+#error "Need RING_TYPE defined before including"
+#endif
+#define TYPE(x) CAT(RING_TYPE_NAME, x)
+
+struct rte_memzone; /* forward declaration, so as not to require memzone.h */
+
+/**
+ * An RTE ring structure.
+ *
+ * The producer and the consumer have a head and a tail index. The particularity
+ * of these index is that they are not between 0 and size(ring). These indexes
+ * are between 0 and 2^32, and we mask their value when we access the ring[]
+ * field. Thanks to this assumption, we can do subtractions between 2 index
+ * values in a modulo-32bit base: that's why the overflow of the indexes is not
+ * a problem.
+ */
+struct TYPE(ring) {
+	/*
+	 * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
+	 * compatibility requirements, it could be changed to RTE_RING_NAMESIZE
+	 * next time the ABI changes
+	 */
+	char name[RTE_MEMZONE_NAMESIZE];    /**< Name of the ring. */
+	int flags;                       /**< Flags supplied at creation. */
+	const struct rte_memzone *memzone;
+			/**< Memzone, if any, containing the ring */
+
+	/** Ring producer status. */
+	struct {
+		uint32_t watermark;      /**< Maximum items before EDQUOT. */
+		uint32_t sp_enqueue;     /**< True, if single producer. */
+		uint32_t size;           /**< Size of ring. */
+		uint32_t mask;           /**< Mask (size-1) of ring. */
+		volatile uint32_t head;  /**< Producer head. */
+		volatile uint32_t tail;  /**< Producer tail. */
+	} prod __rte_cache_aligned;
+
+	/** Ring consumer status. */
+	struct {
+		uint32_t sc_dequeue;     /**< True, if single consumer. */
+		uint32_t size;           /**< Size of the ring. */
+		uint32_t mask;           /**< Mask (size-1) of ring. */
+		volatile uint32_t head;  /**< Consumer head. */
+		volatile uint32_t tail;  /**< Consumer tail. */
+#ifdef RTE_RING_SPLIT_PROD_CONS
+	} cons __rte_cache_aligned;
+#else
+	} cons;
+#endif
+
+#ifdef RTE_LIBRTE_RING_DEBUG
+	struct rte_ring_debug_stats stats[RTE_MAX_LCORE];
+#endif
+
+	/**
+	 * Memory space of ring starts here.
+	 * not volatile so need to be careful
+	 * about compiler re-ordering
+	 */
+	RING_TYPE ring[] __rte_cache_aligned;
+};
+
+
 /**
  * @internal Enqueue several objects on the ring (multi-producers safe).
  *
@@ -1146,10 +1153,6 @@ TYPE(ring_dequeue_burst)(struct TYPE(ring) *r, RING_TYPE *obj_table, unsigned in
 		return TYPE(ring_mc_dequeue_burst)(r, obj_table, n);
 }
 
-TAILQ_HEAD(rte_ring_list, rte_tailq_entry);
-
-extern struct rte_tailq_elem rte_ring_tailq;
-
 /**
  * Calculate the memory size needed for a ring
  *
@@ -1559,8 +1562,9 @@ TYPE(ring_lookup)(const char *name)
 	return r;
 }
 
+#undef TYPE
+
 #ifdef __cplusplus
 }
 #endif
 
-#endif /* _RTE_RING_H_ */
-- 
2.9.3

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [dpdk-dev] [RFC PATCH 08/11] app/pdump: remove duplicate macro definition
  2017-01-11 15:05 [dpdk-dev] [RFC PATCH 00/11] generalise rte_ring to allow different datatypes Bruce Richardson
                   ` (6 preceding siblings ...)
  2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 07/11] ring: allow multiple typed rings in the same unit Bruce Richardson
@ 2017-01-11 15:05 ` Bruce Richardson
  2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 09/11] ring: make existing rings reuse the typed ring definitions Bruce Richardson
                   ` (3 subsequent siblings)
  11 siblings, 0 replies; 18+ messages in thread
From: Bruce Richardson @ 2017-01-11 15:05 UTC (permalink / raw)
  To: olivier.matz; +Cc: dev, Bruce Richardson

The ring size macro from rte_ring.h is duplicated in pdump. Remove it.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 app/pdump/main.c | 1 -
 1 file changed, 1 deletion(-)

diff --git a/app/pdump/main.c b/app/pdump/main.c
index f3ef181..b88090d 100644
--- a/app/pdump/main.c
+++ b/app/pdump/main.c
@@ -92,7 +92,6 @@
 #define BURST_SIZE 32
 #define NUM_VDEVS 2
 
-#define RTE_RING_SZ_MASK  (unsigned)(0x0fffffff) /**< Ring size mask */
 /* true if x is a power of 2 */
 #define POWEROF2(x) ((((x)-1) & (x)) == 0)
 
-- 
2.9.3

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [dpdk-dev] [RFC PATCH 09/11] ring: make existing rings reuse the typed ring definitions
  2017-01-11 15:05 [dpdk-dev] [RFC PATCH 00/11] generalise rte_ring to allow different datatypes Bruce Richardson
                   ` (7 preceding siblings ...)
  2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 08/11] app/pdump: remove duplicate macro definition Bruce Richardson
@ 2017-01-11 15:05 ` Bruce Richardson
  2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 10/11] ring: reuse typed rings management functions Bruce Richardson
                   ` (2 subsequent siblings)
  11 siblings, 0 replies; 18+ messages in thread
From: Bruce Richardson @ 2017-01-11 15:05 UTC (permalink / raw)
  To: olivier.matz; +Cc: dev, Bruce Richardson

now that the typed rings are functional, start removing the old code from
the existing rings, so that we don't have code duplication. The first
to be removed are the structure and macro definitions which are duplicated.
This allows the typed rings and regular ring headers to co-exist, so we
can change the typed rings to use their own guard macro.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 lib/librte_ring/rte_ring.c       |   2 -
 lib/librte_ring/rte_ring.h       | 183 ++-------------------------------------
 lib/librte_ring/rte_typed_ring.h |   6 +-
 3 files changed, 9 insertions(+), 182 deletions(-)

diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
index 8ead295..b6215f6 100644
--- a/lib/librte_ring/rte_ring.c
+++ b/lib/librte_ring/rte_ring.c
@@ -89,8 +89,6 @@
 
 #include "rte_ring.h"
 
-TAILQ_HEAD(rte_ring_list, rte_tailq_entry);
-
 struct rte_tailq_elem rte_ring_tailq = {
 	.name = RTE_TAILQ_RING_NAME,
 };
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index e359aff..4e74efd 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -91,132 +91,13 @@
 extern "C" {
 #endif
 
-#include <stdio.h>
-#include <stdint.h>
-#include <sys/queue.h>
-#include <errno.h>
-#include <rte_common.h>
-#include <rte_memory.h>
-#include <rte_lcore.h>
-#include <rte_atomic.h>
-#include <rte_branch_prediction.h>
-#include <rte_memzone.h>
-
-#define RTE_TAILQ_RING_NAME "RTE_RING"
-
-enum rte_ring_queue_behavior {
-	RTE_RING_QUEUE_FIXED = 0, /* Enq/Deq a fixed number of items from a ring */
-	RTE_RING_QUEUE_VARIABLE   /* Enq/Deq as many items as possible from ring */
-};
-
-#ifdef RTE_LIBRTE_RING_DEBUG
-/**
- * A structure that stores the ring statistics (per-lcore).
- */
-struct rte_ring_debug_stats {
-	uint64_t enq_success_bulk; /**< Successful enqueues number. */
-	uint64_t enq_success_objs; /**< Objects successfully enqueued. */
-	uint64_t enq_quota_bulk;   /**< Successful enqueues above watermark. */
-	uint64_t enq_quota_objs;   /**< Objects enqueued above watermark. */
-	uint64_t enq_fail_bulk;    /**< Failed enqueues number. */
-	uint64_t enq_fail_objs;    /**< Objects that failed to be enqueued. */
-	uint64_t deq_success_bulk; /**< Successful dequeues number. */
-	uint64_t deq_success_objs; /**< Objects successfully dequeued. */
-	uint64_t deq_fail_bulk;    /**< Failed dequeues number. */
-	uint64_t deq_fail_objs;    /**< Objects that failed to be dequeued. */
-} __rte_cache_aligned;
-#endif
-
-#define RTE_RING_MZ_PREFIX "RG_"
-/**< The maximum length of a ring name. */
-#define RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
-			   sizeof(RTE_RING_MZ_PREFIX) + 1)
-
-#ifndef RTE_RING_PAUSE_REP_COUNT
-#define RTE_RING_PAUSE_REP_COUNT 0 /**< Yield after pause num of times, no yield
-                                    *   if RTE_RING_PAUSE_REP not defined. */
-#endif
-
-struct rte_memzone; /* forward declaration, so as not to require memzone.h */
-
-/**
- * An RTE ring structure.
- *
- * The producer and the consumer have a head and a tail index. The particularity
- * of these index is that they are not between 0 and size(ring). These indexes
- * are between 0 and 2^32, and we mask their value when we access the ring[]
- * field. Thanks to this assumption, we can do subtractions between 2 index
- * values in a modulo-32bit base: that's why the overflow of the indexes is not
- * a problem.
- */
-struct rte_ring {
-	/*
-	 * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
-	 * compatibility requirements, it could be changed to RTE_RING_NAMESIZE
-	 * next time the ABI changes
-	 */
-	char name[RTE_MEMZONE_NAMESIZE];    /**< Name of the ring. */
-	int flags;                       /**< Flags supplied at creation. */
-	const struct rte_memzone *memzone;
-			/**< Memzone, if any, containing the rte_ring */
-
-	/** Ring producer status. */
-	struct prod {
-		uint32_t watermark;      /**< Maximum items before EDQUOT. */
-		uint32_t sp_enqueue;     /**< True, if single producer. */
-		uint32_t size;           /**< Size of ring. */
-		uint32_t mask;           /**< Mask (size-1) of ring. */
-		volatile uint32_t head;  /**< Producer head. */
-		volatile uint32_t tail;  /**< Producer tail. */
-	} prod __rte_cache_aligned;
-
-	/** Ring consumer status. */
-	struct cons {
-		uint32_t sc_dequeue;     /**< True, if single consumer. */
-		uint32_t size;           /**< Size of the ring. */
-		uint32_t mask;           /**< Mask (size-1) of ring. */
-		volatile uint32_t head;  /**< Consumer head. */
-		volatile uint32_t tail;  /**< Consumer tail. */
-#ifdef RTE_RING_SPLIT_PROD_CONS
-	} cons __rte_cache_aligned;
-#else
-	} cons;
-#endif
-
-#ifdef RTE_LIBRTE_RING_DEBUG
-	struct rte_ring_debug_stats stats[RTE_MAX_LCORE];
-#endif
+#define RING_TYPE void *
+#define RING_TYPE_NAME rte_void
+#include "rte_typed_ring.h"
+#undef RING_TYPE
+#undef RING_TYPE_NAME
 
-	void *ring[] __rte_cache_aligned;   /**< Memory space of ring starts here.
-	                                     * not volatile so need to be careful
-	                                     * about compiler re-ordering */
-};
-
-#define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
-#define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */
-#define RTE_RING_QUOT_EXCEED (1 << 31)  /**< Quota exceed for burst ops */
-#define RTE_RING_SZ_MASK  (unsigned)(0x0fffffff) /**< Ring size mask */
-
-/**
- * @internal When debug is enabled, store ring statistics.
- * @param r
- *   A pointer to the ring.
- * @param name
- *   The name of the statistics field to increment in the ring.
- * @param n
- *   The number to add to the object-oriented statistics.
- */
-#ifdef RTE_LIBRTE_RING_DEBUG
-#define __RING_STAT_ADD(r, name, n) do {                        \
-		unsigned __lcore_id = rte_lcore_id();           \
-		if (__lcore_id < RTE_MAX_LCORE) {               \
-			r->stats[__lcore_id].name##_objs += n;  \
-			r->stats[__lcore_id].name##_bulk += 1;  \
-		}                                               \
-	} while(0)
-#else
-#define __RING_STAT_ADD(r, name, n) do {} while(0)
-#endif
+#define rte_ring rte_void_ring
 
 /**
  * Calculate the memory size needed for a ring
@@ -350,58 +231,6 @@ int rte_ring_set_water_mark(struct rte_ring *r, unsigned count);
  */
 void rte_ring_dump(FILE *f, const struct rte_ring *r);
 
-/* the actual enqueue of pointers on the ring.
- * Placed here since identical code needed in both
- * single and multi producer enqueue functions */
-#define ENQUEUE_PTRS() do { \
-	const uint32_t size = r->prod.size; \
-	uint32_t idx = prod_head & mask; \
-	if (likely(idx + n < size)) { \
-		for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
-			r->ring[idx] = obj_table[i]; \
-			r->ring[idx+1] = obj_table[i+1]; \
-			r->ring[idx+2] = obj_table[i+2]; \
-			r->ring[idx+3] = obj_table[i+3]; \
-		} \
-		switch (n & 0x3) { \
-			case 3: r->ring[idx++] = obj_table[i++]; \
-			case 2: r->ring[idx++] = obj_table[i++]; \
-			case 1: r->ring[idx++] = obj_table[i++]; \
-		} \
-	} else { \
-		for (i = 0; idx < size; i++, idx++)\
-			r->ring[idx] = obj_table[i]; \
-		for (idx = 0; i < n; i++, idx++) \
-			r->ring[idx] = obj_table[i]; \
-	} \
-} while(0)
-
-/* the actual copy of pointers on the ring to obj_table.
- * Placed here since identical code needed in both
- * single and multi consumer dequeue functions */
-#define DEQUEUE_PTRS() do { \
-	uint32_t idx = cons_head & mask; \
-	const uint32_t size = r->cons.size; \
-	if (likely(idx + n < size)) { \
-		for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
-			obj_table[i] = r->ring[idx]; \
-			obj_table[i+1] = r->ring[idx+1]; \
-			obj_table[i+2] = r->ring[idx+2]; \
-			obj_table[i+3] = r->ring[idx+3]; \
-		} \
-		switch (n & 0x3) { \
-			case 3: obj_table[i++] = r->ring[idx++]; \
-			case 2: obj_table[i++] = r->ring[idx++]; \
-			case 1: obj_table[i++] = r->ring[idx++]; \
-		} \
-	} else { \
-		for (i = 0; idx < size; i++, idx++) \
-			obj_table[i] = r->ring[idx]; \
-		for (idx = 0; i < n; i++, idx++) \
-			obj_table[i] = r->ring[idx]; \
-	} \
-} while (0)
-
 /**
  * @internal Enqueue several objects on the ring (multi-producers safe).
  *
diff --git a/lib/librte_ring/rte_typed_ring.h b/lib/librte_ring/rte_typed_ring.h
index 79edc65..89f6983 100644
--- a/lib/librte_ring/rte_typed_ring.h
+++ b/lib/librte_ring/rte_typed_ring.h
@@ -63,8 +63,8 @@
  *
  ***************************************************************************/
 
-#ifndef _RTE_RING_H_
-#define _RTE_RING_H_
+#ifndef _RTE_TYPED_RING_H_
+#define _RTE_TYPED_RING_H_
 
 /**
  * @file
@@ -242,7 +242,7 @@ TAILQ_HEAD(rte_ring_list, rte_tailq_entry);
 
 extern struct rte_tailq_elem rte_ring_tailq;
 
-#endif /* _RTE_RING_H_ */
+#endif /* _RTE_TYPED_RING_H_ */
 
 #ifndef RING_TYPE_NAME
 #error "Need RING_TYPE_NAME defined before including"
-- 
2.9.3

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [dpdk-dev] [RFC PATCH 10/11] ring: reuse typed rings management functions
  2017-01-11 15:05 [dpdk-dev] [RFC PATCH 00/11] generalise rte_ring to allow different datatypes Bruce Richardson
                   ` (8 preceding siblings ...)
  2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 09/11] ring: make existing rings reuse the typed ring definitions Bruce Richardson
@ 2017-01-11 15:05 ` Bruce Richardson
  2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 11/11] ring: reuse typed ring enqueue and dequeue functions Bruce Richardson
  2017-01-13 14:23 ` [dpdk-dev] [RFC PATCH 00/11] generalise rte_ring to allow different datatypes Olivier Matz
  11 siblings, 0 replies; 18+ messages in thread
From: Bruce Richardson @ 2017-01-11 15:05 UTC (permalink / raw)
  To: olivier.matz; +Cc: dev, Bruce Richardson

remove code from rte_ring.c that is present in the typed rings
implementation. Have the rte_ring functions just call the typed
(rte_void_ring) equivalents.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 lib/librte_ring/rte_ring.c | 242 ++-------------------------------------------
 1 file changed, 8 insertions(+), 234 deletions(-)

diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
index b6215f6..3171549 100644
--- a/lib/librte_ring/rte_ring.c
+++ b/lib/librte_ring/rte_ring.c
@@ -94,65 +94,18 @@ struct rte_tailq_elem rte_ring_tailq = {
 };
 EAL_REGISTER_TAILQ(rte_ring_tailq)
 
-/* true if x is a power of 2 */
-#define POWEROF2(x) ((((x)-1) & (x)) == 0)
-
 /* return the size of memory occupied by a ring */
 ssize_t
 rte_ring_get_memsize(unsigned count)
 {
-	ssize_t sz;
-
-	/* count must be a power of 2 */
-	if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
-		RTE_LOG(ERR, RING,
-			"Requested size is invalid, must be power of 2, and "
-			"do not exceed the size limit %u\n", RTE_RING_SZ_MASK);
-		return -EINVAL;
-	}
-
-	sz = sizeof(struct rte_ring) + count * sizeof(void *);
-	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
-	return sz;
+	return rte_void_ring_get_memsize(count);
 }
 
 int
 rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
 	unsigned flags)
 {
-	int ret;
-
-	/* compilation-time checks */
-	RTE_BUILD_BUG_ON((sizeof(struct rte_ring) &
-			  RTE_CACHE_LINE_MASK) != 0);
-#ifdef RTE_RING_SPLIT_PROD_CONS
-	RTE_BUILD_BUG_ON((offsetof(struct rte_ring, cons) &
-			  RTE_CACHE_LINE_MASK) != 0);
-#endif
-	RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) &
-			  RTE_CACHE_LINE_MASK) != 0);
-#ifdef RTE_LIBRTE_RING_DEBUG
-	RTE_BUILD_BUG_ON((sizeof(struct rte_ring_debug_stats) &
-			  RTE_CACHE_LINE_MASK) != 0);
-	RTE_BUILD_BUG_ON((offsetof(struct rte_ring, stats) &
-			  RTE_CACHE_LINE_MASK) != 0);
-#endif
-
-	/* init the ring structure */
-	memset(r, 0, sizeof(*r));
-	ret = snprintf(r->name, sizeof(r->name), "%s", name);
-	if (ret < 0 || ret >= (int)sizeof(r->name))
-		return -ENAMETOOLONG;
-	r->flags = flags;
-	r->prod.watermark = count;
-	r->prod.sp_enqueue = !!(flags & RING_F_SP_ENQ);
-	r->cons.sc_dequeue = !!(flags & RING_F_SC_DEQ);
-	r->prod.size = r->cons.size = count;
-	r->prod.mask = r->cons.mask = count-1;
-	r->prod.head = r->cons.head = 0;
-	r->prod.tail = r->cons.tail = 0;
-
-	return 0;
+	return rte_void_ring_init(r, name, count, flags);
 }
 
 /* create the ring */
@@ -160,106 +113,14 @@ struct rte_ring *
 rte_ring_create(const char *name, unsigned count, int socket_id,
 		unsigned flags)
 {
-	char mz_name[RTE_MEMZONE_NAMESIZE];
-	struct rte_ring *r;
-	struct rte_tailq_entry *te;
-	const struct rte_memzone *mz;
-	ssize_t ring_size;
-	int mz_flags = 0;
-	struct rte_ring_list* ring_list = NULL;
-	int ret;
-
-	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
-
-	ring_size = rte_ring_get_memsize(count);
-	if (ring_size < 0) {
-		rte_errno = ring_size;
-		return NULL;
-	}
-
-	ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
-		RTE_RING_MZ_PREFIX, name);
-	if (ret < 0 || ret >= (int)sizeof(mz_name)) {
-		rte_errno = ENAMETOOLONG;
-		return NULL;
-	}
-
-	te = rte_zmalloc("RING_TAILQ_ENTRY", sizeof(*te), 0);
-	if (te == NULL) {
-		RTE_LOG(ERR, RING, "Cannot reserve memory for tailq\n");
-		rte_errno = ENOMEM;
-		return NULL;
-	}
-
-	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
-
-	/* reserve a memory zone for this ring. If we can't get rte_config or
-	 * we are secondary process, the memzone_reserve function will set
-	 * rte_errno for us appropriately - hence no check in this this function */
-	mz = rte_memzone_reserve(mz_name, ring_size, socket_id, mz_flags);
-	if (mz != NULL) {
-		r = mz->addr;
-		/* no need to check return value here, we already checked the
-		 * arguments above */
-		rte_ring_init(r, name, count, flags);
-
-		te->data = (void *) r;
-		r->memzone = mz;
-
-		TAILQ_INSERT_TAIL(ring_list, te, next);
-	} else {
-		r = NULL;
-		RTE_LOG(ERR, RING, "Cannot reserve memory\n");
-		rte_free(te);
-	}
-	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
-
-	return r;
+	return rte_void_ring_create(name, count, socket_id, flags);
 }
 
 /* free the ring */
 void
 rte_ring_free(struct rte_ring *r)
 {
-	struct rte_ring_list *ring_list = NULL;
-	struct rte_tailq_entry *te;
-
-	if (r == NULL)
-		return;
-
-	/*
-	 * Ring was not created with rte_ring_create,
-	 * therefore, there is no memzone to free.
-	 */
-	if (r->memzone == NULL) {
-		RTE_LOG(ERR, RING, "Cannot free ring (not created with rte_ring_create()");
-		return;
-	}
-
-	if (rte_memzone_free(r->memzone) != 0) {
-		RTE_LOG(ERR, RING, "Cannot free memory\n");
-		return;
-	}
-
-	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
-	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
-
-	/* find out tailq entry */
-	TAILQ_FOREACH(te, ring_list, next) {
-		if (te->data == (void *) r)
-			break;
-	}
-
-	if (te == NULL) {
-		rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
-		return;
-	}
-
-	TAILQ_REMOVE(ring_list, te, next);
-
-	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
-
-	rte_free(te);
+	rte_void_ring_free(r);
 }
 
 /*
@@ -269,113 +130,26 @@ rte_ring_free(struct rte_ring *r)
 int
 rte_ring_set_water_mark(struct rte_ring *r, unsigned count)
 {
-	if (count >= r->prod.size)
-		return -EINVAL;
-
-	/* if count is 0, disable the watermarking */
-	if (count == 0)
-		count = r->prod.size;
-
-	r->prod.watermark = count;
-	return 0;
+	return rte_void_ring_set_water_mark(r, count);
 }
 
 /* dump the status of the ring on the console */
 void
 rte_ring_dump(FILE *f, const struct rte_ring *r)
 {
-#ifdef RTE_LIBRTE_RING_DEBUG
-	struct rte_ring_debug_stats sum;
-	unsigned lcore_id;
-#endif
-
-	fprintf(f, "ring <%s>@%p\n", r->name, r);
-	fprintf(f, "  flags=%x\n", r->flags);
-	fprintf(f, "  size=%"PRIu32"\n", r->prod.size);
-	fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
-	fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
-	fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
-	fprintf(f, "  ph=%"PRIu32"\n", r->prod.head);
-	fprintf(f, "  used=%u\n", rte_ring_count(r));
-	fprintf(f, "  avail=%u\n", rte_ring_free_count(r));
-	if (r->prod.watermark == r->prod.size)
-		fprintf(f, "  watermark=0\n");
-	else
-		fprintf(f, "  watermark=%"PRIu32"\n", r->prod.watermark);
-
-	/* sum and dump statistics */
-#ifdef RTE_LIBRTE_RING_DEBUG
-	memset(&sum, 0, sizeof(sum));
-	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
-		sum.enq_success_bulk += r->stats[lcore_id].enq_success_bulk;
-		sum.enq_success_objs += r->stats[lcore_id].enq_success_objs;
-		sum.enq_quota_bulk += r->stats[lcore_id].enq_quota_bulk;
-		sum.enq_quota_objs += r->stats[lcore_id].enq_quota_objs;
-		sum.enq_fail_bulk += r->stats[lcore_id].enq_fail_bulk;
-		sum.enq_fail_objs += r->stats[lcore_id].enq_fail_objs;
-		sum.deq_success_bulk += r->stats[lcore_id].deq_success_bulk;
-		sum.deq_success_objs += r->stats[lcore_id].deq_success_objs;
-		sum.deq_fail_bulk += r->stats[lcore_id].deq_fail_bulk;
-		sum.deq_fail_objs += r->stats[lcore_id].deq_fail_objs;
-	}
-	fprintf(f, "  size=%"PRIu32"\n", r->prod.size);
-	fprintf(f, "  enq_success_bulk=%"PRIu64"\n", sum.enq_success_bulk);
-	fprintf(f, "  enq_success_objs=%"PRIu64"\n", sum.enq_success_objs);
-	fprintf(f, "  enq_quota_bulk=%"PRIu64"\n", sum.enq_quota_bulk);
-	fprintf(f, "  enq_quota_objs=%"PRIu64"\n", sum.enq_quota_objs);
-	fprintf(f, "  enq_fail_bulk=%"PRIu64"\n", sum.enq_fail_bulk);
-	fprintf(f, "  enq_fail_objs=%"PRIu64"\n", sum.enq_fail_objs);
-	fprintf(f, "  deq_success_bulk=%"PRIu64"\n", sum.deq_success_bulk);
-	fprintf(f, "  deq_success_objs=%"PRIu64"\n", sum.deq_success_objs);
-	fprintf(f, "  deq_fail_bulk=%"PRIu64"\n", sum.deq_fail_bulk);
-	fprintf(f, "  deq_fail_objs=%"PRIu64"\n", sum.deq_fail_objs);
-#else
-	fprintf(f, "  no statistics available\n");
-#endif
+	rte_void_ring_dump(f, r);
 }
 
 /* dump the status of all rings on the console */
 void
 rte_ring_list_dump(FILE *f)
 {
-	const struct rte_tailq_entry *te;
-	struct rte_ring_list *ring_list;
-
-	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
-
-	rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK);
-
-	TAILQ_FOREACH(te, ring_list, next) {
-		rte_ring_dump(f, (struct rte_ring *) te->data);
-	}
-
-	rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK);
+	rte_void_ring_list_dump(f);
 }
 
 /* search a ring from its name */
 struct rte_ring *
 rte_ring_lookup(const char *name)
 {
-	struct rte_tailq_entry *te;
-	struct rte_ring *r = NULL;
-	struct rte_ring_list *ring_list;
-
-	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
-
-	rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK);
-
-	TAILQ_FOREACH(te, ring_list, next) {
-		r = (struct rte_ring *) te->data;
-		if (strncmp(name, r->name, RTE_RING_NAMESIZE) == 0)
-			break;
-	}
-
-	rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK);
-
-	if (te == NULL) {
-		rte_errno = ENOENT;
-		return NULL;
-	}
-
-	return r;
+	return rte_void_ring_lookup(name);
 }
-- 
2.9.3

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [dpdk-dev] [RFC PATCH 11/11] ring: reuse typed ring enqueue and dequeue functions
  2017-01-11 15:05 [dpdk-dev] [RFC PATCH 00/11] generalise rte_ring to allow different datatypes Bruce Richardson
                   ` (9 preceding siblings ...)
  2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 10/11] ring: reuse typed rings management functions Bruce Richardson
@ 2017-01-11 15:05 ` Bruce Richardson
  2017-01-13 14:23 ` [dpdk-dev] [RFC PATCH 00/11] generalise rte_ring to allow different datatypes Olivier Matz
  11 siblings, 0 replies; 18+ messages in thread
From: Bruce Richardson @ 2017-01-11 15:05 UTC (permalink / raw)
  To: olivier.matz; +Cc: dev, Bruce Richardson

reduce code duplicated between rte_ring.h and rte_typed_ring.h by having
the enqueue and dequeue code reused by rte_ring from rte_typed_ring.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 lib/librte_ring/rte_ring.h | 380 +--------------------------------------------
 1 file changed, 8 insertions(+), 372 deletions(-)

diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index 4e74efd..25c4849 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -231,366 +231,10 @@ int rte_ring_set_water_mark(struct rte_ring *r, unsigned count);
  */
 void rte_ring_dump(FILE *f, const struct rte_ring *r);
 
-/**
- * @internal Enqueue several objects on the ring (multi-producers safe).
- *
- * This function uses a "compare and set" instruction to move the
- * producer index atomically.
- *
- * @param r
- *   A pointer to the ring structure.
- * @param obj_table
- *   A pointer to a table of void * pointers (objects).
- * @param n
- *   The number of objects to add in the ring from the obj_table.
- * @param behavior
- *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
- *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
- * @return
- *   Depend on the behavior value
- *   if behavior = RTE_RING_QUEUE_FIXED
- *   - 0: Success; objects enqueue.
- *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
- *     high water mark is exceeded.
- *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
- *   if behavior = RTE_RING_QUEUE_VARIABLE
- *   - n: Actual number of objects enqueued.
- */
-static inline int __attribute__((always_inline))
-__rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
-			 unsigned n, enum rte_ring_queue_behavior behavior)
-{
-	uint32_t prod_head, prod_next;
-	uint32_t cons_tail, free_entries;
-	const unsigned max = n;
-	int success;
-	unsigned i, rep = 0;
-	uint32_t mask = r->prod.mask;
-	int ret;
-
-	/* Avoid the unnecessary cmpset operation below, which is also
-	 * potentially harmful when n equals 0. */
-	if (n == 0)
-		return 0;
-
-	/* move prod.head atomically */
-	do {
-		/* Reset n to the initial burst count */
-		n = max;
-
-		prod_head = r->prod.head;
-		cons_tail = r->cons.tail;
-		/* The subtraction is done between two unsigned 32bits value
-		 * (the result is always modulo 32 bits even if we have
-		 * prod_head > cons_tail). So 'free_entries' is always between 0
-		 * and size(ring)-1. */
-		free_entries = (mask + cons_tail - prod_head);
-
-		/* check that we have enough room in ring */
-		if (unlikely(n > free_entries)) {
-			if (behavior == RTE_RING_QUEUE_FIXED) {
-				__RING_STAT_ADD(r, enq_fail, n);
-				return -ENOBUFS;
-			}
-			else {
-				/* No free entry available */
-				if (unlikely(free_entries == 0)) {
-					__RING_STAT_ADD(r, enq_fail, n);
-					return 0;
-				}
-
-				n = free_entries;
-			}
-		}
-
-		prod_next = prod_head + n;
-		success = rte_atomic32_cmpset(&r->prod.head, prod_head,
-					      prod_next);
-	} while (unlikely(success == 0));
-
-	/* write entries in ring */
-	ENQUEUE_PTRS();
-	rte_smp_wmb();
-
-	/* if we exceed the watermark */
-	if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
-		ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
-				(int)(n | RTE_RING_QUOT_EXCEED);
-		__RING_STAT_ADD(r, enq_quota, n);
-	}
-	else {
-		ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
-		__RING_STAT_ADD(r, enq_success, n);
-	}
-
-	/*
-	 * If there are other enqueues in progress that preceded us,
-	 * we need to wait for them to complete
-	 */
-	while (unlikely(r->prod.tail != prod_head)) {
-		rte_pause();
-
-		/* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
-		 * for other thread finish. It gives pre-empted thread a chance
-		 * to proceed and finish with ring dequeue operation. */
-		if (RTE_RING_PAUSE_REP_COUNT &&
-		    ++rep == RTE_RING_PAUSE_REP_COUNT) {
-			rep = 0;
-			sched_yield();
-		}
-	}
-	r->prod.tail = prod_next;
-	return ret;
-}
-
-/**
- * @internal Enqueue several objects on a ring (NOT multi-producers safe).
- *
- * @param r
- *   A pointer to the ring structure.
- * @param obj_table
- *   A pointer to a table of void * pointers (objects).
- * @param n
- *   The number of objects to add in the ring from the obj_table.
- * @param behavior
- *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
- *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
- * @return
- *   Depend on the behavior value
- *   if behavior = RTE_RING_QUEUE_FIXED
- *   - 0: Success; objects enqueue.
- *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
- *     high water mark is exceeded.
- *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
- *   if behavior = RTE_RING_QUEUE_VARIABLE
- *   - n: Actual number of objects enqueued.
- */
-static inline int __attribute__((always_inline))
-__rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
-			 unsigned n, enum rte_ring_queue_behavior behavior)
-{
-	uint32_t prod_head, cons_tail;
-	uint32_t prod_next, free_entries;
-	unsigned i;
-	uint32_t mask = r->prod.mask;
-	int ret;
-
-	prod_head = r->prod.head;
-	cons_tail = r->cons.tail;
-	/* The subtraction is done between two unsigned 32bits value
-	 * (the result is always modulo 32 bits even if we have
-	 * prod_head > cons_tail). So 'free_entries' is always between 0
-	 * and size(ring)-1. */
-	free_entries = mask + cons_tail - prod_head;
-
-	/* check that we have enough room in ring */
-	if (unlikely(n > free_entries)) {
-		if (behavior == RTE_RING_QUEUE_FIXED) {
-			__RING_STAT_ADD(r, enq_fail, n);
-			return -ENOBUFS;
-		}
-		else {
-			/* No free entry available */
-			if (unlikely(free_entries == 0)) {
-				__RING_STAT_ADD(r, enq_fail, n);
-				return 0;
-			}
-
-			n = free_entries;
-		}
-	}
-
-	prod_next = prod_head + n;
-	r->prod.head = prod_next;
-
-	/* write entries in ring */
-	ENQUEUE_PTRS();
-	rte_smp_wmb();
-
-	/* if we exceed the watermark */
-	if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
-		ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
-			(int)(n | RTE_RING_QUOT_EXCEED);
-		__RING_STAT_ADD(r, enq_quota, n);
-	}
-	else {
-		ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
-		__RING_STAT_ADD(r, enq_success, n);
-	}
-
-	r->prod.tail = prod_next;
-	return ret;
-}
-
-/**
- * @internal Dequeue several objects from a ring (multi-consumers safe). When
- * the request objects are more than the available objects, only dequeue the
- * actual number of objects
- *
- * This function uses a "compare and set" instruction to move the
- * consumer index atomically.
- *
- * @param r
- *   A pointer to the ring structure.
- * @param obj_table
- *   A pointer to a table of void * pointers (objects) that will be filled.
- * @param n
- *   The number of objects to dequeue from the ring to the obj_table.
- * @param behavior
- *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
- *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
- * @return
- *   Depend on the behavior value
- *   if behavior = RTE_RING_QUEUE_FIXED
- *   - 0: Success; objects dequeued.
- *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
- *     dequeued.
- *   if behavior = RTE_RING_QUEUE_VARIABLE
- *   - n: Actual number of objects dequeued.
- */
-
-static inline int __attribute__((always_inline))
-__rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
-		 unsigned n, enum rte_ring_queue_behavior behavior)
-{
-	uint32_t cons_head, prod_tail;
-	uint32_t cons_next, entries;
-	const unsigned max = n;
-	int success;
-	unsigned i, rep = 0;
-	uint32_t mask = r->prod.mask;
-
-	/* Avoid the unnecessary cmpset operation below, which is also
-	 * potentially harmful when n equals 0. */
-	if (n == 0)
-		return 0;
-
-	/* move cons.head atomically */
-	do {
-		/* Restore n as it may change every loop */
-		n = max;
-
-		cons_head = r->cons.head;
-		prod_tail = r->prod.tail;
-		/* The subtraction is done between two unsigned 32bits value
-		 * (the result is always modulo 32 bits even if we have
-		 * cons_head > prod_tail). So 'entries' is always between 0
-		 * and size(ring)-1. */
-		entries = (prod_tail - cons_head);
-
-		/* Set the actual entries for dequeue */
-		if (n > entries) {
-			if (behavior == RTE_RING_QUEUE_FIXED) {
-				__RING_STAT_ADD(r, deq_fail, n);
-				return -ENOENT;
-			}
-			else {
-				if (unlikely(entries == 0)){
-					__RING_STAT_ADD(r, deq_fail, n);
-					return 0;
-				}
-
-				n = entries;
-			}
-		}
-
-		cons_next = cons_head + n;
-		success = rte_atomic32_cmpset(&r->cons.head, cons_head,
-					      cons_next);
-	} while (unlikely(success == 0));
-
-	/* copy in table */
-	DEQUEUE_PTRS();
-	rte_smp_rmb();
-
-	/*
-	 * If there are other dequeues in progress that preceded us,
-	 * we need to wait for them to complete
-	 */
-	while (unlikely(r->cons.tail != cons_head)) {
-		rte_pause();
-
-		/* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
-		 * for other thread finish. It gives pre-empted thread a chance
-		 * to proceed and finish with ring dequeue operation. */
-		if (RTE_RING_PAUSE_REP_COUNT &&
-		    ++rep == RTE_RING_PAUSE_REP_COUNT) {
-			rep = 0;
-			sched_yield();
-		}
-	}
-	__RING_STAT_ADD(r, deq_success, n);
-	r->cons.tail = cons_next;
-
-	return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
-}
-
-/**
- * @internal Dequeue several objects from a ring (NOT multi-consumers safe).
- * When the request objects are more than the available objects, only dequeue
- * the actual number of objects
- *
- * @param r
- *   A pointer to the ring structure.
- * @param obj_table
- *   A pointer to a table of void * pointers (objects) that will be filled.
- * @param n
- *   The number of objects to dequeue from the ring to the obj_table.
- * @param behavior
- *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
- *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
- * @return
- *   Depend on the behavior value
- *   if behavior = RTE_RING_QUEUE_FIXED
- *   - 0: Success; objects dequeued.
- *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
- *     dequeued.
- *   if behavior = RTE_RING_QUEUE_VARIABLE
- *   - n: Actual number of objects dequeued.
- */
-static inline int __attribute__((always_inline))
-__rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
-		 unsigned n, enum rte_ring_queue_behavior behavior)
-{
-	uint32_t cons_head, prod_tail;
-	uint32_t cons_next, entries;
-	unsigned i;
-	uint32_t mask = r->prod.mask;
-
-	cons_head = r->cons.head;
-	prod_tail = r->prod.tail;
-	/* The subtraction is done between two unsigned 32bits value
-	 * (the result is always modulo 32 bits even if we have
-	 * cons_head > prod_tail). So 'entries' is always between 0
-	 * and size(ring)-1. */
-	entries = prod_tail - cons_head;
-
-	if (n > entries) {
-		if (behavior == RTE_RING_QUEUE_FIXED) {
-			__RING_STAT_ADD(r, deq_fail, n);
-			return -ENOENT;
-		}
-		else {
-			if (unlikely(entries == 0)){
-				__RING_STAT_ADD(r, deq_fail, n);
-				return 0;
-			}
-
-			n = entries;
-		}
-	}
-
-	cons_next = cons_head + n;
-	r->cons.head = cons_next;
-
-	/* copy in table */
-	DEQUEUE_PTRS();
-	rte_smp_rmb();
-
-	__RING_STAT_ADD(r, deq_success, n);
-	r->cons.tail = cons_next;
-	return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
-}
+#define __rte_ring_mp_do_enqueue rte_void___ring_mp_do_enqueue
+#define __rte_ring_sp_do_enqueue rte_void___ring_sp_do_enqueue
+#define __rte_ring_mc_do_dequeue rte_void___ring_mc_do_dequeue
+#define __rte_ring_sc_do_dequeue rte_void___ring_sc_do_dequeue
 
 /**
  * Enqueue several objects on the ring (multi-producers safe).
@@ -882,9 +526,7 @@ rte_ring_dequeue(struct rte_ring *r, void **obj_p)
 static inline int
 rte_ring_full(const struct rte_ring *r)
 {
-	uint32_t prod_tail = r->prod.tail;
-	uint32_t cons_tail = r->cons.tail;
-	return ((cons_tail - prod_tail - 1) & r->prod.mask) == 0;
+	return rte_void_ring_full(r);
 }
 
 /**
@@ -899,9 +541,7 @@ rte_ring_full(const struct rte_ring *r)
 static inline int
 rte_ring_empty(const struct rte_ring *r)
 {
-	uint32_t prod_tail = r->prod.tail;
-	uint32_t cons_tail = r->cons.tail;
-	return !!(cons_tail == prod_tail);
+	return rte_void_ring_empty(r);
 }
 
 /**
@@ -915,9 +555,7 @@ rte_ring_empty(const struct rte_ring *r)
 static inline unsigned
 rte_ring_count(const struct rte_ring *r)
 {
-	uint32_t prod_tail = r->prod.tail;
-	uint32_t cons_tail = r->cons.tail;
-	return (prod_tail - cons_tail) & r->prod.mask;
+	return rte_void_ring_count(r);
 }
 
 /**
@@ -931,9 +569,7 @@ rte_ring_count(const struct rte_ring *r)
 static inline unsigned
 rte_ring_free_count(const struct rte_ring *r)
 {
-	uint32_t prod_tail = r->prod.tail;
-	uint32_t cons_tail = r->cons.tail;
-	return (cons_tail - prod_tail - 1) & r->prod.mask;
+	return rte_void_ring_free_count(r);
 }
 
 /**
-- 
2.9.3

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [dpdk-dev] [RFC PATCH 00/11] generalise rte_ring to allow different datatypes
  2017-01-11 15:05 [dpdk-dev] [RFC PATCH 00/11] generalise rte_ring to allow different datatypes Bruce Richardson
                   ` (10 preceding siblings ...)
  2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 11/11] ring: reuse typed ring enqueue and dequeue functions Bruce Richardson
@ 2017-01-13 14:23 ` Olivier Matz
  2017-01-13 15:00   ` Bruce Richardson
  11 siblings, 1 reply; 18+ messages in thread
From: Olivier Matz @ 2017-01-13 14:23 UTC (permalink / raw)
  To: Bruce Richardson; +Cc: dev

Hi Bruce,

On Wed, 11 Jan 2017 15:05:14 +0000, Bruce Richardson
<bruce.richardson@intel.com> wrote:
> The rte_ring library in DPDK provides an excellent high-performance
> mechanism which can be used for passing pointers between cores and
> for other tasks such as buffering. However, it does have a number
> of limitations:
> 
> * type information of pointers is lost, as it works with void pointers
> * typecasting is needed when using enqueue/dequeue burst functions,
>   since arrays of other types cannot be automatically cast to void **
> * the data to be passed through the ring itself must be no bigger than
>   a pointer
> 
> While the first two limitations are an inconvenience, the final one is
> one that can prevent use of rte_rings in cases where their
> functionality is needed. The use-case which has inspired the patchset
> is that of eventdev. When working with rte_events, each event is a
> 16-byte structure consisting of a pointer and some metadata e.g.
> priority and type. For these events, what is passed around between
> cores is not pointers to events, but the events themselves. This
> makes existing rings unsuitable for use by applications working with
> rte_events, and also for use internally inside any software
> implementation of an eventdev.
> 
> For rings to handle events or other similarly sized structures, e.g.
> NIC descriptors, etc., we then have two options - duplicate rte_ring
> code to create new ring implementations for each of those types, or
> generalise the existing code using macros so that the data type
> handled by each rings is a compile time paramter. This patchset takes
> the latter approach, and once applied would allow us to add an
> rte_event_ring type to DPDK using a header file containing:
> 
> #define RING_TYPE struct rte_event
> #define RING_TYPE_NAME rte_event
> #include <rte_typed_ring.h>
> #undef RING_TYPE_NAME
> #undef RING_TYPE
> 
> [NOTE: the event_ring is not defined in this set, since it depends on
> the eventdev implementation not present in the main tree]
> 
> If we want to elimiate some of the typecasting on our code when
> enqueuing and dequeuing mbuf pointers, an rte_mbuf_ring type can be
> similarly created using the same number of lines of code.
> 
> The downside of this generalisation is that the code for the rings now
> has far more use of macros in it. However, I do not feel that overall
> readability suffers much from this change, the since the changes are
> pretty much just search-replace onces. There should also be no ABI
> compatibility issues with this change, since the existing rte_ring
> structures remain the same.

I didn't dive deeply in the patches, just had a quick look. I
understand the need, and even if I really don't like the "#define +
#include" way to create a new specific ring (for readability,
grepability), that may be a solution to your problem.

I think using a similar approach than in sys/queue.h would be even
worse in terms of readability.


What do you think about the following approach?

- add a new elt_size in rte_ring structure

- update create/enqueue/dequeue/... functions to manage the elt size

- change:
    rte_ring_enqueue_bulk(struct rte_ring *r,
      void * const *obj_table, unsigned n)
  to:
    rte_ring_enqueue_bulk(struct rte_ring *r, void *obj_table,
      unsigned n) 

  This relaxes the type for the API in the function. In the caller,
  the type of obj_table would be:
  - (void **) in case of a ring of pointers
  - (uint8_t *) in case of a ring of uint8_t
  - (struct rte_event *) in case of a ring of rte_event
  ...

  I think (I have not tested it) it won't break compilation since
  any type can be implicitly casted into a void *. Also, I'd say it
  is possible to avoid breaking the ABI.

- deprecate or forbid calls to:
    rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
    (and similar)

  Because with a ring of pointers, obj is the pointer, passed by value.
  For other types, we would need
    rte_ring_mp_enqueue(struct rte_ring *r, <TYPE> obj)

  Maybe we could consider using a macro here.


The drawbacks I see are:
- a dynamic elt_size may slightly decrease performance
- it still uses casts to (void *), so there is no type checking


Regards,
Olivier

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [dpdk-dev] [RFC PATCH 00/11] generalise rte_ring to allow different datatypes
  2017-01-13 14:23 ` [dpdk-dev] [RFC PATCH 00/11] generalise rte_ring to allow different datatypes Olivier Matz
@ 2017-01-13 15:00   ` Bruce Richardson
  2017-01-17 13:38     ` Olivier Matz
  0 siblings, 1 reply; 18+ messages in thread
From: Bruce Richardson @ 2017-01-13 15:00 UTC (permalink / raw)
  To: Olivier Matz; +Cc: dev

On Fri, Jan 13, 2017 at 03:23:34PM +0100, Olivier Matz wrote:
> Hi Bruce,
> 
> On Wed, 11 Jan 2017 15:05:14 +0000, Bruce Richardson
> <bruce.richardson@intel.com> wrote:
> > The rte_ring library in DPDK provides an excellent high-performance
> > mechanism which can be used for passing pointers between cores and
> > for other tasks such as buffering. However, it does have a number
> > of limitations:
> > 
> > * type information of pointers is lost, as it works with void pointers
> > * typecasting is needed when using enqueue/dequeue burst functions,
> >   since arrays of other types cannot be automatically cast to void **
> > * the data to be passed through the ring itself must be no bigger than
> >   a pointer
> > 
> > While the first two limitations are an inconvenience, the final one is
> > one that can prevent use of rte_rings in cases where their
> > functionality is needed. The use-case which has inspired the patchset
> > is that of eventdev. When working with rte_events, each event is a
> > 16-byte structure consisting of a pointer and some metadata e.g.
> > priority and type. For these events, what is passed around between
> > cores is not pointers to events, but the events themselves. This
> > makes existing rings unsuitable for use by applications working with
> > rte_events, and also for use internally inside any software
> > implementation of an eventdev.
> > 
> > For rings to handle events or other similarly sized structures, e.g.
> > NIC descriptors, etc., we then have two options - duplicate rte_ring
> > code to create new ring implementations for each of those types, or
> > generalise the existing code using macros so that the data type
> > handled by each rings is a compile time paramter. This patchset takes
> > the latter approach, and once applied would allow us to add an
> > rte_event_ring type to DPDK using a header file containing:
> > 
> > #define RING_TYPE struct rte_event
> > #define RING_TYPE_NAME rte_event
> > #include <rte_typed_ring.h>
> > #undef RING_TYPE_NAME
> > #undef RING_TYPE
> > 
> > [NOTE: the event_ring is not defined in this set, since it depends on
> > the eventdev implementation not present in the main tree]
> > 
> > If we want to elimiate some of the typecasting on our code when
> > enqueuing and dequeuing mbuf pointers, an rte_mbuf_ring type can be
> > similarly created using the same number of lines of code.
> > 
> > The downside of this generalisation is that the code for the rings now
> > has far more use of macros in it. However, I do not feel that overall
> > readability suffers much from this change, the since the changes are
> > pretty much just search-replace onces. There should also be no ABI
> > compatibility issues with this change, since the existing rte_ring
> > structures remain the same.
> 
> I didn't dive deeply in the patches, just had a quick look. I
> understand the need, and even if I really don't like the "#define +
> #include" way to create a new specific ring (for readability,
> grepability), that may be a solution to your problem.
> 
> I think using a similar approach than in sys/queue.h would be even
> worse in terms of readability.
> 
> 
> What do you think about the following approach?
> 
> - add a new elt_size in rte_ring structure
> 
> - update create/enqueue/dequeue/... functions to manage the elt size
> 
> - change:
>     rte_ring_enqueue_bulk(struct rte_ring *r,
>       void * const *obj_table, unsigned n)
>   to:
>     rte_ring_enqueue_bulk(struct rte_ring *r, void *obj_table,
>       unsigned n) 
> 
>   This relaxes the type for the API in the function. In the caller,
>   the type of obj_table would be:
>   - (void **) in case of a ring of pointers
>   - (uint8_t *) in case of a ring of uint8_t
>   - (struct rte_event *) in case of a ring of rte_event
>   ...
> 
>   I think (I have not tested it) it won't break compilation since
>   any type can be implicitly casted into a void *. Also, I'd say it
>   is possible to avoid breaking the ABI.
> 
> - deprecate or forbid calls to:
>     rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
>     (and similar)
> 
>   Because with a ring of pointers, obj is the pointer, passed by value.
>   For other types, we would need
>     rte_ring_mp_enqueue(struct rte_ring *r, <TYPE> obj)
> 
>   Maybe we could consider using a macro here.
> 
> 
> The drawbacks I see are:
> - a dynamic elt_size may slightly decrease performance
> - it still uses casts to (void *), so there is no type checking
>

Hi Olivier,

Thanks for the feedback.

Yes, I thought about that parameterized sizes solution too, but I did
not pursue it primarily because I was worried about the performance
hits. It would mean that the actual copies of the data elements would
have to be done via memcpy calls - or switches based on size - rather
than assignments, as now. Given that all these calls to enqueue/dequeue
are inlined, that could really hurt performance, as the size of the
elements to be copied are unknown to the compiler at compile time - as
the size is stored in the struct, and not available from the API call.
The compiler type-checking, I really like, being a believer in having the
compiler do as much work as possible for us, but it is a nice-to-have
rather than a mandatory requirement. :-)

Am I right in assuming that the main issue that you see with the patch
is the use of macros may lead to problems with maintainability with the
code?

For me, while macros may not be the nicest solution to the problem:
* it does keep everything in rte_ring exactly as it was before - no API
  and ABI issues
* it should be completely hidden from the end user - most applications
  should never need to use the typed ring directly. Instead apps should
  instead use rte_ring and rte_event_ring headers.
* The majority of the code is still regular C, and the macros don't
  effect readability much IMHO. Also, it's comparatively rare that there
  are changes being made to the ring library. [Though I have a few
  follow-on ideas myself!].
* It gives us the maximum support from the compiler for type checking
  and error reporting based on that 

This patchset is not for 17.02 so we have some time to consider our
options, though I would like some resolution on this early in the 17.05
timeframe so as to reuse any solution inside any software eventdevs we
create.

Regards,
/Bruce

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [dpdk-dev] [RFC PATCH 00/11] generalise rte_ring to allow different datatypes
  2017-01-13 15:00   ` Bruce Richardson
@ 2017-01-17 13:38     ` Olivier Matz
  2017-01-18 11:09       ` Bruce Richardson
  2017-01-19 12:10       ` Bruce Richardson
  0 siblings, 2 replies; 18+ messages in thread
From: Olivier Matz @ 2017-01-17 13:38 UTC (permalink / raw)
  To: Bruce Richardson; +Cc: dev

Hi Bruce,

On Fri, 13 Jan 2017 15:00:54 +0000, Bruce Richardson
<bruce.richardson@intel.com> wrote:
> On Fri, Jan 13, 2017 at 03:23:34PM +0100, Olivier Matz wrote:
> > Hi Bruce,
> > 
> > On Wed, 11 Jan 2017 15:05:14 +0000, Bruce Richardson
> > <bruce.richardson@intel.com> wrote:  
> > > The rte_ring library in DPDK provides an excellent
> > > high-performance mechanism which can be used for passing pointers
> > > between cores and for other tasks such as buffering. However, it
> > > does have a number of limitations:
> > > 
> > > * type information of pointers is lost, as it works with void
> > > pointers
> > > * typecasting is needed when using enqueue/dequeue burst
> > > functions, since arrays of other types cannot be automatically
> > > cast to void **
> > > * the data to be passed through the ring itself must be no bigger
> > > than a pointer
> > > 
> > > While the first two limitations are an inconvenience, the final
> > > one is one that can prevent use of rte_rings in cases where their
> > > functionality is needed. The use-case which has inspired the
> > > patchset is that of eventdev. When working with rte_events, each
> > > event is a 16-byte structure consisting of a pointer and some
> > > metadata e.g. priority and type. For these events, what is passed
> > > around between cores is not pointers to events, but the events
> > > themselves. This makes existing rings unsuitable for use by
> > > applications working with rte_events, and also for use internally
> > > inside any software implementation of an eventdev.
> > > 
> > > For rings to handle events or other similarly sized structures,
> > > e.g. NIC descriptors, etc., we then have two options - duplicate
> > > rte_ring code to create new ring implementations for each of
> > > those types, or generalise the existing code using macros so that
> > > the data type handled by each rings is a compile time paramter.
> > > This patchset takes the latter approach, and once applied would
> > > allow us to add an rte_event_ring type to DPDK using a header
> > > file containing:
> > > 
> > > #define RING_TYPE struct rte_event
> > > #define RING_TYPE_NAME rte_event
> > > #include <rte_typed_ring.h>
> > > #undef RING_TYPE_NAME
> > > #undef RING_TYPE
> > > 
> > > [NOTE: the event_ring is not defined in this set, since it
> > > depends on the eventdev implementation not present in the main
> > > tree]
> > > 
> > > If we want to elimiate some of the typecasting on our code when
> > > enqueuing and dequeuing mbuf pointers, an rte_mbuf_ring type can
> > > be similarly created using the same number of lines of code.
> > > 
> > > The downside of this generalisation is that the code for the
> > > rings now has far more use of macros in it. However, I do not
> > > feel that overall readability suffers much from this change, the
> > > since the changes are pretty much just search-replace onces.
> > > There should also be no ABI compatibility issues with this
> > > change, since the existing rte_ring structures remain the same.  
> > 
> > I didn't dive deeply in the patches, just had a quick look. I
> > understand the need, and even if I really don't like the "#define +
> > #include" way to create a new specific ring (for readability,
> > grepability), that may be a solution to your problem.
> > 
> > I think using a similar approach than in sys/queue.h would be even
> > worse in terms of readability.
> > 
> > 
> > What do you think about the following approach?
> > 
> > - add a new elt_size in rte_ring structure
> > 
> > - update create/enqueue/dequeue/... functions to manage the elt size
> > 
> > - change:
> >     rte_ring_enqueue_bulk(struct rte_ring *r,
> >       void * const *obj_table, unsigned n)
> >   to:
> >     rte_ring_enqueue_bulk(struct rte_ring *r, void *obj_table,
> >       unsigned n) 
> > 
> >   This relaxes the type for the API in the function. In the caller,
> >   the type of obj_table would be:
> >   - (void **) in case of a ring of pointers
> >   - (uint8_t *) in case of a ring of uint8_t
> >   - (struct rte_event *) in case of a ring of rte_event
> >   ...
> > 
> >   I think (I have not tested it) it won't break compilation since
> >   any type can be implicitly casted into a void *. Also, I'd say it
> >   is possible to avoid breaking the ABI.
> > 
> > - deprecate or forbid calls to:
> >     rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
> >     (and similar)
> > 
> >   Because with a ring of pointers, obj is the pointer, passed by
> > value. For other types, we would need
> >     rte_ring_mp_enqueue(struct rte_ring *r, <TYPE> obj)
> > 
> >   Maybe we could consider using a macro here.
> > 
> > 
> > The drawbacks I see are:
> > - a dynamic elt_size may slightly decrease performance
> > - it still uses casts to (void *), so there is no type checking
> >  
> 
> Hi Olivier,
> 
> Thanks for the feedback.
> 
> Yes, I thought about that parameterized sizes solution too, but I did
> not pursue it primarily because I was worried about the performance
> hits. It would mean that the actual copies of the data elements would
> have to be done via memcpy calls - or switches based on size - rather
> than assignments, as now. Given that all these calls to
> enqueue/dequeue are inlined, that could really hurt performance, as
> the size of the elements to be copied are unknown to the compiler at
> compile time - as the size is stored in the struct, and not available
> from the API call.

Maybe it's worth checking the impact. The size check could be done only
once per bulk, so it may not cost that much.

It's also possible to have a particular case for pointer size, and
use a memcpy for other sizes.


> The compiler type-checking, I really like, being a
> believer in having the compiler do as much work as possible for us,
> but it is a nice-to-have rather than a mandatory requirement. :-)
> 
> Am I right in assuming that the main issue that you see with the patch
> is the use of macros may lead to problems with maintainability with
> the code?

Yes, from my experience, having unusual macros leads to loose time when
trying to understand, use or change the code.


> For me, while macros may not be the nicest solution to the problem:
> * it does keep everything in rte_ring exactly as it was before - no
> API and ABI issues
> * it should be completely hidden from the end user - most applications
>   should never need to use the typed ring directly. Instead apps
> should instead use rte_ring and rte_event_ring headers.
> * The majority of the code is still regular C, and the macros don't
>   effect readability much IMHO. Also, it's comparatively rare that
> there are changes being made to the ring library. [Though I have a few
>   follow-on ideas myself!].
> * It gives us the maximum support from the compiler for type checking
>   and error reporting based on that 
> 
> This patchset is not for 17.02 so we have some time to consider our
> options, though I would like some resolution on this early in the
> 17.05 timeframe so as to reuse any solution inside any software
> eventdevs we create.


Yes, I hear your arguments. I don't have much to oppose except
readability. Hmm the fact that init functions become static inline
also bothers me a bit. All functions are static inline, so it closes the
door to de-inline functions in the future.

I think having a performance test showing storing the elt size in the
ring structure has a deep impact would help to reach a consensus
faster :)


Regards,
Olivier

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [dpdk-dev] [RFC PATCH 00/11] generalise rte_ring to allow different datatypes
  2017-01-17 13:38     ` Olivier Matz
@ 2017-01-18 11:09       ` Bruce Richardson
  2017-01-19 12:10       ` Bruce Richardson
  1 sibling, 0 replies; 18+ messages in thread
From: Bruce Richardson @ 2017-01-18 11:09 UTC (permalink / raw)
  To: Olivier Matz; +Cc: dev

On Tue, Jan 17, 2017 at 02:38:20PM +0100, Olivier Matz wrote:
> Hi Bruce,
> 
> On Fri, 13 Jan 2017 15:00:54 +0000, Bruce Richardson
> <bruce.richardson@intel.com> wrote:
> > Hi Olivier,
> > 
> > Thanks for the feedback.
> > 
> > Yes, I thought about that parameterized sizes solution too, but I did
> > not pursue it primarily because I was worried about the performance
> > hits. It would mean that the actual copies of the data elements would
> > have to be done via memcpy calls - or switches based on size - rather
> > than assignments, as now. Given that all these calls to
> > enqueue/dequeue are inlined, that could really hurt performance, as
> > the size of the elements to be copied are unknown to the compiler at
> > compile time - as the size is stored in the struct, and not available
> > from the API call.
> 
> Maybe it's worth checking the impact. The size check could be done only
> once per bulk, so it may not cost that much.
> 
> It's also possible to have a particular case for pointer size, and
> use a memcpy for other sizes.
> 
I think if we go with this approach, just allowing sizes of 8/16/32
bytes may be the best, and we can optimize element assignments for those
particular sizes. I'd hold off on having other sizes beyond those until
such time as we have a concrete use case for it.

> 
> > The compiler type-checking, I really like, being a
> > believer in having the compiler do as much work as possible for us,
> > but it is a nice-to-have rather than a mandatory requirement. :-)
> > 
> > Am I right in assuming that the main issue that you see with the patch
> > is the use of macros may lead to problems with maintainability with
> > the code?
> 
> Yes, from my experience, having unusual macros leads to loose time when
> trying to understand, use or change the code.
> 
> 
> > For me, while macros may not be the nicest solution to the problem:
> > * it does keep everything in rte_ring exactly as it was before - no
> > API and ABI issues
> > * it should be completely hidden from the end user - most applications
> >   should never need to use the typed ring directly. Instead apps
> > should instead use rte_ring and rte_event_ring headers.
> > * The majority of the code is still regular C, and the macros don't
> >   effect readability much IMHO. Also, it's comparatively rare that
> > there are changes being made to the ring library. [Though I have a few
> >   follow-on ideas myself!].
> > * It gives us the maximum support from the compiler for type checking
> >   and error reporting based on that 
> > 
> > This patchset is not for 17.02 so we have some time to consider our
> > options, though I would like some resolution on this early in the
> > 17.05 timeframe so as to reuse any solution inside any software
> > eventdevs we create.
> 
> 
> Yes, I hear your arguments. I don't have much to oppose except
> readability. Hmm the fact that init functions become static inline
> also bothers me a bit. All functions are static inline, so it closes the
> door to de-inline functions in the future.

Yes, it does close the door for now. However, I'd actually view this as
a positive since it eliminates problems of ABI compatibility. We can
freely change the internals of the ring from one release to the next, so
long as the API stays the same for compilation.

> 
> I think having a performance test showing storing the elt size in the
> ring structure has a deep impact would help to reach a consensus
> faster :)
> 

I agree. I'll do some prototyping and see what the perf is like with
elt size in the ring structure. I'll also see what other alternative
approaches can be come up with here.

/Bruce

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [dpdk-dev] [RFC PATCH 00/11] generalise rte_ring to allow different datatypes
  2017-01-17 13:38     ` Olivier Matz
  2017-01-18 11:09       ` Bruce Richardson
@ 2017-01-19 12:10       ` Bruce Richardson
  2017-01-19 12:15         ` Ferruh Yigit
  1 sibling, 1 reply; 18+ messages in thread
From: Bruce Richardson @ 2017-01-19 12:10 UTC (permalink / raw)
  To: Olivier Matz; +Cc: dev

On Tue, Jan 17, 2017 at 02:38:20PM +0100, Olivier Matz wrote:
> Hi Bruce,
> 
> Maybe it's worth checking the impact. The size check could be done only
> once per bulk, so it may not cost that much.
> 
> It's also possible to have a particular case for pointer size, and
> use a memcpy for other sizes.
> 
> 
<snip> 
> I think having a performance test showing storing the elt size in the
> ring structure has a deep impact would help to reach a consensus
> faster :)
> 
> 
Hi Olivier,

I did a quick prototype using a switch statement for three data element
sizes: 8, 16, and 32 bytes. The performance difference was neglible to
none. In most cases, with ring_perf_autotest on my system, there was a
small degradation, of less than 1 cycle per packet, and a few were
slightly faster, probably due to the natural variation in results
between runs. I did not test with any memcpy calls in the datapath, all
assignments were done using uint64_t's or vectors of the appropriate
sizes.

Therefore it looks like some kind of solution without macros and using a
stored element size is possible. However, I think there is a third
alternative too. It is outlined below as option 3.

	1. Use macros as in original RFC

	2. Update rte_ring like I did for tests described above so that
	   create takes the size parameter, and the switch statment in
	   enqueue and dequeue looks that up at runtime.
	   This means that rte_ring becomes the type used for all
	   transfers of all sizes. Also, enqueues/dequeue functions take
	   void * or const void * obj_table parameters rather than void
	   ** and void * const * obj_table. 
	   Downside, this would change the ring API and ABI, and the
	   ring maintains no type information

	3. Update rte_ring as above but rename it to rte_common_ring,
	   and have the element size parameter passed to enqueue and
	   dequeue functions too - allowing the compiler to optimise the
	   switch out. Then we update the existing rte_ring to use the
	   rte_common_ring calls, passing in sizeof(void *) as parameter
	   to each common call. An event-ring type, or any other ring
	   types can similarly be written using common ring code, and
	   present the appropriate type information on enqueue/dequeue
	   to the apps using them.
	   Downside: more code to maintain, and more specialised APIs.

Personally, because I like having type-specialised code, I prefer the
third option. It also gives us the ability to change the common code
without affecting the API/ABI of the rings [which could be updated later
after a proper deprecation period, if we want].

An example of a change I have in mind for this common code would be some
rework around the watermarks support. While the watermarks support is
useful, for the event_rings we actually need more information provided
from enqueue. To that end, I would see the common_rings code changed so
that the enqueue function returns an additional parameter of the
amount of space left in the ring. This information is computed by the
function anyway, and can therefore be efficiently returned by the calls.
For sp_enqueue, this extra parameter would allow the app to know the
minimum number of elements which can be successfully enqueued to the
ring in a subsequent call. The existing rte_ring code can use the return
value to calculate itself if the watermark is exceeded and return a
value as it does now. Other ring types can then decide themselves if
they want to provide watermark functionality, or what their API set
would be - though it's probably best to keep the APIs consistent.

Further thoughts?
/Bruce

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [dpdk-dev] [RFC PATCH 00/11] generalise rte_ring to allow different datatypes
  2017-01-19 12:10       ` Bruce Richardson
@ 2017-01-19 12:15         ` Ferruh Yigit
  0 siblings, 0 replies; 18+ messages in thread
From: Ferruh Yigit @ 2017-01-19 12:15 UTC (permalink / raw)
  To: Bruce Richardson, Olivier Matz; +Cc: dev

On 1/19/2017 12:10 PM, Bruce Richardson wrote:
> On Tue, Jan 17, 2017 at 02:38:20PM +0100, Olivier Matz wrote:
>> Hi Bruce,
>>
>> Maybe it's worth checking the impact. The size check could be done only
>> once per bulk, so it may not cost that much.
>>
>> It's also possible to have a particular case for pointer size, and
>> use a memcpy for other sizes.
>>
>>
> <snip> 
>> I think having a performance test showing storing the elt size in the
>> ring structure has a deep impact would help to reach a consensus
>> faster :)
>>
>>
> Hi Olivier,
> 
> I did a quick prototype using a switch statement for three data element
> sizes: 8, 16, and 32 bytes. The performance difference was neglible to
> none. In most cases, with ring_perf_autotest on my system, there was a
> small degradation, of less than 1 cycle per packet, and a few were
> slightly faster, probably due to the natural variation in results
> between runs. I did not test with any memcpy calls in the datapath, all
> assignments were done using uint64_t's or vectors of the appropriate
> sizes.
> 
> Therefore it looks like some kind of solution without macros and using a
> stored element size is possible. However, I think there is a third
> alternative too. It is outlined below as option 3.
> 
> 	1. Use macros as in original RFC
> 
> 	2. Update rte_ring like I did for tests described above so that
> 	   create takes the size parameter, and the switch statment in
> 	   enqueue and dequeue looks that up at runtime.
> 	   This means that rte_ring becomes the type used for all
> 	   transfers of all sizes. Also, enqueues/dequeue functions take
> 	   void * or const void * obj_table parameters rather than void
> 	   ** and void * const * obj_table. 
> 	   Downside, this would change the ring API and ABI, and the
> 	   ring maintains no type information
> 
> 	3. Update rte_ring as above but rename it to rte_common_ring,
> 	   and have the element size parameter passed to enqueue and
> 	   dequeue functions too - allowing the compiler to optimise the
> 	   switch out. Then we update the existing rte_ring to use the
> 	   rte_common_ring calls, passing in sizeof(void *) as parameter
> 	   to each common call. An event-ring type, or any other ring
> 	   types can similarly be written using common ring code, and
> 	   present the appropriate type information on enqueue/dequeue
> 	   to the apps using them.
> 	   Downside: more code to maintain, and more specialised APIs.
> 
> Personally, because I like having type-specialised code, I prefer the
> third option. It also gives us the ability to change the common code
> without affecting the API/ABI of the rings [which could be updated later
> after a proper deprecation period, if we want].

+1 for third option.

> 
> An example of a change I have in mind for this common code would be some
> rework around the watermarks support. While the watermarks support is
> useful, for the event_rings we actually need more information provided
> from enqueue. To that end, I would see the common_rings code changed so
> that the enqueue function returns an additional parameter of the
> amount of space left in the ring. This information is computed by the
> function anyway, and can therefore be efficiently returned by the calls.
> For sp_enqueue, this extra parameter would allow the app to know the
> minimum number of elements which can be successfully enqueued to the
> ring in a subsequent call. The existing rte_ring code can use the return
> value to calculate itself if the watermark is exceeded and return a
> value as it does now. Other ring types can then decide themselves if
> they want to provide watermark functionality, or what their API set
> would be - though it's probably best to keep the APIs consistent.
> 
> Further thoughts?
> /Bruce
> 

^ permalink raw reply	[flat|nested] 18+ messages in thread

end of thread, other threads:[~2017-01-19 12:15 UTC | newest]

Thread overview: 18+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2017-01-11 15:05 [dpdk-dev] [RFC PATCH 00/11] generalise rte_ring to allow different datatypes Bruce Richardson
2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 01/11] ring: add new typed ring header file Bruce Richardson
2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 02/11] test: add new test file for typed rings Bruce Richardson
2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 03/11] ring: add ring management functions to typed ring header Bruce Richardson
2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 04/11] ring: make ring tailq variable public Bruce Richardson
2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 05/11] ring: add user-specified typing to typed rings Bruce Richardson
2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 06/11] ring: use existing power-of-2 function Bruce Richardson
2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 07/11] ring: allow multiple typed rings in the same unit Bruce Richardson
2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 08/11] app/pdump: remove duplicate macro definition Bruce Richardson
2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 09/11] ring: make existing rings reuse the typed ring definitions Bruce Richardson
2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 10/11] ring: reuse typed rings management functions Bruce Richardson
2017-01-11 15:05 ` [dpdk-dev] [RFC PATCH 11/11] ring: reuse typed ring enqueue and dequeue functions Bruce Richardson
2017-01-13 14:23 ` [dpdk-dev] [RFC PATCH 00/11] generalise rte_ring to allow different datatypes Olivier Matz
2017-01-13 15:00   ` Bruce Richardson
2017-01-17 13:38     ` Olivier Matz
2017-01-18 11:09       ` Bruce Richardson
2017-01-19 12:10       ` Bruce Richardson
2017-01-19 12:15         ` Ferruh Yigit

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).