DPDK patches and discussions
 help / color / mirror / Atom feed
From: Radu Nicolau <radu.nicolau@intel.com>
To: dev@dpdk.org
Cc: jerinj@marvell.com, harry.van.haaren@intel.com,
	Honnappa.Nagarahalli@arm.com, konstantin.ananyev@intel.com,
	bruce.richardson@intel.com, Radu Nicolau <radu.nicolau@intel.com>
Subject: [dpdk-dev] [PATCH v3] event/sw: performance improvements
Date: Wed,  7 Oct 2020 13:51:00 +0000	[thread overview]
Message-ID: <20201007135100.8422-1-radu.nicolau@intel.com> (raw)
In-Reply-To: <20200908105211.10066-1-radu.nicolau@intel.com>

Add minimum burst throughout the scheduler pipeline and a flush counter.
Use a single threaded ring implementation for the reorder buffer free list.

Signed-off-by: Radu Nicolau <radu.nicolau@intel.com>
Acked-by: Harry van Haaren <harry.van.haaren@intel.com>
---
 doc/guides/eventdevs/sw.rst            |  22 +++++
 doc/guides/rel_notes/release_20_11.rst |   5 ++
 drivers/event/sw/event_ring.h          | 109 ++++++------------------
 drivers/event/sw/sw_evdev.c            | 110 ++++++++++++++++++++-----
 drivers/event/sw/sw_evdev.h            |  22 ++++-
 drivers/event/sw/sw_evdev_scheduler.c  |  66 +++++++++++----
 6 files changed, 210 insertions(+), 124 deletions(-)

diff --git a/doc/guides/eventdevs/sw.rst b/doc/guides/eventdevs/sw.rst
index 04c8b0305..69939ab9d 100644
--- a/doc/guides/eventdevs/sw.rst
+++ b/doc/guides/eventdevs/sw.rst
@@ -87,6 +87,28 @@ verify possible gains.
 
     --vdev="event_sw0,credit_quanta=64"
 
+Scheduler tuning arguments
+~~~~~~~~~~~~~
+
+The scheduler minimum number of events that are processed can be increased to
+reduce per event overhead and increase internal burst sizes, which can
+improve throughput.
+
+* ``min_burst`` specifies the minimum number of inflight events that can be
+  moved to the next stage in the scheduler. Default value is 1.
+
+* ``refill_once`` is a switch that when set instructs the scheduler to deque
+  the events waiting in the ingress rings only once per call. The default
+  behavior is to dequeue as needed.
+
+* ``deq_burst`` is the burst size used to dequeue from the port rings.
+  Default value is 32, and it should be increased to 64 or 128 when setting
+  ``refill_once=1``.
+
+.. code-block:: console
+
+    --vdev="event_sw0,min_burst=8,deq_burst=64,refill_once=1"
+
 
 Limitations
 -----------
diff --git a/doc/guides/rel_notes/release_20_11.rst b/doc/guides/rel_notes/release_20_11.rst
index cdf20404c..1b6384e6d 100644
--- a/doc/guides/rel_notes/release_20_11.rst
+++ b/doc/guides/rel_notes/release_20_11.rst
@@ -105,6 +105,11 @@ New Features
     ``--portmask=N``
     where N represents the hexadecimal bitmask of ports used.
 
+* **Updated Software Eventdev driver.**
+
+  Added performance tuning arguments to allow tuning the scheduler for
+  better throughtput in high core count use cases.
+
 * **Updated the pipeline library for alignment with the P4 language.**
 
   Added new Software Switch (SWX) pipeline type that provides more
diff --git a/drivers/event/sw/event_ring.h b/drivers/event/sw/event_ring.h
index 02308728b..2b86ca9f7 100644
--- a/drivers/event/sw/event_ring.h
+++ b/drivers/event/sw/event_ring.h
@@ -20,23 +20,20 @@
 #include <rte_memory.h>
 #include <rte_malloc.h>
 
