From: Konstantin Ananyev <konstantin.ananyev@intel.com>
To: dev@dpdk.org
Cc: olivier.matz@6wind.com, honnappa.nagarahalli@arm.com,
jerinj@marvell.com, drc@linux.vnet.ibm.com,
stephen@networkplumber.org,
Konstantin Ananyev <konstantin.ananyev@intel.com>
Subject: [dpdk-dev] [RFC] ring: make ring implementation non-inlined
Date: Fri, 20 Mar 2020 16:41:38 +0000 [thread overview]
Message-ID: <20200320164138.8510-1-konstantin.ananyev@intel.com> (raw)
As was discussed here:
http://mails.dpdk.org/archives/dev/2020-February/158586.html
this RFC aimed to hide ring internals into .c and make all
ring functions non-inlined. In theory that might help to
maintain ABI stability in future.
This is just a POC to measure the impact of proposed idea,
proper implementation would definetly need some extra effort.
On IA box (SKX) ring_perf_autotest shows ~20-30 cycles extra for
enqueue+dequeue pair. On some more realistic code, I suspect
the impact it might be a bit higher.
For MP/MC bulk transfers degradation seems quite small,
though for SP/SC and/or small transfers it is more then noticable
(see exact numbers below).
From my perspective we'd probably keep it inlined for now
to avoid any non-anticipated perfomance degradations.
Though intersted to see perf results and opinions from
other interested parties.
Intel(R) Xeon(R) Platinum 8160 CPU @ 2.10GHz
ring_perf_autotest (without patch/with patch)
### Testing single element enq/deq ###
legacy APIs: SP/SC: single: 8.75/43.23
legacy APIs: MP/MC: single: 56.18/80.44
### Testing burst enq/deq ###
legacy APIs: SP/SC: burst (size: 8): 37.36/53.37
legacy APIs: SP/SC: burst (size: 32): 93.97/117.30
legacy APIs: MP/MC: burst (size: 8): 78.23/91.45
legacy APIs: MP/MC: burst (size: 32): 131.59/152.49
### Testing bulk enq/deq ###
legacy APIs: SP/SC: bulk (size: 8): 37.29/54.48
legacy APIs: SP/SC: bulk (size: 32): 92.68/113.01
legacy APIs: MP/MC: bulk (size: 8): 78.40/93.50
legacy APIs: MP/MC: bulk (size: 32): 131.49/154.25
### Testing empty bulk deq ###
legacy APIs: SP/SC: bulk (size: 8): 4.00/16.86
legacy APIs: MP/MC: bulk (size: 8): 7.01/15.55
### Testing using two hyperthreads ###
legacy APIs: SP/SC: bulk (size: 8): 10.64/17.56
legacy APIs: MP/MC: bulk (size: 8): 15.30/16.69
legacy APIs: SP/SC: bulk (size: 32): 5.84/7.09
legacy APIs: MP/MC: bulk (size: 32): 6.34/7.54
### Testing using two physical cores ###
legacy APIs: SP/SC: bulk (size: 8): 24.34/42.40
legacy APIs: MP/MC: bulk (size: 8): 70.34/71.82
legacy APIs: SP/SC: bulk (size: 32): 12.67/14.68
legacy APIs: MP/MC: bulk (size: 32): 22.41/17.93
### Testing single element enq/deq ###
elem APIs: element size 16B: SP/SC: single: 10.65/41.96
elem APIs: element size 16B: MP/MC: single: 44.33/81.36
### Testing burst enq/deq ###
elem APIs: element size 16B: SP/SC: burst (size: 8): 39.20/58.52
elem APIs: element size 16B: SP/SC: burst (size: 32): 123.19/142.79
elem APIs: element size 16B: MP/MC: burst (size: 8): 80.72/101.36
elem APIs: element size 16B: MP/MC: burst (size: 32): 169.21/185.38
### Testing bulk enq/deq ###
elem APIs: element size 16B: SP/SC: bulk (size: 8): 41.64/58.46
elem APIs: element size 16B: SP/SC: bulk (size: 32): 122.74/142.52
elem APIs: element size 16B: MP/MC: bulk (size: 8): 80.60/103.14
elem APIs: element size 16B: MP/MC: bulk (size: 32): 169.39/186.67
### Testing empty bulk deq ###
elem APIs: element size 16B: SP/SC: bulk (size: 8): 5.01/17.17
elem APIs: element size 16B: MP/MC: bulk (size: 8): 6.01/14.80
### Testing using two hyperthreads ###
elem APIs: element size 16B: SP/SC: bulk (size: 8): 12.02/17.18
elem APIs: element size 16B: MP/MC: bulk (size: 8): 16.81/21.14
elem APIs: element size 16B: SP/SC: bulk (size: 32): 7.87/9.01
elem APIs: element size 16B: MP/MC: bulk (size: 32): 8.22/10.57
### Testing using two physical cores ###
elem APIs: element size 16B: SP/SC: bulk (size: 8): 27.00/51.94
elem APIs: element size 16B: MP/MC: bulk (size: 8): 78.24/74.48
elem APIs: element size 16B: SP/SC: bulk (size: 32): 15.41/16.14
elem APIs: element size 16B: MP/MC: bulk (size: 32): 18.72/21.64
Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
---
app/proc-info/main.c | 20 +-
app/test-pmd/cmdline.c | 2 +-
app/test/commands.c | 2 +-
app/test/test_pdump.c | 6 +-
app/test/test_ring_stress.h | 2 +-
config/common_base | 2 +-
drivers/crypto/ccp/ccp_pmd_ops.c | 2 +-
drivers/crypto/meson.build | 1 -
drivers/net/ring/rte_eth_ring.c | 17 +-
lib/librte_eventdev/rte_event_ring.c | 6 +-
lib/librte_eventdev/rte_event_ring.h | 17 +-
lib/librte_pdump/rte_pdump.c | 3 +-
lib/librte_port/rte_port_ring.c | 29 +-
lib/librte_ring/Makefile | 6 +-
lib/librte_ring/meson.build | 6 +-
lib/librte_ring/ring_elem.c | 919 +++++++++++++++++++++++++++
lib/librte_ring/ring_impl.c | 750 ++++++++++++++++++++++
lib/librte_ring/ring_impl.h | 64 ++
lib/librte_ring/rte_ring.c | 3 +-
lib/librte_ring/rte_ring.h | 412 ++----------
lib/librte_ring/rte_ring_elem.h | 519 ++-------------
lib/librte_ring/rte_ring_version.map | 46 ++
22 files changed, 1938 insertions(+), 896 deletions(-)
create mode 100644 lib/librte_ring/ring_elem.c
create mode 100644 lib/librte_ring/ring_impl.c
create mode 100644 lib/librte_ring/ring_impl.h
diff --git a/app/proc-info/main.c b/app/proc-info/main.c
index abeca4aab..aa8f113c1 100644
--- a/app/proc-info/main.c
+++ b/app/proc-info/main.c
@@ -1137,25 +1137,7 @@ show_ring(char *name)
if (name != NULL) {
struct rte_ring *ptr = rte_ring_lookup(name);
if (ptr != NULL) {
- printf(" - Name (%s) on socket (%d)\n"
- " - flags:\n"
- "\t -- Single Producer Enqueue (%u)\n"
- "\t -- Single Consmer Dequeue (%u)\n",
- ptr->name,
- ptr->memzone->socket_id,
- ptr->flags & RING_F_SP_ENQ,
- ptr->flags & RING_F_SC_DEQ);
- printf(" - size (%u) mask (0x%x) capacity (%u)\n",
- ptr->size,
- ptr->mask,
- ptr->capacity);
- printf(" - count (%u) free count (%u)\n",
- rte_ring_count(ptr),
- rte_ring_free_count(ptr));
- printf(" - full (%d) empty (%d)\n",
- rte_ring_full(ptr),
- rte_ring_empty(ptr));
-
+ rte_ring_dump(stdout, ptr);
STATS_BDR_STR(50, "");
return;
}
diff --git a/app/test-pmd/cmdline.c b/app/test-pmd/cmdline.c
index a037a55c6..000853067 100644
--- a/app/test-pmd/cmdline.c
+++ b/app/test-pmd/cmdline.c
@@ -9562,7 +9562,7 @@ dump_struct_sizes(void)
#define DUMP_SIZE(t) printf("sizeof(" #t ") = %u\n", (unsigned)sizeof(t));
DUMP_SIZE(struct rte_mbuf);
DUMP_SIZE(struct rte_mempool);
- DUMP_SIZE(struct rte_ring);
+ //DUMP_SIZE(struct rte_ring);
#undef DUMP_SIZE
}
diff --git a/app/test/commands.c b/app/test/commands.c
index 3bf767bf7..89f93cdb2 100644
--- a/app/test/commands.c
+++ b/app/test/commands.c
@@ -107,7 +107,7 @@ dump_struct_sizes(void)
#define DUMP_SIZE(t) printf("sizeof(" #t ") = %u\n", (unsigned)sizeof(t));
DUMP_SIZE(struct rte_mbuf);
DUMP_SIZE(struct rte_mempool);
- DUMP_SIZE(struct rte_ring);
+ //DUMP_SIZE(struct rte_ring);
#undef DUMP_SIZE
}
diff --git a/app/test/test_pdump.c b/app/test/test_pdump.c
index ad183184c..6a1180bcb 100644
--- a/app/test/test_pdump.c
+++ b/app/test/test_pdump.c
@@ -57,8 +57,7 @@ run_pdump_client_tests(void)
if (ret < 0)
return -1;
mp->flags = 0x0000;
- ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(),
- RING_F_SP_ENQ | RING_F_SC_DEQ);
+ ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(), 0);
if (ring_client == NULL) {
printf("rte_ring_create SR0 failed");
return -1;
@@ -71,9 +70,6 @@ run_pdump_client_tests(void)
}
rte_eth_dev_probing_finish(eth_dev);
- ring_client->prod.single = 0;
- ring_client->cons.single = 0;
-
printf("\n***** flags = RTE_PDUMP_FLAG_TX *****\n");
for (itr = 0; itr < NUM_ITR; itr++) {
diff --git a/app/test/test_ring_stress.h b/app/test/test_ring_stress.h
index c6f0bc9f1..27d4adfb2 100644
--- a/app/test/test_ring_stress.h
+++ b/app/test/test_ring_stress.h
@@ -350,7 +350,7 @@ mt1_init(struct rte_ring **rng, void **data, uint32_t num)
/* alloc ring */
nr = 2 * num;
sz = rte_ring_get_memsize(nr);
- r = rte_zmalloc(NULL, sz, alignof(*r));
+ r = rte_zmalloc(NULL, sz, RTE_CACHE_LINE_SIZE);
if (r == NULL) {
printf("%s: alloca(%zu) for FIFO with %u elems failed",
__func__, sz, nr);
diff --git a/config/common_base b/config/common_base
index 7ca2f28b1..87ab0766d 100644
--- a/config/common_base
+++ b/config/common_base
@@ -670,7 +670,7 @@ CONFIG_RTE_LIBRTE_PMD_ZUC=n
# Compile PMD for Crypto Scheduler device
#
-CONFIG_RTE_LIBRTE_PMD_CRYPTO_SCHEDULER=y
+CONFIG_RTE_LIBRTE_PMD_CRYPTO_SCHEDULER=n
#
# Compile PMD for NULL Crypto device
diff --git a/drivers/crypto/ccp/ccp_pmd_ops.c b/drivers/crypto/ccp/ccp_pmd_ops.c
index a19e85ecb..938977dc0 100644
--- a/drivers/crypto/ccp/ccp_pmd_ops.c
+++ b/drivers/crypto/ccp/ccp_pmd_ops.c
@@ -666,7 +666,7 @@ ccp_pmd_qp_create_batch_info_ring(struct ccp_qp *qp,
r = rte_ring_lookup(qp->name);
if (r) {
- if (r->size >= ring_size) {
+ if (rte_ring_get_size(r) >= ring_size) {
CCP_LOG_INFO(
"Reusing ring %s for processed packets",
qp->name);
diff --git a/drivers/crypto/meson.build b/drivers/crypto/meson.build
index 7fa1fbe26..8c8b78db3 100644
--- a/drivers/crypto/meson.build
+++ b/drivers/crypto/meson.build
@@ -16,7 +16,6 @@ drivers = ['aesni_gcm',
'octeontx2',
'openssl',
'qat',
- 'scheduler',
'snow3g',
'virtio',
'zuc']
diff --git a/drivers/net/ring/rte_eth_ring.c b/drivers/net/ring/rte_eth_ring.c
index 41acbc513..0eaac0dc3 100644
--- a/drivers/net/ring/rte_eth_ring.c
+++ b/drivers/net/ring/rte_eth_ring.c
@@ -41,6 +41,8 @@ struct ring_queue {
struct rte_ring *rng;
rte_atomic64_t rx_pkts;
rte_atomic64_t tx_pkts;
+ uint32_t rx_sync_type;
+ uint32_t tx_sync_type;
};
struct pmd_internals {
@@ -74,7 +76,7 @@ eth_ring_rx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
struct ring_queue *r = q;
const uint16_t nb_rx = (uint16_t)rte_ring_dequeue_burst(r->rng,
ptrs, nb_bufs, NULL);
- if (r->rng->flags & RING_F_SC_DEQ)
+ if (r->rx_sync_type == RTE_RING_SYNC_ST)
r->rx_pkts.cnt += nb_rx;
else
rte_atomic64_add(&(r->rx_pkts), nb_rx);
@@ -88,7 +90,7 @@ eth_ring_tx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
struct ring_queue *r = q;
const uint16_t nb_tx = (uint16_t)rte_ring_enqueue_burst(r->rng,
ptrs, nb_bufs, NULL);
- if (r->rng->flags & RING_F_SP_ENQ)
+ if (r->tx_sync_type == RTE_RING_SYNC_ST)
r->tx_pkts.cnt += nb_tx;
else
rte_atomic64_add(&(r->tx_pkts), nb_tx);
@@ -306,10 +308,14 @@ do_eth_dev_ring_create(const char *name,
internals->max_tx_queues = nb_tx_queues;
for (i = 0; i < nb_rx_queues; i++) {
internals->rx_ring_queues[i].rng = rx_queues[i];
+ internals->rx_ring_queues[i].rx_sync_type =
+ rte_ring_get_cons_sync_type(rx_queues[i]);
data->rx_queues[i] = &internals->rx_ring_queues[i];
}
for (i = 0; i < nb_tx_queues; i++) {
internals->tx_ring_queues[i].rng = tx_queues[i];
+ internals->tx_ring_queues[i].tx_sync_type =
+ rte_ring_get_prod_sync_type(rx_queues[i]);
data->tx_queues[i] = &internals->tx_ring_queues[i];
}
@@ -403,8 +409,11 @@ rte_eth_from_rings(const char *name, struct rte_ring *const rx_queues[],
int
rte_eth_from_ring(struct rte_ring *r)
{
- return rte_eth_from_rings(r->name, &r, 1, &r, 1,
- r->memzone ? r->memzone->socket_id : SOCKET_ID_ANY);
+ const struct rte_memzone *mz;
+
+ mz = rte_ring_get_memzone(r);
+ return rte_eth_from_rings(rte_ring_get_name(r), &r, 1, &r, 1,
+ mz ? mz->socket_id : SOCKET_ID_ANY);
}
static int
diff --git a/lib/librte_eventdev/rte_event_ring.c b/lib/librte_eventdev/rte_event_ring.c
index d27e23901..99700a18c 100644
--- a/lib/librte_eventdev/rte_event_ring.c
+++ b/lib/librte_eventdev/rte_event_ring.c
@@ -16,12 +16,8 @@ int
rte_event_ring_init(struct rte_event_ring *r, const char *name,
unsigned int count, unsigned int flags)
{
- /* compilation-time checks */
- RTE_BUILD_BUG_ON((sizeof(struct rte_event_ring) &
- RTE_CACHE_LINE_MASK) != 0);
-
/* init the ring structure */
- return rte_ring_init(&r->r, name, count, flags);
+ return rte_ring_init(r, name, count, flags);
}
/* create the ring */
diff --git a/lib/librte_eventdev/rte_event_ring.h b/lib/librte_eventdev/rte_event_ring.h
index c0861b0ec..b31b33b3e 100644
--- a/lib/librte_eventdev/rte_event_ring.h
+++ b/lib/librte_eventdev/rte_event_ring.h
@@ -32,9 +32,8 @@
* used inside software eventdev implementations and by applications
* directly as needed.
*/
-struct rte_event_ring {
- struct rte_ring r;
-};
+
+#define rte_event_ring rte_ring
/**
* Returns the number of events in the ring
@@ -47,7 +46,7 @@ struct rte_event_ring {
static __rte_always_inline unsigned int
rte_event_ring_count(const struct rte_event_ring *r)
{
- return rte_ring_count(&r->r);
+ return rte_ring_count(r);
}
/**
@@ -62,7 +61,7 @@ rte_event_ring_count(const struct rte_event_ring *r)
static __rte_always_inline unsigned int
rte_event_ring_free_count(const struct rte_event_ring *r)
{
- return rte_ring_free_count(&r->r);
+ return rte_ring_free_count(r);
}
/**
@@ -93,7 +92,7 @@ rte_event_ring_enqueue_burst(struct rte_event_ring *r,
unsigned int num;
uint32_t space;
- num = rte_ring_enqueue_burst_elem(&r->r, events,
+ num = rte_ring_enqueue_burst_elem(r, events,
sizeof(struct rte_event), n,
&space);
@@ -129,7 +128,7 @@ rte_event_ring_dequeue_burst(struct rte_event_ring *r,
unsigned int num;
uint32_t remaining;
- num = rte_ring_dequeue_burst_elem(&r->r, events,
+ num = rte_ring_dequeue_burst_elem(r, events,
sizeof(struct rte_event), n,
&remaining);
@@ -250,7 +249,7 @@ rte_event_ring_free(struct rte_event_ring *r);
static inline unsigned int
rte_event_ring_get_size(const struct rte_event_ring *r)
{
- return rte_ring_get_size(&r->r);
+ return rte_ring_get_size(r);
}
/**
@@ -264,6 +263,6 @@ rte_event_ring_get_size(const struct rte_event_ring *r)
static inline unsigned int
rte_event_ring_get_capacity(const struct rte_event_ring *r)
{
- return rte_ring_get_capacity(&r->r);
+ return rte_ring_get_capacity(r);
}
#endif
diff --git a/lib/librte_pdump/rte_pdump.c b/lib/librte_pdump/rte_pdump.c
index 8a01ac510..43aa07793 100644
--- a/lib/librte_pdump/rte_pdump.c
+++ b/lib/librte_pdump/rte_pdump.c
@@ -380,7 +380,8 @@ pdump_validate_ring_mp(struct rte_ring *ring, struct rte_mempool *mp)
rte_errno = EINVAL;
return -1;
}
- if (ring->prod.single || ring->cons.single) {
+ if (rte_ring_get_prod_sync_type(ring) == RTE_RING_SYNC_ST ||
+ rte_ring_get_cons_sync_type(ring) == RTE_RING_SYNC_ST) {
PDUMP_LOG(ERR, "ring with either SP or SC settings"
" is not valid for pdump, should have MP and MC settings\n");
rte_errno = EINVAL;
diff --git a/lib/librte_port/rte_port_ring.c b/lib/librte_port/rte_port_ring.c
index 47fcdd06a..16fdee0bd 100644
--- a/lib/librte_port/rte_port_ring.c
+++ b/lib/librte_port/rte_port_ring.c
@@ -40,12 +40,15 @@ rte_port_ring_reader_create_internal(void *params, int socket_id,
struct rte_port_ring_reader_params *conf =
params;
struct rte_port_ring_reader *port;
+ uint32_t ring_sync;
+
+ ring_sync = rte_ring_get_cons_sync_type(conf->ring);
/* Check input parameters */
if ((conf == NULL) ||
(conf->ring == NULL) ||
- (conf->ring->cons.single && is_multi) ||
- (!(conf->ring->cons.single) && !is_multi)) {
+ (ring_sync == RTE_RING_SYNC_ST && is_multi) ||
+ (ring_sync != RTE_RING_SYNC_ST && !is_multi)) {
RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
return NULL;
}
@@ -167,13 +170,16 @@ rte_port_ring_writer_create_internal(void *params, int socket_id,
struct rte_port_ring_writer_params *conf =
params;
struct rte_port_ring_writer *port;
+ uint32_t ring_sync;
+
+ ring_sync = rte_ring_get_prod_sync_type(conf->ring);
/* Check input parameters */
if ((conf == NULL) ||
- (conf->ring == NULL) ||
- (conf->ring->prod.single && is_multi) ||
- (!(conf->ring->prod.single) && !is_multi) ||
- (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
+ (conf->ring == NULL) ||
+ (ring_sync == RTE_RING_SYNC_ST && is_multi) ||
+ (ring_sync != RTE_RING_SYNC_ST && !is_multi) ||
+ (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
return NULL;
}
@@ -436,13 +442,16 @@ rte_port_ring_writer_nodrop_create_internal(void *params, int socket_id,
struct rte_port_ring_writer_nodrop_params *conf =
params;
struct rte_port_ring_writer_nodrop *port;
+ uint32_t ring_sync;
+
+ ring_sync = rte_ring_get_prod_sync_type(conf->ring);
/* Check input parameters */
if ((conf == NULL) ||
- (conf->ring == NULL) ||
- (conf->ring->prod.single && is_multi) ||
- (!(conf->ring->prod.single) && !is_multi) ||
- (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
+ (conf->ring == NULL) ||
+ (ring_sync == RTE_RING_SYNC_ST && is_multi) ||
+ (ring_sync != RTE_RING_SYNC_ST && !is_multi) ||
+ (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
return NULL;
}
diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
index 917c560ad..8c3cd523d 100644
--- a/lib/librte_ring/Makefile
+++ b/lib/librte_ring/Makefile
@@ -13,11 +13,11 @@ EXPORT_MAP := rte_ring_version.map
# all source are stored in SRCS-y
SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
+SRCS-$(CONFIG_RTE_LIBRTE_RING) += ring_impl.c
+SRCS-$(CONFIG_RTE_LIBRTE_RING) += ring_elem.c
# install includes
SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
- rte_ring_elem.h \
- rte_ring_generic.h \
- rte_ring_c11_mem.h
+ rte_ring_elem.h
include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build
index f2f3ccc88..8adbdfc9d 100644
--- a/lib/librte_ring/meson.build
+++ b/lib/librte_ring/meson.build
@@ -1,11 +1,9 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright(c) 2017 Intel Corporation
-sources = files('rte_ring.c')
+sources = files('rte_ring.c', 'ring_impl.c', 'ring_elem.c')
headers = files('rte_ring.h',
- 'rte_ring_elem.h',
- 'rte_ring_c11_mem.h',
- 'rte_ring_generic.h')
+ 'rte_ring_elem.h')
# rte_ring_create_elem and rte_ring_get_memsize_elem are experimental
allow_experimental_apis = true
diff --git a/lib/librte_ring/ring_elem.c b/lib/librte_ring/ring_elem.c
new file mode 100644
index 000000000..3588725e7
--- /dev/null
+++ b/lib/librte_ring/ring_elem.c
@@ -0,0 +1,919 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2019 Arm Limited
+ * Copyright (c) 2010-2017 Intel Corporation
+ * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
+ * All rights reserved.
+ * Derived from FreeBSD's bufring.h
+ * Used as BSD-3 Licensed with permission from Kip Macy.
+ */
+
+/**
+ * @file
+ * RTE Ring with user defined element size
+ */
+
+#include <stdio.h>
+#include <stdint.h>
+#include <string.h>
+#include <sys/queue.h>
+#include <errno.h>
+#include <rte_common.h>
+#include <rte_config.h>
+#include <rte_memory.h>
+#include <rte_lcore.h>
+#include <rte_atomic.h>
+#include <rte_branch_prediction.h>
+#include <rte_memzone.h>
+#include <rte_pause.h>
+
+#include "ring_impl.h"
+
+static __rte_always_inline void
+__rte_ring_enqueue_elems_32(struct rte_ring *r, const uint32_t size,
+ uint32_t idx, const void *obj_table, uint32_t n)
+{
+ unsigned int i;
+ uint32_t *ring = (uint32_t *)&r[1];
+ const uint32_t *obj = (const uint32_t *)obj_table;
+ if (likely(idx + n < size)) {
+ for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
+ ring[idx] = obj[i];
+ ring[idx + 1] = obj[i + 1];
+ ring[idx + 2] = obj[i + 2];
+ ring[idx + 3] = obj[i + 3];
+ ring[idx + 4] = obj[i + 4];
+ ring[idx + 5] = obj[i + 5];
+ ring[idx + 6] = obj[i + 6];
+ ring[idx + 7] = obj[i + 7];
+ }
+ switch (n & 0x7) {
+ case 7:
+ ring[idx++] = obj[i++]; /* fallthrough */
+ case 6:
+ ring[idx++] = obj[i++]; /* fallthrough */
+ case 5:
+ ring[idx++] = obj[i++]; /* fallthrough */
+ case 4:
+ ring[idx++] = obj[i++]; /* fallthrough */
+ case 3:
+ ring[idx++] = obj[i++]; /* fallthrough */
+ case 2:
+ ring[idx++] = obj[i++]; /* fallthrough */
+ case 1:
+ ring[idx++] = obj[i++]; /* fallthrough */
+ }
+ } else {
+ for (i = 0; idx < size; i++, idx++)
+ ring[idx] = obj[i];
+ /* Start at the beginning */
+ for (idx = 0; i < n; i++, idx++)
+ ring[idx] = obj[i];
+ }
+}
+
+static __rte_always_inline void
+__rte_ring_enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
+ const void *obj_table, uint32_t n)
+{
+ unsigned int i;
+ const uint32_t size = r->size;
+ uint32_t idx = prod_head & r->mask;
+ uint64_t *ring = (uint64_t *)&r[1];
+ const uint64_t *obj = (const uint64_t *)obj_table;
+ if (likely(idx + n < size)) {
+ for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
+ ring[idx] = obj[i];
+ ring[idx + 1] = obj[i + 1];
+ ring[idx + 2] = obj[i + 2];
+ ring[idx + 3] = obj[i + 3];
+ }
+ switch (n & 0x3) {
+ case 3:
+ ring[idx++] = obj[i++]; /* fallthrough */
+ case 2:
+ ring[idx++] = obj[i++]; /* fallthrough */
+ case 1:
+ ring[idx++] = obj[i++];
+ }
+ } else {
+ for (i = 0; idx < size; i++, idx++)
+ ring[idx] = obj[i];
+ /* Start at the beginning */
+ for (idx = 0; i < n; i++, idx++)
+ ring[idx] = obj[i];
+ }
+}
+
+static __rte_always_inline void
+__rte_ring_enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
+ const void *obj_table, uint32_t n)
+{
+ unsigned int i;
+ const uint32_t size = r->size;
+ uint32_t idx = prod_head & r->mask;
+ rte_int128_t *ring = (rte_int128_t *)&r[1];
+ const rte_int128_t *obj = (const rte_int128_t *)obj_table;
+ if (likely(idx + n < size)) {
+ for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
+ memcpy((void *)(ring + idx),
+ (const void *)(obj + i), 32);
+ switch (n & 0x1) {
+ case 1:
+ memcpy((void *)(ring + idx),
+ (const void *)(obj + i), 16);
+ }
+ } else {
+ for (i = 0; idx < size; i++, idx++)
+ memcpy((void *)(ring + idx),
+ (const void *)(obj + i), 16);
+ /* Start at the beginning */
+ for (idx = 0; i < n; i++, idx++)
+ memcpy((void *)(ring + idx),
+ (const void *)(obj + i), 16);
+ }
+}
+
+/* the actual enqueue of elements on the ring.
+ * Placed here since identical code needed in both
+ * single and multi producer enqueue functions.
+ */
+static __rte_always_inline void
+__rte_ring_enqueue_elems(struct rte_ring *r, uint32_t prod_head,
+ const void *obj_table, uint32_t esize, uint32_t num)
+{
+ /* 8B and 16B copies implemented individually to retain
+ * the current performance.
+ */
+ if (esize == 8)
+ __rte_ring_enqueue_elems_64(r, prod_head, obj_table, num);
+ else if (esize == 16)
+ __rte_ring_enqueue_elems_128(r, prod_head, obj_table, num);
+ else {
+ uint32_t idx, scale, nr_idx, nr_num, nr_size;
+
+ /* Normalize to uint32_t */
+ scale = esize / sizeof(uint32_t);
+ nr_num = num * scale;
+ idx = prod_head & r->mask;
+ nr_idx = idx * scale;
+ nr_size = r->size * scale;
+ __rte_ring_enqueue_elems_32(r, nr_size, nr_idx,
+ obj_table, nr_num);
+ }
+}
+
+static __rte_always_inline void
+__rte_ring_dequeue_elems_32(struct rte_ring *r, const uint32_t size,
+ uint32_t idx, void *obj_table, uint32_t n)
+{
+ unsigned int i;
+ uint32_t *ring = (uint32_t *)&r[1];
+ uint32_t *obj = (uint32_t *)obj_table;
+ if (likely(idx + n < size)) {
+ for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
+ obj[i] = ring[idx];
+ obj[i + 1] = ring[idx + 1];
+ obj[i + 2] = ring[idx + 2];
+ obj[i + 3] = ring[idx + 3];
+ obj[i + 4] = ring[idx + 4];
+ obj[i + 5] = ring[idx + 5];
+ obj[i + 6] = ring[idx + 6];
+ obj[i + 7] = ring[idx + 7];
+ }
+ switch (n & 0x7) {
+ case 7:
+ obj[i++] = ring[idx++]; /* fallthrough */
+ case 6:
+ obj[i++] = ring[idx++]; /* fallthrough */
+ case 5:
+ obj[i++] = ring[idx++]; /* fallthrough */
+ case 4:
+ obj[i++] = ring[idx++]; /* fallthrough */
+ case 3:
+ obj[i++] = ring[idx++]; /* fallthrough */
+ case 2:
+ obj[i++] = ring[idx++]; /* fallthrough */
+ case 1:
+ obj[i++] = ring[idx++]; /* fallthrough */
+ }
+ } else {
+ for (i = 0; idx < size; i++, idx++)
+ obj[i] = ring[idx];
+ /* Start at the beginning */
+ for (idx = 0; i < n; i++, idx++)
+ obj[i] = ring[idx];
+ }
+}
+
+static __rte_always_inline void
+__rte_ring_dequeue_elems_64(struct rte_ring *r, uint32_t prod_head,
+ void *obj_table, uint32_t n)
+{
+ unsigned int i;
+ const uint32_t size = r->size;
+ uint32_t idx = prod_head & r->mask;
+ uint64_t *ring = (uint64_t *)&r[1];
+ uint64_t *obj = (uint64_t *)obj_table;
+ if (likely(idx + n < size)) {
+ for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
+ obj[i] = ring[idx];
+ obj[i + 1] = ring[idx + 1];
+ obj[i + 2] = ring[idx + 2];
+ obj[i + 3] = ring[idx + 3];
+ }
+ switch (n & 0x3) {
+ case 3:
+ obj[i++] = ring[idx++]; /* fallthrough */
+ case 2:
+ obj[i++] = ring[idx++]; /* fallthrough */
+ case 1:
+ obj[i++] = ring[idx++]; /* fallthrough */
+ }
+ } else {
+ for (i = 0; idx < size; i++, idx++)
+ obj[i] = ring[idx];
+ /* Start at the beginning */
+ for (idx = 0; i < n; i++, idx++)
+ obj[i] = ring[idx];
+ }
+}
+
+static __rte_always_inline void
+__rte_ring_dequeue_elems_128(struct rte_ring *r, uint32_t prod_head,
+ void *obj_table, uint32_t n)
+{
+ unsigned int i;
+ const uint32_t size = r->size;
+ uint32_t idx = prod_head & r->mask;
+ rte_int128_t *ring = (rte_int128_t *)&r[1];
+ rte_int128_t *obj = (rte_int128_t *)obj_table;
+ if (likely(idx + n < size)) {
+ for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
+ memcpy((void *)(obj + i), (void *)(ring + idx), 32);
+ switch (n & 0x1) {
+ case 1:
+ memcpy((void *)(obj + i), (void *)(ring + idx), 16);
+ }
+ } else {
+ for (i = 0; idx < size; i++, idx++)
+ memcpy((void *)(obj + i), (void *)(ring + idx), 16);
+ /* Start at the beginning */
+ for (idx = 0; i < n; i++, idx++)
+ memcpy((void *)(obj + i), (void *)(ring + idx), 16);
+ }
+}
+
+/* the actual dequeue of elements from the ring.
+ * Placed here since identical code needed in both
+ * single and multi producer enqueue functions.
+ */
+static __rte_always_inline void
+__rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head,
+ void *obj_table, uint32_t esize, uint32_t num)
+{
+ /* 8B and 16B copies implemented individually to retain
+ * the current performance.
+ */
+ if (esize == 8)
+ __rte_ring_dequeue_elems_64(r, cons_head, obj_table, num);
+ else if (esize == 16)
+ __rte_ring_dequeue_elems_128(r, cons_head, obj_table, num);
+ else {
+ uint32_t idx, scale, nr_idx, nr_num, nr_size;
+
+ /* Normalize to uint32_t */
+ scale = esize / sizeof(uint32_t);
+ nr_num = num * scale;
+ idx = cons_head & r->mask;
+ nr_idx = idx * scale;
+ nr_size = r->size * scale;
+ __rte_ring_dequeue_elems_32(r, nr_size, nr_idx,
+ obj_table, nr_num);
+ }
+}
+
+/* Between load and load. there might be cpu reorder in weak model
+ * (powerpc/arm).
+ * There are 2 choices for the users
+ * 1.use rmb() memory barrier
+ * 2.use one-direction load_acquire/store_release barrier,defined by
+ * CONFIG_RTE_USE_C11_MEM_MODEL=y
+ * It depends on performance test results.
+ * By default, move common functions to rte_ring_generic.h
+ */
+#ifdef RTE_USE_C11_MEM_MODEL
+#include "rte_ring_c11_mem.h"
+#else
+#include "rte_ring_generic.h"
+#endif
+
+/**
+ * @internal Enqueue several objects on the ring
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of objects.
+ * @param esize
+ * The size of ring element, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the ring. Otherwise
+ * the results are undefined.
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
+ * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param is_sp
+ * Indicates whether to use single producer or multi-producer head update
+ * @param free_space
+ * returns the amount of space after the enqueue operation has finished
+ * @return
+ * Actual number of objects enqueued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_enqueue_elem(struct rte_ring *r, const void *obj_table,
+ unsigned int esize, unsigned int n,
+ enum rte_ring_queue_behavior behavior, unsigned int is_sp,
+ unsigned int *free_space)
+{
+ uint32_t prod_head, prod_next;
+ uint32_t free_entries;
+
+ n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
+ &prod_head, &prod_next, &free_entries);
+ if (n == 0)
+ goto end;
+
+ __rte_ring_enqueue_elems(r, prod_head, obj_table, esize, n);
+
+ update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
+end:
+ if (free_space != NULL)
+ *free_space = free_entries - n;
+ return n;
+}
+
+/**
+ * @internal Dequeue several objects from the ring
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of objects.
+ * @param esize
+ * The size of ring element, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the ring. Otherwise
+ * the results are undefined.
+ * @param n
+ * The number of objects to pull from the ring.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
+ * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param is_sc
+ * Indicates whether to use single consumer or multi-consumer head update
+ * @param available
+ * returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ * - Actual number of objects dequeued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table,
+ unsigned int esize, unsigned int n,
+ enum rte_ring_queue_behavior behavior, unsigned int is_sc,
+ unsigned int *available)
+{
+ uint32_t cons_head, cons_next;
+ uint32_t entries;
+
+ n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
+ &cons_head, &cons_next, &entries);
+ if (n == 0)
+ goto end;
+
+ __rte_ring_dequeue_elems(r, cons_head, obj_table, esize, n);
+
+ update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
+
+end:
+ if (available != NULL)
+ *available = entries - n;
+ return n;
+}
+
+/**
+ * Enqueue several objects on the ring (multi-producers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * producer index atomically.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of objects.
+ * @param esize
+ * The size of ring element, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the ring. Otherwise
+ * the results are undefined.
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
+ * @return
+ * The number of objects enqueued, either 0 or n
+ */
+unsigned int
+rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
+ unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+ return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
+ RTE_RING_QUEUE_FIXED, __IS_MP, free_space);
+}
+
+/**
+ * Enqueue several objects on a ring
+ *
+ * @warning This API is NOT multi-producers safe
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of objects.
+ * @param esize
+ * The size of ring element, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the ring. Otherwise
+ * the results are undefined.
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
+ * @return
+ * The number of objects enqueued, either 0 or n
+ */
+unsigned int
+rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
+ unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+ return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
+ RTE_RING_QUEUE_FIXED, __IS_SP, free_space);
+}
+
+/**
+ * Enqueue several objects on a ring.
+ *
+ * This function calls the multi-producer or the single-producer
+ * version depending on the default behavior that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of objects.
+ * @param esize
+ * The size of ring element, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the ring. Otherwise
+ * the results are undefined.
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
+ * @return
+ * The number of objects enqueued, either 0 or n
+ */
+unsigned int
+rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
+ unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+ return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
+ RTE_RING_QUEUE_FIXED, r->prod.single, free_space);
+}
+
+/**
+ * Enqueue one object on a ring (multi-producers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * producer index atomically.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj
+ * A pointer to the object to be added.
+ * @param esize
+ * The size of ring element, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the ring. Otherwise
+ * the results are undefined.
+ * @return
+ * - 0: Success; objects enqueued.
+ * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ */
+int
+rte_ring_mp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
+{
+ return rte_ring_mp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
+ -ENOBUFS;
+}
+
+/**
+ * Enqueue one object on a ring
+ *
+ * @warning This API is NOT multi-producers safe
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj
+ * A pointer to the object to be added.
+ * @param esize
+ * The size of ring element, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the ring. Otherwise
+ * the results are undefined.
+ * @return
+ * - 0: Success; objects enqueued.
+ * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ */
+int
+rte_ring_sp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
+{
+ return rte_ring_sp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
+ -ENOBUFS;
+}
+
+/**
+ * Enqueue one object on a ring.
+ *
+ * This function calls the multi-producer or the single-producer
+ * version, depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj
+ * A pointer to the object to be added.
+ * @param esize
+ * The size of ring element, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the ring. Otherwise
+ * the results are undefined.
+ * @return
+ * - 0: Success; objects enqueued.
+ * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ */
+int
+rte_ring_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
+{
+ return rte_ring_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
+ -ENOBUFS;
+}
+
+/**
+ * Dequeue several objects from a ring (multi-consumers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * consumer index atomically.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of objects that will be filled.
+ * @param esize
+ * The size of ring element, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the ring. Otherwise
+ * the results are undefined.
+ * @param n
+ * The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries after the
+ * dequeue has finished.
+ * @return
+ * The number of objects dequeued, either 0 or n
+ */
+unsigned int
+rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
+ unsigned int esize, unsigned int n, unsigned int *available)
+{
+ return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
+ RTE_RING_QUEUE_FIXED, __IS_MC, available);
+}
+
+/**
+ * Dequeue several objects from a ring (NOT multi-consumers safe).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of objects that will be filled.
+ * @param esize
+ * The size of ring element, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the ring. Otherwise
+ * the results are undefined.
+ * @param n
+ * The number of objects to dequeue from the ring to the obj_table,
+ * must be strictly positive.
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries after the
+ * dequeue has finished.
+ * @return
+ * The number of objects dequeued, either 0 or n
+ */
+unsigned int
+rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
+ unsigned int esize, unsigned int n, unsigned int *available)
+{
+ return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
+ RTE_RING_QUEUE_FIXED, __IS_SC, available);
+}
+
+/**
+ * Dequeue several objects from a ring.
+ *
+ * This function calls the multi-consumers or the single-consumer
+ * version, depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of objects that will be filled.
+ * @param esize
+ * The size of ring element, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the ring. Otherwise
+ * the results are undefined.
+ * @param n
+ * The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries after the
+ * dequeue has finished.
+ * @return
+ * The number of objects dequeued, either 0 or n
+ */
+unsigned int
+rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
+ unsigned int esize, unsigned int n, unsigned int *available)
+{
+ return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
+ RTE_RING_QUEUE_FIXED, r->cons.single, available);
+}
+
+/**
+ * Dequeue one object from a ring (multi-consumers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * consumer index atomically.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_p
+ * A pointer to the object that will be filled.
+ * @param esize
+ * The size of ring element, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the ring. Otherwise
+ * the results are undefined.
+ * @return
+ * - 0: Success; objects dequeued.
+ * - -ENOENT: Not enough entries in the ring to dequeue; no object is
+ * dequeued.
+ */
+int
+rte_ring_mc_dequeue_elem(struct rte_ring *r, void *obj_p,
+ unsigned int esize)
+{
+ return rte_ring_mc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
+ -ENOENT;
+}
+
+/**
+ * Dequeue one object from a ring (NOT multi-consumers safe).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_p
+ * A pointer to the object that will be filled.
+ * @param esize
+ * The size of ring element, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the ring. Otherwise
+ * the results are undefined.
+ * @return
+ * - 0: Success; objects dequeued.
+ * - -ENOENT: Not enough entries in the ring to dequeue, no object is
+ * dequeued.
+ */
+int
+rte_ring_sc_dequeue_elem(struct rte_ring *r, void *obj_p,
+ unsigned int esize)
+{
+ return rte_ring_sc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
+ -ENOENT;
+}
+
+/**
+ * Dequeue one object from a ring.
+ *
+ * This function calls the multi-consumers or the single-consumer
+ * version depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_p
+ * A pointer to the object that will be filled.
+ * @param esize
+ * The size of ring element, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the ring. Otherwise
+ * the results are undefined.
+ * @return
+ * - 0: Success, objects dequeued.
+ * - -ENOENT: Not enough entries in the ring to dequeue, no object is
+ * dequeued.
+ */
+int
+rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int esize)
+{
+ return rte_ring_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
+ -ENOENT;
+}
+
+/**
+ * Enqueue several objects on the ring (multi-producers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * producer index atomically.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of objects.
+ * @param esize
+ * The size of ring element, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the ring. Otherwise
+ * the results are undefined.
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
+ * @return
+ * - n: Actual number of objects enqueued.
+ */
+unsigned
+rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
+ unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+ return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
+ RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
+}
+
+/**
+ * Enqueue several objects on a ring
+ *
+ * @warning This API is NOT multi-producers safe
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of objects.
+ * @param esize
+ * The size of ring element, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the ring. Otherwise
+ * the results are undefined.
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
+ * @return
+ * - n: Actual number of objects enqueued.
+ */
+unsigned
+rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
+ unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+ return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
+ RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
+}
+
+/**
+ * Enqueue several objects on a ring.
+ *
+ * This function calls the multi-producer or the single-producer
+ * version depending on the default behavior that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of objects.
+ * @param esize
+ * The size of ring element, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the ring. Otherwise
+ * the results are undefined.
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
+ * @return
+ * - n: Actual number of objects enqueued.
+ */
+unsigned
+rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
+ unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+ return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
+ RTE_RING_QUEUE_VARIABLE, r->prod.single, free_space);
+}
+
+/**
+ * Dequeue several objects from a ring (multi-consumers safe). When the request
+ * objects are more than the available objects, only dequeue the actual number
+ * of objects
+ *
+ * This function uses a "compare and set" instruction to move the
+ * consumer index atomically.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of objects that will be filled.
+ * @param esize
+ * The size of ring element, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the ring. Otherwise
+ * the results are undefined.
+ * @param n
+ * The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries after the
+ * dequeue has finished.
+ * @return
+ * - n: Actual number of objects dequeued, 0 if ring is empty
+ */
+unsigned
+rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
+ unsigned int esize, unsigned int n, unsigned int *available)
+{
+ return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
+ RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
+}
+
+/**
+ * Dequeue several objects from a ring (NOT multi-consumers safe).When the
+ * request objects are more than the available objects, only dequeue the
+ * actual number of objects
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of objects that will be filled.
+ * @param esize
+ * The size of ring element, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the ring. Otherwise
+ * the results are undefined.
+ * @param n
+ * The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries after the
+ * dequeue has finished.
+ * @return
+ * - n: Actual number of objects dequeued, 0 if ring is empty
+ */
+unsigned
+rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
+ unsigned int esize, unsigned int n, unsigned int *available)
+{
+ return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
+ RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
+}
+
+/**
+ * Dequeue multiple objects from a ring up to a maximum number.
+ *
+ * This function calls the multi-consumers or the single-consumer
+ * version, depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of objects that will be filled.
+ * @param esize
+ * The size of ring element, in bytes. It must be a multiple of 4.
+ * This must be the same value used while creating the ring. Otherwise
+ * the results are undefined.
+ * @param n
+ * The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries after the
+ * dequeue has finished.
+ * @return
+ * - Number of objects dequeued
+ */
+unsigned int
+rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
+ unsigned int esize, unsigned int n, unsigned int *available)
+{
+ return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
+ RTE_RING_QUEUE_VARIABLE,
+ r->cons.single, available);
+}
diff --git a/lib/librte_ring/ring_impl.c b/lib/librte_ring/ring_impl.c
new file mode 100644
index 000000000..ee85c515f
--- /dev/null
+++ b/lib/librte_ring/ring_impl.c
@@ -0,0 +1,750 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2010-2017 Intel Corporation
+ * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
+ * All rights reserved.
+ * Derived from FreeBSD's bufring.h
+ * Used as BSD-3 Licensed with permission from Kip Macy.
+ */
+
+#include <stdio.h>
+#include <stdint.h>
+#include <sys/queue.h>
+#include <errno.h>
+#include <rte_common.h>
+#include <rte_config.h>
+#include <rte_memory.h>
+#include <rte_lcore.h>
+#include <rte_atomic.h>
+#include <rte_branch_prediction.h>
+#include <rte_memzone.h>
+#include <rte_pause.h>
+#include "ring_impl.h"
+
+/* the actual enqueue of pointers on the ring.
+ * Placed here since identical code needed in both
+ * single and multi producer enqueue functions */
+#define ENQUEUE_PTRS(r, ring_start, prod_head, obj_table, n, obj_type) do { \
+ unsigned int i; \
+ const uint32_t size = (r)->size; \
+ uint32_t idx = prod_head & (r)->mask; \
+ obj_type *ring = (obj_type *)ring_start; \
+ if (likely(idx + n < size)) { \
+ for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
+ ring[idx] = obj_table[i]; \
+ ring[idx+1] = obj_table[i+1]; \
+ ring[idx+2] = obj_table[i+2]; \
+ ring[idx+3] = obj_table[i+3]; \
+ } \
+ switch (n & 0x3) { \
+ case 3: \
+ ring[idx++] = obj_table[i++]; /* fallthrough */ \
+ case 2: \
+ ring[idx++] = obj_table[i++]; /* fallthrough */ \
+ case 1: \
+ ring[idx++] = obj_table[i++]; \
+ } \
+ } else { \
+ for (i = 0; idx < size; i++, idx++)\
+ ring[idx] = obj_table[i]; \
+ for (idx = 0; i < n; i++, idx++) \
+ ring[idx] = obj_table[i]; \
+ } \
+} while (0)
+
+/* the actual copy of pointers on the ring to obj_table.
+ * Placed here since identical code needed in both
+ * single and multi consumer dequeue functions */
+#define DEQUEUE_PTRS(r, ring_start, cons_head, obj_table, n, obj_type) do { \
+ unsigned int i; \
+ uint32_t idx = cons_head & (r)->mask; \
+ const uint32_t size = (r)->size; \
+ obj_type *ring = (obj_type *)ring_start; \
+ if (likely(idx + n < size)) { \
+ for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
+ obj_table[i] = ring[idx]; \
+ obj_table[i+1] = ring[idx+1]; \
+ obj_table[i+2] = ring[idx+2]; \
+ obj_table[i+3] = ring[idx+3]; \
+ } \
+ switch (n & 0x3) { \
+ case 3: \
+ obj_table[i++] = ring[idx++]; /* fallthrough */ \
+ case 2: \
+ obj_table[i++] = ring[idx++]; /* fallthrough */ \
+ case 1: \
+ obj_table[i++] = ring[idx++]; \
+ } \
+ } else { \
+ for (i = 0; idx < size; i++, idx++) \
+ obj_table[i] = ring[idx]; \
+ for (idx = 0; i < n; i++, idx++) \
+ obj_table[i] = ring[idx]; \
+ } \
+} while (0)
+
+/* Between load and load. there might be cpu reorder in weak model
+ * (powerpc/arm).
+ * There are 2 choices for the users
+ * 1.use rmb() memory barrier
+ * 2.use one-direction load_acquire/store_release barrier,defined by
+ * CONFIG_RTE_USE_C11_MEM_MODEL=y
+ * It depends on performance test results.
+ * By default, move common functions to rte_ring_generic.h
+ */
+#ifdef RTE_USE_C11_MEM_MODEL
+#include "rte_ring_c11_mem.h"
+#else
+#include "rte_ring_generic.h"
+#endif
+
+/**
+ * @internal Enqueue several objects on the ring
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
+ * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param is_sp
+ * Indicates whether to use single producer or multi-producer head update
+ * @param free_space
+ * returns the amount of space after the enqueue operation has finished
+ * @return
+ * Actual number of objects enqueued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ unsigned int is_sp, unsigned int *free_space)
+{
+ uint32_t prod_head, prod_next;
+ uint32_t free_entries;
+
+ n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
+ &prod_head, &prod_next, &free_entries);
+ if (n == 0)
+ goto end;
+
+ ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
+
+ update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
+end:
+ if (free_space != NULL)
+ *free_space = free_entries - n;
+ return n;
+}
+
+/**
+ * @internal Dequeue several objects from the ring
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to pull from the ring.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
+ * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param is_sc
+ * Indicates whether to use single consumer or multi-consumer head update
+ * @param available
+ * returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ * - Actual number of objects dequeued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ unsigned int is_sc, unsigned int *available)
+{
+ uint32_t cons_head, cons_next;
+ uint32_t entries;
+
+ n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
+ &cons_head, &cons_next, &entries);
+ if (n == 0)
+ goto end;
+
+ DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
+
+ update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
+
+end:
+ if (available != NULL)
+ *available = entries - n;
+ return n;
+}
+
+/**
+ * Enqueue several objects on the ring (multi-producers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * producer index atomically.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
+ * @return
+ * The number of objects enqueued, either 0 or n
+ */
+unsigned int
+rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
+ unsigned int n, unsigned int *free_space)
+{
+ return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
+ __IS_MP, free_space);
+}
+
+/**
+ * Enqueue several objects on a ring (NOT multi-producers safe).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
+ * @return
+ * The number of objects enqueued, either 0 or n
+ */
+unsigned int
+rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
+ unsigned int n, unsigned int *free_space)
+{
+ return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
+ __IS_SP, free_space);
+}
+
+/**
+ * Enqueue several objects on a ring.
+ *
+ * This function calls the multi-producer or the single-producer
+ * version depending on the default behavior that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
+ * @return
+ * The number of objects enqueued, either 0 or n
+ */
+unsigned int
+rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
+ unsigned int n, unsigned int *free_space)
+{
+ return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
+ r->prod.single, free_space);
+}
+
+/**
+ * Enqueue one object on a ring (multi-producers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * producer index atomically.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj
+ * A pointer to the object to be added.
+ * @return
+ * - 0: Success; objects enqueued.
+ * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ */
+int
+rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
+{
+ return rte_ring_mp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
+}
+
+/**
+ * Enqueue one object on a ring (NOT multi-producers safe).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj
+ * A pointer to the object to be added.
+ * @return
+ * - 0: Success; objects enqueued.
+ * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ */
+int
+rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
+{
+ return rte_ring_sp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
+}
+
+/**
+ * Enqueue one object on a ring.
+ *
+ * This function calls the multi-producer or the single-producer
+ * version, depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj
+ * A pointer to the object to be added.
+ * @return
+ * - 0: Success; objects enqueued.
+ * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ */
+int
+rte_ring_enqueue(struct rte_ring *r, void *obj)
+{
+ return rte_ring_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
+}
+
+/**
+ * Dequeue several objects from a ring (multi-consumers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * consumer index atomically.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ * The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries after the
+ * dequeue has finished.
+ * @return
+ * The number of objects dequeued, either 0 or n
+ */
+unsigned int
+rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
+ unsigned int n, unsigned int *available)
+{
+ return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
+ __IS_MC, available);
+}
+
+/**
+ * Dequeue several objects from a ring (NOT multi-consumers safe).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ * The number of objects to dequeue from the ring to the obj_table,
+ * must be strictly positive.
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries after the
+ * dequeue has finished.
+ * @return
+ * The number of objects dequeued, either 0 or n
+ */
+unsigned int
+rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
+ unsigned int n, unsigned int *available)
+{
+ return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
+ __IS_SC, available);
+}
+
+/**
+ * Dequeue several objects from a ring.
+ *
+ * This function calls the multi-consumers or the single-consumer
+ * version, depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ * The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries after the
+ * dequeue has finished.
+ * @return
+ * The number of objects dequeued, either 0 or n
+ */
+unsigned int
+rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
+ unsigned int *available)
+{
+ return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
+ r->cons.single, available);
+}
+
+/**
+ * Dequeue one object from a ring (multi-consumers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * consumer index atomically.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_p
+ * A pointer to a void * pointer (object) that will be filled.
+ * @return
+ * - 0: Success; objects dequeued.
+ * - -ENOENT: Not enough entries in the ring to dequeue; no object is
+ * dequeued.
+ */
+int
+rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
+{
+ return rte_ring_mc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT;
+}
+
+/**
+ * Dequeue one object from a ring (NOT multi-consumers safe).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_p
+ * A pointer to a void * pointer (object) that will be filled.
+ * @return
+ * - 0: Success; objects dequeued.
+ * - -ENOENT: Not enough entries in the ring to dequeue, no object is
+ * dequeued.
+ */
+int
+rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
+{
+ return rte_ring_sc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT;
+}
+
+/**
+ * Dequeue one object from a ring.
+ *
+ * This function calls the multi-consumers or the single-consumer
+ * version depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_p
+ * A pointer to a void * pointer (object) that will be filled.
+ * @return
+ * - 0: Success, objects dequeued.
+ * - -ENOENT: Not enough entries in the ring to dequeue, no object is
+ * dequeued.
+ */
+int
+rte_ring_dequeue(struct rte_ring *r, void **obj_p)
+{
+ return rte_ring_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT;
+}
+
+/**
+ * Return the number of entries in a ring.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @return
+ * The number of entries in the ring.
+ */
+unsigned
+rte_ring_count(const struct rte_ring *r)
+{
+ uint32_t prod_tail = r->prod.tail;
+ uint32_t cons_tail = r->cons.tail;
+ uint32_t count = (prod_tail - cons_tail) & r->mask;
+ return (count > r->capacity) ? r->capacity : count;
+}
+
+/**
+ * Return the number of free entries in a ring.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @return
+ * The number of free entries in the ring.
+ */
+unsigned
+rte_ring_free_count(const struct rte_ring *r)
+{
+ return r->capacity - rte_ring_count(r);
+}
+
+/**
+ * Test if a ring is full.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @return
+ * - 1: The ring is full.
+ * - 0: The ring is not full.
+ */
+int
+rte_ring_full(const struct rte_ring *r)
+{
+ return rte_ring_free_count(r) == 0;
+}
+
+/**
+ * Test if a ring is empty.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @return
+ * - 1: The ring is empty.
+ * - 0: The ring is not empty.
+ */
+int
+rte_ring_empty(const struct rte_ring *r)
+{
+ return rte_ring_count(r) == 0;
+}
+
+const char *
+rte_ring_get_name(const struct rte_ring *r)
+{
+ return r->name;
+}
+
+const struct rte_memzone *
+rte_ring_get_memzone(const struct rte_ring *r)
+{
+ return r->memzone;
+}
+
+/**
+ * Return the size of the ring.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @return
+ * The size of the data store used by the ring.
+ * NOTE: this is not the same as the usable space in the ring. To query that
+ * use ``rte_ring_get_capacity()``.
+ */
+unsigned int
+rte_ring_get_size(const struct rte_ring *r)
+{
+ return r->size;
+}
+
+/**
+ * Return the number of elements which can be stored in the ring.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @return
+ * The usable size of the ring.
+ */
+unsigned int
+rte_ring_get_capacity(const struct rte_ring *r)
+{
+ return r->capacity;
+}
+
+uint32_t
+rte_ring_get_prod_sync_type(const struct rte_ring *r)
+{
+ return (r->prod.single == __IS_SP) ?
+ RTE_RING_SYNC_ST : RTE_RING_SYNC_MT;
+}
+
+uint32_t
+rte_ring_get_cons_sync_type(const struct rte_ring *r)
+{
+ return (r->cons.single == __IS_SC) ?
+ RTE_RING_SYNC_ST : RTE_RING_SYNC_MT;
+}
+
+/**
+ * Dump the status of all rings on the console
+ *
+ * @param f
+ * A pointer to a file for output
+ */
+void rte_ring_list_dump(FILE *f);
+
+/**
+ * Search a ring from its name
+ *
+ * @param name
+ * The name of the ring.
+ * @return
+ * The pointer to the ring matching the name, or NULL if not found,
+ * with rte_errno set appropriately. Possible rte_errno values include:
+ * - ENOENT - required entry not available to return.
+ */
+struct rte_ring *rte_ring_lookup(const char *name);
+
+/**
+ * Enqueue several objects on the ring (multi-producers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * producer index atomically.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
+ * @return
+ * - n: Actual number of objects enqueued.
+ */
+unsigned
+rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
+ unsigned int n, unsigned int *free_space)
+{
+ return __rte_ring_do_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
+}
+
+/**
+ * Enqueue several objects on a ring (NOT multi-producers safe).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
+ * @return
+ * - n: Actual number of objects enqueued.
+ */
+unsigned
+rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
+ unsigned int n, unsigned int *free_space)
+{
+ return __rte_ring_do_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
+}
+
+/**
+ * Enqueue several objects on a ring.
+ *
+ * This function calls the multi-producer or the single-producer
+ * version depending on the default behavior that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
+ * @return
+ * - n: Actual number of objects enqueued.
+ */
+unsigned
+rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
+ unsigned int n, unsigned int *free_space)
+{
+ return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE,
+ r->prod.single, free_space);
+}
+
+/**
+ * Dequeue several objects from a ring (multi-consumers safe). When the request
+ * objects are more than the available objects, only dequeue the actual number
+ * of objects
+ *
+ * This function uses a "compare and set" instruction to move the
+ * consumer index atomically.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ * The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries after the
+ * dequeue has finished.
+ * @return
+ * - n: Actual number of objects dequeued, 0 if ring is empty
+ */
+unsigned
+rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
+ unsigned int n, unsigned int *available)
+{
+ return __rte_ring_do_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
+}
+
+/**
+ * Dequeue several objects from a ring (NOT multi-consumers safe).When the
+ * request objects are more than the available objects, only dequeue the
+ * actual number of objects
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ * The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries after the
+ * dequeue has finished.
+ * @return
+ * - n: Actual number of objects dequeued, 0 if ring is empty
+ */
+unsigned
+rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
+ unsigned int n, unsigned int *available)
+{
+ return __rte_ring_do_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
+}
+
+/**
+ * Dequeue multiple objects from a ring up to a maximum number.
+ *
+ * This function calls the multi-consumers or the single-consumer
+ * version, depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ * The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries after the
+ * dequeue has finished.
+ * @return
+ * - Number of objects dequeued
+ */
+unsigned
+rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
+ unsigned int n, unsigned int *available)
+{
+ return __rte_ring_do_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ r->cons.single, available);
+}
diff --git a/lib/librte_ring/ring_impl.h b/lib/librte_ring/ring_impl.h
new file mode 100644
index 000000000..4ed375511
--- /dev/null
+++ b/lib/librte_ring/ring_impl.h
@@ -0,0 +1,64 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2010-2017 Intel Corporation
+ * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
+ * All rights reserved.
+ * Derived from FreeBSD's bufring.h
+ * Used as BSD-3 Licensed with permission from Kip Macy.
+ */
+
+#ifndef _RING_IMPL_H_
+#define _RING_IMPL_H_
+
+#include <rte_ring.h>
+#include <rte_ring_elem.h>
+
+/* @internal defines for passing to the enqueue dequeue worker functions */
+#define __IS_SP 1
+#define __IS_MP 0
+#define __IS_SC 1
+#define __IS_MC 0
+
+/* structure to hold a pair of head/tail values and other metadata */
+struct rte_ring_headtail {
+ volatile uint32_t head; /**< Prod/consumer head. */
+ volatile uint32_t tail; /**< Prod/consumer tail. */
+ uint32_t single; /**< True if single prod/cons */
+};
+
+/**
+ * An RTE ring structure.
+ *
+ * The producer and the consumer have a head and a tail index. The particularity
+ * of these index is that they are not between 0 and size(ring). These indexes
+ * are between 0 and 2^32, and we mask their value when we access the ring[]
+ * field. Thanks to this assumption, we can do subtractions between 2 index
+ * values in a modulo-32bit base: that's why the overflow of the indexes is not
+ * a problem.
+ */
+struct rte_ring {
+ /*
+ * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
+ * compatibility requirements, it could be changed to RTE_RING_NAMESIZE
+ * next time the ABI changes
+ */
+ char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; /**< Name of the ring. */
+ int flags; /**< Flags supplied at creation. */
+ const struct rte_memzone *memzone;
+ /**< Memzone, if any, containing the rte_ring */
+ uint32_t size; /**< Size of ring. */
+ uint32_t mask; /**< Mask (size-1) of ring. */
+ uint32_t capacity; /**< Usable size of ring */
+
+ char pad0 __rte_cache_aligned; /**< empty cache line */
+
+ /** Ring producer status. */
+ struct rte_ring_headtail prod __rte_cache_aligned;
+ char pad1 __rte_cache_aligned; /**< empty cache line */
+
+ /** Ring consumer status. */
+ struct rte_ring_headtail cons __rte_cache_aligned;
+ char pad2 __rte_cache_aligned; /**< empty cache line */
+};
+
+#endif /* _RING_IMPL_H_ */
diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
index 77e5de099..ec9392585 100644
--- a/lib/librte_ring/rte_ring.c
+++ b/lib/librte_ring/rte_ring.c
@@ -32,8 +32,7 @@
#include <rte_spinlock.h>
#include <rte_tailq.h>
-#include "rte_ring.h"
-#include "rte_ring_elem.h"
+#include "ring_impl.h"
TAILQ_HEAD(rte_ring_list, rte_tailq_entry);
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index 18fc5d845..a1442c360 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -56,53 +56,17 @@ enum rte_ring_queue_behavior {
RTE_RING_QUEUE_VARIABLE /* Enq/Deq as many items as possible from ring */
};
+/** prod/cons sync types */
+enum {
+ RTE_RING_SYNC_MT, /**< multi-thread safe (default mode) */
+ RTE_RING_SYNC_ST, /**< single thread only */
+};
+
#define RTE_RING_MZ_PREFIX "RG_"
/** The maximum length of a ring name. */
#define RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
sizeof(RTE_RING_MZ_PREFIX) + 1)
-/* structure to hold a pair of head/tail values and other metadata */
-struct rte_ring_headtail {
- volatile uint32_t head; /**< Prod/consumer head. */
- volatile uint32_t tail; /**< Prod/consumer tail. */
- uint32_t single; /**< True if single prod/cons */
-};
-
-/**
- * An RTE ring structure.
- *
- * The producer and the consumer have a head and a tail index. The particularity
- * of these index is that they are not between 0 and size(ring). These indexes
- * are between 0 and 2^32, and we mask their value when we access the ring[]
- * field. Thanks to this assumption, we can do subtractions between 2 index
- * values in a modulo-32bit base: that's why the overflow of the indexes is not
- * a problem.
- */
-struct rte_ring {
- /*
- * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
- * compatibility requirements, it could be changed to RTE_RING_NAMESIZE
- * next time the ABI changes
- */
- char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; /**< Name of the ring. */
- int flags; /**< Flags supplied at creation. */
- const struct rte_memzone *memzone;
- /**< Memzone, if any, containing the rte_ring */
- uint32_t size; /**< Size of ring. */
- uint32_t mask; /**< Mask (size-1) of ring. */
- uint32_t capacity; /**< Usable size of ring */
-
- char pad0 __rte_cache_aligned; /**< empty cache line */
-
- /** Ring producer status. */
- struct rte_ring_headtail prod __rte_cache_aligned;
- char pad1 __rte_cache_aligned; /**< empty cache line */
-
- /** Ring consumer status. */
- struct rte_ring_headtail cons __rte_cache_aligned;
- char pad2 __rte_cache_aligned; /**< empty cache line */
-};
-
#define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
#define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */
/**
@@ -116,11 +80,7 @@ struct rte_ring {
#define RING_F_EXACT_SZ 0x0004
#define RTE_RING_SZ_MASK (0x7fffffffU) /**< Ring size mask */
-/* @internal defines for passing to the enqueue dequeue worker functions */
-#define __IS_SP 1
-#define __IS_MP 0
-#define __IS_SC 1
-#define __IS_MC 0
+struct rte_ring;
/**
* Calculate the memory size needed for a ring
@@ -235,168 +195,6 @@ void rte_ring_free(struct rte_ring *r);
*/
void rte_ring_dump(FILE *f, const struct rte_ring *r);
-/* the actual enqueue of pointers on the ring.
- * Placed here since identical code needed in both
- * single and multi producer enqueue functions */
-#define ENQUEUE_PTRS(r, ring_start, prod_head, obj_table, n, obj_type) do { \
- unsigned int i; \
- const uint32_t size = (r)->size; \
- uint32_t idx = prod_head & (r)->mask; \
- obj_type *ring = (obj_type *)ring_start; \
- if (likely(idx + n < size)) { \
- for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
- ring[idx] = obj_table[i]; \
- ring[idx+1] = obj_table[i+1]; \
- ring[idx+2] = obj_table[i+2]; \
- ring[idx+3] = obj_table[i+3]; \
- } \
- switch (n & 0x3) { \
- case 3: \
- ring[idx++] = obj_table[i++]; /* fallthrough */ \
- case 2: \
- ring[idx++] = obj_table[i++]; /* fallthrough */ \
- case 1: \
- ring[idx++] = obj_table[i++]; \
- } \
- } else { \
- for (i = 0; idx < size; i++, idx++)\
- ring[idx] = obj_table[i]; \
- for (idx = 0; i < n; i++, idx++) \
- ring[idx] = obj_table[i]; \
- } \
-} while (0)
-
-/* the actual copy of pointers on the ring to obj_table.
- * Placed here since identical code needed in both
- * single and multi consumer dequeue functions */
-#define DEQUEUE_PTRS(r, ring_start, cons_head, obj_table, n, obj_type) do { \
- unsigned int i; \
- uint32_t idx = cons_head & (r)->mask; \
- const uint32_t size = (r)->size; \
- obj_type *ring = (obj_type *)ring_start; \
- if (likely(idx + n < size)) { \
- for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
- obj_table[i] = ring[idx]; \
- obj_table[i+1] = ring[idx+1]; \
- obj_table[i+2] = ring[idx+2]; \
- obj_table[i+3] = ring[idx+3]; \
- } \
- switch (n & 0x3) { \
- case 3: \
- obj_table[i++] = ring[idx++]; /* fallthrough */ \
- case 2: \
- obj_table[i++] = ring[idx++]; /* fallthrough */ \
- case 1: \
- obj_table[i++] = ring[idx++]; \
- } \
- } else { \
- for (i = 0; idx < size; i++, idx++) \
- obj_table[i] = ring[idx]; \
- for (idx = 0; i < n; i++, idx++) \
- obj_table[i] = ring[idx]; \
- } \
-} while (0)
-
-/* Between load and load. there might be cpu reorder in weak model
- * (powerpc/arm).
- * There are 2 choices for the users
- * 1.use rmb() memory barrier
- * 2.use one-direction load_acquire/store_release barrier,defined by
- * CONFIG_RTE_USE_C11_MEM_MODEL=y
- * It depends on performance test results.
- * By default, move common functions to rte_ring_generic.h
- */
-#ifdef RTE_USE_C11_MEM_MODEL
-#include "rte_ring_c11_mem.h"
-#else
-#include "rte_ring_generic.h"
-#endif
-
-/**
- * @internal Enqueue several objects on the ring
- *
- * @param r
- * A pointer to the ring structure.
- * @param obj_table
- * A pointer to a table of void * pointers (objects).
- * @param n
- * The number of objects to add in the ring from the obj_table.
- * @param behavior
- * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
- * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
- * @param is_sp
- * Indicates whether to use single producer or multi-producer head update
- * @param free_space
- * returns the amount of space after the enqueue operation has finished
- * @return
- * Actual number of objects enqueued.
- * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
- */
-static __rte_always_inline unsigned int
-__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
- unsigned int n, enum rte_ring_queue_behavior behavior,
- unsigned int is_sp, unsigned int *free_space)
-{
- uint32_t prod_head, prod_next;
- uint32_t free_entries;
-
- n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
- &prod_head, &prod_next, &free_entries);
- if (n == 0)
- goto end;
-
- ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
-
- update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
-end:
- if (free_space != NULL)
- *free_space = free_entries - n;
- return n;
-}
-
-/**
- * @internal Dequeue several objects from the ring
- *
- * @param r
- * A pointer to the ring structure.
- * @param obj_table
- * A pointer to a table of void * pointers (objects).
- * @param n
- * The number of objects to pull from the ring.
- * @param behavior
- * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
- * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
- * @param is_sc
- * Indicates whether to use single consumer or multi-consumer head update
- * @param available
- * returns the number of remaining ring entries after the dequeue has finished
- * @return
- * - Actual number of objects dequeued.
- * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
- */
-static __rte_always_inline unsigned int
-__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
- unsigned int n, enum rte_ring_queue_behavior behavior,
- unsigned int is_sc, unsigned int *available)
-{
- uint32_t cons_head, cons_next;
- uint32_t entries;
-
- n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
- &cons_head, &cons_next, &entries);
- if (n == 0)
- goto end;
-
- DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
-
- update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
-
-end:
- if (available != NULL)
- *available = entries - n;
- return n;
-}
-
/**
* Enqueue several objects on the ring (multi-producers safe).
*
@@ -415,13 +213,9 @@ __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
* @return
* The number of objects enqueued, either 0 or n
*/
-static __rte_always_inline unsigned int
+unsigned int
rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
- unsigned int n, unsigned int *free_space)
-{
- return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- __IS_MP, free_space);
-}
+ unsigned int n, unsigned int *free_space);
/**
* Enqueue several objects on a ring (NOT multi-producers safe).
@@ -438,13 +232,9 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
* @return
* The number of objects enqueued, either 0 or n
*/
-static __rte_always_inline unsigned int
+unsigned int
rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
- unsigned int n, unsigned int *free_space)
-{
- return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- __IS_SP, free_space);
-}
+ unsigned int n, unsigned int *free_space);
/**
* Enqueue several objects on a ring.
@@ -465,13 +255,9 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
* @return
* The number of objects enqueued, either 0 or n
*/
-static __rte_always_inline unsigned int
+unsigned int
rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
- unsigned int n, unsigned int *free_space)
-{
- return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- r->prod.single, free_space);
-}
+ unsigned int n, unsigned int *free_space);
/**
* Enqueue one object on a ring (multi-producers safe).
@@ -487,11 +273,8 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
* - 0: Success; objects enqueued.
* - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
*/
-static __rte_always_inline int
-rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
-{
- return rte_ring_mp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
-}
+int
+rte_ring_mp_enqueue(struct rte_ring *r, void *obj);
/**
* Enqueue one object on a ring (NOT multi-producers safe).
@@ -504,11 +287,8 @@ rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
* - 0: Success; objects enqueued.
* - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
*/
-static __rte_always_inline int
-rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
-{
- return rte_ring_sp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
-}
+int
+rte_ring_sp_enqueue(struct rte_ring *r, void *obj);
/**
* Enqueue one object on a ring.
@@ -525,11 +305,8 @@ rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
* - 0: Success; objects enqueued.
* - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
*/
-static __rte_always_inline int
-rte_ring_enqueue(struct rte_ring *r, void *obj)
-{
- return rte_ring_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
-}
+int
+rte_ring_enqueue(struct rte_ring *r, void *obj);
/**
* Dequeue several objects from a ring (multi-consumers safe).
@@ -549,13 +326,9 @@ rte_ring_enqueue(struct rte_ring *r, void *obj)
* @return
* The number of objects dequeued, either 0 or n
*/
-static __rte_always_inline unsigned int
+unsigned int
rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
- unsigned int n, unsigned int *available)
-{
- return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- __IS_MC, available);
-}
+ unsigned int n, unsigned int *available);
/**
* Dequeue several objects from a ring (NOT multi-consumers safe).
@@ -573,13 +346,9 @@ rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
* @return
* The number of objects dequeued, either 0 or n
*/
-static __rte_always_inline unsigned int
+unsigned int
rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
- unsigned int n, unsigned int *available)
-{
- return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- __IS_SC, available);
-}
+ unsigned int n, unsigned int *available);
/**
* Dequeue several objects from a ring.
@@ -600,13 +369,9 @@ rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
* @return
* The number of objects dequeued, either 0 or n
*/
-static __rte_always_inline unsigned int
+unsigned int
rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
- unsigned int *available)
-{
- return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- r->cons.single, available);
-}
+ unsigned int *available);
/**
* Dequeue one object from a ring (multi-consumers safe).
@@ -623,11 +388,8 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
* - -ENOENT: Not enough entries in the ring to dequeue; no object is
* dequeued.
*/
-static __rte_always_inline int
-rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
-{
- return rte_ring_mc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT;
-}
+int
+rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p);
/**
* Dequeue one object from a ring (NOT multi-consumers safe).
@@ -641,11 +403,8 @@ rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
* - -ENOENT: Not enough entries in the ring to dequeue, no object is
* dequeued.
*/
-static __rte_always_inline int
-rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
-{
- return rte_ring_sc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT;
-}
+int
+rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p);
/**
* Dequeue one object from a ring.
@@ -663,11 +422,8 @@ rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
* - -ENOENT: Not enough entries in the ring to dequeue, no object is
* dequeued.
*/
-static __rte_always_inline int
-rte_ring_dequeue(struct rte_ring *r, void **obj_p)
-{
- return rte_ring_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT;
-}
+int
+rte_ring_dequeue(struct rte_ring *r, void **obj_p);
/**
* Flush a ring.
@@ -694,14 +450,8 @@ rte_ring_reset(struct rte_ring *r);
* @return
* The number of entries in the ring.
*/
-static inline unsigned
-rte_ring_count(const struct rte_ring *r)
-{
- uint32_t prod_tail = r->prod.tail;
- uint32_t cons_tail = r->cons.tail;
- uint32_t count = (prod_tail - cons_tail) & r->mask;
- return (count > r->capacity) ? r->capacity : count;
-}
+unsigned
+rte_ring_count(const struct rte_ring *r);
/**
* Return the number of free entries in a ring.
@@ -711,11 +461,8 @@ rte_ring_count(const struct rte_ring *r)
* @return
* The number of free entries in the ring.
*/
-static inline unsigned
-rte_ring_free_count(const struct rte_ring *r)
-{
- return r->capacity - rte_ring_count(r);
-}
+unsigned
+rte_ring_free_count(const struct rte_ring *r);
/**
* Test if a ring is full.
@@ -726,11 +473,8 @@ rte_ring_free_count(const struct rte_ring *r)
* - 1: The ring is full.
* - 0: The ring is not full.
*/
-static inline int
-rte_ring_full(const struct rte_ring *r)
-{
- return rte_ring_free_count(r) == 0;
-}
+int
+rte_ring_full(const struct rte_ring *r);
/**
* Test if a ring is empty.
@@ -741,11 +485,14 @@ rte_ring_full(const struct rte_ring *r)
* - 1: The ring is empty.
* - 0: The ring is not empty.
*/
-static inline int
-rte_ring_empty(const struct rte_ring *r)
-{
- return rte_ring_count(r) == 0;
-}
+int
+rte_ring_empty(const struct rte_ring *r);
+
+const char *
+rte_ring_get_name(const struct rte_ring *r);
+
+const struct rte_memzone *
+rte_ring_get_memzone(const struct rte_ring *r);
/**
* Return the size of the ring.
@@ -757,11 +504,8 @@ rte_ring_empty(const struct rte_ring *r)
* NOTE: this is not the same as the usable space in the ring. To query that
* use ``rte_ring_get_capacity()``.
*/
-static inline unsigned int
-rte_ring_get_size(const struct rte_ring *r)
-{
- return r->size;
-}
+unsigned int
+rte_ring_get_size(const struct rte_ring *r);
/**
* Return the number of elements which can be stored in the ring.
@@ -771,11 +515,14 @@ rte_ring_get_size(const struct rte_ring *r)
* @return
* The usable size of the ring.
*/
-static inline unsigned int
-rte_ring_get_capacity(const struct rte_ring *r)
-{
- return r->capacity;
-}
+unsigned int
+rte_ring_get_capacity(const struct rte_ring *r);
+
+uint32_t
+rte_ring_get_prod_sync_type(const struct rte_ring *r);
+
+uint32_t
+rte_ring_get_cons_sync_type(const struct rte_ring *r);
/**
* Dump the status of all rings on the console
@@ -815,13 +562,9 @@ struct rte_ring *rte_ring_lookup(const char *name);
* @return
* - n: Actual number of objects enqueued.
*/
-static __rte_always_inline unsigned
+unsigned
rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
- unsigned int n, unsigned int *free_space)
-{
- return __rte_ring_do_enqueue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
-}
+ unsigned int n, unsigned int *free_space);
/**
* Enqueue several objects on a ring (NOT multi-producers safe).
@@ -838,13 +581,9 @@ rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
* @return
* - n: Actual number of objects enqueued.
*/
-static __rte_always_inline unsigned
+unsigned
rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
- unsigned int n, unsigned int *free_space)
-{
- return __rte_ring_do_enqueue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
-}
+ unsigned int n, unsigned int *free_space);
/**
* Enqueue several objects on a ring.
@@ -865,13 +604,9 @@ rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
* @return
* - n: Actual number of objects enqueued.
*/
-static __rte_always_inline unsigned
+unsigned
rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
- unsigned int n, unsigned int *free_space)
-{
- return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE,
- r->prod.single, free_space);
-}
+ unsigned int n, unsigned int *free_space);
/**
* Dequeue several objects from a ring (multi-consumers safe). When the request
@@ -893,13 +628,9 @@ rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
* @return
* - n: Actual number of objects dequeued, 0 if ring is empty
*/
-static __rte_always_inline unsigned
+unsigned
rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
- unsigned int n, unsigned int *available)
-{
- return __rte_ring_do_dequeue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
-}
+ unsigned int n, unsigned int *available);
/**
* Dequeue several objects from a ring (NOT multi-consumers safe).When the
@@ -918,13 +649,9 @@ rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
* @return
* - n: Actual number of objects dequeued, 0 if ring is empty
*/
-static __rte_always_inline unsigned
+unsigned
rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
- unsigned int n, unsigned int *available)
-{
- return __rte_ring_do_dequeue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
-}
+ unsigned int n, unsigned int *available);
/**
* Dequeue multiple objects from a ring up to a maximum number.
@@ -945,14 +672,9 @@ rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
* @return
* - Number of objects dequeued
*/
-static __rte_always_inline unsigned
+unsigned
rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
- unsigned int n, unsigned int *available)
-{
- return __rte_ring_do_dequeue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE,
- r->cons.single, available);
-}
+ unsigned int n, unsigned int *available);
#ifdef __cplusplus
}
diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h
index 3976757ed..2a6b00370 100644
--- a/lib/librte_ring/rte_ring_elem.h
+++ b/lib/librte_ring/rte_ring_elem.h
@@ -109,380 +109,6 @@ __rte_experimental
struct rte_ring *rte_ring_create_elem(const char *name, unsigned int esize,
unsigned int count, int socket_id, unsigned int flags);
-static __rte_always_inline void
-__rte_ring_enqueue_elems_32(struct rte_ring *r, const uint32_t size,
- uint32_t idx, const void *obj_table, uint32_t n)
-{
- unsigned int i;
- uint32_t *ring = (uint32_t *)&r[1];
- const uint32_t *obj = (const uint32_t *)obj_table;
- if (likely(idx + n < size)) {
- for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
- ring[idx] = obj[i];
- ring[idx + 1] = obj[i + 1];
- ring[idx + 2] = obj[i + 2];
- ring[idx + 3] = obj[i + 3];
- ring[idx + 4] = obj[i + 4];
- ring[idx + 5] = obj[i + 5];
- ring[idx + 6] = obj[i + 6];
- ring[idx + 7] = obj[i + 7];
- }
- switch (n & 0x7) {
- case 7:
- ring[idx++] = obj[i++]; /* fallthrough */
- case 6:
- ring[idx++] = obj[i++]; /* fallthrough */
- case 5:
- ring[idx++] = obj[i++]; /* fallthrough */
- case 4:
- ring[idx++] = obj[i++]; /* fallthrough */
- case 3:
- ring[idx++] = obj[i++]; /* fallthrough */
- case 2:
- ring[idx++] = obj[i++]; /* fallthrough */
- case 1:
- ring[idx++] = obj[i++]; /* fallthrough */
- }
- } else {
- for (i = 0; idx < size; i++, idx++)
- ring[idx] = obj[i];
- /* Start at the beginning */
- for (idx = 0; i < n; i++, idx++)
- ring[idx] = obj[i];
- }
-}
-
-static __rte_always_inline void
-__rte_ring_enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
- const void *obj_table, uint32_t n)
-{
- unsigned int i;
- const uint32_t size = r->size;
- uint32_t idx = prod_head & r->mask;
- uint64_t *ring = (uint64_t *)&r[1];
- const uint64_t *obj = (const uint64_t *)obj_table;
- if (likely(idx + n < size)) {
- for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
- ring[idx] = obj[i];
- ring[idx + 1] = obj[i + 1];
- ring[idx + 2] = obj[i + 2];
- ring[idx + 3] = obj[i + 3];
- }
- switch (n & 0x3) {
- case 3:
- ring[idx++] = obj[i++]; /* fallthrough */
- case 2:
- ring[idx++] = obj[i++]; /* fallthrough */
- case 1:
- ring[idx++] = obj[i++];
- }
- } else {
- for (i = 0; idx < size; i++, idx++)
- ring[idx] = obj[i];
- /* Start at the beginning */
- for (idx = 0; i < n; i++, idx++)
- ring[idx] = obj[i];
- }
-}
-
-static __rte_always_inline void
-__rte_ring_enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
- const void *obj_table, uint32_t n)
-{
- unsigned int i;
- const uint32_t size = r->size;
- uint32_t idx = prod_head & r->mask;
- rte_int128_t *ring = (rte_int128_t *)&r[1];
- const rte_int128_t *obj = (const rte_int128_t *)obj_table;
- if (likely(idx + n < size)) {
- for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
- memcpy((void *)(ring + idx),
- (const void *)(obj + i), 32);
- switch (n & 0x1) {
- case 1:
- memcpy((void *)(ring + idx),
- (const void *)(obj + i), 16);
- }
- } else {
- for (i = 0; idx < size; i++, idx++)
- memcpy((void *)(ring + idx),
- (const void *)(obj + i), 16);
- /* Start at the beginning */
- for (idx = 0; i < n; i++, idx++)
- memcpy((void *)(ring + idx),
- (const void *)(obj + i), 16);
- }
-}
-
-/* the actual enqueue of elements on the ring.
- * Placed here since identical code needed in both
- * single and multi producer enqueue functions.
- */
-static __rte_always_inline void
-__rte_ring_enqueue_elems(struct rte_ring *r, uint32_t prod_head,
- const void *obj_table, uint32_t esize, uint32_t num)
-{
- /* 8B and 16B copies implemented individually to retain
- * the current performance.
- */
- if (esize == 8)
- __rte_ring_enqueue_elems_64(r, prod_head, obj_table, num);
- else if (esize == 16)
- __rte_ring_enqueue_elems_128(r, prod_head, obj_table, num);
- else {
- uint32_t idx, scale, nr_idx, nr_num, nr_size;
-
- /* Normalize to uint32_t */
- scale = esize / sizeof(uint32_t);
- nr_num = num * scale;
- idx = prod_head & r->mask;
- nr_idx = idx * scale;
- nr_size = r->size * scale;
- __rte_ring_enqueue_elems_32(r, nr_size, nr_idx,
- obj_table, nr_num);
- }
-}
-
-static __rte_always_inline void
-__rte_ring_dequeue_elems_32(struct rte_ring *r, const uint32_t size,
- uint32_t idx, void *obj_table, uint32_t n)
-{
- unsigned int i;
- uint32_t *ring = (uint32_t *)&r[1];
- uint32_t *obj = (uint32_t *)obj_table;
- if (likely(idx + n < size)) {
- for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
- obj[i] = ring[idx];
- obj[i + 1] = ring[idx + 1];
- obj[i + 2] = ring[idx + 2];
- obj[i + 3] = ring[idx + 3];
- obj[i + 4] = ring[idx + 4];
- obj[i + 5] = ring[idx + 5];
- obj[i + 6] = ring[idx + 6];
- obj[i + 7] = ring[idx + 7];
- }
- switch (n & 0x7) {
- case 7:
- obj[i++] = ring[idx++]; /* fallthrough */
- case 6:
- obj[i++] = ring[idx++]; /* fallthrough */
- case 5:
- obj[i++] = ring[idx++]; /* fallthrough */
- case 4:
- obj[i++] = ring[idx++]; /* fallthrough */
- case 3:
- obj[i++] = ring[idx++]; /* fallthrough */
- case 2:
- obj[i++] = ring[idx++]; /* fallthrough */
- case 1:
- obj[i++] = ring[idx++]; /* fallthrough */
- }
- } else {
- for (i = 0; idx < size; i++, idx++)
- obj[i] = ring[idx];
- /* Start at the beginning */
- for (idx = 0; i < n; i++, idx++)
- obj[i] = ring[idx];
- }
-}
-
-static __rte_always_inline void
-__rte_ring_dequeue_elems_64(struct rte_ring *r, uint32_t prod_head,
- void *obj_table, uint32_t n)
-{
- unsigned int i;
- const uint32_t size = r->size;
- uint32_t idx = prod_head & r->mask;
- uint64_t *ring = (uint64_t *)&r[1];
- uint64_t *obj = (uint64_t *)obj_table;
- if (likely(idx + n < size)) {
- for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
- obj[i] = ring[idx];
- obj[i + 1] = ring[idx + 1];
- obj[i + 2] = ring[idx + 2];
- obj[i + 3] = ring[idx + 3];
- }
- switch (n & 0x3) {
- case 3:
- obj[i++] = ring[idx++]; /* fallthrough */
- case 2:
- obj[i++] = ring[idx++]; /* fallthrough */
- case 1:
- obj[i++] = ring[idx++]; /* fallthrough */
- }
- } else {
- for (i = 0; idx < size; i++, idx++)
- obj[i] = ring[idx];
- /* Start at the beginning */
- for (idx = 0; i < n; i++, idx++)
- obj[i] = ring[idx];
- }
-}
-
-static __rte_always_inline void
-__rte_ring_dequeue_elems_128(struct rte_ring *r, uint32_t prod_head,
- void *obj_table, uint32_t n)
-{
- unsigned int i;
- const uint32_t size = r->size;
- uint32_t idx = prod_head & r->mask;
- rte_int128_t *ring = (rte_int128_t *)&r[1];
- rte_int128_t *obj = (rte_int128_t *)obj_table;
- if (likely(idx + n < size)) {
- for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
- memcpy((void *)(obj + i), (void *)(ring + idx), 32);
- switch (n & 0x1) {
- case 1:
- memcpy((void *)(obj + i), (void *)(ring + idx), 16);
- }
- } else {
- for (i = 0; idx < size; i++, idx++)
- memcpy((void *)(obj + i), (void *)(ring + idx), 16);
- /* Start at the beginning */
- for (idx = 0; i < n; i++, idx++)
- memcpy((void *)(obj + i), (void *)(ring + idx), 16);
- }
-}
-
-/* the actual dequeue of elements from the ring.
- * Placed here since identical code needed in both
- * single and multi producer enqueue functions.
- */
-static __rte_always_inline void
-__rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head,
- void *obj_table, uint32_t esize, uint32_t num)
-{
- /* 8B and 16B copies implemented individually to retain
- * the current performance.
- */
- if (esize == 8)
- __rte_ring_dequeue_elems_64(r, cons_head, obj_table, num);
- else if (esize == 16)
- __rte_ring_dequeue_elems_128(r, cons_head, obj_table, num);
- else {
- uint32_t idx, scale, nr_idx, nr_num, nr_size;
-
- /* Normalize to uint32_t */
- scale = esize / sizeof(uint32_t);
- nr_num = num * scale;
- idx = cons_head & r->mask;
- nr_idx = idx * scale;
- nr_size = r->size * scale;
- __rte_ring_dequeue_elems_32(r, nr_size, nr_idx,
- obj_table, nr_num);
- }
-}
-
-/* Between load and load. there might be cpu reorder in weak model
- * (powerpc/arm).
- * There are 2 choices for the users
- * 1.use rmb() memory barrier
- * 2.use one-direction load_acquire/store_release barrier,defined by
- * CONFIG_RTE_USE_C11_MEM_MODEL=y
- * It depends on performance test results.
- * By default, move common functions to rte_ring_generic.h
- */
-#ifdef RTE_USE_C11_MEM_MODEL
-#include "rte_ring_c11_mem.h"
-#else
-#include "rte_ring_generic.h"
-#endif
-
-/**
- * @internal Enqueue several objects on the ring
- *
- * @param r
- * A pointer to the ring structure.
- * @param obj_table
- * A pointer to a table of objects.
- * @param esize
- * The size of ring element, in bytes. It must be a multiple of 4.
- * This must be the same value used while creating the ring. Otherwise
- * the results are undefined.
- * @param n
- * The number of objects to add in the ring from the obj_table.
- * @param behavior
- * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
- * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
- * @param is_sp
- * Indicates whether to use single producer or multi-producer head update
- * @param free_space
- * returns the amount of space after the enqueue operation has finished
- * @return
- * Actual number of objects enqueued.
- * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
- */
-static __rte_always_inline unsigned int
-__rte_ring_do_enqueue_elem(struct rte_ring *r, const void *obj_table,
- unsigned int esize, unsigned int n,
- enum rte_ring_queue_behavior behavior, unsigned int is_sp,
- unsigned int *free_space)
-{
- uint32_t prod_head, prod_next;
- uint32_t free_entries;
-
- n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
- &prod_head, &prod_next, &free_entries);
- if (n == 0)
- goto end;
-
- __rte_ring_enqueue_elems(r, prod_head, obj_table, esize, n);
-
- update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
-end:
- if (free_space != NULL)
- *free_space = free_entries - n;
- return n;
-}
-
-/**
- * @internal Dequeue several objects from the ring
- *
- * @param r
- * A pointer to the ring structure.
- * @param obj_table
- * A pointer to a table of objects.
- * @param esize
- * The size of ring element, in bytes. It must be a multiple of 4.
- * This must be the same value used while creating the ring. Otherwise
- * the results are undefined.
- * @param n
- * The number of objects to pull from the ring.
- * @param behavior
- * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
- * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
- * @param is_sc
- * Indicates whether to use single consumer or multi-consumer head update
- * @param available
- * returns the number of remaining ring entries after the dequeue has finished
- * @return
- * - Actual number of objects dequeued.
- * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
- */
-static __rte_always_inline unsigned int
-__rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table,
- unsigned int esize, unsigned int n,
- enum rte_ring_queue_behavior behavior, unsigned int is_sc,
- unsigned int *available)
-{
- uint32_t cons_head, cons_next;
- uint32_t entries;
-
- n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
- &cons_head, &cons_next, &entries);
- if (n == 0)
- goto end;
-
- __rte_ring_dequeue_elems(r, cons_head, obj_table, esize, n);
-
- update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
-
-end:
- if (available != NULL)
- *available = entries - n;
- return n;
-}
-
/**
* Enqueue several objects on the ring (multi-producers safe).
*
@@ -505,13 +131,9 @@ __rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table,
* @return
* The number of objects enqueued, either 0 or n
*/
-static __rte_always_inline unsigned int
+unsigned int
rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
- unsigned int esize, unsigned int n, unsigned int *free_space)
-{
- return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_FIXED, __IS_MP, free_space);
-}
+ unsigned int esize, unsigned int n, unsigned int *free_space);
/**
* Enqueue several objects on a ring
@@ -534,13 +156,9 @@ rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
* @return
* The number of objects enqueued, either 0 or n
*/
-static __rte_always_inline unsigned int
+unsigned int
rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
- unsigned int esize, unsigned int n, unsigned int *free_space)
-{
- return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_FIXED, __IS_SP, free_space);
-}
+ unsigned int esize, unsigned int n, unsigned int *free_space);
/**
* Enqueue several objects on a ring.
@@ -565,13 +183,9 @@ rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
* @return
* The number of objects enqueued, either 0 or n
*/
-static __rte_always_inline unsigned int
+unsigned int
rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
- unsigned int esize, unsigned int n, unsigned int *free_space)
-{
- return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_FIXED, r->prod.single, free_space);
-}
+ unsigned int esize, unsigned int n, unsigned int *free_space);
/**
* Enqueue one object on a ring (multi-producers safe).
@@ -591,12 +205,8 @@ rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
* - 0: Success; objects enqueued.
* - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
*/
-static __rte_always_inline int
-rte_ring_mp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
-{
- return rte_ring_mp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
- -ENOBUFS;
-}
+int
+rte_ring_mp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize);
/**
* Enqueue one object on a ring
@@ -615,12 +225,8 @@ rte_ring_mp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
* - 0: Success; objects enqueued.
* - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
*/
-static __rte_always_inline int
-rte_ring_sp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
-{
- return rte_ring_sp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
- -ENOBUFS;
-}
+int
+rte_ring_sp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize);
/**
* Enqueue one object on a ring.
@@ -641,12 +247,8 @@ rte_ring_sp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
* - 0: Success; objects enqueued.
* - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
*/
-static __rte_always_inline int
-rte_ring_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
-{
- return rte_ring_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
- -ENOBUFS;
-}
+int
+rte_ring_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize);
/**
* Dequeue several objects from a ring (multi-consumers safe).
@@ -670,13 +272,9 @@ rte_ring_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
* @return
* The number of objects dequeued, either 0 or n
*/
-static __rte_always_inline unsigned int
+unsigned int
rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
- unsigned int esize, unsigned int n, unsigned int *available)
-{
- return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_FIXED, __IS_MC, available);
-}
+ unsigned int esize, unsigned int n, unsigned int *available);
/**
* Dequeue several objects from a ring (NOT multi-consumers safe).
@@ -698,13 +296,9 @@ rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
* @return
* The number of objects dequeued, either 0 or n
*/
-static __rte_always_inline unsigned int
+unsigned int
rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
- unsigned int esize, unsigned int n, unsigned int *available)
-{
- return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_FIXED, __IS_SC, available);
-}
+ unsigned int esize, unsigned int n, unsigned int *available);
/**
* Dequeue several objects from a ring.
@@ -729,13 +323,9 @@ rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
* @return
* The number of objects dequeued, either 0 or n
*/
-static __rte_always_inline unsigned int
+unsigned int
rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
- unsigned int esize, unsigned int n, unsigned int *available)
-{
- return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_FIXED, r->cons.single, available);
-}
+ unsigned int esize, unsigned int n, unsigned int *available);
/**
* Dequeue one object from a ring (multi-consumers safe).
@@ -756,13 +346,9 @@ rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
* - -ENOENT: Not enough entries in the ring to dequeue; no object is
* dequeued.
*/
-static __rte_always_inline int
+int
rte_ring_mc_dequeue_elem(struct rte_ring *r, void *obj_p,
- unsigned int esize)
-{
- return rte_ring_mc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
- -ENOENT;
-}
+ unsigned int esize);
/**
* Dequeue one object from a ring (NOT multi-consumers safe).
@@ -780,13 +366,9 @@ rte_ring_mc_dequeue_elem(struct rte_ring *r, void *obj_p,
* - -ENOENT: Not enough entries in the ring to dequeue, no object is
* dequeued.
*/
-static __rte_always_inline int
+int
rte_ring_sc_dequeue_elem(struct rte_ring *r, void *obj_p,
- unsigned int esize)
-{
- return rte_ring_sc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
- -ENOENT;
-}
+ unsigned int esize);
/**
* Dequeue one object from a ring.
@@ -808,12 +390,8 @@ rte_ring_sc_dequeue_elem(struct rte_ring *r, void *obj_p,
* - -ENOENT: Not enough entries in the ring to dequeue, no object is
* dequeued.
*/
-static __rte_always_inline int
-rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int esize)
-{
- return rte_ring_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
- -ENOENT;
-}
+int
+rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int esize);
/**
* Enqueue several objects on the ring (multi-producers safe).
@@ -837,13 +415,9 @@ rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int esize)
* @return
* - n: Actual number of objects enqueued.
*/
-static __rte_always_inline unsigned
+unsigned
rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
- unsigned int esize, unsigned int n, unsigned int *free_space)
-{
- return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
-}
+ unsigned int esize, unsigned int n, unsigned int *free_space);
/**
* Enqueue several objects on a ring
@@ -866,13 +440,9 @@ rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
* @return
* - n: Actual number of objects enqueued.
*/
-static __rte_always_inline unsigned
+unsigned
rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
- unsigned int esize, unsigned int n, unsigned int *free_space)
-{
- return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
-}
+ unsigned int esize, unsigned int n, unsigned int *free_space);
/**
* Enqueue several objects on a ring.
@@ -897,13 +467,9 @@ rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
* @return
* - n: Actual number of objects enqueued.
*/
-static __rte_always_inline unsigned
+unsigned
rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
- unsigned int esize, unsigned int n, unsigned int *free_space)
-{
- return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_VARIABLE, r->prod.single, free_space);
-}
+ unsigned int esize, unsigned int n, unsigned int *free_space);
/**
* Dequeue several objects from a ring (multi-consumers safe). When the request
@@ -929,13 +495,9 @@ rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
* @return
* - n: Actual number of objects dequeued, 0 if ring is empty
*/
-static __rte_always_inline unsigned
+unsigned
rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
- unsigned int esize, unsigned int n, unsigned int *available)
-{
- return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
-}
+ unsigned int esize, unsigned int n, unsigned int *available);
/**
* Dequeue several objects from a ring (NOT multi-consumers safe).When the
@@ -958,13 +520,9 @@ rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
* @return
* - n: Actual number of objects dequeued, 0 if ring is empty
*/
-static __rte_always_inline unsigned
+unsigned
rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
- unsigned int esize, unsigned int n, unsigned int *available)
-{
- return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
-}
+ unsigned int esize, unsigned int n, unsigned int *available);
/**
* Dequeue multiple objects from a ring up to a maximum number.
@@ -989,14 +547,9 @@ rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
* @return
* - Number of objects dequeued
*/
-static __rte_always_inline unsigned int
+unsigned int
rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
- unsigned int esize, unsigned int n, unsigned int *available)
-{
- return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_VARIABLE,
- r->cons.single, available);
-}
+ unsigned int esize, unsigned int n, unsigned int *available);
#ifdef __cplusplus
}
diff --git a/lib/librte_ring/rte_ring_version.map b/lib/librte_ring/rte_ring_version.map
index e88c143cf..6cceacf95 100644
--- a/lib/librte_ring/rte_ring_version.map
+++ b/lib/librte_ring/rte_ring_version.map
@@ -8,6 +8,52 @@ DPDK_20.0 {
rte_ring_init;
rte_ring_list_dump;
rte_ring_lookup;
+ rte_ring_count;
+ rte_ring_dequeue;
+ rte_ring_dequeue_bulk;
+ rte_ring_dequeue_bulk_elem;
+ rte_ring_dequeue_burst;
+ rte_ring_dequeue_burst_elem;
+ rte_ring_dequeue_elem;
+ rte_ring_empty;
+ rte_ring_enqueue;
+ rte_ring_enqueue_bulk;
+ rte_ring_enqueue_bulk_elem;
+ rte_ring_enqueue_burst;
+ rte_ring_enqueue_burst_elem;
+ rte_ring_enqueue_elem;
+ rte_ring_free_count;
+ rte_ring_full;
+ rte_ring_get_capacity;
+ rte_ring_get_cons_sync_type;
+ rte_ring_get_memzone;
+ rte_ring_get_name;
+ rte_ring_get_prod_sync_type;
+ rte_ring_get_size;
+ rte_ring_mc_dequeue;
+ rte_ring_mc_dequeue_bulk;
+ rte_ring_mc_dequeue_bulk_elem;
+ rte_ring_mc_dequeue_burst;
+ rte_ring_mc_dequeue_burst_elem;
+ rte_ring_mc_dequeue_elem;
+ rte_ring_mp_enqueue;
+ rte_ring_mp_enqueue_bulk;
+ rte_ring_mp_enqueue_bulk_elem;
+ rte_ring_mp_enqueue_burst;
+ rte_ring_mp_enqueue_burst_elem;
+ rte_ring_mp_enqueue_elem;
+ rte_ring_sc_dequeue;
+ rte_ring_sc_dequeue_bulk;
+ rte_ring_sc_dequeue_bulk_elem;
+ rte_ring_sc_dequeue_burst;
+ rte_ring_sc_dequeue_burst_elem;
+ rte_ring_sc_dequeue_elem;
+ rte_ring_sp_enqueue;
+ rte_ring_sp_enqueue_bulk;
+ rte_ring_sp_enqueue_bulk_elem;
+ rte_ring_sp_enqueue_burst;
+ rte_ring_sp_enqueue_burst_elem;
+ rte_ring_sp_enqueue_elem;
local: *;
};
--
2.17.1
next reply other threads:[~2020-03-20 16:41 UTC|newest]
Thread overview: 12+ messages / expand[flat|nested] mbox.gz Atom feed top
2020-03-20 16:41 Konstantin Ananyev [this message]
2020-03-20 17:54 ` Stephen Hemminger
2020-03-21 1:03 ` Ananyev, Konstantin
2020-03-25 21:09 ` Jerin Jacob
2020-03-26 0:28 ` Ananyev, Konstantin
2020-03-26 8:04 ` Morten Brørup
2020-03-31 23:25 ` Thomas Monjalon
2020-06-30 23:15 ` Honnappa Nagarahalli
2020-07-01 7:27 ` Morten Brørup
2020-07-01 12:21 ` Ananyev, Konstantin
2020-07-01 14:11 ` Honnappa Nagarahalli
2020-07-01 14:31 ` David Marchand
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20200320164138.8510-1-konstantin.ananyev@intel.com \
--to=konstantin.ananyev@intel.com \
--cc=dev@dpdk.org \
--cc=drc@linux.vnet.ibm.com \
--cc=honnappa.nagarahalli@arm.com \
--cc=jerinj@marvell.com \
--cc=olivier.matz@6wind.com \
--cc=stephen@networkplumber.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).