From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga06.intel.com (mga06.intel.com [134.134.136.31]) by dpdk.org (Postfix) with ESMTP id 43762F96E for ; Tue, 7 Feb 2017 15:14:06 +0100 (CET) Received: from orsmga005.jf.intel.com ([10.7.209.41]) by orsmga104.jf.intel.com with ESMTP; 07 Feb 2017 06:14:06 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.33,346,1477983600"; d="scan'208";a="61813980" Received: from sivswdev01.ir.intel.com (HELO localhost.localdomain) ([10.237.217.45]) by orsmga005.jf.intel.com with ESMTP; 07 Feb 2017 06:14:04 -0800 From: Bruce Richardson To: olivier.matz@6wind.com Cc: thomas.monjalon@6wind.com, keith.wiles@intel.com, konstantin.ananyev@intel.com, stephen@networkplumber.org, dev@dpdk.org, Bruce Richardson Date: Tue, 7 Feb 2017 14:12:49 +0000 Message-Id: <1486476777-24768-12-git-send-email-bruce.richardson@intel.com> X-Mailer: git-send-email 1.7.0.7 In-Reply-To: <20170125121456.GA24344@bricha3-MOBL3.ger.corp.intel.com> References: <20170125121456.GA24344@bricha3-MOBL3.ger.corp.intel.com> Subject: [dpdk-dev] [PATCH RFCv3 11/19] ring: allow enq fns to return free space value X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 07 Feb 2017 14:14:07 -0000 Add an extra parameter to the ring enqueue burst/bulk functions so that those functions can optionally return the amount of free space in the ring. This information can be used by applications in a number of ways, for instance, with single-producer queues, it provides a max enqueue size which is guaranteed to work. It can also be used to implement watermark functionality in apps, replacing the older functionality with a more flexible version, which enables apps to implement multiple watermark thresholds, rather than just one. Signed-off-by: Bruce Richardson --- app/test-pipeline/pipeline_hash.c | 3 +- app/test-pipeline/runtime.c | 5 +- app/test/test_link_bonding_mode4.c | 3 +- app/test/test_pmd_ring_perf.c | 5 +- app/test/test_ring.c | 55 +++++++------ app/test/test_ring_perf.c | 16 ++-- app/test/test_table_ports.c | 4 +- app/test/virtual_pmd.c | 4 +- drivers/net/ring/rte_eth_ring.c | 2 +- examples/distributor/main.c | 3 +- examples/load_balancer/runtime.c | 12 ++- .../client_server_mp/mp_server/main.c | 2 +- examples/packet_ordering/main.c | 7 +- examples/qos_sched/app_thread.c | 4 +- examples/server_node_efd/server/main.c | 2 +- lib/librte_hash/rte_cuckoo_hash.c | 2 +- lib/librte_mempool/rte_mempool_ring.c | 4 +- lib/librte_pdump/rte_pdump.c | 2 +- lib/librte_port/rte_port_ras.c | 2 +- lib/librte_port/rte_port_ring.c | 28 ++++--- lib/librte_ring/rte_ring.h | 89 +++++++++++----------- 21 files changed, 135 insertions(+), 119 deletions(-) diff --git a/app/test-pipeline/pipeline_hash.c b/app/test-pipeline/pipeline_hash.c index 1ac0aa8..0c6e04f 100644 --- a/app/test-pipeline/pipeline_hash.c +++ b/app/test-pipeline/pipeline_hash.c @@ -546,7 +546,8 @@ app_main_loop_rx_metadata(void) { ret = rte_ring_sp_enqueue_bulk( app.rings_rx[i], (void **) app.mbuf_rx.array, - n_mbufs); + n_mbufs, + NULL); } while (ret == 0); } } diff --git a/app/test-pipeline/runtime.c b/app/test-pipeline/runtime.c index 4e20669..c06ff54 100644 --- a/app/test-pipeline/runtime.c +++ b/app/test-pipeline/runtime.c @@ -97,7 +97,7 @@ app_main_loop_rx(void) { ret = rte_ring_sp_enqueue_bulk( app.rings_rx[i], (void **) app.mbuf_rx.array, - n_mbufs); + n_mbufs, NULL); } while (ret == 0); } } @@ -130,7 +130,8 @@ app_main_loop_worker(void) { ret = rte_ring_sp_enqueue_bulk( app.rings_tx[i ^ 1], (void **) worker_mbuf->array, - app.burst_size_worker_write); + app.burst_size_worker_write, + NULL); } while (ret == 0); } } diff --git a/app/test/test_link_bonding_mode4.c b/app/test/test_link_bonding_mode4.c index 53caa3e..8df28b4 100644 --- a/app/test/test_link_bonding_mode4.c +++ b/app/test/test_link_bonding_mode4.c @@ -206,7 +206,8 @@ slave_get_pkts(struct slave_conf *slave, struct rte_mbuf **buf, uint16_t size) static int slave_put_pkts(struct slave_conf *slave, struct rte_mbuf **buf, uint16_t size) { - return rte_ring_enqueue_burst(slave->rx_queue, (void **)buf, size); + return rte_ring_enqueue_burst(slave->rx_queue, (void **)buf, + size, NULL); } static uint16_t diff --git a/app/test/test_pmd_ring_perf.c b/app/test/test_pmd_ring_perf.c index af011f7..045a7f2 100644 --- a/app/test/test_pmd_ring_perf.c +++ b/app/test/test_pmd_ring_perf.c @@ -98,7 +98,7 @@ test_single_enqueue_dequeue(void) const uint64_t sc_start = rte_rdtsc_precise(); rte_compiler_barrier(); for (i = 0; i < iterations; i++) { - rte_ring_enqueue_bulk(r, &burst, 1); + rte_ring_enqueue_bulk(r, &burst, 1, NULL); rte_ring_dequeue_bulk(r, &burst, 1); } const uint64_t sc_end = rte_rdtsc_precise(); @@ -131,7 +131,8 @@ test_bulk_enqueue_dequeue(void) for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) { const uint64_t sc_start = rte_rdtsc(); for (i = 0; i < iterations; i++) { - rte_ring_sp_enqueue_bulk(r, (void *)burst, bulk_sizes[sz]); + rte_ring_sp_enqueue_bulk(r, (void *)burst, + bulk_sizes[sz], NULL); rte_ring_sc_dequeue_bulk(r, (void *)burst, bulk_sizes[sz]); } const uint64_t sc_end = rte_rdtsc(); diff --git a/app/test/test_ring.c b/app/test/test_ring.c index 4378fd0..aa2a711 100644 --- a/app/test/test_ring.c +++ b/app/test/test_ring.c @@ -118,12 +118,11 @@ test_ring_basic_full_empty(void * const src[], void *dst[]) printf("%s: iteration %u, random shift: %u;\n", __func__, i, rand); TEST_RING_VERIFY(0 != rte_ring_enqueue_bulk(r, src, - rand)); + rand, NULL)); TEST_RING_VERIFY(rand == rte_ring_dequeue_bulk(r, dst, rand)); /* fill the ring */ - TEST_RING_VERIFY(0 != rte_ring_enqueue_bulk(r, src, - rsz)); + TEST_RING_VERIFY(0 != rte_ring_enqueue_bulk(r, src, rsz, NULL)); TEST_RING_VERIFY(0 == rte_ring_free_count(r)); TEST_RING_VERIFY(rsz == rte_ring_count(r)); TEST_RING_VERIFY(rte_ring_full(r)); @@ -169,19 +168,19 @@ test_ring_basic(void) cur_dst = dst; printf("enqueue 1 obj\n"); - ret = rte_ring_sp_enqueue_bulk(r, cur_src, 1); + ret = rte_ring_sp_enqueue_bulk(r, cur_src, 1, NULL); cur_src += 1; if (ret == 0) goto fail; printf("enqueue 2 objs\n"); - ret = rte_ring_sp_enqueue_bulk(r, cur_src, 2); + ret = rte_ring_sp_enqueue_bulk(r, cur_src, 2, NULL); cur_src += 2; if (ret == 0) goto fail; printf("enqueue MAX_BULK objs\n"); - ret = rte_ring_sp_enqueue_bulk(r, cur_src, MAX_BULK); + ret = rte_ring_sp_enqueue_bulk(r, cur_src, MAX_BULK, NULL); cur_src += MAX_BULK; if (ret == 0) goto fail; @@ -215,19 +214,19 @@ test_ring_basic(void) cur_dst = dst; printf("enqueue 1 obj\n"); - ret = rte_ring_mp_enqueue_bulk(r, cur_src, 1); + ret = rte_ring_mp_enqueue_bulk(r, cur_src, 1, NULL); cur_src += 1; if (ret == 0) goto fail; printf("enqueue 2 objs\n"); - ret = rte_ring_mp_enqueue_bulk(r, cur_src, 2); + ret = rte_ring_mp_enqueue_bulk(r, cur_src, 2, NULL); cur_src += 2; if (ret == 0) goto fail; printf("enqueue MAX_BULK objs\n"); - ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK); + ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK, NULL); cur_src += MAX_BULK; if (ret == 0) goto fail; @@ -262,7 +261,7 @@ test_ring_basic(void) printf("fill and empty the ring\n"); for (i = 0; itx_queue, (void **)bufs, - nb_pkts); + nb_pkts, NULL); /* increment opacket count */ dev_private->eth_stats.opackets += nb_pkts; @@ -496,7 +496,7 @@ virtual_ethdev_add_mbufs_to_rx_queue(uint8_t port_id, vrtl_eth_dev->data->dev_private; return rte_ring_enqueue_burst(dev_private->rx_queue, (void **)pkt_burst, - burst_length); + burst_length, NULL); } int diff --git a/drivers/net/ring/rte_eth_ring.c b/drivers/net/ring/rte_eth_ring.c index 6f9cc1a..adbf478 100644 --- a/drivers/net/ring/rte_eth_ring.c +++ b/drivers/net/ring/rte_eth_ring.c @@ -102,7 +102,7 @@ eth_ring_tx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs) void **ptrs = (void *)&bufs[0]; struct ring_queue *r = q; const uint16_t nb_tx = (uint16_t)rte_ring_enqueue_burst(r->rng, - ptrs, nb_bufs); + ptrs, nb_bufs, NULL); if (r->rng->flags & RING_F_SP_ENQ) { r->tx_pkts.cnt += nb_tx; r->err_pkts.cnt += nb_bufs - nb_tx; diff --git a/examples/distributor/main.c b/examples/distributor/main.c index e7641d2..cfd360b 100644 --- a/examples/distributor/main.c +++ b/examples/distributor/main.c @@ -238,7 +238,8 @@ lcore_rx(struct lcore_params *p) continue; } - uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret); + uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, + nb_ret, NULL); app_stats.rx.enqueued_pkts += sent; if (unlikely(sent < nb_ret)) { RTE_LOG_DP(DEBUG, DISTRAPP, diff --git a/examples/load_balancer/runtime.c b/examples/load_balancer/runtime.c index 82b10bc..1645994 100644 --- a/examples/load_balancer/runtime.c +++ b/examples/load_balancer/runtime.c @@ -144,7 +144,8 @@ app_lcore_io_rx_buffer_to_send ( ret = rte_ring_sp_enqueue_bulk( lp->rx.rings[worker], (void **) lp->rx.mbuf_out[worker].array, - bsz); + bsz, + NULL); if (unlikely(ret == 0)) { uint32_t k; @@ -310,7 +311,8 @@ app_lcore_io_rx_flush(struct app_lcore_params_io *lp, uint32_t n_workers) ret = rte_ring_sp_enqueue_bulk( lp->rx.rings[worker], (void **) lp->rx.mbuf_out[worker].array, - lp->rx.mbuf_out[worker].n_mbufs); + lp->rx.mbuf_out[worker].n_mbufs, + NULL); if (unlikely(ret == 0)) { uint32_t k; @@ -553,7 +555,8 @@ app_lcore_worker( ret = rte_ring_sp_enqueue_bulk( lp->rings_out[port], (void **) lp->mbuf_out[port].array, - bsz_wr); + bsz_wr, + NULL); #if APP_STATS lp->rings_out_iters[port] ++; @@ -605,7 +608,8 @@ app_lcore_worker_flush(struct app_lcore_params_worker *lp) ret = rte_ring_sp_enqueue_bulk( lp->rings_out[port], (void **) lp->mbuf_out[port].array, - lp->mbuf_out[port].n_mbufs); + lp->mbuf_out[port].n_mbufs, + NULL); if (unlikely(ret == 0)) { uint32_t k; diff --git a/examples/multi_process/client_server_mp/mp_server/main.c b/examples/multi_process/client_server_mp/mp_server/main.c index 19c95b2..c2b0261 100644 --- a/examples/multi_process/client_server_mp/mp_server/main.c +++ b/examples/multi_process/client_server_mp/mp_server/main.c @@ -227,7 +227,7 @@ flush_rx_queue(uint16_t client) cl = &clients[client]; if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[client].buffer, - cl_rx_buf[client].count) == 0){ + cl_rx_buf[client].count, NULL) == 0){ for (j = 0; j < cl_rx_buf[client].count; j++) rte_pktmbuf_free(cl_rx_buf[client].buffer[j]); cl->stats.rx_drop += cl_rx_buf[client].count; diff --git a/examples/packet_ordering/main.c b/examples/packet_ordering/main.c index d4dc789..d268350 100644 --- a/examples/packet_ordering/main.c +++ b/examples/packet_ordering/main.c @@ -421,8 +421,8 @@ rx_thread(struct rte_ring *ring_out) pkts[i++]->seqn = seqn++; /* enqueue to rx_to_workers ring */ - ret = rte_ring_enqueue_burst(ring_out, (void *) pkts, - nb_rx_pkts); + ret = rte_ring_enqueue_burst(ring_out, + (void *)pkts, nb_rx_pkts, NULL); app_stats.rx.enqueue_pkts += ret; if (unlikely(ret < nb_rx_pkts)) { app_stats.rx.enqueue_failed_pkts += @@ -473,7 +473,8 @@ worker_thread(void *args_ptr) burst_buffer[i++]->port ^= xor_val; /* enqueue the modified mbufs to workers_to_tx ring */ - ret = rte_ring_enqueue_burst(ring_out, (void *)burst_buffer, burst_size); + ret = rte_ring_enqueue_burst(ring_out, (void *)burst_buffer, + burst_size, NULL); __sync_fetch_and_add(&app_stats.wkr.enqueue_pkts, ret); if (unlikely(ret < burst_size)) { /* Return the mbufs to their respective pool, dropping packets */ diff --git a/examples/qos_sched/app_thread.c b/examples/qos_sched/app_thread.c index dab4594..0c81a15 100644 --- a/examples/qos_sched/app_thread.c +++ b/examples/qos_sched/app_thread.c @@ -107,7 +107,7 @@ app_rx_thread(struct thread_conf **confs) } if (unlikely(rte_ring_sp_enqueue_bulk(conf->rx_ring, - (void **)rx_mbufs, nb_rx) == 0)) { + (void **)rx_mbufs, nb_rx, NULL) == 0)) { for(i = 0; i < nb_rx; i++) { rte_pktmbuf_free(rx_mbufs[i]); @@ -231,7 +231,7 @@ app_worker_thread(struct thread_conf **confs) burst_conf.qos_dequeue); if (likely(nb_pkt > 0)) while (rte_ring_sp_enqueue_bulk(conf->tx_ring, - (void **)mbufs, nb_pkt) == 0) + (void **)mbufs, nb_pkt, NULL) == 0) ; /* empty body */ conf_idx++; diff --git a/examples/server_node_efd/server/main.c b/examples/server_node_efd/server/main.c index 3eb7fac..597b4c2 100644 --- a/examples/server_node_efd/server/main.c +++ b/examples/server_node_efd/server/main.c @@ -247,7 +247,7 @@ flush_rx_queue(uint16_t node) cl = &nodes[node]; if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[node].buffer, - cl_rx_buf[node].count) != cl_rx_buf[node].count){ + cl_rx_buf[node].count, NULL) != cl_rx_buf[node].count){ for (j = 0; j < cl_rx_buf[node].count; j++) rte_pktmbuf_free(cl_rx_buf[node].buffer[j]); cl->stats.rx_drop += cl_rx_buf[node].count; diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c index 51db006..6552199 100644 --- a/lib/librte_hash/rte_cuckoo_hash.c +++ b/lib/librte_hash/rte_cuckoo_hash.c @@ -808,7 +808,7 @@ remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i) /* Need to enqueue the free slots in global ring. */ n_slots = rte_ring_mp_enqueue_burst(h->free_slots, cached_free_slots->objs, - LCORE_CACHE_SIZE); + LCORE_CACHE_SIZE, NULL); cached_free_slots->len -= n_slots; } /* Put index of new free slot in cache. */ diff --git a/lib/librte_mempool/rte_mempool_ring.c b/lib/librte_mempool/rte_mempool_ring.c index 409b860..9b8fd2b 100644 --- a/lib/librte_mempool/rte_mempool_ring.c +++ b/lib/librte_mempool/rte_mempool_ring.c @@ -43,7 +43,7 @@ common_ring_mp_enqueue(struct rte_mempool *mp, void * const *obj_table, unsigned n) { return rte_ring_mp_enqueue_bulk(mp->pool_data, - obj_table, n) == 0 ? -ENOBUFS : 0; + obj_table, n, NULL) == 0 ? -ENOBUFS : 0; } static int @@ -51,7 +51,7 @@ common_ring_sp_enqueue(struct rte_mempool *mp, void * const *obj_table, unsigned n) { return rte_ring_sp_enqueue_bulk(mp->pool_data, - obj_table, n) == 0 ? -ENOBUFS : 0; + obj_table, n, NULL) == 0 ? -ENOBUFS : 0; } static int diff --git a/lib/librte_pdump/rte_pdump.c b/lib/librte_pdump/rte_pdump.c index a580a6a..d6d3e46 100644 --- a/lib/librte_pdump/rte_pdump.c +++ b/lib/librte_pdump/rte_pdump.c @@ -197,7 +197,7 @@ pdump_copy(struct rte_mbuf **pkts, uint16_t nb_pkts, void *user_params) dup_bufs[d_pkts++] = p; } - ring_enq = rte_ring_enqueue_burst(ring, (void *)dup_bufs, d_pkts); + ring_enq = rte_ring_enqueue_burst(ring, (void *)dup_bufs, d_pkts, NULL); if (unlikely(ring_enq < d_pkts)) { RTE_LOG(DEBUG, PDUMP, "only %d of packets enqueued to ring\n", ring_enq); diff --git a/lib/librte_port/rte_port_ras.c b/lib/librte_port/rte_port_ras.c index c4bb508..4de0945 100644 --- a/lib/librte_port/rte_port_ras.c +++ b/lib/librte_port/rte_port_ras.c @@ -167,7 +167,7 @@ send_burst(struct rte_port_ring_writer_ras *p) uint32_t nb_tx; nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf, - p->tx_buf_count); + p->tx_buf_count, NULL); RTE_PORT_RING_WRITER_RAS_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx); for ( ; nb_tx < p->tx_buf_count; nb_tx++) diff --git a/lib/librte_port/rte_port_ring.c b/lib/librte_port/rte_port_ring.c index 3b9d3d0..9fadac7 100644 --- a/lib/librte_port/rte_port_ring.c +++ b/lib/librte_port/rte_port_ring.c @@ -241,7 +241,7 @@ send_burst(struct rte_port_ring_writer *p) uint32_t nb_tx; nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf, - p->tx_buf_count); + p->tx_buf_count, NULL); RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx); for ( ; nb_tx < p->tx_buf_count; nb_tx++) @@ -256,7 +256,7 @@ send_burst_mp(struct rte_port_ring_writer *p) uint32_t nb_tx; nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf, - p->tx_buf_count); + p->tx_buf_count, NULL); RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx); for ( ; nb_tx < p->tx_buf_count; nb_tx++) @@ -318,11 +318,11 @@ rte_port_ring_writer_tx_bulk_internal(void *port, RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, n_pkts); if (is_multi) - n_pkts_ok = rte_ring_mp_enqueue_burst(p->ring, (void **)pkts, - n_pkts); + n_pkts_ok = rte_ring_mp_enqueue_burst(p->ring, + (void **)pkts, n_pkts, NULL); else - n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring, (void **)pkts, - n_pkts); + n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring, + (void **)pkts, n_pkts, NULL); RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, n_pkts - n_pkts_ok); for ( ; n_pkts_ok < n_pkts; n_pkts_ok++) { @@ -517,7 +517,7 @@ send_burst_nodrop(struct rte_port_ring_writer_nodrop *p) uint32_t nb_tx = 0, i; nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf, - p->tx_buf_count); + p->tx_buf_count, NULL); /* We sent all the packets in a first try */ if (nb_tx >= p->tx_buf_count) { @@ -527,7 +527,8 @@ send_burst_nodrop(struct rte_port_ring_writer_nodrop *p) for (i = 0; i < p->n_retries; i++) { nb_tx += rte_ring_sp_enqueue_burst(p->ring, - (void **) (p->tx_buf + nb_tx), p->tx_buf_count - nb_tx); + (void **) (p->tx_buf + nb_tx), + p->tx_buf_count - nb_tx, NULL); /* We sent all the packets in more than one try */ if (nb_tx >= p->tx_buf_count) { @@ -550,7 +551,7 @@ send_burst_mp_nodrop(struct rte_port_ring_writer_nodrop *p) uint32_t nb_tx = 0, i; nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf, - p->tx_buf_count); + p->tx_buf_count, NULL); /* We sent all the packets in a first try */ if (nb_tx >= p->tx_buf_count) { @@ -560,7 +561,8 @@ send_burst_mp_nodrop(struct rte_port_ring_writer_nodrop *p) for (i = 0; i < p->n_retries; i++) { nb_tx += rte_ring_mp_enqueue_burst(p->ring, - (void **) (p->tx_buf + nb_tx), p->tx_buf_count - nb_tx); + (void **) (p->tx_buf + nb_tx), + p->tx_buf_count - nb_tx, NULL); /* We sent all the packets in more than one try */ if (nb_tx >= p->tx_buf_count) { @@ -633,10 +635,12 @@ rte_port_ring_writer_nodrop_tx_bulk_internal(void *port, RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, n_pkts); if (is_multi) n_pkts_ok = - rte_ring_mp_enqueue_burst(p->ring, (void **)pkts, n_pkts); + rte_ring_mp_enqueue_burst(p->ring, + (void **)pkts, n_pkts, NULL); else n_pkts_ok = - rte_ring_sp_enqueue_burst(p->ring, (void **)pkts, n_pkts); + rte_ring_sp_enqueue_burst(p->ring, + (void **)pkts, n_pkts, NULL); if (n_pkts_ok >= n_pkts) return 0; diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index d4d44ce..2f8995c 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -354,20 +354,16 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r); */ static inline unsigned int __attribute__((always_inline)) __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table, - unsigned n, enum rte_ring_queue_behavior behavior) + unsigned int n, enum rte_ring_queue_behavior behavior, + unsigned int *free_space) { uint32_t prod_head, prod_next; uint32_t cons_tail, free_entries; - const unsigned max = n; + const unsigned int max = n; int success; unsigned int i; uint32_t mask = r->mask; - /* Avoid the unnecessary cmpset operation below, which is also - * potentially harmful when n equals 0. */ - if (n == 0) - return 0; - /* move prod.head atomically */ do { /* Reset n to the initial burst count */ @@ -382,16 +378,12 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table, free_entries = (mask + cons_tail - prod_head); /* check that we have enough room in ring */ - if (unlikely(n > free_entries)) { - if (behavior == RTE_RING_QUEUE_FIXED) - return 0; - else { - /* No free entry available */ - if (unlikely(free_entries == 0)) - return 0; - n = free_entries; - } - } + if (unlikely(n > free_entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? + 0 : free_entries; + + if (n == 0) + goto end; prod_next = prod_head + n; success = rte_atomic32_cmpset(&r->prod.head, prod_head, @@ -410,6 +402,9 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table, rte_pause(); r->prod.tail = prod_next; +end: + if (free_space != NULL) + *free_space = free_entries - n; return n; } @@ -435,7 +430,8 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table, */ static inline unsigned int __attribute__((always_inline)) __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table, - unsigned n, enum rte_ring_queue_behavior behavior) + unsigned int n, enum rte_ring_queue_behavior behavior, + unsigned int *free_space) { uint32_t prod_head, cons_tail; uint32_t prod_next, free_entries; @@ -451,16 +447,12 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table, free_entries = mask + cons_tail - prod_head; /* check that we have enough room in ring */ - if (unlikely(n > free_entries)) { - if (behavior == RTE_RING_QUEUE_FIXED) - return 0; - else { - /* No free entry available */ - if (unlikely(free_entries == 0)) - return 0; - n = free_entries; - } - } + if (unlikely(n > free_entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : free_entries; + + if (n == 0) + goto end; + prod_next = prod_head + n; r->prod.head = prod_next; @@ -470,6 +462,9 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table, rte_smp_wmb(); r->prod.tail = prod_next; +end: + if (free_space != NULL) + *free_space = free_entries - n; return n; } @@ -639,9 +634,10 @@ __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table, */ static inline unsigned int __attribute__((always_inline)) rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, - unsigned n) + unsigned int n, unsigned int *free_space) { - return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED); + return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + free_space); } /** @@ -658,9 +654,10 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, */ static inline unsigned int __attribute__((always_inline)) rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, - unsigned n) + unsigned int n, unsigned int *free_space) { - return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED); + return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + free_space); } /** @@ -681,12 +678,12 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, */ static inline unsigned int __attribute__((always_inline)) rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, - unsigned n) + unsigned int n, unsigned int *free_space) { if (r->prod.sp_enqueue) - return rte_ring_sp_enqueue_bulk(r, obj_table, n); + return rte_ring_sp_enqueue_bulk(r, obj_table, n, free_space); else - return rte_ring_mp_enqueue_bulk(r, obj_table, n); + return rte_ring_mp_enqueue_bulk(r, obj_table, n, free_space); } /** @@ -706,7 +703,7 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, static inline int __attribute__((always_inline)) rte_ring_mp_enqueue(struct rte_ring *r, void *obj) { - return rte_ring_mp_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS; + return rte_ring_mp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS; } /** @@ -723,7 +720,7 @@ rte_ring_mp_enqueue(struct rte_ring *r, void *obj) static inline int __attribute__((always_inline)) rte_ring_sp_enqueue(struct rte_ring *r, void *obj) { - return rte_ring_sp_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS; + return rte_ring_sp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS; } /** @@ -744,7 +741,7 @@ rte_ring_sp_enqueue(struct rte_ring *r, void *obj) static inline int __attribute__((always_inline)) rte_ring_enqueue(struct rte_ring *r, void *obj) { - return rte_ring_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS; + return rte_ring_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS; } /** @@ -990,9 +987,10 @@ struct rte_ring *rte_ring_lookup(const char *name); */ static inline unsigned __attribute__((always_inline)) rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table, - unsigned n) + unsigned int n, unsigned int *free_space) { - return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE); + return __rte_ring_mp_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, free_space); } /** @@ -1009,9 +1007,10 @@ rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table, */ static inline unsigned __attribute__((always_inline)) rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table, - unsigned n) + unsigned int n, unsigned int *free_space) { - return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE); + return __rte_ring_sp_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, free_space); } /** @@ -1032,12 +1031,12 @@ rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table, */ static inline unsigned __attribute__((always_inline)) rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, - unsigned n) + unsigned int n, unsigned int *free_space) { if (r->prod.sp_enqueue) - return rte_ring_sp_enqueue_burst(r, obj_table, n); + return rte_ring_sp_enqueue_burst(r, obj_table, n, free_space); else - return rte_ring_mp_enqueue_burst(r, obj_table, n); + return rte_ring_mp_enqueue_burst(r, obj_table, n, free_space); } /** -- 2.9.3