-#define QE_RING_NAMESIZE 32
-
-struct qe_ring {
-	char name[QE_RING_NAMESIZE] __rte_cache_aligned;
-	uint32_t ring_size; /* size of memory block allocated to the ring */
-	uint32_t mask;      /* mask for read/write values == ring_size -1 */
-	uint32_t size;      /* actual usable space in the ring */
-	volatile uint32_t write_idx __rte_cache_aligned;
-	volatile uint32_t read_idx __rte_cache_aligned;
-
-	struct rte_event ring[0] __rte_cache_aligned;
+/* Custom single threaded ring implementation used for ROB */
+struct rob_ring {
+	uint32_t ring_size;
+	uint32_t mask;
+	uint32_t size;
+	uint32_t write_idx;
+	uint32_t read_idx;
+	void *ring[0] __rte_cache_aligned;
 };
 
-static inline struct qe_ring *
-qe_ring_create(const char *name, unsigned int size, unsigned int socket_id)
+static inline struct rob_ring *
+rob_ring_create(unsigned int size, unsigned int socket_id)
 {
-	struct qe_ring *retval;
+	struct rob_ring *retval;
 	const uint32_t ring_size = rte_align32pow2(size + 1);
 	size_t memsize = sizeof(*retval) +
 			(ring_size * sizeof(retval->ring[0]));
@@ -44,8 +41,6 @@ qe_ring_create(const char *name, unsigned int size, unsigned int socket_id)
 	retval = rte_zmalloc_socket(NULL, memsize, 0, socket_id);
 	if (retval == NULL)
 		goto end;
-
-	snprintf(retval->name, sizeof(retval->name), "EVDEV_RG_%s", name);
 	retval->ring_size = ring_size;
 	retval->mask = ring_size - 1;
 	retval->size = size;
@@ -54,100 +49,50 @@ qe_ring_create(const char *name, unsigned int size, unsigned int socket_id)
 }
 
 static inline void
-qe_ring_destroy(struct qe_ring *r)
+rob_ring_free(struct rob_ring *r)
 {
 	rte_free(r);
 }
 
 static __rte_always_inline unsigned int
-qe_ring_count(const struct qe_ring *r)
+rob_ring_count(const struct rob_ring *r)
 {
 	return r->write_idx - r->read_idx;
 }
 
 static __rte_always_inline unsigned int
-qe_ring_free_count(const struct qe_ring *r)
+rob_ring_free_count(const struct rob_ring *r)
 {
-	return r->size - qe_ring_count(r);
+	return r->size - rob_ring_count(r);
 }
 
 static __rte_always_inline unsigned int
-qe_ring_enqueue_burst(struct qe_ring *r, const struct rte_event *qes,
-		unsigned int nb_qes, uint16_t *free_count)
+rob_ring_enqueue(struct rob_ring *r, void *re)
 {
 	const uint32_t size = r->size;
 	const uint32_t mask = r->mask;
 	const uint32_t read = r->read_idx;
 	uint32_t write = r->write_idx;
 	const uint32_t space = read + size - write;
-	uint32_t i;
-
-	if (space < nb_qes)
-		nb_qes = space;
-
-	for (i = 0; i < nb_qes; i++, write++)
-		r->ring[write & mask] = qes[i];
-
-	rte_smp_wmb();
-
-	if (nb_qes != 0)
-		r->write_idx = write;
-
-	*free_count = space - nb_qes;
-
-	return nb_qes;
+	if (space < 1)
+		return 0;
+	r->ring[write & mask] = re;
+	r->write_idx++;
+	return 1;
 }
 
 static __rte_always_inline unsigned int
-qe_ring_enqueue_burst_with_ops(struct qe_ring *r, const struct rte_event *qes,
-		unsigned int nb_qes, uint8_t *ops)
-{
-	const uint32_t size = r->size;
-	const uint32_t mask = r->mask;
-	const uint32_t read = r->read_idx;
-	uint32_t write = r->write_idx;
-	const uint32_t space = read + size - write;
-	uint32_t i;
-
-	if (space < nb_qes)
-		nb_qes = space;
-
-	for (i = 0; i < nb_qes; i++, write++) {
-		r->ring[write & mask] = qes[i];
-		r->ring[write & mask].op = ops[i];
-	}
-
-	rte_smp_wmb();
-
-	if (nb_qes != 0)
-		r->write_idx = write;
-
-	return nb_qes;
-}
-
-static __rte_always_inline unsigned int
-qe_ring_dequeue_burst(struct qe_ring *r, struct rte_event *qes,
-		unsigned int nb_qes)
+rob_ring_dequeue(struct rob_ring *r, void **re)
 {
 	const uint32_t mask = r->mask;
 	uint32_t read = r->read_idx;
 	const uint32_t write = r->write_idx;
 	const uint32_t items = write - read;
-	uint32_t i;
-
-	if (items < nb_qes)
-		nb_qes = items;
-
-
-	for (i = 0; i < nb_qes; i++, read++)
-		qes[i] = r->ring[read & mask];
-
-	rte_smp_rmb();
-
-	if (nb_qes != 0)
-		r->read_idx += nb_qes;
-
-	return nb_qes;
+	if (items < 1)
+		return 0;
+	*re = r->ring[read & mask];
+	r->read_idx++;
+	return 1;
 }
 
 #endif
diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c
index 98dae7164..e310c8c34 100644
--- a/drivers/event/sw/sw_evdev.c
+++ b/drivers/event/sw/sw_evdev.c
@@ -14,11 +14,15 @@
 
 #include "sw_evdev.h"
 #include "iq_chunk.h"
+#include "event_ring.h"
 
 #define EVENTDEV_NAME_SW_PMD event_sw
 #define NUMA_NODE_ARG "numa_node"
 #define SCHED_QUANTA_ARG "sched_quanta"
 #define CREDIT_QUANTA_ARG "credit_quanta"
+#define MIN_BURST_SIZE_ARG "min_burst"
+#define DEQ_BURST_SIZE_ARG "deq_burst"
+#define REFIL_ONCE_ARG "refill_once"
 
 static void
 sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info);
