From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga06.intel.com (mga06.intel.com [134.134.136.31]) by dpdk.org (Postfix) with ESMTP id 3C49EF98A for ; Tue, 7 Feb 2017 15:14:13 +0100 (CET) Received: from orsmga005.jf.intel.com ([10.7.209.41]) by orsmga104.jf.intel.com with ESMTP; 07 Feb 2017 06:14:12 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.33,346,1477983600"; d="scan'208";a="61814024" Received: from sivswdev01.ir.intel.com (HELO localhost.localdomain) ([10.237.217.45]) by orsmga005.jf.intel.com with ESMTP; 07 Feb 2017 06:14:11 -0800 From: Bruce Richardson To: olivier.matz@6wind.com Cc: thomas.monjalon@6wind.com, keith.wiles@intel.com, konstantin.ananyev@intel.com, stephen@networkplumber.org, dev@dpdk.org, Bruce Richardson Date: Tue, 7 Feb 2017 14:12:51 +0000 Message-Id: <1486476777-24768-14-git-send-email-bruce.richardson@intel.com> X-Mailer: git-send-email 1.7.0.7 In-Reply-To: <20170125121456.GA24344@bricha3-MOBL3.ger.corp.intel.com> References: <20170125121456.GA24344@bricha3-MOBL3.ger.corp.intel.com> Subject: [dpdk-dev] [PATCH RFCv3 13/19] ring: allow dequeue fns to return remaining entry count X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 07 Feb 2017 14:14:14 -0000 Add an extra parameter to the ring dequeue burst/bulk functions so that those functions can optionally return the amount of remaining objs in the ring. This information can be used by applications in a number of ways, for instance, with single-consumer queues, it provides a max dequeue size which is guaranteed to work. Signed-off-by: Bruce Richardson --- app/pdump/main.c | 2 +- app/test-pipeline/runtime.c | 6 +- app/test/test_link_bonding_mode4.c | 3 +- app/test/test_pmd_ring_perf.c | 7 +- app/test/test_ring.c | 54 ++++++------- app/test/test_ring_perf.c | 20 +++-- app/test/test_table_acl.c | 2 +- app/test/test_table_pipeline.c | 2 +- app/test/test_table_ports.c | 8 +- app/test/virtual_pmd.c | 4 +- drivers/crypto/null/null_crypto_pmd.c | 2 +- drivers/net/bonding/rte_eth_bond_pmd.c | 3 +- drivers/net/ring/rte_eth_ring.c | 2 +- examples/distributor/main.c | 2 +- examples/load_balancer/runtime.c | 6 +- .../client_server_mp/mp_client/client.c | 3 +- examples/packet_ordering/main.c | 6 +- examples/qos_sched/app_thread.c | 6 +- examples/quota_watermark/qw/main.c | 5 +- examples/server_node_efd/node/node.c | 2 +- lib/librte_hash/rte_cuckoo_hash.c | 3 +- lib/librte_mempool/rte_mempool_ring.c | 4 +- lib/librte_port/rte_port_frag.c | 3 +- lib/librte_port/rte_port_ring.c | 6 +- lib/librte_ring/rte_ring.h | 90 +++++++++++----------- 25 files changed, 137 insertions(+), 114 deletions(-) diff --git a/app/pdump/main.c b/app/pdump/main.c index b88090d..3b13753 100644 --- a/app/pdump/main.c +++ b/app/pdump/main.c @@ -496,7 +496,7 @@ pdump_rxtx(struct rte_ring *ring, uint8_t vdev_id, struct pdump_stats *stats) /* first dequeue packets from ring of primary process */ const uint16_t nb_in_deq = rte_ring_dequeue_burst(ring, - (void *)rxtx_bufs, BURST_SIZE); + (void *)rxtx_bufs, BURST_SIZE, NULL); stats->dequeue_pkts += nb_in_deq; if (nb_in_deq) { diff --git a/app/test-pipeline/runtime.c b/app/test-pipeline/runtime.c index c06ff54..8970e1c 100644 --- a/app/test-pipeline/runtime.c +++ b/app/test-pipeline/runtime.c @@ -121,7 +121,8 @@ app_main_loop_worker(void) { ret = rte_ring_sc_dequeue_bulk( app.rings_rx[i], (void **) worker_mbuf->array, - app.burst_size_worker_read); + app.burst_size_worker_read, + NULL); if (ret == 0) continue; @@ -151,7 +152,8 @@ app_main_loop_tx(void) { ret = rte_ring_sc_dequeue_bulk( app.rings_tx[i], (void **) &app.mbuf_tx[i].array[n_mbufs], - app.burst_size_tx_read); + app.burst_size_tx_read, + NULL); if (ret == 0) continue; diff --git a/app/test/test_link_bonding_mode4.c b/app/test/test_link_bonding_mode4.c index 8df28b4..15091b1 100644 --- a/app/test/test_link_bonding_mode4.c +++ b/app/test/test_link_bonding_mode4.c @@ -193,7 +193,8 @@ static uint8_t lacpdu_rx_count[RTE_MAX_ETHPORTS] = {0, }; static int slave_get_pkts(struct slave_conf *slave, struct rte_mbuf **buf, uint16_t size) { - return rte_ring_dequeue_burst(slave->tx_queue, (void **)buf, size); + return rte_ring_dequeue_burst(slave->tx_queue, (void **)buf, + size, NULL); } /* diff --git a/app/test/test_pmd_ring_perf.c b/app/test/test_pmd_ring_perf.c index 045a7f2..004882a 100644 --- a/app/test/test_pmd_ring_perf.c +++ b/app/test/test_pmd_ring_perf.c @@ -67,7 +67,7 @@ test_empty_dequeue(void) const uint64_t sc_start = rte_rdtsc(); for (i = 0; i < iterations; i++) - rte_ring_sc_dequeue_bulk(r, burst, bulk_sizes[0]); + rte_ring_sc_dequeue_bulk(r, burst, bulk_sizes[0], NULL); const uint64_t sc_end = rte_rdtsc(); const uint64_t eth_start = rte_rdtsc(); @@ -99,7 +99,7 @@ test_single_enqueue_dequeue(void) rte_compiler_barrier(); for (i = 0; i < iterations; i++) { rte_ring_enqueue_bulk(r, &burst, 1, NULL); - rte_ring_dequeue_bulk(r, &burst, 1); + rte_ring_dequeue_bulk(r, &burst, 1, NULL); } const uint64_t sc_end = rte_rdtsc_precise(); rte_compiler_barrier(); @@ -133,7 +133,8 @@ test_bulk_enqueue_dequeue(void) for (i = 0; i < iterations; i++) { rte_ring_sp_enqueue_bulk(r, (void *)burst, bulk_sizes[sz], NULL); - rte_ring_sc_dequeue_bulk(r, (void *)burst, bulk_sizes[sz]); + rte_ring_sc_dequeue_bulk(r, (void *)burst, + bulk_sizes[sz], NULL); } const uint64_t sc_end = rte_rdtsc(); diff --git a/app/test/test_ring.c b/app/test/test_ring.c index aa2a711..5b61ef1 100644 --- a/app/test/test_ring.c +++ b/app/test/test_ring.c @@ -119,7 +119,8 @@ test_ring_basic_full_empty(void * const src[], void *dst[]) __func__, i, rand); TEST_RING_VERIFY(0 != rte_ring_enqueue_bulk(r, src, rand, NULL)); - TEST_RING_VERIFY(rand == rte_ring_dequeue_bulk(r, dst, rand)); + TEST_RING_VERIFY(rand == rte_ring_dequeue_bulk(r, dst, + rand, NULL)); /* fill the ring */ TEST_RING_VERIFY(0 != rte_ring_enqueue_bulk(r, src, rsz, NULL)); @@ -129,7 +130,8 @@ test_ring_basic_full_empty(void * const src[], void *dst[]) TEST_RING_VERIFY(0 == rte_ring_empty(r)); /* empty the ring */ - TEST_RING_VERIFY(rsz == rte_ring_dequeue_bulk(r, dst, rsz)); + TEST_RING_VERIFY(rsz == rte_ring_dequeue_bulk(r, dst, + rsz, NULL)); TEST_RING_VERIFY(rsz == rte_ring_free_count(r)); TEST_RING_VERIFY(0 == rte_ring_count(r)); TEST_RING_VERIFY(0 == rte_ring_full(r)); @@ -186,19 +188,19 @@ test_ring_basic(void) goto fail; printf("dequeue 1 obj\n"); - ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 1); + ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 1, NULL); cur_dst += 1; if (ret == 0) goto fail; printf("dequeue 2 objs\n"); - ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 2); + ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 2, NULL); cur_dst += 2; if (ret == 0) goto fail; printf("dequeue MAX_BULK objs\n"); - ret = rte_ring_sc_dequeue_bulk(r, cur_dst, MAX_BULK); + ret = rte_ring_sc_dequeue_bulk(r, cur_dst, MAX_BULK, NULL); cur_dst += MAX_BULK; if (ret == 0) goto fail; @@ -232,19 +234,19 @@ test_ring_basic(void) goto fail; printf("dequeue 1 obj\n"); - ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 1); + ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 1, NULL); cur_dst += 1; if (ret == 0) goto fail; printf("dequeue 2 objs\n"); - ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 2); + ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 2, NULL); cur_dst += 2; if (ret == 0) goto fail; printf("dequeue MAX_BULK objs\n"); - ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK); + ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK, NULL); cur_dst += MAX_BULK; if (ret == 0) goto fail; @@ -265,7 +267,7 @@ test_ring_basic(void) cur_src += MAX_BULK; if (ret == 0) goto fail; - ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK); + ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK, NULL); cur_dst += MAX_BULK; if (ret == 0) goto fail; @@ -303,13 +305,13 @@ test_ring_basic(void) printf("Cannot enqueue\n"); goto fail; } - ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems); + ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems, NULL); cur_dst += num_elems; if (ret == 0) { printf("Cannot dequeue\n"); goto fail; } - ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems); + ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems, NULL); cur_dst += num_elems; if (ret == 0) { printf("Cannot dequeue2\n"); @@ -390,19 +392,19 @@ test_ring_burst_basic(void) goto fail; printf("dequeue 1 obj\n"); - ret = rte_ring_sc_dequeue_burst(r, cur_dst, 1) ; + ret = rte_ring_sc_dequeue_burst(r, cur_dst, 1, NULL) ; cur_dst += 1; if ((ret & RTE_RING_SZ_MASK) != 1) goto fail; printf("dequeue 2 objs\n"); - ret = rte_ring_sc_dequeue_burst(r, cur_dst, 2); + ret = rte_ring_sc_dequeue_burst(r, cur_dst, 2, NULL); cur_dst += 2; if ((ret & RTE_RING_SZ_MASK) != 2) goto fail; printf("dequeue MAX_BULK objs\n"); - ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK); + ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK, NULL); cur_dst += MAX_BULK; if ((ret & RTE_RING_SZ_MASK) != MAX_BULK) goto fail; @@ -451,19 +453,19 @@ test_ring_burst_basic(void) printf("Test dequeue without enough objects \n"); for (i = 0; idata->dev_private; rx_count = rte_ring_dequeue_burst(dev_private->rx_queue, (void **) bufs, - nb_pkts); + nb_pkts, NULL); /* increments ipackets count */ dev_private->eth_stats.ipackets += rx_count; @@ -508,7 +508,7 @@ virtual_ethdev_get_mbufs_from_tx_queue(uint8_t port_id, dev_private = vrtl_eth_dev->data->dev_private; return rte_ring_dequeue_burst(dev_private->tx_queue, (void **)pkt_burst, - burst_length); + burst_length, NULL); } static uint8_t diff --git a/drivers/crypto/null/null_crypto_pmd.c b/drivers/crypto/null/null_crypto_pmd.c index ed5a9fc..f68ec8d 100644 --- a/drivers/crypto/null/null_crypto_pmd.c +++ b/drivers/crypto/null/null_crypto_pmd.c @@ -155,7 +155,7 @@ null_crypto_pmd_dequeue_burst(void *queue_pair, struct rte_crypto_op **ops, unsigned nb_dequeued; nb_dequeued = rte_ring_dequeue_burst(qp->processed_pkts, - (void **)ops, nb_ops); + (void **)ops, nb_ops, NULL); qp->qp_stats.dequeued_count += nb_dequeued; return nb_dequeued; diff --git a/drivers/net/bonding/rte_eth_bond_pmd.c b/drivers/net/bonding/rte_eth_bond_pmd.c index f3ac9e2..96638af 100644 --- a/drivers/net/bonding/rte_eth_bond_pmd.c +++ b/drivers/net/bonding/rte_eth_bond_pmd.c @@ -1008,7 +1008,8 @@ bond_ethdev_tx_burst_8023ad(void *queue, struct rte_mbuf **bufs, struct port *port = &mode_8023ad_ports[slaves[i]]; slave_slow_nb_pkts[i] = rte_ring_dequeue_burst(port->tx_ring, - slow_pkts, BOND_MODE_8023AX_SLAVE_TX_PKTS); + slow_pkts, BOND_MODE_8023AX_SLAVE_TX_PKTS, + NULL); slave_nb_pkts[i] = slave_slow_nb_pkts[i]; for (j = 0; j < slave_slow_nb_pkts[i]; j++) diff --git a/drivers/net/ring/rte_eth_ring.c b/drivers/net/ring/rte_eth_ring.c index adbf478..77ef3a1 100644 --- a/drivers/net/ring/rte_eth_ring.c +++ b/drivers/net/ring/rte_eth_ring.c @@ -88,7 +88,7 @@ eth_ring_rx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs) void **ptrs = (void *)&bufs[0]; struct ring_queue *r = q; const uint16_t nb_rx = (uint16_t)rte_ring_dequeue_burst(r->rng, - ptrs, nb_bufs); + ptrs, nb_bufs, NULL); if (r->rng->flags & RING_F_SC_DEQ) r->rx_pkts.cnt += nb_rx; else diff --git a/examples/distributor/main.c b/examples/distributor/main.c index cfd360b..5cb6185 100644 --- a/examples/distributor/main.c +++ b/examples/distributor/main.c @@ -330,7 +330,7 @@ lcore_tx(struct rte_ring *in_r) struct rte_mbuf *bufs[BURST_SIZE]; const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, - (void *)bufs, BURST_SIZE); + (void *)bufs, BURST_SIZE, NULL); app_stats.tx.dequeue_pkts += nb_rx; /* if we get no traffic, flush anything we have */ diff --git a/examples/load_balancer/runtime.c b/examples/load_balancer/runtime.c index 1645994..8192c08 100644 --- a/examples/load_balancer/runtime.c +++ b/examples/load_balancer/runtime.c @@ -349,7 +349,8 @@ app_lcore_io_tx( ret = rte_ring_sc_dequeue_bulk( ring, (void **) &lp->tx.mbuf_out[port].array[n_mbufs], - bsz_rd); + bsz_rd, + NULL); if (unlikely(ret == 0)) continue; @@ -504,7 +505,8 @@ app_lcore_worker( ret = rte_ring_sc_dequeue_bulk( ring_in, (void **) lp->mbuf_in.array, - bsz_rd); + bsz_rd, + NULL); if (unlikely(ret == 0)) continue; diff --git a/examples/multi_process/client_server_mp/mp_client/client.c b/examples/multi_process/client_server_mp/mp_client/client.c index dca9eb9..01b535c 100644 --- a/examples/multi_process/client_server_mp/mp_client/client.c +++ b/examples/multi_process/client_server_mp/mp_client/client.c @@ -279,7 +279,8 @@ main(int argc, char *argv[]) uint16_t i, rx_pkts; uint8_t port; - rx_pkts = rte_ring_dequeue_burst(rx_ring, pkts, PKT_READ_SIZE); + rx_pkts = rte_ring_dequeue_burst(rx_ring, pkts, + PKT_READ_SIZE, NULL); if (unlikely(rx_pkts == 0)){ if (need_flush) diff --git a/examples/packet_ordering/main.c b/examples/packet_ordering/main.c index d268350..7719dad 100644 --- a/examples/packet_ordering/main.c +++ b/examples/packet_ordering/main.c @@ -462,7 +462,7 @@ worker_thread(void *args_ptr) /* dequeue the mbufs from rx_to_workers ring */ burst_size = rte_ring_dequeue_burst(ring_in, - (void *)burst_buffer, MAX_PKTS_BURST); + (void *)burst_buffer, MAX_PKTS_BURST, NULL); if (unlikely(burst_size == 0)) continue; @@ -510,7 +510,7 @@ send_thread(struct send_thread_args *args) /* deque the mbufs from workers_to_tx ring */ nb_dq_mbufs = rte_ring_dequeue_burst(args->ring_in, - (void *)mbufs, MAX_PKTS_BURST); + (void *)mbufs, MAX_PKTS_BURST, NULL); if (unlikely(nb_dq_mbufs == 0)) continue; @@ -595,7 +595,7 @@ tx_thread(struct rte_ring *ring_in) /* deque the mbufs from workers_to_tx ring */ dqnum = rte_ring_dequeue_burst(ring_in, - (void *)mbufs, MAX_PKTS_BURST); + (void *)mbufs, MAX_PKTS_BURST, NULL); if (unlikely(dqnum == 0)) continue; diff --git a/examples/qos_sched/app_thread.c b/examples/qos_sched/app_thread.c index 0c81a15..15f117f 100644 --- a/examples/qos_sched/app_thread.c +++ b/examples/qos_sched/app_thread.c @@ -179,7 +179,7 @@ app_tx_thread(struct thread_conf **confs) while ((conf = confs[conf_idx])) { retval = rte_ring_sc_dequeue_bulk(conf->tx_ring, (void **)mbufs, - burst_conf.qos_dequeue); + burst_conf.qos_dequeue, NULL); if (likely(retval != 0)) { app_send_packets(conf, mbufs, burst_conf.qos_dequeue); @@ -218,7 +218,7 @@ app_worker_thread(struct thread_conf **confs) /* Read packet from the ring */ nb_pkt = rte_ring_sc_dequeue_burst(conf->rx_ring, (void **)mbufs, - burst_conf.ring_burst); + burst_conf.ring_burst, NULL); if (likely(nb_pkt)) { int nb_sent = rte_sched_port_enqueue(conf->sched_port, mbufs, nb_pkt); @@ -254,7 +254,7 @@ app_mixed_thread(struct thread_conf **confs) /* Read packet from the ring */ nb_pkt = rte_ring_sc_dequeue_burst(conf->rx_ring, (void **)mbufs, - burst_conf.ring_burst); + burst_conf.ring_burst, NULL); if (likely(nb_pkt)) { int nb_sent = rte_sched_port_enqueue(conf->sched_port, mbufs, nb_pkt); diff --git a/examples/quota_watermark/qw/main.c b/examples/quota_watermark/qw/main.c index 8fb7eb1..ef39053 100644 --- a/examples/quota_watermark/qw/main.c +++ b/examples/quota_watermark/qw/main.c @@ -243,7 +243,7 @@ pipeline_stage(__attribute__((unused)) void *args) } /* Dequeue up to quota mbuf from rx */ - nb_dq_pkts = rte_ring_dequeue_burst(rx, pkts, *quota); + nb_dq_pkts = rte_ring_dequeue_burst(rx, pkts, *quota, NULL); if (unlikely(nb_dq_pkts < 0)) continue; @@ -297,7 +297,8 @@ send_stage(__attribute__((unused)) void *args) continue; /* Dequeue packets from tx and send them */ - nb_dq_pkts = (uint16_t) rte_ring_dequeue_burst(tx, (void *) tx_pkts, *quota); + nb_dq_pkts = (uint16_t) rte_ring_dequeue_burst(tx, (void *) tx_pkts, + *quota, NULL); rte_eth_tx_burst(dest_port_id, 0, tx_pkts, nb_dq_pkts); /* TODO: Check if nb_dq_pkts == nb_tx_pkts? */ diff --git a/examples/server_node_efd/node/node.c b/examples/server_node_efd/node/node.c index 9ec6a05..f780b92 100644 --- a/examples/server_node_efd/node/node.c +++ b/examples/server_node_efd/node/node.c @@ -392,7 +392,7 @@ main(int argc, char *argv[]) */ while (rx_pkts > 0 && unlikely(rte_ring_dequeue_bulk(rx_ring, pkts, - rx_pkts) == 0)) + rx_pkts, NULL) == 0)) rx_pkts = (uint16_t)RTE_MIN(rte_ring_count(rx_ring), PKT_READ_SIZE); diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c index 6552199..645c0cf 100644 --- a/lib/librte_hash/rte_cuckoo_hash.c +++ b/lib/librte_hash/rte_cuckoo_hash.c @@ -536,7 +536,8 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, if (cached_free_slots->len == 0) { /* Need to get another burst of free slots from global ring */ n_slots = rte_ring_mc_dequeue_burst(h->free_slots, - cached_free_slots->objs, LCORE_CACHE_SIZE); + cached_free_slots->objs, + LCORE_CACHE_SIZE, NULL); if (n_slots == 0) return -ENOSPC; diff --git a/lib/librte_mempool/rte_mempool_ring.c b/lib/librte_mempool/rte_mempool_ring.c index 9b8fd2b..5c132bf 100644 --- a/lib/librte_mempool/rte_mempool_ring.c +++ b/lib/librte_mempool/rte_mempool_ring.c @@ -58,14 +58,14 @@ static int common_ring_mc_dequeue(struct rte_mempool *mp, void **obj_table, unsigned n) { return rte_ring_mc_dequeue_bulk(mp->pool_data, - obj_table, n) == 0 ? -ENOBUFS : 0; + obj_table, n, NULL) == 0 ? -ENOBUFS : 0; } static int common_ring_sc_dequeue(struct rte_mempool *mp, void **obj_table, unsigned n) { return rte_ring_sc_dequeue_bulk(mp->pool_data, - obj_table, n) == 0 ? -ENOBUFS : 0; + obj_table, n, NULL) == 0 ? -ENOBUFS : 0; } static unsigned diff --git a/lib/librte_port/rte_port_frag.c b/lib/librte_port/rte_port_frag.c index 0fcace9..320407e 100644 --- a/lib/librte_port/rte_port_frag.c +++ b/lib/librte_port/rte_port_frag.c @@ -186,7 +186,8 @@ rte_port_ring_reader_frag_rx(void *port, /* If "pkts" buffer is empty, read packet burst from ring */ if (p->n_pkts == 0) { p->n_pkts = rte_ring_sc_dequeue_burst(p->ring, - (void **) p->pkts, RTE_PORT_IN_BURST_SIZE_MAX); + (void **) p->pkts, RTE_PORT_IN_BURST_SIZE_MAX, + NULL); RTE_PORT_RING_READER_FRAG_STATS_PKTS_IN_ADD(p, p->n_pkts); if (p->n_pkts == 0) return n_pkts_out; diff --git a/lib/librte_port/rte_port_ring.c b/lib/librte_port/rte_port_ring.c index 9fadac7..492b0e7 100644 --- a/lib/librte_port/rte_port_ring.c +++ b/lib/librte_port/rte_port_ring.c @@ -111,7 +111,8 @@ rte_port_ring_reader_rx(void *port, struct rte_mbuf **pkts, uint32_t n_pkts) struct rte_port_ring_reader *p = (struct rte_port_ring_reader *) port; uint32_t nb_rx; - nb_rx = rte_ring_sc_dequeue_burst(p->ring, (void **) pkts, n_pkts); + nb_rx = rte_ring_sc_dequeue_burst(p->ring, (void **) pkts, + n_pkts, NULL); RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(p, nb_rx); return nb_rx; @@ -124,7 +125,8 @@ rte_port_ring_multi_reader_rx(void *port, struct rte_mbuf **pkts, struct rte_port_ring_reader *p = (struct rte_port_ring_reader *) port; uint32_t nb_rx; - nb_rx = rte_ring_mc_dequeue_burst(p->ring, (void **) pkts, n_pkts); + nb_rx = rte_ring_mc_dequeue_burst(p->ring, (void **) pkts, + n_pkts, NULL); RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(p, nb_rx); return nb_rx; diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index 2f8995c..b6123ba 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -497,7 +497,8 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table, static inline unsigned int __attribute__((always_inline)) __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table, - unsigned n, enum rte_ring_queue_behavior behavior) + unsigned int n, enum rte_ring_queue_behavior behavior, + unsigned int *available) { uint32_t cons_head, prod_tail; uint32_t cons_next, entries; @@ -506,11 +507,6 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table, unsigned int i; uint32_t mask = r->mask; - /* Avoid the unnecessary cmpset operation below, which is also - * potentially harmful when n equals 0. */ - if (n == 0) - return 0; - /* move cons.head atomically */ do { /* Restore n as it may change every loop */ @@ -525,15 +521,11 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table, entries = (prod_tail - cons_head); /* Set the actual entries for dequeue */ - if (n > entries) { - if (behavior == RTE_RING_QUEUE_FIXED) - return 0; - else { - if (unlikely(entries == 0)) - return 0; - n = entries; - } - } + if (n > entries) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : entries; + + if (unlikely(n == 0)) + goto end; cons_next = cons_head + n; success = rte_atomic32_cmpset(&r->cons.head, cons_head, @@ -552,7 +544,9 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table, rte_pause(); r->cons.tail = cons_next; - +end: + if (available != NULL) + *available = entries - n; return n; } @@ -581,7 +575,8 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table, */ static inline unsigned int __attribute__((always_inline)) __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table, - unsigned n, enum rte_ring_queue_behavior behavior) + unsigned int n, enum rte_ring_queue_behavior behavior, + unsigned int *available) { uint32_t cons_head, prod_tail; uint32_t cons_next, entries; @@ -596,15 +591,11 @@ __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table, * and size(ring)-1. */ entries = prod_tail - cons_head; - if (n > entries) { - if (behavior == RTE_RING_QUEUE_FIXED) - return 0; - else { - if (unlikely(entries == 0)) - return 0; - n = entries; - } - } + if (n > entries) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : entries; + + if (unlikely(entries == 0)) + goto end; cons_next = cons_head + n; r->cons.head = cons_next; @@ -614,6 +605,9 @@ __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table, rte_smp_rmb(); r->cons.tail = cons_next; +end: + if (available != NULL) + *available = entries - n; return n; } @@ -760,9 +754,11 @@ rte_ring_enqueue(struct rte_ring *r, void *obj) * The number of objects dequeued, either 0 or n */ static inline unsigned int __attribute__((always_inline)) -rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n) +rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) { - return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED); + return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + available); } /** @@ -779,9 +775,11 @@ rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n) * The number of objects dequeued, either 0 or n */ static inline unsigned int __attribute__((always_inline)) -rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n) +rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) { - return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED); + return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + available); } /** @@ -801,12 +799,13 @@ rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n) * The number of objects dequeued, either 0 or n */ static inline unsigned int __attribute__((always_inline)) -rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n) +rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, + unsigned int *available) { if (r->cons.sc_dequeue) - return rte_ring_sc_dequeue_bulk(r, obj_table, n); + return rte_ring_sc_dequeue_bulk(r, obj_table, n, available); else - return rte_ring_mc_dequeue_bulk(r, obj_table, n); + return rte_ring_mc_dequeue_bulk(r, obj_table, n, available); } /** @@ -827,7 +826,7 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n) static inline int __attribute__((always_inline)) rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p) { - return rte_ring_mc_dequeue_bulk(r, obj_p, 1) ? 0 : -ENOBUFS; + return rte_ring_mc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOBUFS; } /** @@ -845,7 +844,7 @@ rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p) static inline int __attribute__((always_inline)) rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p) { - return rte_ring_sc_dequeue_bulk(r, obj_p, 1) ? 0 : -ENOBUFS; + return rte_ring_sc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOBUFS; } /** @@ -867,7 +866,7 @@ rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p) static inline int __attribute__((always_inline)) rte_ring_dequeue(struct rte_ring *r, void **obj_p) { - return rte_ring_dequeue_bulk(r, obj_p, 1) ? 0 : -ENOBUFS; + return rte_ring_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOBUFS; } /** @@ -1057,9 +1056,11 @@ rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, * - n: Actual number of objects dequeued, 0 if ring is empty */ static inline unsigned __attribute__((always_inline)) -rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n) +rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) { - return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE); + return __rte_ring_mc_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, available); } /** @@ -1077,9 +1078,11 @@ rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n) * - n: Actual number of objects dequeued, 0 if ring is empty */ static inline unsigned __attribute__((always_inline)) -rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n) +rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) { - return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE); + return __rte_ring_sc_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, available); } /** @@ -1099,12 +1102,13 @@ rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n) * - Number of objects dequeued */ static inline unsigned __attribute__((always_inline)) -rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n) +rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) { if (r->cons.sc_dequeue) - return rte_ring_sc_dequeue_burst(r, obj_table, n); + return rte_ring_sc_dequeue_burst(r, obj_table, n, available); else - return rte_ring_mc_dequeue_burst(r, obj_table, n); + return rte_ring_mc_dequeue_burst(r, obj_table, n, available); } #ifdef __cplusplus -- 2.9.3