From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id 5E1BFA04BA; Wed, 7 Oct 2020 15:51:20 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 8FF261BB3E; Wed, 7 Oct 2020 15:51:18 +0200 (CEST) Received: from mga11.intel.com (mga11.intel.com [192.55.52.93]) by dpdk.org (Postfix) with ESMTP id C83A91BA4B for ; Wed, 7 Oct 2020 15:51:15 +0200 (CEST) IronPort-SDR: WUqT+iwbxONRj9s/AlNGwaAKyboviXPAWgJL2B+45RtH9Ipb9uB+Sl9WmpIrPJuYniW3VB4UKy VB9W4U3wTNHg== X-IronPort-AV: E=McAfee;i="6000,8403,9766"; a="161550074" X-IronPort-AV: E=Sophos;i="5.77,347,1596524400"; d="scan'208";a="161550074" X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga005.jf.intel.com ([10.7.209.41]) by fmsmga102.fm.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 07 Oct 2020 06:51:13 -0700 IronPort-SDR: xPNjjT7TXBandd21Nbj58STRXFRY0Kg6m+4d6m2NXwTIDWE1F2849dN8gql9fEA7YVeQiwHsQM VNPbA/Y0skhw== X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.77,347,1596524400"; d="scan'208";a="527963256" Received: from silpixa00399477.ir.intel.com ([10.237.214.232]) by orsmga005.jf.intel.com with ESMTP; 07 Oct 2020 06:51:10 -0700 From: Radu Nicolau To: dev@dpdk.org Cc: jerinj@marvell.com, harry.van.haaren@intel.com, Honnappa.Nagarahalli@arm.com, konstantin.ananyev@intel.com, bruce.richardson@intel.com, Radu Nicolau Date: Wed, 7 Oct 2020 13:51:00 +0000 Message-Id: <20201007135100.8422-1-radu.nicolau@intel.com> X-Mailer: git-send-email 2.17.1 In-Reply-To: <20200908105211.10066-1-radu.nicolau@intel.com> References: <20200908105211.10066-1-radu.nicolau@intel.com> Subject: [dpdk-dev] [PATCH v3] event/sw: performance improvements X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Add minimum burst throughout the scheduler pipeline and a flush counter. Use a single threaded ring implementation for the reorder buffer free list. Signed-off-by: Radu Nicolau Acked-by: Harry van Haaren --- doc/guides/eventdevs/sw.rst | 22 +++++ doc/guides/rel_notes/release_20_11.rst | 5 ++ drivers/event/sw/event_ring.h | 109 ++++++------------------ drivers/event/sw/sw_evdev.c | 110 ++++++++++++++++++++----- drivers/event/sw/sw_evdev.h | 22 ++++- drivers/event/sw/sw_evdev_scheduler.c | 66 +++++++++++---- 6 files changed, 210 insertions(+), 124 deletions(-) diff --git a/doc/guides/eventdevs/sw.rst b/doc/guides/eventdevs/sw.rst index 04c8b0305..69939ab9d 100644 --- a/doc/guides/eventdevs/sw.rst +++ b/doc/guides/eventdevs/sw.rst @@ -87,6 +87,28 @@ verify possible gains. --vdev="event_sw0,credit_quanta=64" +Scheduler tuning arguments +~~~~~~~~~~~~~ + +The scheduler minimum number of events that are processed can be increased to +reduce per event overhead and increase internal burst sizes, which can +improve throughput. + +* ``min_burst`` specifies the minimum number of inflight events that can be + moved to the next stage in the scheduler. Default value is 1. + +* ``refill_once`` is a switch that when set instructs the scheduler to deque + the events waiting in the ingress rings only once per call. The default + behavior is to dequeue as needed. + +* ``deq_burst`` is the burst size used to dequeue from the port rings. + Default value is 32, and it should be increased to 64 or 128 when setting + ``refill_once=1``. + +.. code-block:: console + + --vdev="event_sw0,min_burst=8,deq_burst=64,refill_once=1" + Limitations ----------- diff --git a/doc/guides/rel_notes/release_20_11.rst b/doc/guides/rel_notes/release_20_11.rst index cdf20404c..1b6384e6d 100644 --- a/doc/guides/rel_notes/release_20_11.rst +++ b/doc/guides/rel_notes/release_20_11.rst @@ -105,6 +105,11 @@ New Features ``--portmask=N`` where N represents the hexadecimal bitmask of ports used. +* **Updated Software Eventdev driver.** + + Added performance tuning arguments to allow tuning the scheduler for + better throughtput in high core count use cases. + * **Updated the pipeline library for alignment with the P4 language.** Added new Software Switch (SWX) pipeline type that provides more diff --git a/drivers/event/sw/event_ring.h b/drivers/event/sw/event_ring.h index 02308728b..2b86ca9f7 100644 --- a/drivers/event/sw/event_ring.h +++ b/drivers/event/sw/event_ring.h @@ -20,23 +20,20 @@ #include #include -#define QE_RING_NAMESIZE 32 - -struct qe_ring { - char name[QE_RING_NAMESIZE] __rte_cache_aligned; - uint32_t ring_size; /* size of memory block allocated to the ring */ - uint32_t mask; /* mask for read/write values == ring_size -1 */ - uint32_t size; /* actual usable space in the ring */ - volatile uint32_t write_idx __rte_cache_aligned; - volatile uint32_t read_idx __rte_cache_aligned; - - struct rte_event ring[0] __rte_cache_aligned; +/* Custom single threaded ring implementation used for ROB */ +struct rob_ring { + uint32_t ring_size; + uint32_t mask; + uint32_t size; + uint32_t write_idx; + uint32_t read_idx; + void *ring[0] __rte_cache_aligned; }; -static inline struct qe_ring * -qe_ring_create(const char *name, unsigned int size, unsigned int socket_id) +static inline struct rob_ring * +rob_ring_create(unsigned int size, unsigned int socket_id) { - struct qe_ring *retval; + struct rob_ring *retval; const uint32_t ring_size = rte_align32pow2(size + 1); size_t memsize = sizeof(*retval) + (ring_size * sizeof(retval->ring[0])); @@ -44,8 +41,6 @@ qe_ring_create(const char *name, unsigned int size, unsigned int socket_id) retval = rte_zmalloc_socket(NULL, memsize, 0, socket_id); if (retval == NULL) goto end; - - snprintf(retval->name, sizeof(retval->name), "EVDEV_RG_%s", name); retval->ring_size = ring_size; retval->mask = ring_size - 1; retval->size = size; @@ -54,100 +49,50 @@ qe_ring_create(const char *name, unsigned int size, unsigned int socket_id) } static inline void -qe_ring_destroy(struct qe_ring *r) +rob_ring_free(struct rob_ring *r) { rte_free(r); } static __rte_always_inline unsigned int -qe_ring_count(const struct qe_ring *r) +rob_ring_count(const struct rob_ring *r) { return r->write_idx - r->read_idx; } static __rte_always_inline unsigned int -qe_ring_free_count(const struct qe_ring *r) +rob_ring_free_count(const struct rob_ring *r) { - return r->size - qe_ring_count(r); + return r->size - rob_ring_count(r); } static __rte_always_inline unsigned int -qe_ring_enqueue_burst(struct qe_ring *r, const struct rte_event *qes, - unsigned int nb_qes, uint16_t *free_count) +rob_ring_enqueue(struct rob_ring *r, void *re) { const uint32_t size = r->size; const uint32_t mask = r->mask; const uint32_t read = r->read_idx; uint32_t write = r->write_idx; const uint32_t space = read + size - write; - uint32_t i; - - if (space < nb_qes) - nb_qes = space; - - for (i = 0; i < nb_qes; i++, write++) - r->ring[write & mask] = qes[i]; - - rte_smp_wmb(); - - if (nb_qes != 0) - r->write_idx = write; - - *free_count = space - nb_qes; - - return nb_qes; + if (space < 1) + return 0; + r->ring[write & mask] = re; + r->write_idx++; + return 1; } static __rte_always_inline unsigned int -qe_ring_enqueue_burst_with_ops(struct qe_ring *r, const struct rte_event *qes, - unsigned int nb_qes, uint8_t *ops) -{ - const uint32_t size = r->size; - const uint32_t mask = r->mask; - const uint32_t read = r->read_idx; - uint32_t write = r->write_idx; - const uint32_t space = read + size - write; - uint32_t i; - - if (space < nb_qes) - nb_qes = space; - - for (i = 0; i < nb_qes; i++, write++) { - r->ring[write & mask] = qes[i]; - r->ring[write & mask].op = ops[i]; - } - - rte_smp_wmb(); - - if (nb_qes != 0) - r->write_idx = write; - - return nb_qes; -} - -static __rte_always_inline unsigned int -qe_ring_dequeue_burst(struct qe_ring *r, struct rte_event *qes, - unsigned int nb_qes) +rob_ring_dequeue(struct rob_ring *r, void **re) { const uint32_t mask = r->mask; uint32_t read = r->read_idx; const uint32_t write = r->write_idx; const uint32_t items = write - read; - uint32_t i; - - if (items < nb_qes) - nb_qes = items; - - - for (i = 0; i < nb_qes; i++, read++) - qes[i] = r->ring[read & mask]; - - rte_smp_rmb(); - - if (nb_qes != 0) - r->read_idx += nb_qes; - - return nb_qes; + if (items < 1) + return 0; + *re = r->ring[read & mask]; + r->read_idx++; + return 1; } #endif diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c index 98dae7164..e310c8c34 100644 --- a/drivers/event/sw/sw_evdev.c +++ b/drivers/event/sw/sw_evdev.c @@ -14,11 +14,15 @@ #include "sw_evdev.h" #include "iq_chunk.h" +#include "event_ring.h" #define EVENTDEV_NAME_SW_PMD event_sw #define NUMA_NODE_ARG "numa_node" #define SCHED_QUANTA_ARG "sched_quanta" #define CREDIT_QUANTA_ARG "credit_quanta" +#define MIN_BURST_SIZE_ARG "min_burst" +#define DEQ_BURST_SIZE_ARG "deq_burst" +#define REFIL_ONCE_ARG "refill_once" static void sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info); @@ -239,7 +243,6 @@ qid_init(struct sw_evdev *sw, unsigned int idx, int type, qid->priority = queue_conf->priority; if (qid->type == RTE_SCHED_TYPE_ORDERED) { - char ring_name[RTE_RING_NAMESIZE]; uint32_t window_size; /* rte_ring and window_size_mask require require window_size to @@ -270,18 +273,8 @@ qid_init(struct sw_evdev *sw, unsigned int idx, int type, 0, window_size * sizeof(qid->reorder_buffer[0])); - snprintf(ring_name, sizeof(ring_name), "sw%d_q%d_freelist", - dev_id, idx); - - /* lookup the ring, and if it already exists, free it */ - struct rte_ring *cleanup = rte_ring_lookup(ring_name); - if (cleanup) - rte_ring_free(cleanup); - - qid->reorder_buffer_freelist = rte_ring_create(ring_name, - window_size, - socket_id, - RING_F_SP_ENQ | RING_F_SC_DEQ); + qid->reorder_buffer_freelist = rob_ring_create(window_size, + socket_id); if (!qid->reorder_buffer_freelist) { SW_LOG_DBG("freelist ring create failed"); goto cleanup; @@ -292,8 +285,8 @@ qid_init(struct sw_evdev *sw, unsigned int idx, int type, * that many. */ for (i = 0; i < window_size - 1; i++) { - if (rte_ring_sp_enqueue(qid->reorder_buffer_freelist, - &qid->reorder_buffer[i]) < 0) + if (rob_ring_enqueue(qid->reorder_buffer_freelist, + &qid->reorder_buffer[i]) != 1) goto cleanup; } @@ -312,7 +305,7 @@ qid_init(struct sw_evdev *sw, unsigned int idx, int type, } if (qid->reorder_buffer_freelist) { - rte_ring_free(qid->reorder_buffer_freelist); + rob_ring_free(qid->reorder_buffer_freelist); qid->reorder_buffer_freelist = NULL; } @@ -327,7 +320,7 @@ sw_queue_release(struct rte_eventdev *dev, uint8_t id) if (qid->type == RTE_SCHED_TYPE_ORDERED) { rte_free(qid->reorder_buffer); - rte_ring_free(qid->reorder_buffer_freelist); + rob_ring_free(qid->reorder_buffer_freelist); } memset(qid, 0, sizeof(*qid)); } @@ -724,11 +717,11 @@ sw_dump(struct rte_eventdev *dev, FILE *f) qid->stats.rx_pkts, qid->stats.rx_dropped, qid->stats.tx_pkts); if (qid->type == RTE_SCHED_TYPE_ORDERED) { - struct rte_ring *rob_buf_free = + struct rob_ring *rob_buf_free = qid->reorder_buffer_freelist; if (rob_buf_free) fprintf(f, "\tReorder entries in use: %u\n", - rte_ring_free_count(rob_buf_free)); + rob_ring_free_count(rob_buf_free)); else fprintf(f, "\tReorder buffer not initialized\n"); @@ -910,6 +903,35 @@ set_credit_quanta(const char *key __rte_unused, const char *value, void *opaque) return 0; } +static int +set_deq_burst_sz(const char *key __rte_unused, const char *value, void *opaque) +{ + int *deq_burst_sz = opaque; + *deq_burst_sz = atoi(value); + if (*deq_burst_sz < 0 || *deq_burst_sz > SCHED_DEQUEUE_MAX_BURST_SIZE) + return -1; + return 0; +} + +static int +set_min_burst_sz(const char *key __rte_unused, const char *value, void *opaque) +{ + int *min_burst_sz = opaque; + *min_burst_sz = atoi(value); + if (*min_burst_sz < 0 || *min_burst_sz > SCHED_DEQUEUE_MAX_BURST_SIZE) + return -1; + return 0; +} + +static int +set_refill_once(const char *key __rte_unused, const char *value, void *opaque) +{ + int *refill_once_per_call = opaque; + *refill_once_per_call = atoi(value); + if (*refill_once_per_call < 0 || *refill_once_per_call > 1) + return -1; + return 0; +} static int32_t sw_sched_service_func(void *args) { @@ -957,6 +979,9 @@ sw_probe(struct rte_vdev_device *vdev) NUMA_NODE_ARG, SCHED_QUANTA_ARG, CREDIT_QUANTA_ARG, + MIN_BURST_SIZE_ARG, + DEQ_BURST_SIZE_ARG, + REFIL_ONCE_ARG, NULL }; const char *name; @@ -966,6 +991,9 @@ sw_probe(struct rte_vdev_device *vdev) int socket_id = rte_socket_id(); int sched_quanta = SW_DEFAULT_SCHED_QUANTA; int credit_quanta = SW_DEFAULT_CREDIT_QUANTA; + int min_burst_size = 1; + int deq_burst_size = SCHED_DEQUEUE_DEFAULT_BURST_SIZE; + int refill_once = 0; name = rte_vdev_device_name(vdev); params = rte_vdev_device_args(vdev); @@ -1007,13 +1035,46 @@ sw_probe(struct rte_vdev_device *vdev) return ret; } + ret = rte_kvargs_process(kvlist, MIN_BURST_SIZE_ARG, + set_min_burst_sz, &min_burst_size); + if (ret != 0) { + SW_LOG_ERR( + "%s: Error parsing minimum burst size parameter", + name); + rte_kvargs_free(kvlist); + return ret; + } + + ret = rte_kvargs_process(kvlist, DEQ_BURST_SIZE_ARG, + set_deq_burst_sz, &deq_burst_size); + if (ret != 0) { + SW_LOG_ERR( + "%s: Error parsing dequeue burst size parameter", + name); + rte_kvargs_free(kvlist); + return ret; + } + + ret = rte_kvargs_process(kvlist, REFIL_ONCE_ARG, + set_refill_once, &refill_once); + if (ret != 0) { + SW_LOG_ERR( + "%s: Error parsing refill once per call switch", + name); + rte_kvargs_free(kvlist); + return ret; + } + rte_kvargs_free(kvlist); } } SW_LOG_INFO( - "Creating eventdev sw device %s, numa_node=%d, sched_quanta=%d, credit_quanta=%d\n", - name, socket_id, sched_quanta, credit_quanta); + "Creating eventdev sw device %s, numa_node=%d, " + "sched_quanta=%d, credit_quanta=%d " + "min_burst=%d, deq_burst=%d, refill_once=%d\n", + name, socket_id, sched_quanta, credit_quanta, + min_burst_size, deq_burst_size, refill_once); dev = rte_event_pmd_vdev_init(name, sizeof(struct sw_evdev), socket_id); @@ -1038,6 +1099,9 @@ sw_probe(struct rte_vdev_device *vdev) /* copy values passed from vdev command line to instance */ sw->credit_update_quanta = credit_quanta; sw->sched_quanta = sched_quanta; + sw->sched_min_burst_size = min_burst_size; + sw->sched_deq_burst_size = deq_burst_size; + sw->refill_once_per_iter = refill_once; /* register service with EAL */ struct rte_service_spec service; @@ -1082,5 +1146,7 @@ static struct rte_vdev_driver evdev_sw_pmd_drv = { RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_SW_PMD, evdev_sw_pmd_drv); RTE_PMD_REGISTER_PARAM_STRING(event_sw, NUMA_NODE_ARG "= " - SCHED_QUANTA_ARG "=" CREDIT_QUANTA_ARG "="); + SCHED_QUANTA_ARG "=" CREDIT_QUANTA_ARG "=" + MIN_BURST_SIZE_ARG "=" DEQ_BURST_SIZE_ARG "=" + REFIL_ONCE_ARG "="); RTE_LOG_REGISTER(eventdev_sw_log_level, pmd.event.sw, NOTICE); diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h index 7c77b2495..1fc07b64f 100644 --- a/drivers/event/sw/sw_evdev.h +++ b/drivers/event/sw/sw_evdev.h @@ -29,7 +29,13 @@ /* report dequeue burst sizes in buckets */ #define SW_DEQ_STAT_BUCKET_SHIFT 2 /* how many packets pulled from port by sched */ -#define SCHED_DEQUEUE_BURST_SIZE 32 +#define SCHED_DEQUEUE_DEFAULT_BURST_SIZE 32 +/* max buffer size */ +#define SCHED_DEQUEUE_MAX_BURST_SIZE 256 + +/* Flush the pipeline after this many no enq to cq */ +#define SCHED_NO_ENQ_CYCLE_FLUSH 256 + #define SW_PORT_HIST_LIST (MAX_SW_PROD_Q_DEPTH) /* size of our history list */ #define NUM_SAMPLES 64 /* how many data points use for average stats */ @@ -122,7 +128,7 @@ struct sw_qid { /* Track packet order for reordering when needed */ struct reorder_buffer_entry *reorder_buffer; /*< pkts await reorder */ - struct rte_ring *reorder_buffer_freelist; /* available reorder slots */ + struct rob_ring *reorder_buffer_freelist; /* available reorder slots */ uint32_t reorder_buffer_index; /* oldest valid reorder buffer entry */ uint32_t window_size; /* Used to wrap reorder_buffer_index */ @@ -197,7 +203,7 @@ struct sw_port { uint32_t pp_buf_start; uint32_t pp_buf_count; uint16_t cq_buf_count; - struct rte_event pp_buf[SCHED_DEQUEUE_BURST_SIZE]; + struct rte_event pp_buf[SCHED_DEQUEUE_MAX_BURST_SIZE]; struct rte_event cq_buf[MAX_SW_CONS_Q_DEPTH]; uint8_t num_qids_mapped; @@ -214,6 +220,16 @@ struct sw_evdev { uint32_t xstats_count_mode_port; uint32_t xstats_count_mode_queue; + /* Minimum burst size*/ + uint32_t sched_min_burst_size __rte_cache_aligned; + /* Port dequeue burst size*/ + uint32_t sched_deq_burst_size; + /* Refill pp buffers only once per scheduler call*/ + uint32_t refill_once_per_iter; + /* Current values */ + uint32_t sched_flush_count; + uint32_t sched_min_burst; + /* Contains all ports - load balanced and directed */ struct sw_port ports[SW_PORTS_MAX] __rte_cache_aligned; diff --git a/drivers/event/sw/sw_evdev_scheduler.c b/drivers/event/sw/sw_evdev_scheduler.c index cff747da8..f747b3c6d 100644 --- a/drivers/event/sw/sw_evdev_scheduler.c +++ b/drivers/event/sw/sw_evdev_scheduler.c @@ -7,6 +7,7 @@ #include #include "sw_evdev.h" #include "iq_chunk.h" +#include "event_ring.h" #define SW_IQS_MASK (SW_IQS_MAX-1) @@ -26,6 +27,7 @@ /* use cheap bit mixing, we only need to lose a few bits */ #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK) + static inline uint32_t sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, uint32_t iq_num, unsigned int count) @@ -127,7 +129,7 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, if (keep_order) /* only schedule as many as we have reorder buffer entries */ count = RTE_MIN(count, - rte_ring_count(qid->reorder_buffer_freelist)); + rob_ring_count(qid->reorder_buffer_freelist)); for (i = 0; i < count; i++) { const struct rte_event *qe = iq_peek(&qid->iq[iq_num]); @@ -146,9 +148,9 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, cq_idx = 0; cq = qid->cq_map[cq_idx++]; - } while (rte_event_ring_free_count( - sw->ports[cq].cq_worker_ring) == 0 || - sw->ports[cq].inflights == SW_PORT_HIST_LIST); + } while (sw->ports[cq].inflights == SW_PORT_HIST_LIST || + rte_event_ring_free_count( + sw->ports[cq].cq_worker_ring) == 0); struct sw_port *p = &sw->ports[cq]; if (sw->cq_ring_space[cq] == 0 || @@ -164,7 +166,7 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, p->hist_list[head].qid = qid_id; if (keep_order) - rte_ring_sc_dequeue(qid->reorder_buffer_freelist, + rob_ring_dequeue(qid->reorder_buffer_freelist, (void *)&p->hist_list[head].rob_entry); sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe; @@ -229,7 +231,7 @@ sw_schedule_qid_to_cq(struct sw_evdev *sw) uint32_t pkts_done = 0; uint32_t count = iq_count(&qid->iq[iq_num]); - if (count > 0) { + if (count >= sw->sched_min_burst) { if (type == SW_SCHED_TYPE_DIRECT) pkts_done += sw_schedule_dir_to_cq(sw, qid, iq_num, count); @@ -267,14 +269,17 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end) for (; qid_start < qid_end; qid_start++) { struct sw_qid *qid = &sw->qids[qid_start]; - int i, num_entries_in_use; + unsigned int i, num_entries_in_use; if (qid->type != RTE_SCHED_TYPE_ORDERED) continue; - num_entries_in_use = rte_ring_free_count( + num_entries_in_use = rob_ring_free_count( qid->reorder_buffer_freelist); + if (num_entries_in_use < sw->sched_min_burst) + num_entries_in_use = 0; + for (i = 0; i < num_entries_in_use; i++) { struct reorder_buffer_entry *entry; int j; @@ -320,7 +325,7 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end) if (!entry->ready) { entry->fragment_index = 0; - rte_ring_sp_enqueue( + rob_ring_enqueue( qid->reorder_buffer_freelist, entry); @@ -339,7 +344,7 @@ sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port) struct rte_event_ring *worker = port->rx_worker_ring; port->pp_buf_start = 0; port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf, - RTE_DIM(port->pp_buf), NULL); + sw->sched_deq_burst_size, NULL); } static __rte_always_inline uint32_t @@ -350,7 +355,7 @@ __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder) struct sw_port *port = &sw->ports[port_id]; /* If shadow ring has 0 pkts, pull from worker ring */ - if (port->pp_buf_count == 0) + if (!sw->refill_once_per_iter && port->pp_buf_count == 0) sw_refill_pp_buf(sw, port); while (port->pp_buf_count) { @@ -468,7 +473,7 @@ sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id) struct sw_port *port = &sw->ports[port_id]; /* If shadow ring has 0 pkts, pull from worker ring */ - if (port->pp_buf_count == 0) + if (!sw->refill_once_per_iter && port->pp_buf_count == 0) sw_refill_pp_buf(sw, port); while (port->pp_buf_count) { @@ -557,12 +562,39 @@ sw_event_schedule(struct rte_eventdev *dev) /* push all the internal buffered QEs in port->cq_ring to the * worker cores: aka, do the ring transfers batched. */ + int no_enq = 1; for (i = 0; i < sw->port_count; i++) { - struct rte_event_ring *worker = sw->ports[i].cq_worker_ring; - rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf, - sw->ports[i].cq_buf_count, - &sw->cq_ring_space[i]); - sw->ports[i].cq_buf_count = 0; + struct sw_port *port = &sw->ports[i]; + struct rte_event_ring *worker = port->cq_worker_ring; + + /* If shadow ring has 0 pkts, pull from worker ring */ + if (sw->refill_once_per_iter && port->pp_buf_count == 0) + sw_refill_pp_buf(sw, port); + + if (port->cq_buf_count >= sw->sched_min_burst) { + rte_event_ring_enqueue_burst(worker, + port->cq_buf, + port->cq_buf_count, + &sw->cq_ring_space[i]); + port->cq_buf_count = 0; + no_enq = 0; + } else { + sw->cq_ring_space[i] = + rte_event_ring_free_count(worker) - + port->cq_buf_count; + } + } + + if (no_enq) { + if (unlikely(sw->sched_flush_count > SCHED_NO_ENQ_CYCLE_FLUSH)) + sw->sched_min_burst = 1; + else + sw->sched_flush_count++; + } else { + if (sw->sched_flush_count) + sw->sched_flush_count--; + else + sw->sched_min_burst = sw->sched_min_burst_size; } } -- 2.17.1