@@ -239,7 +243,6 @@ qid_init(struct sw_evdev *sw, unsigned int idx, int type,
 	qid->priority = queue_conf->priority;
 
 	if (qid->type == RTE_SCHED_TYPE_ORDERED) {
-		char ring_name[RTE_RING_NAMESIZE];
 		uint32_t window_size;
 
 		/* rte_ring and window_size_mask require require window_size to
@@ -270,18 +273,8 @@ qid_init(struct sw_evdev *sw, unsigned int idx, int type,
 		       0,
 		       window_size * sizeof(qid->reorder_buffer[0]));
 
-		snprintf(ring_name, sizeof(ring_name), "sw%d_q%d_freelist",
-				dev_id, idx);
-
-		/* lookup the ring, and if it already exists, free it */
-		struct rte_ring *cleanup = rte_ring_lookup(ring_name);
-		if (cleanup)
-			rte_ring_free(cleanup);
-
-		qid->reorder_buffer_freelist = rte_ring_create(ring_name,
-				window_size,
-				socket_id,
-				RING_F_SP_ENQ | RING_F_SC_DEQ);
+		qid->reorder_buffer_freelist = rob_ring_create(window_size,
+				socket_id);
 		if (!qid->reorder_buffer_freelist) {
 			SW_LOG_DBG("freelist ring create failed");
 			goto cleanup;
@@ -292,8 +285,8 @@ qid_init(struct sw_evdev *sw, unsigned int idx, int type,
 		 * that many.
 		 */
 		for (i = 0; i < window_size - 1; i++) {
-			if (rte_ring_sp_enqueue(qid->reorder_buffer_freelist,
-						&qid->reorder_buffer[i]) < 0)
+			if (rob_ring_enqueue(qid->reorder_buffer_freelist,
+						&qid->reorder_buffer[i]) != 1)
 				goto cleanup;
 		}
 
@@ -312,7 +305,7 @@ qid_init(struct sw_evdev *sw, unsigned int idx, int type,
 	}
 
 	if (qid->reorder_buffer_freelist) {
-		rte_ring_free(qid->reorder_buffer_freelist);
+		rob_ring_free(qid->reorder_buffer_freelist);
 		qid->reorder_buffer_freelist = NULL;
 	}
 
@@ -327,7 +320,7 @@ sw_queue_release(struct rte_eventdev *dev, uint8_t id)
 
 	if (qid->type == RTE_SCHED_TYPE_ORDERED) {
 		rte_free(qid->reorder_buffer);
-		rte_ring_free(qid->reorder_buffer_freelist);
+		rob_ring_free(qid->reorder_buffer_freelist);
 	}
 	memset(qid, 0, sizeof(*qid));
 }
@@ -724,11 +717,11 @@ sw_dump(struct rte_eventdev *dev, FILE *f)
 			qid->stats.rx_pkts, qid->stats.rx_dropped,
 			qid->stats.tx_pkts);
 		if (qid->type == RTE_SCHED_TYPE_ORDERED) {
-			struct rte_ring *rob_buf_free =
+			struct rob_ring *rob_buf_free =
 				qid->reorder_buffer_freelist;
 			if (rob_buf_free)
 				fprintf(f, "\tReorder entries in use: %u\n",
-					rte_ring_free_count(rob_buf_free));
+					rob_ring_free_count(rob_buf_free));
 			else
 				fprintf(f,
 					"\tReorder buffer not initialized\n");
@@ -910,6 +903,35 @@ set_credit_quanta(const char *key __rte_unused, const char *value, void *opaque)
 	return 0;
 }
 
+static int
+set_deq_burst_sz(const char *key __rte_unused, const char *value, void *opaque)
+{
+	int *deq_burst_sz = opaque;
+	*deq_burst_sz = atoi(value);
+	if (*deq_burst_sz < 0 || *deq_burst_sz > SCHED_DEQUEUE_MAX_BURST_SIZE)
+		return -1;
+	return 0;
+}
+
+static int
+set_min_burst_sz(const char *key __rte_unused, const char *value, void *opaque)
+{
+	int *min_burst_sz = opaque;
+	*min_burst_sz = atoi(value);
+	if (*min_burst_sz < 0 || *min_burst_sz > SCHED_DEQUEUE_MAX_BURST_SIZE)
+		return -1;
+	return 0;
+}
+
+static int
+set_refill_once(const char *key __rte_unused, const char *value, void *opaque)
+{
+	int *refill_once_per_call = opaque;
+	*refill_once_per_call = atoi(value);
+	if (*refill_once_per_call < 0 || *refill_once_per_call > 1)
+		return -1;
+	return 0;
+}
 
 static int32_t sw_sched_service_func(void *args)
 {
@@ -957,6 +979,9 @@ sw_probe(struct rte_vdev_device *vdev)
 		NUMA_NODE_ARG,
 		SCHED_QUANTA_ARG,
 		CREDIT_QUANTA_ARG,
+		MIN_BURST_SIZE_ARG,
+		DEQ_BURST_SIZE_ARG,
+		REFIL_ONCE_ARG,
 		NULL
 	};
 	const char *name;
@@ -966,6 +991,9 @@ sw_probe(struct rte_vdev_device *vdev)
 	int socket_id = rte_socket_id();
 	int sched_quanta  = SW_DEFAULT_SCHED_QUANTA;
 	int credit_quanta = SW_DEFAULT_CREDIT_QUANTA;
+	int min_burst_size = 1;
+	int deq_burst_size = SCHED_DEQUEUE_DEFAULT_BURST_SIZE;
+	int refill_once = 0;
 
 	name = rte_vdev_device_name(vdev);
 	params = rte_vdev_device_args(vdev);
@@ -1007,13 +1035,46 @@ sw_probe(struct rte_vdev_device *vdev)
 				return ret;
 			}
 
