From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id 60AE5A0583; Fri, 20 Mar 2020 17:41:49 +0100 (CET) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 2D5D62BB9; Fri, 20 Mar 2020 17:41:48 +0100 (CET) Received: from mga06.intel.com (mga06.intel.com [134.134.136.31]) by dpdk.org (Postfix) with ESMTP id C38B33B5 for ; Fri, 20 Mar 2020 17:41:45 +0100 (CET) IronPort-SDR: DLmc3Q8rF+nIPB5M8C8Sg8pkHeWA8l9vsvxQEwZ98NCJW6YhSa6ByVXQBPFYZpOOn07JSasbJO WgX0RsLkmEVw== X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga004.fm.intel.com ([10.253.24.48]) by orsmga104.jf.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 20 Mar 2020 09:41:43 -0700 IronPort-SDR: /pyFA0YHFWHl16CsPsBkZ5FpSdkzevbYlFOZdc3bxHOc8EwxCkpbuLigQP8PatlF1L3s3zrjJn dEDFMixopvjQ== X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.72,285,1580803200"; d="scan'208";a="269146235" Received: from sivswdev08.ir.intel.com ([10.237.217.47]) by fmsmga004.fm.intel.com with ESMTP; 20 Mar 2020 09:41:40 -0700 From: Konstantin Ananyev To: dev@dpdk.org Cc: olivier.matz@6wind.com, honnappa.nagarahalli@arm.com, jerinj@marvell.com, drc@linux.vnet.ibm.com, stephen@networkplumber.org, Konstantin Ananyev Date: Fri, 20 Mar 2020 16:41:38 +0000 Message-Id: <20200320164138.8510-1-konstantin.ananyev@intel.com> X-Mailer: git-send-email 2.18.0 Subject: [dpdk-dev] [RFC] ring: make ring implementation non-inlined X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" As was discussed here: http://mails.dpdk.org/archives/dev/2020-February/158586.html this RFC aimed to hide ring internals into .c and make all ring functions non-inlined. In theory that might help to maintain ABI stability in future. This is just a POC to measure the impact of proposed idea, proper implementation would definetly need some extra effort. On IA box (SKX) ring_perf_autotest shows ~20-30 cycles extra for enqueue+dequeue pair. On some more realistic code, I suspect the impact it might be a bit higher. For MP/MC bulk transfers degradation seems quite small, though for SP/SC and/or small transfers it is more then noticable (see exact numbers below). >From my perspective we'd probably keep it inlined for now to avoid any non-anticipated perfomance degradations. Though intersted to see perf results and opinions from other interested parties. Intel(R) Xeon(R) Platinum 8160 CPU @ 2.10GHz ring_perf_autotest (without patch/with patch) ### Testing single element enq/deq ### legacy APIs: SP/SC: single: 8.75/43.23 legacy APIs: MP/MC: single: 56.18/80.44 ### Testing burst enq/deq ### legacy APIs: SP/SC: burst (size: 8): 37.36/53.37 legacy APIs: SP/SC: burst (size: 32): 93.97/117.30 legacy APIs: MP/MC: burst (size: 8): 78.23/91.45 legacy APIs: MP/MC: burst (size: 32): 131.59/152.49 ### Testing bulk enq/deq ### legacy APIs: SP/SC: bulk (size: 8): 37.29/54.48 legacy APIs: SP/SC: bulk (size: 32): 92.68/113.01 legacy APIs: MP/MC: bulk (size: 8): 78.40/93.50 legacy APIs: MP/MC: bulk (size: 32): 131.49/154.25 ### Testing empty bulk deq ### legacy APIs: SP/SC: bulk (size: 8): 4.00/16.86 legacy APIs: MP/MC: bulk (size: 8): 7.01/15.55 ### Testing using two hyperthreads ### legacy APIs: SP/SC: bulk (size: 8): 10.64/17.56 legacy APIs: MP/MC: bulk (size: 8): 15.30/16.69 legacy APIs: SP/SC: bulk (size: 32): 5.84/7.09 legacy APIs: MP/MC: bulk (size: 32): 6.34/7.54 ### Testing using two physical cores ### legacy APIs: SP/SC: bulk (size: 8): 24.34/42.40 legacy APIs: MP/MC: bulk (size: 8): 70.34/71.82 legacy APIs: SP/SC: bulk (size: 32): 12.67/14.68 legacy APIs: MP/MC: bulk (size: 32): 22.41/17.93 ### Testing single element enq/deq ### elem APIs: element size 16B: SP/SC: single: 10.65/41.96 elem APIs: element size 16B: MP/MC: single: 44.33/81.36 ### Testing burst enq/deq ### elem APIs: element size 16B: SP/SC: burst (size: 8): 39.20/58.52 elem APIs: element size 16B: SP/SC: burst (size: 32): 123.19/142.79 elem APIs: element size 16B: MP/MC: burst (size: 8): 80.72/101.36 elem APIs: element size 16B: MP/MC: burst (size: 32): 169.21/185.38 ### Testing bulk enq/deq ### elem APIs: element size 16B: SP/SC: bulk (size: 8): 41.64/58.46 elem APIs: element size 16B: SP/SC: bulk (size: 32): 122.74/142.52 elem APIs: element size 16B: MP/MC: bulk (size: 8): 80.60/103.14 elem APIs: element size 16B: MP/MC: bulk (size: 32): 169.39/186.67 ### Testing empty bulk deq ### elem APIs: element size 16B: SP/SC: bulk (size: 8): 5.01/17.17 elem APIs: element size 16B: MP/MC: bulk (size: 8): 6.01/14.80 ### Testing using two hyperthreads ### elem APIs: element size 16B: SP/SC: bulk (size: 8): 12.02/17.18 elem APIs: element size 16B: MP/MC: bulk (size: 8): 16.81/21.14 elem APIs: element size 16B: SP/SC: bulk (size: 32): 7.87/9.01 elem APIs: element size 16B: MP/MC: bulk (size: 32): 8.22/10.57 ### Testing using two physical cores ### elem APIs: element size 16B: SP/SC: bulk (size: 8): 27.00/51.94 elem APIs: element size 16B: MP/MC: bulk (size: 8): 78.24/74.48 elem APIs: element size 16B: SP/SC: bulk (size: 32): 15.41/16.14 elem APIs: element size 16B: MP/MC: bulk (size: 32): 18.72/21.64 Signed-off-by: Konstantin Ananyev --- app/proc-info/main.c | 20 +- app/test-pmd/cmdline.c | 2 +- app/test/commands.c | 2 +- app/test/test_pdump.c | 6 +- app/test/test_ring_stress.h | 2 +- config/common_base | 2 +- drivers/crypto/ccp/ccp_pmd_ops.c | 2 +- drivers/crypto/meson.build | 1 - drivers/net/ring/rte_eth_ring.c | 17 +- lib/librte_eventdev/rte_event_ring.c | 6 +- lib/librte_eventdev/rte_event_ring.h | 17 +- lib/librte_pdump/rte_pdump.c | 3 +- lib/librte_port/rte_port_ring.c | 29 +- lib/librte_ring/Makefile | 6 +- lib/librte_ring/meson.build | 6 +- lib/librte_ring/ring_elem.c | 919 +++++++++++++++++++++++++++ lib/librte_ring/ring_impl.c | 750 ++++++++++++++++++++++ lib/librte_ring/ring_impl.h | 64 ++ lib/librte_ring/rte_ring.c | 3 +- lib/librte_ring/rte_ring.h | 412 ++---------- lib/librte_ring/rte_ring_elem.h | 519 ++------------- lib/librte_ring/rte_ring_version.map | 46 ++ 22 files changed, 1938 insertions(+), 896 deletions(-) create mode 100644 lib/librte_ring/ring_elem.c create mode 100644 lib/librte_ring/ring_impl.c create mode 100644 lib/librte_ring/ring_impl.h diff --git a/app/proc-info/main.c b/app/proc-info/main.c index abeca4aab..aa8f113c1 100644 --- a/app/proc-info/main.c +++ b/app/proc-info/main.c @@ -1137,25 +1137,7 @@ show_ring(char *name) if (name != NULL) { struct rte_ring *ptr = rte_ring_lookup(name); if (ptr != NULL) { - printf(" - Name (%s) on socket (%d)\n" - " - flags:\n" - "\t -- Single Producer Enqueue (%u)\n" - "\t -- Single Consmer Dequeue (%u)\n", - ptr->name, - ptr->memzone->socket_id, - ptr->flags & RING_F_SP_ENQ, - ptr->flags & RING_F_SC_DEQ); - printf(" - size (%u) mask (0x%x) capacity (%u)\n", - ptr->size, - ptr->mask, - ptr->capacity); - printf(" - count (%u) free count (%u)\n", - rte_ring_count(ptr), - rte_ring_free_count(ptr)); - printf(" - full (%d) empty (%d)\n", - rte_ring_full(ptr), - rte_ring_empty(ptr)); - + rte_ring_dump(stdout, ptr); STATS_BDR_STR(50, ""); return; } diff --git a/app/test-pmd/cmdline.c b/app/test-pmd/cmdline.c index a037a55c6..000853067 100644 --- a/app/test-pmd/cmdline.c +++ b/app/test-pmd/cmdline.c @@ -9562,7 +9562,7 @@ dump_struct_sizes(void) #define DUMP_SIZE(t) printf("sizeof(" #t ") = %u\n", (unsigned)sizeof(t)); DUMP_SIZE(struct rte_mbuf); DUMP_SIZE(struct rte_mempool); - DUMP_SIZE(struct rte_ring); + //DUMP_SIZE(struct rte_ring); #undef DUMP_SIZE } diff --git a/app/test/commands.c b/app/test/commands.c index 3bf767bf7..89f93cdb2 100644 --- a/app/test/commands.c +++ b/app/test/commands.c @@ -107,7 +107,7 @@ dump_struct_sizes(void) #define DUMP_SIZE(t) printf("sizeof(" #t ") = %u\n", (unsigned)sizeof(t)); DUMP_SIZE(struct rte_mbuf); DUMP_SIZE(struct rte_mempool); - DUMP_SIZE(struct rte_ring); + //DUMP_SIZE(struct rte_ring); #undef DUMP_SIZE } diff --git a/app/test/test_pdump.c b/app/test/test_pdump.c index ad183184c..6a1180bcb 100644 --- a/app/test/test_pdump.c +++ b/app/test/test_pdump.c @@ -57,8 +57,7 @@ run_pdump_client_tests(void) if (ret < 0) return -1; mp->flags = 0x0000; - ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(), - RING_F_SP_ENQ | RING_F_SC_DEQ); + ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(), 0); if (ring_client == NULL) { printf("rte_ring_create SR0 failed"); return -1; @@ -71,9 +70,6 @@ run_pdump_client_tests(void) } rte_eth_dev_probing_finish(eth_dev); - ring_client->prod.single = 0; - ring_client->cons.single = 0; - printf("\n***** flags = RTE_PDUMP_FLAG_TX *****\n"); for (itr = 0; itr < NUM_ITR; itr++) { diff --git a/app/test/test_ring_stress.h b/app/test/test_ring_stress.h index c6f0bc9f1..27d4adfb2 100644 --- a/app/test/test_ring_stress.h +++ b/app/test/test_ring_stress.h @@ -350,7 +350,7 @@ mt1_init(struct rte_ring **rng, void **data, uint32_t num) /* alloc ring */ nr = 2 * num; sz = rte_ring_get_memsize(nr); - r = rte_zmalloc(NULL, sz, alignof(*r)); + r = rte_zmalloc(NULL, sz, RTE_CACHE_LINE_SIZE); if (r == NULL) { printf("%s: alloca(%zu) for FIFO with %u elems failed", __func__, sz, nr); diff --git a/config/common_base b/config/common_base index 7ca2f28b1..87ab0766d 100644 --- a/config/common_base +++ b/config/common_base @@ -670,7 +670,7 @@ CONFIG_RTE_LIBRTE_PMD_ZUC=n # Compile PMD for Crypto Scheduler device # -CONFIG_RTE_LIBRTE_PMD_CRYPTO_SCHEDULER=y +CONFIG_RTE_LIBRTE_PMD_CRYPTO_SCHEDULER=n # # Compile PMD for NULL Crypto device diff --git a/drivers/crypto/ccp/ccp_pmd_ops.c b/drivers/crypto/ccp/ccp_pmd_ops.c index a19e85ecb..938977dc0 100644 --- a/drivers/crypto/ccp/ccp_pmd_ops.c +++ b/drivers/crypto/ccp/ccp_pmd_ops.c @@ -666,7 +666,7 @@ ccp_pmd_qp_create_batch_info_ring(struct ccp_qp *qp, r = rte_ring_lookup(qp->name); if (r) { - if (r->size >= ring_size) { + if (rte_ring_get_size(r) >= ring_size) { CCP_LOG_INFO( "Reusing ring %s for processed packets", qp->name); diff --git a/drivers/crypto/meson.build b/drivers/crypto/meson.build index 7fa1fbe26..8c8b78db3 100644 --- a/drivers/crypto/meson.build +++ b/drivers/crypto/meson.build @@ -16,7 +16,6 @@ drivers = ['aesni_gcm', 'octeontx2', 'openssl', 'qat', - 'scheduler', 'snow3g', 'virtio', 'zuc'] diff --git a/drivers/net/ring/rte_eth_ring.c b/drivers/net/ring/rte_eth_ring.c index 41acbc513..0eaac0dc3 100644 --- a/drivers/net/ring/rte_eth_ring.c +++ b/drivers/net/ring/rte_eth_ring.c @@ -41,6 +41,8 @@ struct ring_queue { struct rte_ring *rng; rte_atomic64_t rx_pkts; rte_atomic64_t tx_pkts; + uint32_t rx_sync_type; + uint32_t tx_sync_type; }; struct pmd_internals { @@ -74,7 +76,7 @@ eth_ring_rx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs) struct ring_queue *r = q; const uint16_t nb_rx = (uint16_t)rte_ring_dequeue_burst(r->rng, ptrs, nb_bufs, NULL); - if (r->rng->flags & RING_F_SC_DEQ) + if (r->rx_sync_type == RTE_RING_SYNC_ST) r->rx_pkts.cnt += nb_rx; else rte_atomic64_add(&(r->rx_pkts), nb_rx); @@ -88,7 +90,7 @@ eth_ring_tx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs) struct ring_queue *r = q; const uint16_t nb_tx = (uint16_t)rte_ring_enqueue_burst(r->rng, ptrs, nb_bufs, NULL); - if (r->rng->flags & RING_F_SP_ENQ) + if (r->tx_sync_type == RTE_RING_SYNC_ST) r->tx_pkts.cnt += nb_tx; else rte_atomic64_add(&(r->tx_pkts), nb_tx); @@ -306,10 +308,14 @@ do_eth_dev_ring_create(const char *name, internals->max_tx_queues = nb_tx_queues; for (i = 0; i < nb_rx_queues; i++) { internals->rx_ring_queues[i].rng = rx_queues[i]; + internals->rx_ring_queues[i].rx_sync_type = + rte_ring_get_cons_sync_type(rx_queues[i]); data->rx_queues[i] = &internals->rx_ring_queues[i]; } for (i = 0; i < nb_tx_queues; i++) { internals->tx_ring_queues[i].rng = tx_queues[i]; + internals->tx_ring_queues[i].tx_sync_type = + rte_ring_get_prod_sync_type(rx_queues[i]); data->tx_queues[i] = &internals->tx_ring_queues[i]; } @@ -403,8 +409,11 @@ rte_eth_from_rings(const char *name, struct rte_ring *const rx_queues[], int rte_eth_from_ring(struct rte_ring *r) { - return rte_eth_from_rings(r->name, &r, 1, &r, 1, - r->memzone ? r->memzone->socket_id : SOCKET_ID_ANY); + const struct rte_memzone *mz; + + mz = rte_ring_get_memzone(r); + return rte_eth_from_rings(rte_ring_get_name(r), &r, 1, &r, 1, + mz ? mz->socket_id : SOCKET_ID_ANY); } static int diff --git a/lib/librte_eventdev/rte_event_ring.c b/lib/librte_eventdev/rte_event_ring.c index d27e23901..99700a18c 100644 --- a/lib/librte_eventdev/rte_event_ring.c +++ b/lib/librte_eventdev/rte_event_ring.c @@ -16,12 +16,8 @@ int rte_event_ring_init(struct rte_event_ring *r, const char *name, unsigned int count, unsigned int flags) { - /* compilation-time checks */ - RTE_BUILD_BUG_ON((sizeof(struct rte_event_ring) & - RTE_CACHE_LINE_MASK) != 0); - /* init the ring structure */ - return rte_ring_init(&r->r, name, count, flags); + return rte_ring_init(r, name, count, flags); } /* create the ring */ diff --git a/lib/librte_eventdev/rte_event_ring.h b/lib/librte_eventdev/rte_event_ring.h index c0861b0ec..b31b33b3e 100644 --- a/lib/librte_eventdev/rte_event_ring.h +++ b/lib/librte_eventdev/rte_event_ring.h @@ -32,9 +32,8 @@ * used inside software eventdev implementations and by applications * directly as needed. */ -struct rte_event_ring { - struct rte_ring r; -}; + +#define rte_event_ring rte_ring /** * Returns the number of events in the ring @@ -47,7 +46,7 @@ struct rte_event_ring { static __rte_always_inline unsigned int rte_event_ring_count(const struct rte_event_ring *r) { - return rte_ring_count(&r->r); + return rte_ring_count(r); } /** @@ -62,7 +61,7 @@ rte_event_ring_count(const struct rte_event_ring *r) static __rte_always_inline unsigned int rte_event_ring_free_count(const struct rte_event_ring *r) { - return rte_ring_free_count(&r->r); + return rte_ring_free_count(r); } /** @@ -93,7 +92,7 @@ rte_event_ring_enqueue_burst(struct rte_event_ring *r, unsigned int num; uint32_t space; - num = rte_ring_enqueue_burst_elem(&r->r, events, + num = rte_ring_enqueue_burst_elem(r, events, sizeof(struct rte_event), n, &space); @@ -129,7 +128,7 @@ rte_event_ring_dequeue_burst(struct rte_event_ring *r, unsigned int num; uint32_t remaining; - num = rte_ring_dequeue_burst_elem(&r->r, events, + num = rte_ring_dequeue_burst_elem(r, events, sizeof(struct rte_event), n, &remaining); @@ -250,7 +249,7 @@ rte_event_ring_free(struct rte_event_ring *r); static inline unsigned int rte_event_ring_get_size(const struct rte_event_ring *r) { - return rte_ring_get_size(&r->r); + return rte_ring_get_size(r); } /** @@ -264,6 +263,6 @@ rte_event_ring_get_size(const struct rte_event_ring *r) static inline unsigned int rte_event_ring_get_capacity(const struct rte_event_ring *r) { - return rte_ring_get_capacity(&r->r); + return rte_ring_get_capacity(r); } #endif diff --git a/lib/librte_pdump/rte_pdump.c b/lib/librte_pdump/rte_pdump.c index 8a01ac510..43aa07793 100644 --- a/lib/librte_pdump/rte_pdump.c +++ b/lib/librte_pdump/rte_pdump.c @@ -380,7 +380,8 @@ pdump_validate_ring_mp(struct rte_ring *ring, struct rte_mempool *mp) rte_errno = EINVAL; return -1; } - if (ring->prod.single || ring->cons.single) { + if (rte_ring_get_prod_sync_type(ring) == RTE_RING_SYNC_ST || + rte_ring_get_cons_sync_type(ring) == RTE_RING_SYNC_ST) { PDUMP_LOG(ERR, "ring with either SP or SC settings" " is not valid for pdump, should have MP and MC settings\n"); rte_errno = EINVAL; diff --git a/lib/librte_port/rte_port_ring.c b/lib/librte_port/rte_port_ring.c index 47fcdd06a..16fdee0bd 100644 --- a/lib/librte_port/rte_port_ring.c +++ b/lib/librte_port/rte_port_ring.c @@ -40,12 +40,15 @@ rte_port_ring_reader_create_internal(void *params, int socket_id, struct rte_port_ring_reader_params *conf = params; struct rte_port_ring_reader *port; + uint32_t ring_sync; + + ring_sync = rte_ring_get_cons_sync_type(conf->ring); /* Check input parameters */ if ((conf == NULL) || (conf->ring == NULL) || - (conf->ring->cons.single && is_multi) || - (!(conf->ring->cons.single) && !is_multi)) { + (ring_sync == RTE_RING_SYNC_ST && is_multi) || + (ring_sync != RTE_RING_SYNC_ST && !is_multi)) { RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__); return NULL; } @@ -167,13 +170,16 @@ rte_port_ring_writer_create_internal(void *params, int socket_id, struct rte_port_ring_writer_params *conf = params; struct rte_port_ring_writer *port; + uint32_t ring_sync; + + ring_sync = rte_ring_get_prod_sync_type(conf->ring); /* Check input parameters */ if ((conf == NULL) || - (conf->ring == NULL) || - (conf->ring->prod.single && is_multi) || - (!(conf->ring->prod.single) && !is_multi) || - (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) { + (conf->ring == NULL) || + (ring_sync == RTE_RING_SYNC_ST && is_multi) || + (ring_sync != RTE_RING_SYNC_ST && !is_multi) || + (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) { RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__); return NULL; } @@ -436,13 +442,16 @@ rte_port_ring_writer_nodrop_create_internal(void *params, int socket_id, struct rte_port_ring_writer_nodrop_params *conf = params; struct rte_port_ring_writer_nodrop *port; + uint32_t ring_sync; + + ring_sync = rte_ring_get_prod_sync_type(conf->ring); /* Check input parameters */ if ((conf == NULL) || - (conf->ring == NULL) || - (conf->ring->prod.single && is_multi) || - (!(conf->ring->prod.single) && !is_multi) || - (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) { + (conf->ring == NULL) || + (ring_sync == RTE_RING_SYNC_ST && is_multi) || + (ring_sync != RTE_RING_SYNC_ST && !is_multi) || + (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) { RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__); return NULL; } diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index 917c560ad..8c3cd523d 100644 --- a/lib/librte_ring/Makefile +++ b/lib/librte_ring/Makefile @@ -13,11 +13,11 @@ EXPORT_MAP := rte_ring_version.map # all source are stored in SRCS-y SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c +SRCS-$(CONFIG_RTE_LIBRTE_RING) += ring_impl.c +SRCS-$(CONFIG_RTE_LIBRTE_RING) += ring_elem.c # install includes SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ - rte_ring_elem.h \ - rte_ring_generic.h \ - rte_ring_c11_mem.h + rte_ring_elem.h include $(RTE_SDK)/mk/rte.lib.mk diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index f2f3ccc88..8adbdfc9d 100644 --- a/lib/librte_ring/meson.build +++ b/lib/librte_ring/meson.build @@ -1,11 +1,9 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2017 Intel Corporation -sources = files('rte_ring.c') +sources = files('rte_ring.c', 'ring_impl.c', 'ring_elem.c') headers = files('rte_ring.h', - 'rte_ring_elem.h', - 'rte_ring_c11_mem.h', - 'rte_ring_generic.h') + 'rte_ring_elem.h') # rte_ring_create_elem and rte_ring_get_memsize_elem are experimental allow_experimental_apis = true diff --git a/lib/librte_ring/ring_elem.c b/lib/librte_ring/ring_elem.c new file mode 100644 index 000000000..3588725e7 --- /dev/null +++ b/lib/librte_ring/ring_elem.c @@ -0,0 +1,919 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2019 Arm Limited + * Copyright (c) 2010-2017 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +/** + * @file + * RTE Ring with user defined element size + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "ring_impl.h" + +static __rte_always_inline void +__rte_ring_enqueue_elems_32(struct rte_ring *r, const uint32_t size, + uint32_t idx, const void *obj_table, uint32_t n) +{ + unsigned int i; + uint32_t *ring = (uint32_t *)&r[1]; + const uint32_t *obj = (const uint32_t *)obj_table; + if (likely(idx + n < size)) { + for (i = 0; i < (n & ~0x7); i += 8, idx += 8) { + ring[idx] = obj[i]; + ring[idx + 1] = obj[i + 1]; + ring[idx + 2] = obj[i + 2]; + ring[idx + 3] = obj[i + 3]; + ring[idx + 4] = obj[i + 4]; + ring[idx + 5] = obj[i + 5]; + ring[idx + 6] = obj[i + 6]; + ring[idx + 7] = obj[i + 7]; + } + switch (n & 0x7) { + case 7: + ring[idx++] = obj[i++]; /* fallthrough */ + case 6: + ring[idx++] = obj[i++]; /* fallthrough */ + case 5: + ring[idx++] = obj[i++]; /* fallthrough */ + case 4: + ring[idx++] = obj[i++]; /* fallthrough */ + case 3: + ring[idx++] = obj[i++]; /* fallthrough */ + case 2: + ring[idx++] = obj[i++]; /* fallthrough */ + case 1: + ring[idx++] = obj[i++]; /* fallthrough */ + } + } else { + for (i = 0; idx < size; i++, idx++) + ring[idx] = obj[i]; + /* Start at the beginning */ + for (idx = 0; i < n; i++, idx++) + ring[idx] = obj[i]; + } +} + +static __rte_always_inline void +__rte_ring_enqueue_elems_64(struct rte_ring *r, uint32_t prod_head, + const void *obj_table, uint32_t n) +{ + unsigned int i; + const uint32_t size = r->size; + uint32_t idx = prod_head & r->mask; + uint64_t *ring = (uint64_t *)&r[1]; + const uint64_t *obj = (const uint64_t *)obj_table; + if (likely(idx + n < size)) { + for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { + ring[idx] = obj[i]; + ring[idx + 1] = obj[i + 1]; + ring[idx + 2] = obj[i + 2]; + ring[idx + 3] = obj[i + 3]; + } + switch (n & 0x3) { + case 3: + ring[idx++] = obj[i++]; /* fallthrough */ + case 2: + ring[idx++] = obj[i++]; /* fallthrough */ + case 1: + ring[idx++] = obj[i++]; + } + } else { + for (i = 0; idx < size; i++, idx++) + ring[idx] = obj[i]; + /* Start at the beginning */ + for (idx = 0; i < n; i++, idx++) + ring[idx] = obj[i]; + } +} + +static __rte_always_inline void +__rte_ring_enqueue_elems_128(struct rte_ring *r, uint32_t prod_head, + const void *obj_table, uint32_t n) +{ + unsigned int i; + const uint32_t size = r->size; + uint32_t idx = prod_head & r->mask; + rte_int128_t *ring = (rte_int128_t *)&r[1]; + const rte_int128_t *obj = (const rte_int128_t *)obj_table; + if (likely(idx + n < size)) { + for (i = 0; i < (n & ~0x1); i += 2, idx += 2) + memcpy((void *)(ring + idx), + (const void *)(obj + i), 32); + switch (n & 0x1) { + case 1: + memcpy((void *)(ring + idx), + (const void *)(obj + i), 16); + } + } else { + for (i = 0; idx < size; i++, idx++) + memcpy((void *)(ring + idx), + (const void *)(obj + i), 16); + /* Start at the beginning */ + for (idx = 0; i < n; i++, idx++) + memcpy((void *)(ring + idx), + (const void *)(obj + i), 16); + } +} + +/* the actual enqueue of elements on the ring. + * Placed here since identical code needed in both + * single and multi producer enqueue functions. + */ +static __rte_always_inline void +__rte_ring_enqueue_elems(struct rte_ring *r, uint32_t prod_head, + const void *obj_table, uint32_t esize, uint32_t num) +{ + /* 8B and 16B copies implemented individually to retain + * the current performance. + */ + if (esize == 8) + __rte_ring_enqueue_elems_64(r, prod_head, obj_table, num); + else if (esize == 16) + __rte_ring_enqueue_elems_128(r, prod_head, obj_table, num); + else { + uint32_t idx, scale, nr_idx, nr_num, nr_size; + + /* Normalize to uint32_t */ + scale = esize / sizeof(uint32_t); + nr_num = num * scale; + idx = prod_head & r->mask; + nr_idx = idx * scale; + nr_size = r->size * scale; + __rte_ring_enqueue_elems_32(r, nr_size, nr_idx, + obj_table, nr_num); + } +} + +static __rte_always_inline void +__rte_ring_dequeue_elems_32(struct rte_ring *r, const uint32_t size, + uint32_t idx, void *obj_table, uint32_t n) +{ + unsigned int i; + uint32_t *ring = (uint32_t *)&r[1]; + uint32_t *obj = (uint32_t *)obj_table; + if (likely(idx + n < size)) { + for (i = 0; i < (n & ~0x7); i += 8, idx += 8) { + obj[i] = ring[idx]; + obj[i + 1] = ring[idx + 1]; + obj[i + 2] = ring[idx + 2]; + obj[i + 3] = ring[idx + 3]; + obj[i + 4] = ring[idx + 4]; + obj[i + 5] = ring[idx + 5]; + obj[i + 6] = ring[idx + 6]; + obj[i + 7] = ring[idx + 7]; + } + switch (n & 0x7) { + case 7: + obj[i++] = ring[idx++]; /* fallthrough */ + case 6: + obj[i++] = ring[idx++]; /* fallthrough */ + case 5: + obj[i++] = ring[idx++]; /* fallthrough */ + case 4: + obj[i++] = ring[idx++]; /* fallthrough */ + case 3: + obj[i++] = ring[idx++]; /* fallthrough */ + case 2: + obj[i++] = ring[idx++]; /* fallthrough */ + case 1: + obj[i++] = ring[idx++]; /* fallthrough */ + } + } else { + for (i = 0; idx < size; i++, idx++) + obj[i] = ring[idx]; + /* Start at the beginning */ + for (idx = 0; i < n; i++, idx++) + obj[i] = ring[idx]; + } +} + +static __rte_always_inline void +__rte_ring_dequeue_elems_64(struct rte_ring *r, uint32_t prod_head, + void *obj_table, uint32_t n) +{ + unsigned int i; + const uint32_t size = r->size; + uint32_t idx = prod_head & r->mask; + uint64_t *ring = (uint64_t *)&r[1]; + uint64_t *obj = (uint64_t *)obj_table; + if (likely(idx + n < size)) { + for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { + obj[i] = ring[idx]; + obj[i + 1] = ring[idx + 1]; + obj[i + 2] = ring[idx + 2]; + obj[i + 3] = ring[idx + 3]; + } + switch (n & 0x3) { + case 3: + obj[i++] = ring[idx++]; /* fallthrough */ + case 2: + obj[i++] = ring[idx++]; /* fallthrough */ + case 1: + obj[i++] = ring[idx++]; /* fallthrough */ + } + } else { + for (i = 0; idx < size; i++, idx++) + obj[i] = ring[idx]; + /* Start at the beginning */ + for (idx = 0; i < n; i++, idx++) + obj[i] = ring[idx]; + } +} + +static __rte_always_inline void +__rte_ring_dequeue_elems_128(struct rte_ring *r, uint32_t prod_head, + void *obj_table, uint32_t n) +{ + unsigned int i; + const uint32_t size = r->size; + uint32_t idx = prod_head & r->mask; + rte_int128_t *ring = (rte_int128_t *)&r[1]; + rte_int128_t *obj = (rte_int128_t *)obj_table; + if (likely(idx + n < size)) { + for (i = 0; i < (n & ~0x1); i += 2, idx += 2) + memcpy((void *)(obj + i), (void *)(ring + idx), 32); + switch (n & 0x1) { + case 1: + memcpy((void *)(obj + i), (void *)(ring + idx), 16); + } + } else { + for (i = 0; idx < size; i++, idx++) + memcpy((void *)(obj + i), (void *)(ring + idx), 16); + /* Start at the beginning */ + for (idx = 0; i < n; i++, idx++) + memcpy((void *)(obj + i), (void *)(ring + idx), 16); + } +} + +/* the actual dequeue of elements from the ring. + * Placed here since identical code needed in both + * single and multi producer enqueue functions. + */ +static __rte_always_inline void +__rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head, + void *obj_table, uint32_t esize, uint32_t num) +{ + /* 8B and 16B copies implemented individually to retain + * the current performance. + */ + if (esize == 8) + __rte_ring_dequeue_elems_64(r, cons_head, obj_table, num); + else if (esize == 16) + __rte_ring_dequeue_elems_128(r, cons_head, obj_table, num); + else { + uint32_t idx, scale, nr_idx, nr_num, nr_size; + + /* Normalize to uint32_t */ + scale = esize / sizeof(uint32_t); + nr_num = num * scale; + idx = cons_head & r->mask; + nr_idx = idx * scale; + nr_size = r->size * scale; + __rte_ring_dequeue_elems_32(r, nr_size, nr_idx, + obj_table, nr_num); + } +} + +/* Between load and load. there might be cpu reorder in weak model + * (powerpc/arm). + * There are 2 choices for the users + * 1.use rmb() memory barrier + * 2.use one-direction load_acquire/store_release barrier,defined by + * CONFIG_RTE_USE_C11_MEM_MODEL=y + * It depends on performance test results. + * By default, move common functions to rte_ring_generic.h + */ +#ifdef RTE_USE_C11_MEM_MODEL +#include "rte_ring_c11_mem.h" +#else +#include "rte_ring_generic.h" +#endif + +/** + * @internal Enqueue several objects on the ring + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param is_sp + * Indicates whether to use single producer or multi-producer head update + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_enqueue_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, + enum rte_ring_queue_behavior behavior, unsigned int is_sp, + unsigned int *free_space) +{ + uint32_t prod_head, prod_next; + uint32_t free_entries; + + n = __rte_ring_move_prod_head(r, is_sp, n, behavior, + &prod_head, &prod_next, &free_entries); + if (n == 0) + goto end; + + __rte_ring_enqueue_elems(r, prod_head, obj_table, esize, n); + + update_tail(&r->prod, prod_head, prod_next, is_sp, 1); +end: + if (free_space != NULL) + *free_space = free_entries - n; + return n; +} + +/** + * @internal Dequeue several objects from the ring + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param is_sc + * Indicates whether to use single consumer or multi-consumer head update + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, + enum rte_ring_queue_behavior behavior, unsigned int is_sc, + unsigned int *available) +{ + uint32_t cons_head, cons_next; + uint32_t entries; + + n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior, + &cons_head, &cons_next, &entries); + if (n == 0) + goto end; + + __rte_ring_dequeue_elems(r, cons_head, obj_table, esize, n); + + update_tail(&r->cons, cons_head, cons_next, is_sc, 0); + +end: + if (available != NULL) + *available = entries - n; + return n; +} + +/** + * Enqueue several objects on the ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +unsigned int +rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, __IS_MP, free_space); +} + +/** + * Enqueue several objects on a ring + * + * @warning This API is NOT multi-producers safe + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +unsigned int +rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, __IS_SP, free_space); +} + +/** + * Enqueue several objects on a ring. + * + * This function calls the multi-producer or the single-producer + * version depending on the default behavior that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +unsigned int +rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, r->prod.single, free_space); +} + +/** + * Enqueue one object on a ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj + * A pointer to the object to be added. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @return + * - 0: Success; objects enqueued. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +int +rte_ring_mp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize) +{ + return rte_ring_mp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 : + -ENOBUFS; +} + +/** + * Enqueue one object on a ring + * + * @warning This API is NOT multi-producers safe + * + * @param r + * A pointer to the ring structure. + * @param obj + * A pointer to the object to be added. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @return + * - 0: Success; objects enqueued. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +int +rte_ring_sp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize) +{ + return rte_ring_sp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 : + -ENOBUFS; +} + +/** + * Enqueue one object on a ring. + * + * This function calls the multi-producer or the single-producer + * version, depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj + * A pointer to the object to be added. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @return + * - 0: Success; objects enqueued. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +int +rte_ring_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize) +{ + return rte_ring_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 : + -ENOBUFS; +} + +/** + * Dequeue several objects from a ring (multi-consumers safe). + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +unsigned int +rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, __IS_MC, available); +} + +/** + * Dequeue several objects from a ring (NOT multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table, + * must be strictly positive. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +unsigned int +rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, __IS_SC, available); +} + +/** + * Dequeue several objects from a ring. + * + * This function calls the multi-consumers or the single-consumer + * version, depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +unsigned int +rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, r->cons.single, available); +} + +/** + * Dequeue one object from a ring (multi-consumers safe). + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_p + * A pointer to the object that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @return + * - 0: Success; objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue; no object is + * dequeued. + */ +int +rte_ring_mc_dequeue_elem(struct rte_ring *r, void *obj_p, + unsigned int esize) +{ + return rte_ring_mc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 : + -ENOENT; +} + +/** + * Dequeue one object from a ring (NOT multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_p + * A pointer to the object that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @return + * - 0: Success; objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue, no object is + * dequeued. + */ +int +rte_ring_sc_dequeue_elem(struct rte_ring *r, void *obj_p, + unsigned int esize) +{ + return rte_ring_sc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 : + -ENOENT; +} + +/** + * Dequeue one object from a ring. + * + * This function calls the multi-consumers or the single-consumer + * version depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_p + * A pointer to the object that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @return + * - 0: Success, objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue, no object is + * dequeued. + */ +int +rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int esize) +{ + return rte_ring_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 : + -ENOENT; +} + +/** + * Enqueue several objects on the ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +unsigned +rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space); +} + +/** + * Enqueue several objects on a ring + * + * @warning This API is NOT multi-producers safe + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +unsigned +rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space); +} + +/** + * Enqueue several objects on a ring. + * + * This function calls the multi-producer or the single-producer + * version depending on the default behavior that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +unsigned +rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, r->prod.single, free_space); +} + +/** + * Dequeue several objects from a ring (multi-consumers safe). When the request + * objects are more than the available objects, only dequeue the actual number + * of objects + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +unsigned +rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, __IS_MC, available); +} + +/** + * Dequeue several objects from a ring (NOT multi-consumers safe).When the + * request objects are more than the available objects, only dequeue the + * actual number of objects + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +unsigned +rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, __IS_SC, available); +} + +/** + * Dequeue multiple objects from a ring up to a maximum number. + * + * This function calls the multi-consumers or the single-consumer + * version, depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - Number of objects dequeued + */ +unsigned int +rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, + r->cons.single, available); +} diff --git a/lib/librte_ring/ring_impl.c b/lib/librte_ring/ring_impl.c new file mode 100644 index 000000000..ee85c515f --- /dev/null +++ b/lib/librte_ring/ring_impl.c @@ -0,0 +1,750 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2017 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "ring_impl.h" + +/* the actual enqueue of pointers on the ring. + * Placed here since identical code needed in both + * single and multi producer enqueue functions */ +#define ENQUEUE_PTRS(r, ring_start, prod_head, obj_table, n, obj_type) do { \ + unsigned int i; \ + const uint32_t size = (r)->size; \ + uint32_t idx = prod_head & (r)->mask; \ + obj_type *ring = (obj_type *)ring_start; \ + if (likely(idx + n < size)) { \ + for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \ + ring[idx] = obj_table[i]; \ + ring[idx+1] = obj_table[i+1]; \ + ring[idx+2] = obj_table[i+2]; \ + ring[idx+3] = obj_table[i+3]; \ + } \ + switch (n & 0x3) { \ + case 3: \ + ring[idx++] = obj_table[i++]; /* fallthrough */ \ + case 2: \ + ring[idx++] = obj_table[i++]; /* fallthrough */ \ + case 1: \ + ring[idx++] = obj_table[i++]; \ + } \ + } else { \ + for (i = 0; idx < size; i++, idx++)\ + ring[idx] = obj_table[i]; \ + for (idx = 0; i < n; i++, idx++) \ + ring[idx] = obj_table[i]; \ + } \ +} while (0) + +/* the actual copy of pointers on the ring to obj_table. + * Placed here since identical code needed in both + * single and multi consumer dequeue functions */ +#define DEQUEUE_PTRS(r, ring_start, cons_head, obj_table, n, obj_type) do { \ + unsigned int i; \ + uint32_t idx = cons_head & (r)->mask; \ + const uint32_t size = (r)->size; \ + obj_type *ring = (obj_type *)ring_start; \ + if (likely(idx + n < size)) { \ + for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\ + obj_table[i] = ring[idx]; \ + obj_table[i+1] = ring[idx+1]; \ + obj_table[i+2] = ring[idx+2]; \ + obj_table[i+3] = ring[idx+3]; \ + } \ + switch (n & 0x3) { \ + case 3: \ + obj_table[i++] = ring[idx++]; /* fallthrough */ \ + case 2: \ + obj_table[i++] = ring[idx++]; /* fallthrough */ \ + case 1: \ + obj_table[i++] = ring[idx++]; \ + } \ + } else { \ + for (i = 0; idx < size; i++, idx++) \ + obj_table[i] = ring[idx]; \ + for (idx = 0; i < n; i++, idx++) \ + obj_table[i] = ring[idx]; \ + } \ +} while (0) + +/* Between load and load. there might be cpu reorder in weak model + * (powerpc/arm). + * There are 2 choices for the users + * 1.use rmb() memory barrier + * 2.use one-direction load_acquire/store_release barrier,defined by + * CONFIG_RTE_USE_C11_MEM_MODEL=y + * It depends on performance test results. + * By default, move common functions to rte_ring_generic.h + */ +#ifdef RTE_USE_C11_MEM_MODEL +#include "rte_ring_c11_mem.h" +#else +#include "rte_ring_generic.h" +#endif + +/** + * @internal Enqueue several objects on the ring + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param is_sp + * Indicates whether to use single producer or multi-producer head update + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, + unsigned int n, enum rte_ring_queue_behavior behavior, + unsigned int is_sp, unsigned int *free_space) +{ + uint32_t prod_head, prod_next; + uint32_t free_entries; + + n = __rte_ring_move_prod_head(r, is_sp, n, behavior, + &prod_head, &prod_next, &free_entries); + if (n == 0) + goto end; + + ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); + + update_tail(&r->prod, prod_head, prod_next, is_sp, 1); +end: + if (free_space != NULL) + *free_space = free_entries - n; + return n; +} + +/** + * @internal Dequeue several objects from the ring + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param is_sc + * Indicates whether to use single consumer or multi-consumer head update + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, + unsigned int n, enum rte_ring_queue_behavior behavior, + unsigned int is_sc, unsigned int *available) +{ + uint32_t cons_head, cons_next; + uint32_t entries; + + n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior, + &cons_head, &cons_next, &entries); + if (n == 0) + goto end; + + DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *); + + update_tail(&r->cons, cons_head, cons_next, is_sc, 0); + +end: + if (available != NULL) + *available = entries - n; + return n; +} + +/** + * Enqueue several objects on the ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +unsigned int +rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + __IS_MP, free_space); +} + +/** + * Enqueue several objects on a ring (NOT multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +unsigned int +rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + __IS_SP, free_space); +} + +/** + * Enqueue several objects on a ring. + * + * This function calls the multi-producer or the single-producer + * version depending on the default behavior that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +unsigned int +rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + r->prod.single, free_space); +} + +/** + * Enqueue one object on a ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj + * A pointer to the object to be added. + * @return + * - 0: Success; objects enqueued. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +int +rte_ring_mp_enqueue(struct rte_ring *r, void *obj) +{ + return rte_ring_mp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS; +} + +/** + * Enqueue one object on a ring (NOT multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj + * A pointer to the object to be added. + * @return + * - 0: Success; objects enqueued. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +int +rte_ring_sp_enqueue(struct rte_ring *r, void *obj) +{ + return rte_ring_sp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS; +} + +/** + * Enqueue one object on a ring. + * + * This function calls the multi-producer or the single-producer + * version, depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj + * A pointer to the object to be added. + * @return + * - 0: Success; objects enqueued. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +int +rte_ring_enqueue(struct rte_ring *r, void *obj) +{ + return rte_ring_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS; +} + +/** + * Dequeue several objects from a ring (multi-consumers safe). + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +unsigned int +rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + __IS_MC, available); +} + +/** + * Dequeue several objects from a ring (NOT multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table, + * must be strictly positive. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +unsigned int +rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + __IS_SC, available); +} + +/** + * Dequeue several objects from a ring. + * + * This function calls the multi-consumers or the single-consumer + * version, depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +unsigned int +rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, + unsigned int *available) +{ + return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + r->cons.single, available); +} + +/** + * Dequeue one object from a ring (multi-consumers safe). + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_p + * A pointer to a void * pointer (object) that will be filled. + * @return + * - 0: Success; objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue; no object is + * dequeued. + */ +int +rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p) +{ + return rte_ring_mc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT; +} + +/** + * Dequeue one object from a ring (NOT multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_p + * A pointer to a void * pointer (object) that will be filled. + * @return + * - 0: Success; objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue, no object is + * dequeued. + */ +int +rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p) +{ + return rte_ring_sc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT; +} + +/** + * Dequeue one object from a ring. + * + * This function calls the multi-consumers or the single-consumer + * version depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_p + * A pointer to a void * pointer (object) that will be filled. + * @return + * - 0: Success, objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue, no object is + * dequeued. + */ +int +rte_ring_dequeue(struct rte_ring *r, void **obj_p) +{ + return rte_ring_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT; +} + +/** + * Return the number of entries in a ring. + * + * @param r + * A pointer to the ring structure. + * @return + * The number of entries in the ring. + */ +unsigned +rte_ring_count(const struct rte_ring *r) +{ + uint32_t prod_tail = r->prod.tail; + uint32_t cons_tail = r->cons.tail; + uint32_t count = (prod_tail - cons_tail) & r->mask; + return (count > r->capacity) ? r->capacity : count; +} + +/** + * Return the number of free entries in a ring. + * + * @param r + * A pointer to the ring structure. + * @return + * The number of free entries in the ring. + */ +unsigned +rte_ring_free_count(const struct rte_ring *r) +{ + return r->capacity - rte_ring_count(r); +} + +/** + * Test if a ring is full. + * + * @param r + * A pointer to the ring structure. + * @return + * - 1: The ring is full. + * - 0: The ring is not full. + */ +int +rte_ring_full(const struct rte_ring *r) +{ + return rte_ring_free_count(r) == 0; +} + +/** + * Test if a ring is empty. + * + * @param r + * A pointer to the ring structure. + * @return + * - 1: The ring is empty. + * - 0: The ring is not empty. + */ +int +rte_ring_empty(const struct rte_ring *r) +{ + return rte_ring_count(r) == 0; +} + +const char * +rte_ring_get_name(const struct rte_ring *r) +{ + return r->name; +} + +const struct rte_memzone * +rte_ring_get_memzone(const struct rte_ring *r) +{ + return r->memzone; +} + +/** + * Return the size of the ring. + * + * @param r + * A pointer to the ring structure. + * @return + * The size of the data store used by the ring. + * NOTE: this is not the same as the usable space in the ring. To query that + * use ``rte_ring_get_capacity()``. + */ +unsigned int +rte_ring_get_size(const struct rte_ring *r) +{ + return r->size; +} + +/** + * Return the number of elements which can be stored in the ring. + * + * @param r + * A pointer to the ring structure. + * @return + * The usable size of the ring. + */ +unsigned int +rte_ring_get_capacity(const struct rte_ring *r) +{ + return r->capacity; +} + +uint32_t +rte_ring_get_prod_sync_type(const struct rte_ring *r) +{ + return (r->prod.single == __IS_SP) ? + RTE_RING_SYNC_ST : RTE_RING_SYNC_MT; +} + +uint32_t +rte_ring_get_cons_sync_type(const struct rte_ring *r) +{ + return (r->cons.single == __IS_SC) ? + RTE_RING_SYNC_ST : RTE_RING_SYNC_MT; +} + +/** + * Dump the status of all rings on the console + * + * @param f + * A pointer to a file for output + */ +void rte_ring_list_dump(FILE *f); + +/** + * Search a ring from its name + * + * @param name + * The name of the ring. + * @return + * The pointer to the ring matching the name, or NULL if not found, + * with rte_errno set appropriately. Possible rte_errno values include: + * - ENOENT - required entry not available to return. + */ +struct rte_ring *rte_ring_lookup(const char *name); + +/** + * Enqueue several objects on the ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +unsigned +rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space); +} + +/** + * Enqueue several objects on a ring (NOT multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +unsigned +rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space); +} + +/** + * Enqueue several objects on a ring. + * + * This function calls the multi-producer or the single-producer + * version depending on the default behavior that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +unsigned +rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE, + r->prod.single, free_space); +} + +/** + * Dequeue several objects from a ring (multi-consumers safe). When the request + * objects are more than the available objects, only dequeue the actual number + * of objects + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +unsigned +rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, __IS_MC, available); +} + +/** + * Dequeue several objects from a ring (NOT multi-consumers safe).When the + * request objects are more than the available objects, only dequeue the + * actual number of objects + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +unsigned +rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, __IS_SC, available); +} + +/** + * Dequeue multiple objects from a ring up to a maximum number. + * + * This function calls the multi-consumers or the single-consumer + * version, depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - Number of objects dequeued + */ +unsigned +rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + r->cons.single, available); +} diff --git a/lib/librte_ring/ring_impl.h b/lib/librte_ring/ring_impl.h new file mode 100644 index 000000000..4ed375511 --- /dev/null +++ b/lib/librte_ring/ring_impl.h @@ -0,0 +1,64 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2017 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RING_IMPL_H_ +#define _RING_IMPL_H_ + +#include +#include + +/* @internal defines for passing to the enqueue dequeue worker functions */ +#define __IS_SP 1 +#define __IS_MP 0 +#define __IS_SC 1 +#define __IS_MC 0 + +/* structure to hold a pair of head/tail values and other metadata */ +struct rte_ring_headtail { + volatile uint32_t head; /**< Prod/consumer head. */ + volatile uint32_t tail; /**< Prod/consumer tail. */ + uint32_t single; /**< True if single prod/cons */ +}; + +/** + * An RTE ring structure. + * + * The producer and the consumer have a head and a tail index. The particularity + * of these index is that they are not between 0 and size(ring). These indexes + * are between 0 and 2^32, and we mask their value when we access the ring[] + * field. Thanks to this assumption, we can do subtractions between 2 index + * values in a modulo-32bit base: that's why the overflow of the indexes is not + * a problem. + */ +struct rte_ring { + /* + * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI + * compatibility requirements, it could be changed to RTE_RING_NAMESIZE + * next time the ABI changes + */ + char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; /**< Name of the ring. */ + int flags; /**< Flags supplied at creation. */ + const struct rte_memzone *memzone; + /**< Memzone, if any, containing the rte_ring */ + uint32_t size; /**< Size of ring. */ + uint32_t mask; /**< Mask (size-1) of ring. */ + uint32_t capacity; /**< Usable size of ring */ + + char pad0 __rte_cache_aligned; /**< empty cache line */ + + /** Ring producer status. */ + struct rte_ring_headtail prod __rte_cache_aligned; + char pad1 __rte_cache_aligned; /**< empty cache line */ + + /** Ring consumer status. */ + struct rte_ring_headtail cons __rte_cache_aligned; + char pad2 __rte_cache_aligned; /**< empty cache line */ +}; + +#endif /* _RING_IMPL_H_ */ diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index 77e5de099..ec9392585 100644 --- a/lib/librte_ring/rte_ring.c +++ b/lib/librte_ring/rte_ring.c @@ -32,8 +32,7 @@ #include #include -#include "rte_ring.h" -#include "rte_ring_elem.h" +#include "ring_impl.h" TAILQ_HEAD(rte_ring_list, rte_tailq_entry); diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index 18fc5d845..a1442c360 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -56,53 +56,17 @@ enum rte_ring_queue_behavior { RTE_RING_QUEUE_VARIABLE /* Enq/Deq as many items as possible from ring */ }; +/** prod/cons sync types */ +enum { + RTE_RING_SYNC_MT, /**< multi-thread safe (default mode) */ + RTE_RING_SYNC_ST, /**< single thread only */ +}; + #define RTE_RING_MZ_PREFIX "RG_" /** The maximum length of a ring name. */ #define RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \ sizeof(RTE_RING_MZ_PREFIX) + 1) -/* structure to hold a pair of head/tail values and other metadata */ -struct rte_ring_headtail { - volatile uint32_t head; /**< Prod/consumer head. */ - volatile uint32_t tail; /**< Prod/consumer tail. */ - uint32_t single; /**< True if single prod/cons */ -}; - -/** - * An RTE ring structure. - * - * The producer and the consumer have a head and a tail index. The particularity - * of these index is that they are not between 0 and size(ring). These indexes - * are between 0 and 2^32, and we mask their value when we access the ring[] - * field. Thanks to this assumption, we can do subtractions between 2 index - * values in a modulo-32bit base: that's why the overflow of the indexes is not - * a problem. - */ -struct rte_ring { - /* - * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI - * compatibility requirements, it could be changed to RTE_RING_NAMESIZE - * next time the ABI changes - */ - char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; /**< Name of the ring. */ - int flags; /**< Flags supplied at creation. */ - const struct rte_memzone *memzone; - /**< Memzone, if any, containing the rte_ring */ - uint32_t size; /**< Size of ring. */ - uint32_t mask; /**< Mask (size-1) of ring. */ - uint32_t capacity; /**< Usable size of ring */ - - char pad0 __rte_cache_aligned; /**< empty cache line */ - - /** Ring producer status. */ - struct rte_ring_headtail prod __rte_cache_aligned; - char pad1 __rte_cache_aligned; /**< empty cache line */ - - /** Ring consumer status. */ - struct rte_ring_headtail cons __rte_cache_aligned; - char pad2 __rte_cache_aligned; /**< empty cache line */ -}; - #define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */ #define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */ /** @@ -116,11 +80,7 @@ struct rte_ring { #define RING_F_EXACT_SZ 0x0004 #define RTE_RING_SZ_MASK (0x7fffffffU) /**< Ring size mask */ -/* @internal defines for passing to the enqueue dequeue worker functions */ -#define __IS_SP 1 -#define __IS_MP 0 -#define __IS_SC 1 -#define __IS_MC 0 +struct rte_ring; /** * Calculate the memory size needed for a ring @@ -235,168 +195,6 @@ void rte_ring_free(struct rte_ring *r); */ void rte_ring_dump(FILE *f, const struct rte_ring *r); -/* the actual enqueue of pointers on the ring. - * Placed here since identical code needed in both - * single and multi producer enqueue functions */ -#define ENQUEUE_PTRS(r, ring_start, prod_head, obj_table, n, obj_type) do { \ - unsigned int i; \ - const uint32_t size = (r)->size; \ - uint32_t idx = prod_head & (r)->mask; \ - obj_type *ring = (obj_type *)ring_start; \ - if (likely(idx + n < size)) { \ - for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \ - ring[idx] = obj_table[i]; \ - ring[idx+1] = obj_table[i+1]; \ - ring[idx+2] = obj_table[i+2]; \ - ring[idx+3] = obj_table[i+3]; \ - } \ - switch (n & 0x3) { \ - case 3: \ - ring[idx++] = obj_table[i++]; /* fallthrough */ \ - case 2: \ - ring[idx++] = obj_table[i++]; /* fallthrough */ \ - case 1: \ - ring[idx++] = obj_table[i++]; \ - } \ - } else { \ - for (i = 0; idx < size; i++, idx++)\ - ring[idx] = obj_table[i]; \ - for (idx = 0; i < n; i++, idx++) \ - ring[idx] = obj_table[i]; \ - } \ -} while (0) - -/* the actual copy of pointers on the ring to obj_table. - * Placed here since identical code needed in both - * single and multi consumer dequeue functions */ -#define DEQUEUE_PTRS(r, ring_start, cons_head, obj_table, n, obj_type) do { \ - unsigned int i; \ - uint32_t idx = cons_head & (r)->mask; \ - const uint32_t size = (r)->size; \ - obj_type *ring = (obj_type *)ring_start; \ - if (likely(idx + n < size)) { \ - for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\ - obj_table[i] = ring[idx]; \ - obj_table[i+1] = ring[idx+1]; \ - obj_table[i+2] = ring[idx+2]; \ - obj_table[i+3] = ring[idx+3]; \ - } \ - switch (n & 0x3) { \ - case 3: \ - obj_table[i++] = ring[idx++]; /* fallthrough */ \ - case 2: \ - obj_table[i++] = ring[idx++]; /* fallthrough */ \ - case 1: \ - obj_table[i++] = ring[idx++]; \ - } \ - } else { \ - for (i = 0; idx < size; i++, idx++) \ - obj_table[i] = ring[idx]; \ - for (idx = 0; i < n; i++, idx++) \ - obj_table[i] = ring[idx]; \ - } \ -} while (0) - -/* Between load and load. there might be cpu reorder in weak model - * (powerpc/arm). - * There are 2 choices for the users - * 1.use rmb() memory barrier - * 2.use one-direction load_acquire/store_release barrier,defined by - * CONFIG_RTE_USE_C11_MEM_MODEL=y - * It depends on performance test results. - * By default, move common functions to rte_ring_generic.h - */ -#ifdef RTE_USE_C11_MEM_MODEL -#include "rte_ring_c11_mem.h" -#else -#include "rte_ring_generic.h" -#endif - -/** - * @internal Enqueue several objects on the ring - * - * @param r - * A pointer to the ring structure. - * @param obj_table - * A pointer to a table of void * pointers (objects). - * @param n - * The number of objects to add in the ring from the obj_table. - * @param behavior - * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring - * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring - * @param is_sp - * Indicates whether to use single producer or multi-producer head update - * @param free_space - * returns the amount of space after the enqueue operation has finished - * @return - * Actual number of objects enqueued. - * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. - */ -static __rte_always_inline unsigned int -__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, - unsigned int n, enum rte_ring_queue_behavior behavior, - unsigned int is_sp, unsigned int *free_space) -{ - uint32_t prod_head, prod_next; - uint32_t free_entries; - - n = __rte_ring_move_prod_head(r, is_sp, n, behavior, - &prod_head, &prod_next, &free_entries); - if (n == 0) - goto end; - - ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); - - update_tail(&r->prod, prod_head, prod_next, is_sp, 1); -end: - if (free_space != NULL) - *free_space = free_entries - n; - return n; -} - -/** - * @internal Dequeue several objects from the ring - * - * @param r - * A pointer to the ring structure. - * @param obj_table - * A pointer to a table of void * pointers (objects). - * @param n - * The number of objects to pull from the ring. - * @param behavior - * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring - * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring - * @param is_sc - * Indicates whether to use single consumer or multi-consumer head update - * @param available - * returns the number of remaining ring entries after the dequeue has finished - * @return - * - Actual number of objects dequeued. - * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. - */ -static __rte_always_inline unsigned int -__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, - unsigned int n, enum rte_ring_queue_behavior behavior, - unsigned int is_sc, unsigned int *available) -{ - uint32_t cons_head, cons_next; - uint32_t entries; - - n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior, - &cons_head, &cons_next, &entries); - if (n == 0) - goto end; - - DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *); - - update_tail(&r->cons, cons_head, cons_next, is_sc, 0); - -end: - if (available != NULL) - *available = entries - n; - return n; -} - /** * Enqueue several objects on the ring (multi-producers safe). * @@ -415,13 +213,9 @@ __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, * @return * The number of objects enqueued, either 0 or n */ -static __rte_always_inline unsigned int +unsigned int rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, - unsigned int n, unsigned int *free_space) -{ - return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - __IS_MP, free_space); -} + unsigned int n, unsigned int *free_space); /** * Enqueue several objects on a ring (NOT multi-producers safe). @@ -438,13 +232,9 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, * @return * The number of objects enqueued, either 0 or n */ -static __rte_always_inline unsigned int +unsigned int rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, - unsigned int n, unsigned int *free_space) -{ - return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - __IS_SP, free_space); -} + unsigned int n, unsigned int *free_space); /** * Enqueue several objects on a ring. @@ -465,13 +255,9 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, * @return * The number of objects enqueued, either 0 or n */ -static __rte_always_inline unsigned int +unsigned int rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, - unsigned int n, unsigned int *free_space) -{ - return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - r->prod.single, free_space); -} + unsigned int n, unsigned int *free_space); /** * Enqueue one object on a ring (multi-producers safe). @@ -487,11 +273,8 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, * - 0: Success; objects enqueued. * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. */ -static __rte_always_inline int -rte_ring_mp_enqueue(struct rte_ring *r, void *obj) -{ - return rte_ring_mp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS; -} +int +rte_ring_mp_enqueue(struct rte_ring *r, void *obj); /** * Enqueue one object on a ring (NOT multi-producers safe). @@ -504,11 +287,8 @@ rte_ring_mp_enqueue(struct rte_ring *r, void *obj) * - 0: Success; objects enqueued. * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. */ -static __rte_always_inline int -rte_ring_sp_enqueue(struct rte_ring *r, void *obj) -{ - return rte_ring_sp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS; -} +int +rte_ring_sp_enqueue(struct rte_ring *r, void *obj); /** * Enqueue one object on a ring. @@ -525,11 +305,8 @@ rte_ring_sp_enqueue(struct rte_ring *r, void *obj) * - 0: Success; objects enqueued. * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. */ -static __rte_always_inline int -rte_ring_enqueue(struct rte_ring *r, void *obj) -{ - return rte_ring_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS; -} +int +rte_ring_enqueue(struct rte_ring *r, void *obj); /** * Dequeue several objects from a ring (multi-consumers safe). @@ -549,13 +326,9 @@ rte_ring_enqueue(struct rte_ring *r, void *obj) * @return * The number of objects dequeued, either 0 or n */ -static __rte_always_inline unsigned int +unsigned int rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, - unsigned int n, unsigned int *available) -{ - return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - __IS_MC, available); -} + unsigned int n, unsigned int *available); /** * Dequeue several objects from a ring (NOT multi-consumers safe). @@ -573,13 +346,9 @@ rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, * @return * The number of objects dequeued, either 0 or n */ -static __rte_always_inline unsigned int +unsigned int rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, - unsigned int n, unsigned int *available) -{ - return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - __IS_SC, available); -} + unsigned int n, unsigned int *available); /** * Dequeue several objects from a ring. @@ -600,13 +369,9 @@ rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, * @return * The number of objects dequeued, either 0 or n */ -static __rte_always_inline unsigned int +unsigned int rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, - unsigned int *available) -{ - return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - r->cons.single, available); -} + unsigned int *available); /** * Dequeue one object from a ring (multi-consumers safe). @@ -623,11 +388,8 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, * - -ENOENT: Not enough entries in the ring to dequeue; no object is * dequeued. */ -static __rte_always_inline int -rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p) -{ - return rte_ring_mc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT; -} +int +rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p); /** * Dequeue one object from a ring (NOT multi-consumers safe). @@ -641,11 +403,8 @@ rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p) * - -ENOENT: Not enough entries in the ring to dequeue, no object is * dequeued. */ -static __rte_always_inline int -rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p) -{ - return rte_ring_sc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT; -} +int +rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p); /** * Dequeue one object from a ring. @@ -663,11 +422,8 @@ rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p) * - -ENOENT: Not enough entries in the ring to dequeue, no object is * dequeued. */ -static __rte_always_inline int -rte_ring_dequeue(struct rte_ring *r, void **obj_p) -{ - return rte_ring_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT; -} +int +rte_ring_dequeue(struct rte_ring *r, void **obj_p); /** * Flush a ring. @@ -694,14 +450,8 @@ rte_ring_reset(struct rte_ring *r); * @return * The number of entries in the ring. */ -static inline unsigned -rte_ring_count(const struct rte_ring *r) -{ - uint32_t prod_tail = r->prod.tail; - uint32_t cons_tail = r->cons.tail; - uint32_t count = (prod_tail - cons_tail) & r->mask; - return (count > r->capacity) ? r->capacity : count; -} +unsigned +rte_ring_count(const struct rte_ring *r); /** * Return the number of free entries in a ring. @@ -711,11 +461,8 @@ rte_ring_count(const struct rte_ring *r) * @return * The number of free entries in the ring. */ -static inline unsigned -rte_ring_free_count(const struct rte_ring *r) -{ - return r->capacity - rte_ring_count(r); -} +unsigned +rte_ring_free_count(const struct rte_ring *r); /** * Test if a ring is full. @@ -726,11 +473,8 @@ rte_ring_free_count(const struct rte_ring *r) * - 1: The ring is full. * - 0: The ring is not full. */ -static inline int -rte_ring_full(const struct rte_ring *r) -{ - return rte_ring_free_count(r) == 0; -} +int +rte_ring_full(const struct rte_ring *r); /** * Test if a ring is empty. @@ -741,11 +485,14 @@ rte_ring_full(const struct rte_ring *r) * - 1: The ring is empty. * - 0: The ring is not empty. */ -static inline int -rte_ring_empty(const struct rte_ring *r) -{ - return rte_ring_count(r) == 0; -} +int +rte_ring_empty(const struct rte_ring *r); + +const char * +rte_ring_get_name(const struct rte_ring *r); + +const struct rte_memzone * +rte_ring_get_memzone(const struct rte_ring *r); /** * Return the size of the ring. @@ -757,11 +504,8 @@ rte_ring_empty(const struct rte_ring *r) * NOTE: this is not the same as the usable space in the ring. To query that * use ``rte_ring_get_capacity()``. */ -static inline unsigned int -rte_ring_get_size(const struct rte_ring *r) -{ - return r->size; -} +unsigned int +rte_ring_get_size(const struct rte_ring *r); /** * Return the number of elements which can be stored in the ring. @@ -771,11 +515,14 @@ rte_ring_get_size(const struct rte_ring *r) * @return * The usable size of the ring. */ -static inline unsigned int -rte_ring_get_capacity(const struct rte_ring *r) -{ - return r->capacity; -} +unsigned int +rte_ring_get_capacity(const struct rte_ring *r); + +uint32_t +rte_ring_get_prod_sync_type(const struct rte_ring *r); + +uint32_t +rte_ring_get_cons_sync_type(const struct rte_ring *r); /** * Dump the status of all rings on the console @@ -815,13 +562,9 @@ struct rte_ring *rte_ring_lookup(const char *name); * @return * - n: Actual number of objects enqueued. */ -static __rte_always_inline unsigned +unsigned rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table, - unsigned int n, unsigned int *free_space) -{ - return __rte_ring_do_enqueue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space); -} + unsigned int n, unsigned int *free_space); /** * Enqueue several objects on a ring (NOT multi-producers safe). @@ -838,13 +581,9 @@ rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table, * @return * - n: Actual number of objects enqueued. */ -static __rte_always_inline unsigned +unsigned rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table, - unsigned int n, unsigned int *free_space) -{ - return __rte_ring_do_enqueue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space); -} + unsigned int n, unsigned int *free_space); /** * Enqueue several objects on a ring. @@ -865,13 +604,9 @@ rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table, * @return * - n: Actual number of objects enqueued. */ -static __rte_always_inline unsigned +unsigned rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, - unsigned int n, unsigned int *free_space) -{ - return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE, - r->prod.single, free_space); -} + unsigned int n, unsigned int *free_space); /** * Dequeue several objects from a ring (multi-consumers safe). When the request @@ -893,13 +628,9 @@ rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, * @return * - n: Actual number of objects dequeued, 0 if ring is empty */ -static __rte_always_inline unsigned +unsigned rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, - unsigned int n, unsigned int *available) -{ - return __rte_ring_do_dequeue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, __IS_MC, available); -} + unsigned int n, unsigned int *available); /** * Dequeue several objects from a ring (NOT multi-consumers safe).When the @@ -918,13 +649,9 @@ rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, * @return * - n: Actual number of objects dequeued, 0 if ring is empty */ -static __rte_always_inline unsigned +unsigned rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, - unsigned int n, unsigned int *available) -{ - return __rte_ring_do_dequeue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, __IS_SC, available); -} + unsigned int n, unsigned int *available); /** * Dequeue multiple objects from a ring up to a maximum number. @@ -945,14 +672,9 @@ rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, * @return * - Number of objects dequeued */ -static __rte_always_inline unsigned +unsigned rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, - unsigned int n, unsigned int *available) -{ - return __rte_ring_do_dequeue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, - r->cons.single, available); -} + unsigned int n, unsigned int *available); #ifdef __cplusplus } diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h index 3976757ed..2a6b00370 100644 --- a/lib/librte_ring/rte_ring_elem.h +++ b/lib/librte_ring/rte_ring_elem.h @@ -109,380 +109,6 @@ __rte_experimental struct rte_ring *rte_ring_create_elem(const char *name, unsigned int esize, unsigned int count, int socket_id, unsigned int flags); -static __rte_always_inline void -__rte_ring_enqueue_elems_32(struct rte_ring *r, const uint32_t size, - uint32_t idx, const void *obj_table, uint32_t n) -{ - unsigned int i; - uint32_t *ring = (uint32_t *)&r[1]; - const uint32_t *obj = (const uint32_t *)obj_table; - if (likely(idx + n < size)) { - for (i = 0; i < (n & ~0x7); i += 8, idx += 8) { - ring[idx] = obj[i]; - ring[idx + 1] = obj[i + 1]; - ring[idx + 2] = obj[i + 2]; - ring[idx + 3] = obj[i + 3]; - ring[idx + 4] = obj[i + 4]; - ring[idx + 5] = obj[i + 5]; - ring[idx + 6] = obj[i + 6]; - ring[idx + 7] = obj[i + 7]; - } - switch (n & 0x7) { - case 7: - ring[idx++] = obj[i++]; /* fallthrough */ - case 6: - ring[idx++] = obj[i++]; /* fallthrough */ - case 5: - ring[idx++] = obj[i++]; /* fallthrough */ - case 4: - ring[idx++] = obj[i++]; /* fallthrough */ - case 3: - ring[idx++] = obj[i++]; /* fallthrough */ - case 2: - ring[idx++] = obj[i++]; /* fallthrough */ - case 1: - ring[idx++] = obj[i++]; /* fallthrough */ - } - } else { - for (i = 0; idx < size; i++, idx++) - ring[idx] = obj[i]; - /* Start at the beginning */ - for (idx = 0; i < n; i++, idx++) - ring[idx] = obj[i]; - } -} - -static __rte_always_inline void -__rte_ring_enqueue_elems_64(struct rte_ring *r, uint32_t prod_head, - const void *obj_table, uint32_t n) -{ - unsigned int i; - const uint32_t size = r->size; - uint32_t idx = prod_head & r->mask; - uint64_t *ring = (uint64_t *)&r[1]; - const uint64_t *obj = (const uint64_t *)obj_table; - if (likely(idx + n < size)) { - for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { - ring[idx] = obj[i]; - ring[idx + 1] = obj[i + 1]; - ring[idx + 2] = obj[i + 2]; - ring[idx + 3] = obj[i + 3]; - } - switch (n & 0x3) { - case 3: - ring[idx++] = obj[i++]; /* fallthrough */ - case 2: - ring[idx++] = obj[i++]; /* fallthrough */ - case 1: - ring[idx++] = obj[i++]; - } - } else { - for (i = 0; idx < size; i++, idx++) - ring[idx] = obj[i]; - /* Start at the beginning */ - for (idx = 0; i < n; i++, idx++) - ring[idx] = obj[i]; - } -} - -static __rte_always_inline void -__rte_ring_enqueue_elems_128(struct rte_ring *r, uint32_t prod_head, - const void *obj_table, uint32_t n) -{ - unsigned int i; - const uint32_t size = r->size; - uint32_t idx = prod_head & r->mask; - rte_int128_t *ring = (rte_int128_t *)&r[1]; - const rte_int128_t *obj = (const rte_int128_t *)obj_table; - if (likely(idx + n < size)) { - for (i = 0; i < (n & ~0x1); i += 2, idx += 2) - memcpy((void *)(ring + idx), - (const void *)(obj + i), 32); - switch (n & 0x1) { - case 1: - memcpy((void *)(ring + idx), - (const void *)(obj + i), 16); - } - } else { - for (i = 0; idx < size; i++, idx++) - memcpy((void *)(ring + idx), - (const void *)(obj + i), 16); - /* Start at the beginning */ - for (idx = 0; i < n; i++, idx++) - memcpy((void *)(ring + idx), - (const void *)(obj + i), 16); - } -} - -/* the actual enqueue of elements on the ring. - * Placed here since identical code needed in both - * single and multi producer enqueue functions. - */ -static __rte_always_inline void -__rte_ring_enqueue_elems(struct rte_ring *r, uint32_t prod_head, - const void *obj_table, uint32_t esize, uint32_t num) -{ - /* 8B and 16B copies implemented individually to retain - * the current performance. - */ - if (esize == 8) - __rte_ring_enqueue_elems_64(r, prod_head, obj_table, num); - else if (esize == 16) - __rte_ring_enqueue_elems_128(r, prod_head, obj_table, num); - else { - uint32_t idx, scale, nr_idx, nr_num, nr_size; - - /* Normalize to uint32_t */ - scale = esize / sizeof(uint32_t); - nr_num = num * scale; - idx = prod_head & r->mask; - nr_idx = idx * scale; - nr_size = r->size * scale; - __rte_ring_enqueue_elems_32(r, nr_size, nr_idx, - obj_table, nr_num); - } -} - -static __rte_always_inline void -__rte_ring_dequeue_elems_32(struct rte_ring *r, const uint32_t size, - uint32_t idx, void *obj_table, uint32_t n) -{ - unsigned int i; - uint32_t *ring = (uint32_t *)&r[1]; - uint32_t *obj = (uint32_t *)obj_table; - if (likely(idx + n < size)) { - for (i = 0; i < (n & ~0x7); i += 8, idx += 8) { - obj[i] = ring[idx]; - obj[i + 1] = ring[idx + 1]; - obj[i + 2] = ring[idx + 2]; - obj[i + 3] = ring[idx + 3]; - obj[i + 4] = ring[idx + 4]; - obj[i + 5] = ring[idx + 5]; - obj[i + 6] = ring[idx + 6]; - obj[i + 7] = ring[idx + 7]; - } - switch (n & 0x7) { - case 7: - obj[i++] = ring[idx++]; /* fallthrough */ - case 6: - obj[i++] = ring[idx++]; /* fallthrough */ - case 5: - obj[i++] = ring[idx++]; /* fallthrough */ - case 4: - obj[i++] = ring[idx++]; /* fallthrough */ - case 3: - obj[i++] = ring[idx++]; /* fallthrough */ - case 2: - obj[i++] = ring[idx++]; /* fallthrough */ - case 1: - obj[i++] = ring[idx++]; /* fallthrough */ - } - } else { - for (i = 0; idx < size; i++, idx++) - obj[i] = ring[idx]; - /* Start at the beginning */ - for (idx = 0; i < n; i++, idx++) - obj[i] = ring[idx]; - } -} - -static __rte_always_inline void -__rte_ring_dequeue_elems_64(struct rte_ring *r, uint32_t prod_head, - void *obj_table, uint32_t n) -{ - unsigned int i; - const uint32_t size = r->size; - uint32_t idx = prod_head & r->mask; - uint64_t *ring = (uint64_t *)&r[1]; - uint64_t *obj = (uint64_t *)obj_table; - if (likely(idx + n < size)) { - for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { - obj[i] = ring[idx]; - obj[i + 1] = ring[idx + 1]; - obj[i + 2] = ring[idx + 2]; - obj[i + 3] = ring[idx + 3]; - } - switch (n & 0x3) { - case 3: - obj[i++] = ring[idx++]; /* fallthrough */ - case 2: - obj[i++] = ring[idx++]; /* fallthrough */ - case 1: - obj[i++] = ring[idx++]; /* fallthrough */ - } - } else { - for (i = 0; idx < size; i++, idx++) - obj[i] = ring[idx]; - /* Start at the beginning */ - for (idx = 0; i < n; i++, idx++) - obj[i] = ring[idx]; - } -} - -static __rte_always_inline void -__rte_ring_dequeue_elems_128(struct rte_ring *r, uint32_t prod_head, - void *obj_table, uint32_t n) -{ - unsigned int i; - const uint32_t size = r->size; - uint32_t idx = prod_head & r->mask; - rte_int128_t *ring = (rte_int128_t *)&r[1]; - rte_int128_t *obj = (rte_int128_t *)obj_table; - if (likely(idx + n < size)) { - for (i = 0; i < (n & ~0x1); i += 2, idx += 2) - memcpy((void *)(obj + i), (void *)(ring + idx), 32); - switch (n & 0x1) { - case 1: - memcpy((void *)(obj + i), (void *)(ring + idx), 16); - } - } else { - for (i = 0; idx < size; i++, idx++) - memcpy((void *)(obj + i), (void *)(ring + idx), 16); - /* Start at the beginning */ - for (idx = 0; i < n; i++, idx++) - memcpy((void *)(obj + i), (void *)(ring + idx), 16); - } -} - -/* the actual dequeue of elements from the ring. - * Placed here since identical code needed in both - * single and multi producer enqueue functions. - */ -static __rte_always_inline void -__rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head, - void *obj_table, uint32_t esize, uint32_t num) -{ - /* 8B and 16B copies implemented individually to retain - * the current performance. - */ - if (esize == 8) - __rte_ring_dequeue_elems_64(r, cons_head, obj_table, num); - else if (esize == 16) - __rte_ring_dequeue_elems_128(r, cons_head, obj_table, num); - else { - uint32_t idx, scale, nr_idx, nr_num, nr_size; - - /* Normalize to uint32_t */ - scale = esize / sizeof(uint32_t); - nr_num = num * scale; - idx = cons_head & r->mask; - nr_idx = idx * scale; - nr_size = r->size * scale; - __rte_ring_dequeue_elems_32(r, nr_size, nr_idx, - obj_table, nr_num); - } -} - -/* Between load and load. there might be cpu reorder in weak model - * (powerpc/arm). - * There are 2 choices for the users - * 1.use rmb() memory barrier - * 2.use one-direction load_acquire/store_release barrier,defined by - * CONFIG_RTE_USE_C11_MEM_MODEL=y - * It depends on performance test results. - * By default, move common functions to rte_ring_generic.h - */ -#ifdef RTE_USE_C11_MEM_MODEL -#include "rte_ring_c11_mem.h" -#else -#include "rte_ring_generic.h" -#endif - -/** - * @internal Enqueue several objects on the ring - * - * @param r - * A pointer to the ring structure. - * @param obj_table - * A pointer to a table of objects. - * @param esize - * The size of ring element, in bytes. It must be a multiple of 4. - * This must be the same value used while creating the ring. Otherwise - * the results are undefined. - * @param n - * The number of objects to add in the ring from the obj_table. - * @param behavior - * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring - * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring - * @param is_sp - * Indicates whether to use single producer or multi-producer head update - * @param free_space - * returns the amount of space after the enqueue operation has finished - * @return - * Actual number of objects enqueued. - * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. - */ -static __rte_always_inline unsigned int -__rte_ring_do_enqueue_elem(struct rte_ring *r, const void *obj_table, - unsigned int esize, unsigned int n, - enum rte_ring_queue_behavior behavior, unsigned int is_sp, - unsigned int *free_space) -{ - uint32_t prod_head, prod_next; - uint32_t free_entries; - - n = __rte_ring_move_prod_head(r, is_sp, n, behavior, - &prod_head, &prod_next, &free_entries); - if (n == 0) - goto end; - - __rte_ring_enqueue_elems(r, prod_head, obj_table, esize, n); - - update_tail(&r->prod, prod_head, prod_next, is_sp, 1); -end: - if (free_space != NULL) - *free_space = free_entries - n; - return n; -} - -/** - * @internal Dequeue several objects from the ring - * - * @param r - * A pointer to the ring structure. - * @param obj_table - * A pointer to a table of objects. - * @param esize - * The size of ring element, in bytes. It must be a multiple of 4. - * This must be the same value used while creating the ring. Otherwise - * the results are undefined. - * @param n - * The number of objects to pull from the ring. - * @param behavior - * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring - * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring - * @param is_sc - * Indicates whether to use single consumer or multi-consumer head update - * @param available - * returns the number of remaining ring entries after the dequeue has finished - * @return - * - Actual number of objects dequeued. - * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. - */ -static __rte_always_inline unsigned int -__rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table, - unsigned int esize, unsigned int n, - enum rte_ring_queue_behavior behavior, unsigned int is_sc, - unsigned int *available) -{ - uint32_t cons_head, cons_next; - uint32_t entries; - - n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior, - &cons_head, &cons_next, &entries); - if (n == 0) - goto end; - - __rte_ring_dequeue_elems(r, cons_head, obj_table, esize, n); - - update_tail(&r->cons, cons_head, cons_next, is_sc, 0); - -end: - if (available != NULL) - *available = entries - n; - return n; -} - /** * Enqueue several objects on the ring (multi-producers safe). * @@ -505,13 +131,9 @@ __rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table, * @return * The number of objects enqueued, either 0 or n */ -static __rte_always_inline unsigned int +unsigned int rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, - unsigned int esize, unsigned int n, unsigned int *free_space) -{ - return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, __IS_MP, free_space); -} + unsigned int esize, unsigned int n, unsigned int *free_space); /** * Enqueue several objects on a ring @@ -534,13 +156,9 @@ rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, * @return * The number of objects enqueued, either 0 or n */ -static __rte_always_inline unsigned int +unsigned int rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, - unsigned int esize, unsigned int n, unsigned int *free_space) -{ - return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, __IS_SP, free_space); -} + unsigned int esize, unsigned int n, unsigned int *free_space); /** * Enqueue several objects on a ring. @@ -565,13 +183,9 @@ rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, * @return * The number of objects enqueued, either 0 or n */ -static __rte_always_inline unsigned int +unsigned int rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, - unsigned int esize, unsigned int n, unsigned int *free_space) -{ - return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, r->prod.single, free_space); -} + unsigned int esize, unsigned int n, unsigned int *free_space); /** * Enqueue one object on a ring (multi-producers safe). @@ -591,12 +205,8 @@ rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, * - 0: Success; objects enqueued. * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. */ -static __rte_always_inline int -rte_ring_mp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize) -{ - return rte_ring_mp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 : - -ENOBUFS; -} +int +rte_ring_mp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize); /** * Enqueue one object on a ring @@ -615,12 +225,8 @@ rte_ring_mp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize) * - 0: Success; objects enqueued. * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. */ -static __rte_always_inline int -rte_ring_sp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize) -{ - return rte_ring_sp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 : - -ENOBUFS; -} +int +rte_ring_sp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize); /** * Enqueue one object on a ring. @@ -641,12 +247,8 @@ rte_ring_sp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize) * - 0: Success; objects enqueued. * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. */ -static __rte_always_inline int -rte_ring_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize) -{ - return rte_ring_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 : - -ENOBUFS; -} +int +rte_ring_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize); /** * Dequeue several objects from a ring (multi-consumers safe). @@ -670,13 +272,9 @@ rte_ring_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize) * @return * The number of objects dequeued, either 0 or n */ -static __rte_always_inline unsigned int +unsigned int rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, - unsigned int esize, unsigned int n, unsigned int *available) -{ - return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, __IS_MC, available); -} + unsigned int esize, unsigned int n, unsigned int *available); /** * Dequeue several objects from a ring (NOT multi-consumers safe). @@ -698,13 +296,9 @@ rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, * @return * The number of objects dequeued, either 0 or n */ -static __rte_always_inline unsigned int +unsigned int rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, - unsigned int esize, unsigned int n, unsigned int *available) -{ - return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, __IS_SC, available); -} + unsigned int esize, unsigned int n, unsigned int *available); /** * Dequeue several objects from a ring. @@ -729,13 +323,9 @@ rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, * @return * The number of objects dequeued, either 0 or n */ -static __rte_always_inline unsigned int +unsigned int rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, - unsigned int esize, unsigned int n, unsigned int *available) -{ - return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, r->cons.single, available); -} + unsigned int esize, unsigned int n, unsigned int *available); /** * Dequeue one object from a ring (multi-consumers safe). @@ -756,13 +346,9 @@ rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, * - -ENOENT: Not enough entries in the ring to dequeue; no object is * dequeued. */ -static __rte_always_inline int +int rte_ring_mc_dequeue_elem(struct rte_ring *r, void *obj_p, - unsigned int esize) -{ - return rte_ring_mc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 : - -ENOENT; -} + unsigned int esize); /** * Dequeue one object from a ring (NOT multi-consumers safe). @@ -780,13 +366,9 @@ rte_ring_mc_dequeue_elem(struct rte_ring *r, void *obj_p, * - -ENOENT: Not enough entries in the ring to dequeue, no object is * dequeued. */ -static __rte_always_inline int +int rte_ring_sc_dequeue_elem(struct rte_ring *r, void *obj_p, - unsigned int esize) -{ - return rte_ring_sc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 : - -ENOENT; -} + unsigned int esize); /** * Dequeue one object from a ring. @@ -808,12 +390,8 @@ rte_ring_sc_dequeue_elem(struct rte_ring *r, void *obj_p, * - -ENOENT: Not enough entries in the ring to dequeue, no object is * dequeued. */ -static __rte_always_inline int -rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int esize) -{ - return rte_ring_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 : - -ENOENT; -} +int +rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int esize); /** * Enqueue several objects on the ring (multi-producers safe). @@ -837,13 +415,9 @@ rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int esize) * @return * - n: Actual number of objects enqueued. */ -static __rte_always_inline unsigned +unsigned rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, - unsigned int esize, unsigned int n, unsigned int *free_space) -{ - return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space); -} + unsigned int esize, unsigned int n, unsigned int *free_space); /** * Enqueue several objects on a ring @@ -866,13 +440,9 @@ rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, * @return * - n: Actual number of objects enqueued. */ -static __rte_always_inline unsigned +unsigned rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, - unsigned int esize, unsigned int n, unsigned int *free_space) -{ - return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space); -} + unsigned int esize, unsigned int n, unsigned int *free_space); /** * Enqueue several objects on a ring. @@ -897,13 +467,9 @@ rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, * @return * - n: Actual number of objects enqueued. */ -static __rte_always_inline unsigned +unsigned rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, - unsigned int esize, unsigned int n, unsigned int *free_space) -{ - return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, r->prod.single, free_space); -} + unsigned int esize, unsigned int n, unsigned int *free_space); /** * Dequeue several objects from a ring (multi-consumers safe). When the request @@ -929,13 +495,9 @@ rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, * @return * - n: Actual number of objects dequeued, 0 if ring is empty */ -static __rte_always_inline unsigned +unsigned rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table, - unsigned int esize, unsigned int n, unsigned int *available) -{ - return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, __IS_MC, available); -} + unsigned int esize, unsigned int n, unsigned int *available); /** * Dequeue several objects from a ring (NOT multi-consumers safe).When the @@ -958,13 +520,9 @@ rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table, * @return * - n: Actual number of objects dequeued, 0 if ring is empty */ -static __rte_always_inline unsigned +unsigned rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table, - unsigned int esize, unsigned int n, unsigned int *available) -{ - return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, __IS_SC, available); -} + unsigned int esize, unsigned int n, unsigned int *available); /** * Dequeue multiple objects from a ring up to a maximum number. @@ -989,14 +547,9 @@ rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table, * @return * - Number of objects dequeued */ -static __rte_always_inline unsigned int +unsigned int rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table, - unsigned int esize, unsigned int n, unsigned int *available) -{ - return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, - r->cons.single, available); -} + unsigned int esize, unsigned int n, unsigned int *available); #ifdef __cplusplus } diff --git a/lib/librte_ring/rte_ring_version.map b/lib/librte_ring/rte_ring_version.map index e88c143cf..6cceacf95 100644 --- a/lib/librte_ring/rte_ring_version.map +++ b/lib/librte_ring/rte_ring_version.map @@ -8,6 +8,52 @@ DPDK_20.0 { rte_ring_init; rte_ring_list_dump; rte_ring_lookup; + rte_ring_count; + rte_ring_dequeue; + rte_ring_dequeue_bulk; + rte_ring_dequeue_bulk_elem; + rte_ring_dequeue_burst; + rte_ring_dequeue_burst_elem; + rte_ring_dequeue_elem; + rte_ring_empty; + rte_ring_enqueue; + rte_ring_enqueue_bulk; + rte_ring_enqueue_bulk_elem; + rte_ring_enqueue_burst; + rte_ring_enqueue_burst_elem; + rte_ring_enqueue_elem; + rte_ring_free_count; + rte_ring_full; + rte_ring_get_capacity; + rte_ring_get_cons_sync_type; + rte_ring_get_memzone; + rte_ring_get_name; + rte_ring_get_prod_sync_type; + rte_ring_get_size; + rte_ring_mc_dequeue; + rte_ring_mc_dequeue_bulk; + rte_ring_mc_dequeue_bulk_elem; + rte_ring_mc_dequeue_burst; + rte_ring_mc_dequeue_burst_elem; + rte_ring_mc_dequeue_elem; + rte_ring_mp_enqueue; + rte_ring_mp_enqueue_bulk; + rte_ring_mp_enqueue_bulk_elem; + rte_ring_mp_enqueue_burst; + rte_ring_mp_enqueue_burst_elem; + rte_ring_mp_enqueue_elem; + rte_ring_sc_dequeue; + rte_ring_sc_dequeue_bulk; + rte_ring_sc_dequeue_bulk_elem; + rte_ring_sc_dequeue_burst; + rte_ring_sc_dequeue_burst_elem; + rte_ring_sc_dequeue_elem; + rte_ring_sp_enqueue; + rte_ring_sp_enqueue_bulk; + rte_ring_sp_enqueue_bulk_elem; + rte_ring_sp_enqueue_burst; + rte_ring_sp_enqueue_burst_elem; + rte_ring_sp_enqueue_elem; local: *; }; -- 2.17.1