+			ret = rte_kvargs_process(kvlist, MIN_BURST_SIZE_ARG,
+					set_min_burst_sz, &min_burst_size);
+			if (ret != 0) {
+				SW_LOG_ERR(
+					"%s: Error parsing minimum burst size parameter",
+					name);
+				rte_kvargs_free(kvlist);
+				return ret;
+			}
+
+			ret = rte_kvargs_process(kvlist, DEQ_BURST_SIZE_ARG,
+					set_deq_burst_sz, &deq_burst_size);
+			if (ret != 0) {
+				SW_LOG_ERR(
+					"%s: Error parsing dequeue burst size parameter",
+					name);
+				rte_kvargs_free(kvlist);
+				return ret;
+			}
+
+			ret = rte_kvargs_process(kvlist, REFIL_ONCE_ARG,
+					set_refill_once, &refill_once);
+			if (ret != 0) {
+				SW_LOG_ERR(
+					"%s: Error parsing refill once per call switch",
+					name);
+				rte_kvargs_free(kvlist);
+				return ret;
+			}
+
 			rte_kvargs_free(kvlist);
 		}
 	}
 
 	SW_LOG_INFO(
-			"Creating eventdev sw device %s, numa_node=%d, sched_quanta=%d, credit_quanta=%d\n",
-			name, socket_id, sched_quanta, credit_quanta);
+			"Creating eventdev sw device %s, numa_node=%d, "
+			"sched_quanta=%d, credit_quanta=%d "
+			"min_burst=%d, deq_burst=%d, refill_once=%d\n",
+			name, socket_id, sched_quanta, credit_quanta,
+			min_burst_size, deq_burst_size, refill_once);
 
 	dev = rte_event_pmd_vdev_init(name,
 			sizeof(struct sw_evdev), socket_id);
@@ -1038,6 +1099,9 @@ sw_probe(struct rte_vdev_device *vdev)
 	/* copy values passed from vdev command line to instance */
 	sw->credit_update_quanta = credit_quanta;
 	sw->sched_quanta = sched_quanta;
+	sw->sched_min_burst_size = min_burst_size;
+	sw->sched_deq_burst_size = deq_burst_size;
+	sw->refill_once_per_iter = refill_once;
 
 	/* register service with EAL */
 	struct rte_service_spec service;
@@ -1082,5 +1146,7 @@ static struct rte_vdev_driver evdev_sw_pmd_drv = {
 
 RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_SW_PMD, evdev_sw_pmd_drv);
 RTE_PMD_REGISTER_PARAM_STRING(event_sw, NUMA_NODE_ARG "=<int> "
-		SCHED_QUANTA_ARG "=<int>" CREDIT_QUANTA_ARG "=<int>");
+		SCHED_QUANTA_ARG "=<int>" CREDIT_QUANTA_ARG "=<int>"
+		MIN_BURST_SIZE_ARG "=<int>" DEQ_BURST_SIZE_ARG "=<int>"
+		REFIL_ONCE_ARG "=<int>");
 RTE_LOG_REGISTER(eventdev_sw_log_level, pmd.event.sw, NOTICE);
diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h
index 7c77b2495..1fc07b64f 100644
--- a/drivers/event/sw/sw_evdev.h
+++ b/drivers/event/sw/sw_evdev.h
@@ -29,7 +29,13 @@
 /* report dequeue burst sizes in buckets */
 #define SW_DEQ_STAT_BUCKET_SHIFT 2
 /* how many packets pulled from port by sched */
-#define SCHED_DEQUEUE_BURST_SIZE 32
+#define SCHED_DEQUEUE_DEFAULT_BURST_SIZE 32
+/* max buffer size */
+#define SCHED_DEQUEUE_MAX_BURST_SIZE 256
+
+/* Flush the pipeline after this many no enq to cq */
+#define SCHED_NO_ENQ_CYCLE_FLUSH 256
+
 
 #define SW_PORT_HIST_LIST (MAX_SW_PROD_Q_DEPTH) /* size of our history list */
 #define NUM_SAMPLES 64 /* how many data points use for average stats */
@@ -122,7 +128,7 @@ struct sw_qid {
 
 	/* Track packet order for reordering when needed */
 	struct reorder_buffer_entry *reorder_buffer; /*< pkts await reorder */
-	struct rte_ring *reorder_buffer_freelist; /* available reorder slots */
+	struct rob_ring *reorder_buffer_freelist; /* available reorder slots */
 	uint32_t reorder_buffer_index; /* oldest valid reorder buffer entry */
 	uint32_t window_size;          /* Used to wrap reorder_buffer_index */
 
@@ -197,7 +203,7 @@ struct sw_port {
 	uint32_t pp_buf_start;
 	uint32_t pp_buf_count;
 	uint16_t cq_buf_count;
-	struct rte_event pp_buf[SCHED_DEQUEUE_BURST_SIZE];
+	struct rte_event pp_buf[SCHED_DEQUEUE_MAX_BURST_SIZE];
 	struct rte_event cq_buf[MAX_SW_CONS_Q_DEPTH];
 
 	uint8_t num_qids_mapped;
@@ -214,6 +220,16 @@ struct sw_evdev {
 	uint32_t xstats_count_mode_port;
 	uint32_t xstats_count_mode_queue;
 
+	/* Minimum burst size*/
+	uint32_t sched_min_burst_size __rte_cache_aligned;
+	/* Port dequeue burst size*/
+	uint32_t sched_deq_burst_size;
+	/* Refill pp buffers only once per scheduler call*/
+	uint32_t refill_once_per_iter;
+	/* Current values */
+	uint32_t sched_flush_count;
+	uint32_t sched_min_burst;
+
 	/* Contains all ports - load balanced and directed */
 	struct sw_port ports[SW_PORTS_MAX] __rte_cache_aligned;
 
diff --git a/drivers/event/sw/sw_evdev_scheduler.c b/drivers/event/sw/sw_evdev_scheduler.c
index cff747da8..f747b3c6d 100644
--- a/drivers/event/sw/sw_evdev_scheduler.c
+++ b/drivers/event/sw/sw_evdev_scheduler.c
@@ -7,6 +7,7 @@
 #include <rte_event_ring.h>
 #include "sw_evdev.h"
 #include "iq_chunk.h"
+#include "event_ring.h"
 
 #define SW_IQS_MASK (SW_IQS_MAX-1)
 
@@ -26,6 +27,7 @@
 /* use cheap bit mixing, we only need to lose a few bits */
 #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK)
 
+
 static inline uint32_t
 sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
 		uint32_t iq_num, unsigned int count)
@@ -127,7 +129,7 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
 	if (keep_order)
 		/* only schedule as many as we have reorder buffer entries */
 		count = RTE_MIN(count,
-				rte_ring_count(qid->reorder_buffer_freelist));
+				rob_ring_count(qid->reorder_buffer_freelist));
 
 	for (i = 0; i < count; i++) {
 		const struct rte_event *qe = iq_peek(&qid->iq[iq_num]);
@@ -146,9 +148,9 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
 				cq_idx = 0;
 			cq = qid->cq_map[cq_idx++];
 
-		} while (rte_event_ring_free_count(
-				sw->ports[cq].cq_worker_ring) == 0 ||
-				sw->ports[cq].inflights == SW_PORT_HIST_LIST);
+		} while (sw->ports[cq].inflights == SW_PORT_HIST_LIST ||
+				rte_event_ring_free_count(
+					sw->ports[cq].cq_worker_ring) == 0);
 
 		struct sw_port *p = &sw->ports[cq];
 		if (sw->cq_ring_space[cq] == 0 ||
@@ -164,7 +166,7 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
 		p->hist_list[head].qid = qid_id;
 
 		if (keep_order)
-			rte_ring_sc_dequeue(qid->reorder_buffer_freelist,
+			rob_ring_dequeue(qid->reorder_buffer_freelist,
 					(void *)&p->hist_list[head].rob_entry);
 
 		sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
@@ -229,7 +231,7 @@ sw_schedule_qid_to_cq(struct sw_evdev *sw)
 		uint32_t pkts_done = 0;
 		uint32_t count = iq_count(&qid->iq[iq_num]);
 
-		if (count > 0) {
+		if (count >= sw->sched_min_burst) {
 			if (type == SW_SCHED_TYPE_DIRECT)
 				pkts_done += sw_schedule_dir_to_cq(sw, qid,
 						iq_num, count);
@@ -267,14 +269,17 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
 
 	for (; qid_start < qid_end; qid_start++) {
 		struct sw_qid *qid = &sw->qids[qid_start];
-		int i, num_entries_in_use;
+		unsigned int i, num_entries_in_use;
 
 		if (qid->type != RTE_SCHED_TYPE_ORDERED)
 			continue;
 
-		num_entries_in_use = rte_ring_free_count(
+		num_entries_in_use = rob_ring_free_count(
 					qid->reorder_buffer_freelist);
 
+		if (num_entries_in_use < sw->sched_min_burst)
+			num_entries_in_use = 0;
+
 		for (i = 0; i < num_entries_in_use; i++) {
 			struct reorder_buffer_entry *entry;
 			int j;
@@ -320,7 +325,7 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
 			if (!entry->ready) {
 				entry->fragment_index = 0;
 
-				rte_ring_sp_enqueue(
+				rob_ring_enqueue(
 						qid->reorder_buffer_freelist,
 						entry);
 
@@ -339,7 +344,7 @@ sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port)
 	struct rte_event_ring *worker = port->rx_worker_ring;
 	port->pp_buf_start = 0;
 	port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf,
-			RTE_DIM(port->pp_buf), NULL);
+			sw->sched_deq_burst_size, NULL);
 }
 
 static __rte_always_inline uint32_t
@@ -350,7 +355,7 @@ __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
 	struct sw_port *port = &sw->ports[port_id];
 
 	/* If shadow ring has 0 pkts, pull from worker ring */
-	if (port->pp_buf_count == 0)
+	if (!sw->refill_once_per_iter && port->pp_buf_count == 0)
 		sw_refill_pp_buf(sw, port);
 
 	while (port->pp_buf_count) {
@@ -468,7 +473,7 @@ sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
 	struct sw_port *port = &sw->ports[port_id];
 
 	/* If shadow ring has 0 pkts, pull from worker ring */
-	if (port->pp_buf_count == 0)
+	if (!sw->refill_once_per_iter && port->pp_buf_count == 0)
 		sw_refill_pp_buf(sw, port);
 
 	while (port->pp_buf_count) {
@@ -557,12 +562,39 @@ sw_event_schedule(struct rte_eventdev *dev)
 	/* push all the internal buffered QEs in port->cq_ring to the
 	 * worker cores: aka, do the ring transfers batched.
 	 */
+	int no_enq = 1;
 	for (i = 0; i < sw->port_count; i++) {
-		struct rte_event_ring *worker = sw->ports[i].cq_worker_ring;
-		rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
-				sw->ports[i].cq_buf_count,
-				&sw->cq_ring_space[i]);
-		sw->ports[i].cq_buf_count = 0;
+		struct sw_port *port = &sw->ports[i];
+		struct rte_event_ring *worker = port->cq_worker_ring;
+
+		/* If shadow ring has 0 pkts, pull from worker ring */
+		if (sw->refill_once_per_iter && port->pp_buf_count == 0)
+			sw_refill_pp_buf(sw, port);
+
+		if (port->cq_buf_count >= sw->sched_min_burst) {
+			rte_event_ring_enqueue_burst(worker,
+					port->cq_buf,
+					port->cq_buf_count,
+					&sw->cq_ring_space[i]);
+			port->cq_buf_count = 0;
+			no_enq = 0;
+		} else {
+			sw->cq_ring_space[i] =
+					rte_event_ring_free_count(worker) -
+					port->cq_buf_count;
+		}
+	}
+
+	if (no_enq) {
+		if (unlikely(sw->sched_flush_count > SCHED_NO_ENQ_CYCLE_FLUSH))
+			sw->sched_min_burst = 1;
+		else
+			sw->sched_flush_count++;
+	} else {
+		if (sw->sched_flush_count)
+			sw->sched_flush_count--;
+		else
+			sw->sched_min_burst = sw->sched_min_burst_size;
 	}
 
 }
-- 
2.17.1


      parent reply	other threads:[~2020-10-07 13:51 UTC|newest]

Thread overview: 22+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-09-08 10:52 [dpdk-dev] [PATCH v1] " Radu Nicolau
2020-09-23 11:13 ` Van Haaren, Harry
2020-09-23 23:10   ` Honnappa Nagarahalli
2020-09-24 15:27     ` Nicolau, Radu
2020-09-24 17:38       ` Honnappa Nagarahalli
2020-09-24 18:02         ` Ananyev, Konstantin
2020-09-25 10:28           ` Bruce Richardson
2020-09-28 16:02             ` Honnappa Nagarahalli
2020-09-29  9:02               ` Nicolau, Radu
2020-10-05 16:35                 ` Jerin Jacob
2020-10-06  7:59                   ` Van Haaren, Harry
2020-10-06 10:13                     ` Ananyev, Konstantin
2020-10-07 10:44                       ` Nicolau, Radu
2020-10-07 10:52                         ` Ananyev, Konstantin
2020-10-13 19:11                           ` Jerin Jacob
2020-10-14  8:32                             ` Nicolau, Radu
2020-10-14 10:09                               ` Jerin Jacob
2020-10-14 10:21                                 ` Ananyev, Konstantin
2020-10-14 18:27                                   ` Jerin Jacob
2020-09-28  8:28 ` [dpdk-dev] [PATCH v2] " Radu Nicolau
2020-09-28 13:47   ` Van Haaren, Harry
2020-10-07 13:51 ` Radu Nicolau [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20201007135100.8422-1-radu.nicolau@intel.com \
    --to=radu.nicolau@intel.com \
    --cc=Honnappa.Nagarahalli@arm.com \
    --cc=bruce.richardson@intel.com \
    --cc=dev@dpdk.org \
    --cc=harry.van.haaren@intel.com \
    --cc=jerinj@marvell.com \
    --cc=konstantin.ananyev@intel.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).