DPDK patches and discussions
 help / color / mirror / Atom feed
From: "Mattias Rönnblom" <mattias.ronnblom@ericsson.com>
To: dev@dpdk.org
Cc: jerin.jacob@caviumnetworks.com, bruce.richardson@intel.com,
	"Mattias Rönnblom" <mattias.ronnblom@ericsson.com>
Subject: [dpdk-dev] [RFC 1/1] eventdev: add distributed software (DSW) event device
Date: Wed, 11 Jul 2018 23:21:54 +0200	[thread overview]
Message-ID: <20180711212154.5807-2-mattias.ronnblom@ericsson.com> (raw)
In-Reply-To: <20180711212154.5807-1-mattias.ronnblom@ericsson.com>

Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
---
 config/common_base                            |    5 +
 drivers/event/Makefile                        |    1 +
 drivers/event/dsw/Makefile                    |   28 +
 drivers/event/dsw/dsw_evdev.c                 |  361 +++++
 drivers/event/dsw/dsw_evdev.h                 |  296 ++++
 drivers/event/dsw/dsw_event.c                 | 1285 +++++++++++++++++
 drivers/event/dsw/dsw_sort.h                  |   47 +
 drivers/event/dsw/dsw_xstats.c                |  284 ++++
 .../event/dsw/rte_pmd_evdev_dsw_version.map   |    3 +
 mk/rte.app.mk                                 |    1 +
 10 files changed, 2311 insertions(+)
 create mode 100644 drivers/event/dsw/Makefile
 create mode 100644 drivers/event/dsw/dsw_evdev.c
 create mode 100644 drivers/event/dsw/dsw_evdev.h
 create mode 100644 drivers/event/dsw/dsw_event.c
 create mode 100644 drivers/event/dsw/dsw_sort.h
 create mode 100644 drivers/event/dsw/dsw_xstats.c
 create mode 100644 drivers/event/dsw/rte_pmd_evdev_dsw_version.map

diff --git a/config/common_base b/config/common_base
index e4241db16..c07d340d9 100644
--- a/config/common_base
+++ b/config/common_base
@@ -588,6 +588,11 @@ CONFIG_RTE_LIBRTE_PMD_SKELETON_EVENTDEV_DEBUG=n
 #
 CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV=y
 
+#
+# Compile PMD for distributed software event device
+#
+CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV=y
+
 #
 # Compile PMD for octeontx sso event device
 #
diff --git a/drivers/event/Makefile b/drivers/event/Makefile
index f301d8dc2..03ad1b6cb 100644
--- a/drivers/event/Makefile
+++ b/drivers/event/Makefile
@@ -6,6 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk
 
 DIRS-$(CONFIG_RTE_LIBRTE_PMD_SKELETON_EVENTDEV) += skeleton
 DIRS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw
+DIRS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw
 DIRS-$(CONFIG_RTE_LIBRTE_PMD_OCTEONTX_SSOVF) += octeontx
 ifeq ($(CONFIG_RTE_LIBRTE_DPAA_BUS),y)
 DIRS-$(CONFIG_RTE_LIBRTE_PMD_DPAA_EVENTDEV) += dpaa
diff --git a/drivers/event/dsw/Makefile b/drivers/event/dsw/Makefile
new file mode 100644
index 000000000..8f358eaa3
--- /dev/null
+++ b/drivers/event/dsw/Makefile
@@ -0,0 +1,28 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2018 Ericsson AB
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+LIB = librte_pmd_dsw_event.a
+
+CFLAGS += -DALLOW_EXPERIMENTAL_API
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+CFLAGS += -Wno-format-nonliteral
+
+LDLIBS += -lrte_eal
+LDLIBS += -lrte_mbuf
+LDLIBS += -lrte_mempool
+LDLIBS += -lrte_ring
+LDLIBS += -lrte_eventdev
+LDLIBS += -lrte_bus_vdev
+
+LIBABIVER := 1
+
+EXPORT_MAP := rte_pmd_evdev_dsw_version.map
+
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw_evdev.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw_event.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw_xstats.c
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c
new file mode 100644
index 000000000..39008f7eb
--- /dev/null
+++ b/drivers/event/dsw/dsw_evdev.c
@@ -0,0 +1,361 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2018 Ericsson AB
+ */
+
+#include <rte_eventdev_pmd.h>
+#include <rte_eventdev_pmd_vdev.h>
+#include <rte_cycles.h>
+#include <rte_random.h>
+
+#include "dsw_evdev.h"
+
+#define EVENTDEV_NAME_DSW_PMD event_dsw
+
+static int
+dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
+	       const struct rte_event_port_conf *conf)
+{
+	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+	struct dsw_port *port;
+	struct rte_event_ring *in_ring;
+	struct rte_ring *ctl_in_ring;
+	char ring_name[RTE_RING_NAMESIZE];
+
+	port = &dsw->ports[port_id];
+
+	*port = (struct dsw_port) {
+		.id = port_id,
+		.dsw = dsw,
+		.dequeue_depth = conf->dequeue_depth,
+		.enqueue_depth = conf->enqueue_depth,
+		.new_event_threshold = conf->new_event_threshold
+	};
+
+	snprintf(ring_name, sizeof(ring_name), "dsw%d_p%u", dev->data->dev_id,
+		 port_id);
+
+	in_ring = rte_event_ring_create(ring_name, DSW_IN_RING_SIZE,
+					dev->data->socket_id,
+					RING_F_SC_DEQ|RING_F_EXACT_SZ);
+
+	if (in_ring == NULL)
+		return -ENOMEM;
+
+	snprintf(ring_name, sizeof(ring_name), "dswctl%d_p%u",
+		 dev->data->dev_id, port_id);
+
+	ctl_in_ring = rte_ring_create(ring_name, DSW_CTL_IN_RING_SIZE,
+				      dev->data->socket_id,
+				      RING_F_SC_DEQ|RING_F_EXACT_SZ);
+
+	if (in_ring == NULL) {
+		rte_event_ring_free(in_ring);
+		return -ENOMEM;
+	}
+
+	port->in_ring = in_ring;
+	port->ctl_in_ring = ctl_in_ring;
+
+	rte_atomic16_init(&port->load);
+
+	port->load_update_interval =
+		(DSW_LOAD_UPDATE_INTERVAL * rte_get_timer_hz()) / US_PER_S;
+
+	port->migration_interval =
+		(DSW_MIGRATION_INTERVAL * rte_get_timer_hz()) / US_PER_S;
+
+	dev->data->ports[port_id] = port;
+
+	return 0;
+}
+
+static void
+dsw_port_def_conf(struct rte_eventdev *dev __rte_unused,
+		  uint8_t port_id __rte_unused,
+		  struct rte_event_port_conf *port_conf)
+{
+	*port_conf = (struct rte_event_port_conf) {
+		.new_event_threshold = 1024,
+		.dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH / 4,
+		.enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH / 4
+	};
+}
+
+static void
+dsw_port_release(void *p)
+{
+	struct dsw_port *port = p;
+
+	rte_event_ring_free(port->in_ring);
+	rte_ring_free(port->ctl_in_ring);
+}
+
+static int
+dsw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id,
+		const struct rte_event_queue_conf *conf)
+{
+	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+	struct dsw_queue *queue = &dsw->queues[queue_id];
+
+	if (RTE_EVENT_QUEUE_CFG_ALL_TYPES & conf->event_queue_cfg)
+		return -ENOTSUP;
+
+	if (conf->schedule_type == RTE_SCHED_TYPE_ORDERED)
+		return -ENOTSUP;
+
+	/* SINGLE_LINK is better off treated as TYPE_ATOMIC, since it
+	 * avoid the "fake" TYPE_PARALLEL flow_id assignment. Since
+	 * the queue will only have a single serving port, no
+	 * migration will ever happen, so the extra TYPE_ATOMIC
+	 * migration overhead is avoided.
+	 */
+	if (RTE_EVENT_QUEUE_CFG_SINGLE_LINK & conf->event_queue_cfg)
+		queue->schedule_type = RTE_SCHED_TYPE_ATOMIC;
+	else /* atomic or parallel */
+		queue->schedule_type = conf->schedule_type;
+
+	queue->num_serving_ports = 0;
+
+	return 0;
+}
+
+static void
+dsw_queue_def_conf(struct rte_eventdev *dev __rte_unused,
+		   uint8_t queue_id __rte_unused,
+		   struct rte_event_queue_conf *queue_conf)
+{
+	*queue_conf = (struct rte_event_queue_conf) {
+		.nb_atomic_flows = 4096,
+		.schedule_type = RTE_SCHED_TYPE_ATOMIC,
+		.priority = RTE_EVENT_DEV_PRIORITY_NORMAL
+	};
+}
+
+static void
+dsw_queue_release(struct rte_eventdev *dev __rte_unused,
+		  uint8_t queue_id __rte_unused)
+{
+}
+
+static void
+queue_add_port(struct dsw_queue *queue, uint16_t port_id)
+{
+	queue->serving_ports[queue->num_serving_ports] = port_id;
+	queue->num_serving_ports++;
+}
+
+static bool
+queue_remove_port(struct dsw_queue *queue, uint16_t port_id)
+{
+	uint16_t i;
+	for (i = 0; i < queue->num_serving_ports; i++)
+		if (queue->serving_ports[i] == port_id) {
+			uint16_t last_idx = queue->num_serving_ports - 1;
+			if (i != last_idx)
+				queue->serving_ports[i] =
+					queue->serving_ports[last_idx];
+			queue->num_serving_ports--;
+			return true;
+		}
+	return false;
+}
+
+static int
+dsw_port_link_unlink(struct rte_eventdev *dev, void *port,
+		     const uint8_t queues[], uint16_t num, bool link)
+{
+	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+	struct dsw_port *p = port;
+	uint16_t i;
+	uint16_t count = 0;
+
+	for (i = 0; i < num; i++) {
+		uint8_t qid = queues[i];
+		struct dsw_queue *q = &dsw->queues[qid];
+		if (link) {
+			queue_add_port(q, p->id);
+			count++;
+		} else {
+			bool removed = queue_remove_port(q, p->id);
+			if (removed)
+				count++;
+		}
+	}
+
+	return count;
+}
+
+static int
+dsw_port_link(struct rte_eventdev *dev, void *port, const uint8_t queues[],
+	      const uint8_t priorities[] __rte_unused, uint16_t num)
+{
+	return dsw_port_link_unlink(dev, port, queues, num, true);
+}
+
+static int
+dsw_port_unlink(struct rte_eventdev *dev, void *port, uint8_t queues[],
+		uint16_t num)
+{
+	return dsw_port_link_unlink(dev, port, queues, num, false);
+}
+
+static void
+dsw_info_get(struct rte_eventdev *dev __rte_unused,
+	     struct rte_event_dev_info *info)
+{
+	*info = (struct rte_event_dev_info) {
+		.driver_name = DSW_PMD_NAME,
+		.max_event_queues = DSW_MAX_QUEUES,
+		.max_event_queue_flows = DSW_MAX_FLOWS,
+		.max_event_queue_priority_levels = 1,
+		.max_event_priority_levels = 1,
+		.max_event_ports = DSW_MAX_PORTS,
+		.max_event_port_dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH,
+		.max_event_port_enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH,
+		.max_num_events = DSW_MAX_EVENTS,
+		.event_dev_cap = RTE_EVENT_DEV_CAP_BURST_MODE|
+		RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED
+	};
+}
+
+static int
+dsw_configure(const struct rte_eventdev *dev)
+{
+	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+	const struct rte_event_dev_config *conf = &dev->data->dev_conf;
+	int32_t min_max_in_flight;
+
+	dsw->num_queues = conf->nb_event_queues;
+	dsw->num_ports = conf->nb_event_ports;
+
+	/* Avoid a situation where consumer ports are holding all the
+	 * credits, without making use of them.
+	 */
+	min_max_in_flight = conf->nb_event_ports * DSW_PORT_MAX_CREDITS;
+
+	dsw->max_inflight = RTE_MAX(conf->nb_events_limit, min_max_in_flight);
+
+	return 0;
+}
+
+static void
+initial_flow_to_port_assignment(struct dsw_evdev *dsw)
+{
+	uint8_t queue_id;
+	for (queue_id = 0; queue_id < dsw->num_queues; queue_id++) {
+		struct dsw_queue *queue = &dsw->queues[queue_id];
+		uint16_t flow_hash;
+		for (flow_hash = 0; flow_hash < DSW_MAX_FLOWS; flow_hash++) {
+			uint8_t port_idx =
+				rte_rand() % queue->num_serving_ports;
+			uint8_t port_id =
+				queue->serving_ports[port_idx];
+			dsw->queues[queue_id].flow_to_port_map[flow_hash] =
+				port_id;
+		}
+	}
+}
+
+static int
+dsw_start(struct rte_eventdev *dev)
+{
+	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+	uint16_t i;
+	uint64_t now;
+
+	rte_atomic32_init(&dsw->credits_on_loan);
+
+	initial_flow_to_port_assignment(dsw);
+
+	now = rte_get_timer_cycles();
+	for (i = 0; i < dsw->num_ports; i++) {
+		dsw->ports[i].measurement_start = now;
+		dsw->ports[i].busy_start = now;
+	}
+
+	return 0;
+}
+
+static void
+dsw_stop(struct rte_eventdev *dev __rte_unused)
+{
+}
+
+static int
+dsw_close(struct rte_eventdev *dev)
+{
+	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+
+	dsw->num_ports = 0;
+	dsw->num_queues = 0;
+
+	return 0;
+}
+
+static struct rte_eventdev_ops dsw_evdev_ops = {
+	.dev_infos_get = dsw_info_get,
+	.dev_configure = dsw_configure,
+	.dev_start = dsw_start,
+	.dev_stop = dsw_stop,
+	.dev_close = dsw_close,
+	.port_setup = dsw_port_setup,
+	.port_def_conf = dsw_port_def_conf,
+	.port_release = dsw_port_release,
+	.queue_setup = dsw_queue_setup,
+	.queue_def_conf = dsw_queue_def_conf,
+	.queue_release = dsw_queue_release,
+	.port_link = dsw_port_link,
+	.port_unlink = dsw_port_unlink,
+#ifdef DSW_XSTATS
+	.xstats_get = dsw_xstats_get,
+	.xstats_get_names = dsw_xstats_get_names,
+	.xstats_get_by_name = dsw_xstats_get_by_name
+#endif
+};
+
+static int
+dsw_probe(struct rte_vdev_device *vdev)
+{
+	const char *name;
+	struct rte_eventdev *dev;
+	struct dsw_evdev *dsw;
+
+	name = rte_vdev_device_name(vdev);
+
+	dev = rte_event_pmd_vdev_init(name, sizeof(struct dsw_evdev),
+				      rte_socket_id());
+	if (dev == NULL)
+		return -EFAULT;
+
+	dev->dev_ops = &dsw_evdev_ops;
+	dev->enqueue = dsw_event_enqueue;
+	dev->enqueue_burst = dsw_event_enqueue_burst;
+	dev->enqueue_new_burst = dsw_event_enqueue_new_burst;
+	dev->enqueue_forward_burst = dsw_event_enqueue_forward_burst;
+	dev->dequeue = dsw_event_dequeue;
+	dev->dequeue_burst = dsw_event_dequeue_burst;
+
+	dsw = dev->data->dev_private;
+	dsw->data = dev->data;
+
+	return 0;
+}
+
+static int
+dsw_remove(struct rte_vdev_device *vdev)
+{
+	const char *name;
+
+	name = rte_vdev_device_name(vdev);
+	if (name == NULL)
+		return -EINVAL;
+
+	return rte_event_pmd_vdev_uninit(name);
+}
+
+static struct rte_vdev_driver evdev_dsw_pmd_drv = {
+	.probe = dsw_probe,
+	.remove = dsw_remove
+};
+
+RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_DSW_PMD, evdev_dsw_pmd_drv);
diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
new file mode 100644
index 000000000..e6b34c013
--- /dev/null
+++ b/drivers/event/dsw/dsw_evdev.h
@@ -0,0 +1,296 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2018 Ericsson AB
+ */
+
+#ifndef _DSW_EVDEV_H_
+#define _DSW_EVDEV_H_
+
+#include <inttypes.h>
+#include <stdbool.h>
+#include <rte_eventdev.h>
+#include <rte_event_ring.h>
+#include <rte_ring.h>
+#include <rte_event_ring.h>
+#include <rte_config.h>
+
+#define DSW_PMD_NAME RTE_STR(event_dsw)
+
+/* Code changes are required to allow more ports. */
+#define DSW_MAX_PORTS (64)
+#define DSW_MAX_PORT_OUT_BUFFER (32)
+#define DSW_MAX_PORT_DEQUEUE_DEPTH (128)
+#define DSW_MAX_PORT_ENQUEUE_DEPTH (DSW_MAX_PORT_OUT_BUFFER)
+
+#define DSW_MAX_QUEUES (16)
+
+#define DSW_MAX_EVENTS (16384)
+
+/* Code changes are required to allow more flows than 32k. */
+#define DSW_MAX_FLOWS_BITS (15)
+#define DSW_MAX_FLOWS (1<<(DSW_MAX_FLOWS_BITS))
+#define DSW_MAX_FLOWS_MASK (DSW_MAX_FLOWS-1)
+
+/* Eventdev RTE_SCHED_TYPE_PARALLEL doesn't have a concept of flows,
+ * but the 'dsw' scheduler (more or less) randomly assign flow id to
+ * events on parallel queues, to be able to reuse some of the
+ * migration mechanism and scheduling logic from
+ * RTE_SCHED_TYPE_ATOMIC. By moving one of the parallel "flows" from a
+ * particular port, the likely-hood of events being scheduled to this
+ * port is reduced, and thus a kind of statistical load balancing is
+ * achieved.
+ */
+#define DSW_PARALLEL_FLOWS (1024)
+
+/* 'Background tasks' are polling the control rings for *
+ *  migration-related messages, or flush the output buffer (so
+ *  buffered events doesn't linger too long). Shouldn't be too low,
+ *  since the system won't benefit from the 'batching' effects from
+ *  the output buffer, and shouldn't be too high, since it will make
+ *  buffered events linger too long in case the port goes idle.
+ */
+#define DSW_MAX_PORT_OPS_PER_BG_TASK (128)
+
+/* Avoid making small 'loans' from the central in-flight event credit
+ * pool, to improve efficiency.
+ */
+#define DSW_MIN_CREDIT_LOAN (64)
+#define DSW_PORT_MAX_CREDITS (2*DSW_MIN_CREDIT_LOAN)
+#define DSW_PORT_MIN_CREDITS (DSW_MIN_CREDIT_LOAN)
+
+/* The rings are dimensioned so that all in-flight events can reside
+ * on only one of the port rings, to avoid the trouble of having to
+ * care about the case of an event not fitting on the receiver port's
+ * ring.
+ */
+#define DSW_IN_RING_SIZE (DSW_MAX_EVENTS)
+
+#define DSW_MAX_LOAD (INT16_MAX)
+#define DSW_LOAD_FROM_PERCENT(x) ((int16_t)(((x)*DSW_MAX_LOAD)/100))
+#define DSW_LOAD_TO_PERCENT(x) ((100*x)/DSW_MAX_LOAD)
+
+/* The thought behind keeping the load update interval shorter than
+ * the migration interval is that the load from newly migrated flows
+ * should 'show up' on the load measurement before new migrations are
+ * considered. This is to avoid having too many flows, from too many
+ * source ports, to be migrated too quickly to a lightly loaded port -
+ * in particular since this might cause the system to oscillate.
+ */
+#define DSW_LOAD_UPDATE_INTERVAL (DSW_MIGRATION_INTERVAL/4)
+#define DSW_OLD_LOAD_WEIGHT (1)
+
+/* The minimum time (in us) between two flow migrations. What puts an
+ * upper limit on the actual migration rate is primarily the pace in
+ * which the ports send and receive control messages, which in turn is
+ * largely a function of how much cycles are spent the processing of
+ * an event burst.
+ */
+#define DSW_MIGRATION_INTERVAL (1000)
+#define DSW_MIN_SOURCE_LOAD_FOR_MIGRATION (DSW_LOAD_FROM_PERCENT(70))
+#define DSW_MAX_TARGET_LOAD_FOR_MIGRATION (DSW_LOAD_FROM_PERCENT(95))
+
+#define DSW_MAX_EVENTS_RECORDED (128)
+
+/* Only one outstanding migration per port is allowed */
+#define DSW_MAX_PAUSED_FLOWS (DSW_MAX_PORTS)
+
+/* Enough room for paus request/confirm and unpaus request/confirm for
+ * all possible senders.
+ */
+#define DSW_CTL_IN_RING_SIZE ((DSW_MAX_PORTS-1)*4)
+
+/* Statistics is configurable at this point, mostly to make it easy to
+ * measure their performance impact.
+ */
+#define DSW_XSTATS
+
+/* With DSW_SORT_DEQUEUED enabled, the scheduler will, at the point of
+ * dequeue(), arrange events so that events with the same flow id on
+ * the same queue forms a back-to-back "burst", and also so that such
+ * bursts of different flow ids, but on the same queue, also come
+ * consecutively. All this in an attempt to improve data and
+ * instruction cache usage for the application, at the cost of a
+ * scheduler overhead increase.
+ */
+
+//#define DSW_SORT_DEQUEUED
+
+struct dsw_queue_flow {
+	uint8_t queue_id;
+	uint16_t flow_hash;
+};
+
+enum dsw_migration_state {
+	DSW_MIGRATION_STATE_IDLE = 0,
+	DSW_MIGRATION_STATE_PAUSING,
+	DSW_MIGRATION_STATE_FORWARDING,
+	DSW_MIGRATION_STATE_UNPAUSING
+};
+
+struct dsw_port {
+	uint16_t id;
+
+	/* Keeping a pointer here to avoid container_of() calls, which
+	 * are expensive since they are very frequent and will result
+	 * in an integer multiplication (since the port id is an index
+	 * into the dsw_evdev port array).
+	 */
+	struct dsw_evdev *dsw;
+
+	uint16_t dequeue_depth;
+	uint16_t enqueue_depth;
+
+	int32_t inflight_credits;
+
+	int32_t new_event_threshold;
+
+	uint16_t pending_releases;
+
+	uint16_t next_parallel_flow_id;
+
+	uint16_t ops_since_bg_task;
+
+	/* For port load measurement. */
+	uint64_t next_load_update;
+	uint64_t load_update_interval;
+	uint64_t measurement_start;
+	uint64_t busy_start;
+	uint64_t busy_cycles;
+
+	/* For the ctl interface and flow migration mechanism. */
+	uint64_t next_migration;
+	uint64_t migration_interval;
+	enum dsw_migration_state migration_state;
+
+#ifdef DSW_XSTATS
+
+	uint64_t new_enqueued;
+	uint64_t forward_enqueued;
+	uint64_t release_enqueued;
+	uint64_t queue_enqueued[DSW_MAX_QUEUES];
+
+	uint64_t dequeued;
+	uint64_t queue_dequeued[DSW_MAX_QUEUES];
+
+	uint64_t migration_start;
+	uint64_t migrations;
+	uint64_t migration_latency;
+
+	uint64_t total_busy_cycles;
+
+	uint64_t last_bg;
+#endif
+
+	uint8_t migration_target_port_id;
+	struct dsw_queue_flow migration_target_qf;
+	uint8_t cfm_cnt;
+
+	uint16_t paused_flows_len;
+	struct dsw_queue_flow paused_flows[DSW_MAX_PAUSED_FLOWS];
+
+	/* In a very contrived worst case all inflight events can be
+	 * laying around paused here.
+	 */
+	uint16_t paused_events_len;
+	struct rte_event paused_events[DSW_MAX_EVENTS];
+
+	uint16_t seen_events_len;
+	uint16_t seen_events_idx;
+	struct dsw_queue_flow seen_events[DSW_MAX_EVENTS_RECORDED];
+
+	uint16_t out_buffer_len[DSW_MAX_PORTS];
+	struct rte_event out_buffer[DSW_MAX_PORTS][DSW_MAX_PORT_OUT_BUFFER];
+
+	uint16_t in_buffer_len;
+	uint16_t in_buffer_start;
+	/* This buffer may contain events that were read up from the
+	 * in_ring during the flow migration process.
+	 */
+	struct rte_event in_buffer[DSW_MAX_EVENTS];
+
+	struct rte_event_ring *in_ring __rte_cache_aligned;
+
+	struct rte_ring *ctl_in_ring __rte_cache_aligned;
+
+	/* Estimate of current port load. */
+	rte_atomic16_t load __rte_cache_aligned;
+} __rte_cache_aligned;
+
+struct dsw_queue {
+	uint8_t schedule_type;
+	uint8_t serving_ports[DSW_MAX_PORTS];
+	uint16_t num_serving_ports;
+
+	uint8_t flow_to_port_map[DSW_MAX_FLOWS] __rte_cache_aligned;
+};
+
+struct dsw_evdev {
+	struct rte_eventdev_data *data;
+
+	struct dsw_port ports[DSW_MAX_PORTS];
+	uint16_t num_ports;
+	struct dsw_queue queues[DSW_MAX_QUEUES];
+	uint8_t num_queues;
+	int32_t max_inflight;
+
+	rte_atomic32_t credits_on_loan __rte_cache_aligned;
+};
+
+#define DSW_CTL_PAUS_REQ (0)
+#define DSW_CTL_UNPAUS_REQ (1)
+#define DSW_CTL_CFM (2)
+
+/* sizeof(struct dsw_ctl_msg) must be equal or less than
+ * sizeof(void *), to fit on the control ring.
+ */
+struct dsw_ctl_msg {
+	uint8_t type:2;
+	uint8_t originating_port_id:6;
+	uint8_t queue_id;
+	uint16_t flow_hash;
+} __rte_packed;
+
+uint16_t dsw_event_enqueue(void *port, const struct rte_event *event);
+uint16_t dsw_event_enqueue_burst(void *port,
+				 const struct rte_event events[],
+				 uint16_t events_len);
+uint16_t dsw_event_enqueue_new_burst(void *port,
+				     const struct rte_event events[],
+				     uint16_t events_len);
+uint16_t dsw_event_enqueue_forward_burst(void *port,
+					 const struct rte_event events[],
+					 uint16_t events_len);
+
+uint16_t dsw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait);
+uint16_t dsw_event_dequeue_burst(void *port, struct rte_event *events,
+				 uint16_t num, uint64_t wait);
+
+void dsw_event_schedule(struct rte_eventdev *dev);
+
+#ifdef DSW_XSTATS
+int dsw_xstats_get_names(const struct rte_eventdev *dev,
+			 enum rte_event_dev_xstats_mode mode,
+			 uint8_t queue_port_id,
+			 struct rte_event_dev_xstats_name *xstats_names,
+			 unsigned int *ids, unsigned int size);
+int dsw_xstats_get(const struct rte_eventdev *dev,
+		   enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id,
+		   const unsigned int ids[], uint64_t values[], unsigned int n);
+uint64_t dsw_xstats_get_by_name(const struct rte_eventdev *dev,
+				const char *name, unsigned int *id);
+#endif
+
+static inline struct dsw_evdev *
+dsw_pmd_priv(const struct rte_eventdev *eventdev)
+{
+	return eventdev->data->dev_private;
+}
+
+#define DSW_LOG_DP(level, fmt, args...)					\
+	RTE_LOG_DP(level, EVENTDEV, "[%s] %s() line %u: " fmt,		\
+		   DSW_PMD_NAME,					\
+		   __func__, __LINE__, ## args)
+
+#define DSW_LOG_DP_PORT(level, port_id, fmt, args...)		\
+	DSW_LOG_DP(level, "<Port %d> " fmt, port_id, ## args)
+
+#endif
diff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c
new file mode 100644
index 000000000..5295e84c0
--- /dev/null
+++ b/drivers/event/dsw/dsw_event.c
@@ -0,0 +1,1285 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2018 Ericsson AB
+ */
+
+#ifdef DSW_SORT_DEQUEUED
+#include "dsw_sort.h"
+#endif
+
+#include "dsw_evdev.h"
+
+#include <string.h>
+#include <stdbool.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <rte_cycles.h>
+#include <rte_atomic.h>
+#include <rte_random.h>
+#include <rte_branch_prediction.h>
+#include <rte_memcpy.h>
+
+static bool
+dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
+			 int32_t credits)
+{
+	int32_t inflight_credits = port->inflight_credits;
+	int32_t missing_credits = credits - inflight_credits;
+	int32_t total_on_loan;
+	int32_t available;
+	int32_t acquired_credits;
+	int32_t new_total_on_loan;
+
+	if (likely(missing_credits <= 0)) {
+		port->inflight_credits -= credits;
+		return true;
+	}
+
+	total_on_loan = rte_atomic32_read(&dsw->credits_on_loan);
+	available = dsw->max_inflight - total_on_loan;
+	acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
+
+	if (available < acquired_credits)
+		return false;
+
+	/* This is a race, no locks are involved, and thus some other
+	 * thread can allocate tokens in between the check and the
+	 * allocation.
+	 */
+	new_total_on_loan = rte_atomic32_add_return(&dsw->credits_on_loan,
+						    acquired_credits);
+
+	if (unlikely(new_total_on_loan > dsw->max_inflight)) {
+		/* Some other port took the last credits */
+		rte_atomic32_sub(&dsw->credits_on_loan, acquired_credits);
+		return false;
+	}
+
+	DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
+			acquired_credits);
+
+	port->inflight_credits += acquired_credits;
+	port->inflight_credits -= credits;
+
+	return true;
+}
+
+static void
+dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
+			int32_t credits)
+{
+	port->inflight_credits += credits;
+
+	if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
+		int32_t leave_credits = DSW_PORT_MIN_CREDITS;
+		int32_t return_credits =
+			port->inflight_credits - leave_credits;
+
+		port->inflight_credits = leave_credits;
+
+		rte_atomic32_sub(&dsw->credits_on_loan, return_credits);
+
+		DSW_LOG_DP_PORT(DEBUG, port->id,
+				"Returned %d tokens to pool.\n",
+				return_credits);
+	}
+}
+
+#ifdef DSW_XSTATS
+
+static void
+dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new,
+		       uint16_t num_forward, uint16_t num_release)
+{
+	port->new_enqueued += num_new;
+	port->forward_enqueued += num_forward;
+	port->release_enqueued += num_release;
+}
+
+static void
+dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id)
+{
+	source_port->queue_enqueued[queue_id]++;
+}
+
+static void
+dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num)
+{
+	port->dequeued += num;
+}
+
+static void
+dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id)
+{
+	source_port->queue_dequeued[queue_id]++;
+}
+#endif
+
+static void
+dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
+{
+	if (dequeued > 0 && port->busy_start == 0)
+		/* work period begins */
+		port->busy_start = rte_get_timer_cycles();
+	else if (dequeued == 0 && port->busy_start > 0) {
+		/* work period ends */
+		uint64_t work_period =
+			rte_get_timer_cycles() - port->busy_start;
+		port->busy_cycles += work_period;
+		port->busy_start = 0;
+	}
+}
+
+static int16_t
+dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
+{
+	uint64_t passed = now - port->measurement_start;
+	uint64_t busy_cycles = port->busy_cycles;
+
+	if (port->busy_start > 0) {
+		busy_cycles += (now - port->busy_start);
+		port->busy_start = now;
+	}
+
+	int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
+
+	port->measurement_start = now;
+	port->busy_cycles = 0;
+
+#ifdef DSW_XSTATS
+	port->total_busy_cycles += busy_cycles;
+#endif
+
+	return load;
+}
+
+static void
+dsw_port_load_update(struct dsw_port *port, uint64_t now)
+{
+	int16_t old_load;
+	int16_t period_load;
+	int16_t new_load;
+
+	old_load = rte_atomic16_read(&port->load);
+
+	period_load = dsw_port_load_close_period(port, now);
+
+	new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
+		(DSW_OLD_LOAD_WEIGHT+1);
+
+	rte_atomic16_set(&port->load, new_load);
+}
+
+static void
+dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
+{
+	if (now < port->next_load_update)
+		return;
+
+	port->next_load_update = now + port->load_update_interval;
+
+	dsw_port_load_update(port, now);
+}
+
+static void
+dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
+{
+	void *raw_msg;
+	memcpy(&raw_msg, msg, sizeof(*msg));
+
+	/* there's always room on the ring */
+	while (rte_ring_enqueue(port->ctl_in_ring, raw_msg) != 0)
+		rte_pause();
+}
+
+static int
+dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
+{
+	void *raw_msg;
+	int rc;
+
+	rc = rte_ring_dequeue(port->ctl_in_ring, &raw_msg);
+
+	if (rc == 0)
+		memcpy(msg, &raw_msg, sizeof(*msg));
+
+	return rc;
+}
+
+static void
+dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
+		       uint8_t type, uint8_t queue_id, uint16_t flow_hash)
+{
+	uint16_t port_id;
+	struct dsw_ctl_msg msg = {
+		.type = type,
+		.originating_port_id = source_port->id,
+		.queue_id = queue_id,
+		.flow_hash = flow_hash
+	};
+
+	for (port_id = 0; port_id < dsw->num_ports; port_id++)
+		if (port_id != source_port->id)
+			dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
+}
+
+static bool
+dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
+			uint16_t flow_hash)
+{
+	uint16_t i;
+	for (i = 0; i < port->paused_flows_len; i++) {
+		struct dsw_queue_flow *qf = &port->paused_flows[i];
+		if (qf->queue_id == queue_id &&
+		    qf->flow_hash == flow_hash)
+			return true;
+	}
+	return false;
+}
+
+static void
+dsw_port_add_paused_flow(struct dsw_port *port, uint8_t queue_id,
+			 uint16_t paused_flow_hash)
+{
+	port->paused_flows[port->paused_flows_len] = (struct dsw_queue_flow) {
+		.queue_id = queue_id,
+		.flow_hash = paused_flow_hash
+	};
+	port->paused_flows_len++;
+}
+
+static void
+dsw_port_remove_paused_flow(struct dsw_port *port, uint8_t queue_id,
+			    uint16_t paused_flow_hash)
+{
+	uint16_t i;
+
+	for (i = 0; i < port->paused_flows_len; i++) {
+		struct dsw_queue_flow *qf = &port->paused_flows[i];
+
+		if (qf->queue_id == queue_id &&
+		    qf->flow_hash == paused_flow_hash) {
+			uint16_t last_idx = port->paused_flows_len-1;
+			if (i != last_idx)
+				port->paused_flows[i] =
+					port->paused_flows[last_idx];
+			port->paused_flows_len--;
+			break;
+		}
+	}
+}
+
+static void
+dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
+
+static void
+dsw_port_handle_pause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
+			   uint8_t originating_port_id, uint8_t queue_id,
+			   uint16_t paused_flow_hash)
+{
+	struct dsw_ctl_msg cfm = {
+		.type = DSW_CTL_CFM,
+		.originating_port_id = port->id,
+		.queue_id = queue_id,
+		.flow_hash = paused_flow_hash
+	};
+
+	DSW_LOG_DP_PORT(DEBUG, port->id, "Pausing queue_id %d flow_hash %d.\n",
+			queue_id, paused_flow_hash);
+
+	/* There might be already-scheduled events belonging to the
+	 * paused flow in the output buffers.
+	 */
+	dsw_port_flush_out_buffers(dsw, port);
+
+	dsw_port_add_paused_flow(port, queue_id, paused_flow_hash);
+
+	/* Make sure any stores to the original port's in_ring is seen
+	 * before the ctl message.
+	 */
+	rte_smp_wmb();
+
+	dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
+}
+
+static void
+dsw_find_lowest_load_port(uint8_t *port_ids, uint16_t num_port_ids,
+			  uint8_t exclude_port_id, int16_t *port_loads,
+			  uint8_t *target_port_id, int16_t *target_load)
+{
+	int16_t candidate_port_id = -1;
+	int16_t candidate_load = DSW_MAX_LOAD;
+	uint16_t i;
+
+	for (i = 0; i < num_port_ids; i++) {
+		uint8_t port_id = port_ids[i];
+		if (port_id != exclude_port_id) {
+			int16_t load = port_loads[port_id];
+			if (candidate_port_id == -1 ||
+			    load < candidate_load) {
+				candidate_port_id = port_id;
+				candidate_load = load;
+			}
+		}
+	}
+	*target_port_id = candidate_port_id;
+	*target_load = candidate_load;
+}
+
+struct dsw_queue_flow_burst {
+	struct dsw_queue_flow queue_flow;
+	uint16_t count;
+};
+
+static inline int
+dsw_cmp_burst(const void *v_burst_a, const void *v_burst_b)
+{
+	const struct dsw_queue_flow_burst *burst_a = v_burst_a;
+	const struct dsw_queue_flow_burst *burst_b = v_burst_b;
+
+	int a_count = burst_a->count;
+	int b_count = burst_b->count;
+
+	return a_count - b_count;
+}
+
+#define DSW_QF_TO_INT(_qf)					\
+	((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
+
+static inline int
+dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
+{
+	const struct dsw_queue_flow *qf_a = v_qf_a;
+	const struct dsw_queue_flow *qf_b = v_qf_b;
+
+	return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
+}
+
+static uint16_t
+dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
+		       struct dsw_queue_flow_burst *bursts)
+{
+	uint16_t i;
+	struct dsw_queue_flow_burst *current_burst;
+	uint16_t num_bursts = 0;
+
+	/* We don't need the stable property, and the list is likely
+	 * large enough for qsort() to outperform dsw_stable_sort(),
+	 * so we use qsort() here.
+	 */
+	qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
+
+	/* arrange the (now-consecutive) events into bursts */
+	for (i = 0; i < qfs_len; i++) {
+		if (i == 0 ||
+		    dsw_cmp_qf(&qfs[i], &current_burst->queue_flow) != 0) {
+			current_burst = &bursts[num_bursts];
+			current_burst->queue_flow = qfs[i];
+			current_burst->count = 0;
+			num_bursts++;
+		}
+		current_burst->count++;
+	}
+
+	qsort(bursts, num_bursts, sizeof(bursts[0]), dsw_cmp_burst);
+
+	return num_bursts;
+}
+
+static bool
+dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
+			int16_t load_limit)
+{
+	bool below_limit = false;
+	uint16_t i;
+	for (i = 0; i < dsw->num_ports; i++) {
+		int16_t load = rte_atomic16_read(&dsw->ports[i].load);
+		if (load < load_limit)
+			below_limit = true;
+		port_loads[i] = load;
+	}
+	return below_limit;
+}
+
+static bool
+dsw_select_migration_target(struct dsw_evdev *dsw,
+			    struct dsw_port *source_port,
+			    struct dsw_queue_flow_burst *bursts,
+			    uint16_t num_bursts, int16_t *port_loads,
+			    int16_t max_load, struct dsw_queue_flow *target_qf,
+			    uint8_t *target_port_id)
+{
+	uint16_t source_load = port_loads[source_port->id];
+	uint16_t i;
+
+	for (i = 0; i < num_bursts; i++) {
+		struct dsw_queue_flow *qf = &bursts[i].queue_flow;
+
+		if (dsw_port_is_flow_paused(source_port, qf->queue_id,
+					    qf->flow_hash))
+			continue;
+
+		struct dsw_queue *queue = &dsw->queues[qf->queue_id];
+		int16_t target_load;
+
+		dsw_find_lowest_load_port(queue->serving_ports,
+					  queue->num_serving_ports,
+					  source_port->id, port_loads,
+					  target_port_id, &target_load);
+
+		if (target_load < source_load &&
+		    target_load < max_load) {
+			*target_qf = *qf;
+			return true;
+		}
+	}
+
+	DSW_LOG_DP_PORT(DEBUG, source_port->id, "For the %d flows considered, "
+			"no target port found with load less than %d.\n",
+			num_bursts, DSW_LOAD_TO_PERCENT(max_load));
+
+	return false;
+}
+
+static uint8_t
+dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
+{
+	struct dsw_queue *queue = &dsw->queues[queue_id];
+	uint8_t port_id;
+
+	if (queue->num_serving_ports > 1)
+		port_id = queue->flow_to_port_map[flow_hash];
+	else
+		/* A single-link queue, or atomic/ordered/parallel but
+		 * with just a single serving port.
+		 */
+		port_id = queue->serving_ports[0];
+
+	DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
+		   "to port %d.\n", queue_id, flow_hash, port_id);
+
+	return port_id;
+}
+
+static void
+dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
+			   uint8_t dest_port_id)
+{
+	struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
+	uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
+	struct rte_event *buffer = source_port->out_buffer[dest_port_id];
+	uint16_t enqueued = 0;
+
+	if (*buffer_len == 0)
+		return;
+
+	/* The rings are dimensioned to fit all in-flight events (even
+	 * on a single ring), so looping will work.
+	 */
+	do {
+		enqueued +=
+			rte_event_ring_enqueue_burst(dest_port->in_ring,
+						     buffer+enqueued,
+						     *buffer_len-enqueued,
+						     NULL);
+	} while (unlikely(enqueued != *buffer_len));
+
+	(*buffer_len) = 0;
+}
+
+static uint16_t
+dsw_port_get_parallel_flow_id(struct dsw_port *port)
+{
+	uint16_t flow_id = port->next_parallel_flow_id;
+
+	port->next_parallel_flow_id =
+		(port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
+
+	return flow_id;
+}
+
+static void
+dsw_port_buffer_paused(struct dsw_port *port,
+		       const struct rte_event *paused_event)
+{
+	port->paused_events[port->paused_events_len] = *paused_event;
+	port->paused_events_len++;
+}
+
+static void
+dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
+			   uint8_t dest_port_id, const struct rte_event *event)
+{
+	struct rte_event *buffer = source_port->out_buffer[dest_port_id];
+	uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
+
+	if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
+		dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
+
+	buffer[*buffer_len] = *event;
+
+	(*buffer_len)++;
+}
+
+#define DSW_FLOW_ID_BITS (24)
+static uint16_t
+dsw_flow_id_hash(uint32_t flow_id)
+{
+	uint16_t hash = 0;
+	uint16_t offset = 0;
+
+	do {
+		hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
+		offset += DSW_MAX_FLOWS_BITS;
+	} while (offset < DSW_FLOW_ID_BITS);
+
+	return hash;
+}
+
+static void
+dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
+			 struct rte_event event)
+{
+	uint8_t dest_port_id;
+
+	event.flow_id = dsw_port_get_parallel_flow_id(source_port);
+
+	dest_port_id = dsw_schedule(dsw, event.queue_id,
+				    dsw_flow_id_hash(event.flow_id));
+
+	dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
+}
+
+static void
+dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
+		      const struct rte_event *event)
+{
+	uint16_t flow_hash;
+	uint8_t dest_port_id;
+
+	if (unlikely(dsw->queues[event->queue_id].schedule_type ==
+		     RTE_SCHED_TYPE_PARALLEL)) {
+		dsw_port_buffer_parallel(dsw, source_port, *event);
+		return;
+	}
+
+	flow_hash = dsw_flow_id_hash(event->flow_id);
+
+	if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
+					     flow_hash))) {
+		dsw_port_buffer_paused(source_port, event);
+		return;
+	}
+
+	dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
+
+	dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
+}
+
+static void
+dsw_port_flush_paused_events(struct dsw_evdev *dsw,
+			     struct dsw_port *source_port,
+			     uint8_t queue_id, uint16_t paused_flow_hash)
+{
+	uint16_t paused_events_len = source_port->paused_events_len;
+	struct rte_event paused_events[paused_events_len];
+	uint8_t dest_port_id;
+	uint16_t i;
+
+	if (paused_events_len == 0)
+		return;
+
+	if (dsw_port_is_flow_paused(source_port, queue_id, paused_flow_hash))
+		return;
+
+	rte_memcpy(paused_events, source_port->paused_events,
+		   paused_events_len * sizeof(struct rte_event));
+
+	source_port->paused_events_len = 0;
+
+	dest_port_id = dsw_schedule(dsw, queue_id, paused_flow_hash);
+
+	for (i = 0; i < paused_events_len; i++) {
+		struct rte_event *event = &paused_events[i];
+		uint16_t flow_hash;
+
+		flow_hash = dsw_flow_id_hash(event->flow_id);
+
+		if (event->queue_id == queue_id &&
+		    flow_hash == paused_flow_hash)
+			dsw_port_buffer_non_paused(dsw, source_port,
+						   dest_port_id, event);
+		else
+			dsw_port_buffer_paused(source_port, event);
+	}
+}
+
+#ifdef DSW_XSTATS
+static void
+dsw_port_migration_stats(struct dsw_port *port)
+{
+	uint64_t migration_latency;
+
+	migration_latency = (rte_get_timer_cycles() - port->migration_start);
+	port->migration_latency += migration_latency;
+	port->migrations++;
+}
+#endif
+
+static void
+dsw_port_end_migration(struct dsw_evdev *dsw, struct dsw_port *port)
+{
+	uint8_t queue_id = port->migration_target_qf.queue_id;
+	uint16_t flow_hash = port->migration_target_qf.flow_hash;
+
+	port->migration_state = DSW_MIGRATION_STATE_IDLE;
+	port->seen_events_len = 0;
+
+#ifdef DSW_XSTATS
+	dsw_port_migration_stats(port);
+#endif
+
+	if (dsw->queues[queue_id].schedule_type != RTE_SCHED_TYPE_PARALLEL) {
+		dsw_port_remove_paused_flow(port, queue_id, flow_hash);
+		dsw_port_flush_paused_events(dsw, port, queue_id, flow_hash);
+	}
+
+	DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for queue_id "
+			"%d flow_hash %d.\n", queue_id, flow_hash);
+}
+
+static void
+dsw_port_consider_migration(struct dsw_evdev *dsw,
+			    struct dsw_port *source_port,
+			    uint64_t now)
+{
+	bool any_port_below_limit;
+	struct dsw_queue_flow *seen_events = source_port->seen_events;
+	uint16_t seen_events_len = source_port->seen_events_len;
+	struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
+	uint16_t num_bursts;
+	int16_t source_port_load;
+	int16_t port_loads[dsw->num_ports];
+
+	if (now < source_port->next_migration)
+		return;
+
+	if (dsw->num_ports == 1)
+		return;
+
+	DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering migration.\n");
+
+	/* Randomize interval to avoid having all threads considering
+	 * migration at the same in point in time, which might lead to
+	 * all choosing the same target port.
+	 */
+	source_port->next_migration = now +
+		source_port->migration_interval / 2 +
+		rte_rand() % source_port->migration_interval;
+
+	if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
+		DSW_LOG_DP_PORT(DEBUG, source_port->id,
+				"Migration already in progress.\n");
+		return;
+	}
+
+	/* For simplicity, avoid migration in the unlikely case there
+	 * is still events to consume in the in_buffer (from the last
+	 * migration).
+	 */
+	if (source_port->in_buffer_len > 0) {
+		DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
+				"events in the input buffer.\n");
+		return;
+	}
+
+	source_port_load = rte_atomic16_read(&source_port->load);
+	if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
+		DSW_LOG_DP_PORT(DEBUG, source_port->id,
+				"Load %d is below threshold level %d.\n",
+				DSW_LOAD_TO_PERCENT(source_port_load),
+		       DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
+		return;
+	}
+
+	/* Avoid starting any expensive operations (sorting etc), in
+	 * case of a scenario with all ports above the load limit.
+	 */
+	any_port_below_limit =
+		dsw_retrieve_port_loads(dsw, port_loads,
+					DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
+	if (!any_port_below_limit) {
+		DSW_LOG_DP_PORT(DEBUG, source_port->id,
+				"Candidate target ports are all too highly "
+				"loaded.\n");
+		return;
+	}
+
+	/* Sort flows into 'bursts' to allow attempting to migrating
+	 * small (but still active) flows first - this it to avoid
+	 * having large flows moving around the worker cores too much
+	 * (to avoid cache misses, among other things). Of course, the
+	 * number of recorded events (queue+flow ids) are limited, and
+	 * provides only a snapshot, so only so many conclusions can
+	 * be drawn from this data.
+	 */
+	num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
+					    bursts);
+	/* For non-big-little systems, there's no point in moving the
+	 * only (known) flow.
+	 */
+	if (num_bursts < 2) {
+		DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow "
+				"queue_id %d flow_hash %d has been seen.\n",
+				bursts[0].queue_flow.queue_id,
+				bursts[0].queue_flow.flow_hash);
+		return;
+	}
+
+	/* The strategy is to first try to find a flow to move to a
+	 * port with low load (below the migration-attempt
+	 * threshold). If that fails, we try to find a port which is
+	 * below the max threshold, and also less loaded than this
+	 * port is.
+	 */
+	if (!dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
+					 port_loads,
+					 DSW_MIN_SOURCE_LOAD_FOR_MIGRATION,
+					 &source_port->migration_target_qf,
+					 &source_port->migration_target_port_id)
+	    &&
+	    !dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
+					 port_loads,
+					 DSW_MAX_TARGET_LOAD_FOR_MIGRATION,
+					 &source_port->migration_target_qf,
+				       &source_port->migration_target_port_id))
+		return;
+
+	DSW_LOG_DP_PORT(DEBUG, source_port->id, "Migrating queue_id %d "
+			"flow_hash %d from port %d to port %d.\n",
+			source_port->migration_target_qf.queue_id,
+			source_port->migration_target_qf.flow_hash,
+			source_port->id, source_port->migration_target_port_id);
+#if 0
+	printf("Migrating queue_id %d "
+			"flow_hash %d from port %d to port %d.\n",
+			source_port->migration_target_qf.queue_id,
+			source_port->migration_target_qf.flow_hash,
+			source_port->id, source_port->migration_target_port_id);
+#endif
+
+	/* We have a winner. */
+
+	source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
+#ifdef DSW_XSTATS
+	source_port->migration_start = rte_get_timer_cycles();
+#endif
+
+	/* No need to go through the whole pause procedure for
+	 * parallel queues, since atomic/ordered semantics need not to
+	 * be maintained.
+	 */
+
+	if (dsw->queues[source_port->migration_target_qf.queue_id].schedule_type
+	    == RTE_SCHED_TYPE_PARALLEL) {
+		uint8_t queue_id = source_port->migration_target_qf.queue_id;
+		uint16_t flow_hash = source_port->migration_target_qf.flow_hash;
+		uint8_t dest_port_id = source_port->migration_target_port_id;
+
+		/* Single byte-sized stores are always atomic. */
+		dsw->queues[queue_id].flow_to_port_map[flow_hash] =
+			dest_port_id;
+		rte_smp_wmb();
+
+		dsw_port_end_migration(dsw, source_port);
+
+		return;
+	}
+
+	/* There might be 'loopback' events already scheduled in the
+	 * output buffers.
+	 */
+	dsw_port_flush_out_buffers(dsw, source_port);
+
+	dsw_port_add_paused_flow(source_port,
+				 source_port->migration_target_qf.queue_id,
+				 source_port->migration_target_qf.flow_hash);
+
+	dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
+			       source_port->migration_target_qf.queue_id,
+			       source_port->migration_target_qf.flow_hash);
+	source_port->cfm_cnt = 0;
+}
+
+static void
+dsw_port_flush_paused_events(struct dsw_evdev *dsw,
+			     struct dsw_port *source_port,
+			     uint8_t queue_id, uint16_t paused_flow_hash);
+
+static void
+dsw_port_handle_unpause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
+			     uint8_t originating_port_id, uint8_t queue_id,
+			     uint16_t paused_flow_hash)
+{
+	struct dsw_ctl_msg cfm = {
+		.type = DSW_CTL_CFM,
+		.originating_port_id = port->id,
+		.queue_id = queue_id,
+		.flow_hash = paused_flow_hash
+	};
+
+	DSW_LOG_DP_PORT(DEBUG, port->id, "Un-pausing queue_id %d flow_hash %d.\n",
+			queue_id, paused_flow_hash);
+
+	dsw_port_remove_paused_flow(port, queue_id, paused_flow_hash);
+
+	rte_smp_rmb();
+
+	dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
+
+	dsw_port_flush_paused_events(dsw, port, queue_id, paused_flow_hash);
+}
+
+#define FORWARD_BURST_SIZE (32)
+
+static void
+dsw_port_forward_migrated_flow(struct dsw_port *source_port,
+			       struct rte_event_ring *dest_ring,
+			       uint8_t queue_id,
+			       uint16_t flow_hash)
+{
+	uint16_t events_left;
+
+	/* Control ring message should been seen before the ring count
+	 * is read on the port's in_ring.
+	 */
+	rte_smp_rmb();
+
+	events_left = rte_event_ring_count(source_port->in_ring);
+
+	while (events_left > 0) {
+		uint16_t in_burst_size =
+			RTE_MIN(FORWARD_BURST_SIZE, events_left);
+		struct rte_event in_burst[in_burst_size];
+		uint16_t in_len;
+		uint16_t i;
+
+		in_len = rte_event_ring_dequeue_burst(source_port->in_ring,
+						      in_burst,
+						      in_burst_size, NULL);
+		/* No need to care about bursting forwarded events (to
+		 * the destination port's in_ring), since migration
+		 * doesn't happen very often, and also the majority of
+		 * the dequeued events will likely *not* be forwarded.
+		 */
+		for (i = 0; i < in_len; i++) {
+			struct rte_event *e = &in_burst[i];
+			if (e->queue_id == queue_id &&
+			    dsw_flow_id_hash(e->flow_id) == flow_hash) {
+				while (rte_event_ring_enqueue_burst(dest_ring,
+								    e, 1,
+								    NULL) != 1)
+					rte_pause();
+			} else {
+				uint16_t last_idx = source_port->in_buffer_len;
+				source_port->in_buffer[last_idx] = *e;
+				source_port->in_buffer_len++;
+			}
+		}
+
+		events_left -= in_len;
+	}
+}
+
+static void
+dsw_port_move_migrating_flow(struct dsw_evdev *dsw,
+			     struct dsw_port *source_port)
+{
+	uint8_t queue_id = source_port->migration_target_qf.queue_id;
+	uint16_t flow_hash = source_port->migration_target_qf.flow_hash;
+	uint8_t dest_port_id = source_port->migration_target_port_id;
+	struct dsw_port *dest_port = &dsw->ports[dest_port_id];
+
+	dsw_port_flush_out_buffers(dsw, source_port);
+
+	rte_smp_wmb();
+
+	dsw->queues[queue_id].flow_to_port_map[flow_hash] =
+		dest_port_id;
+
+	dsw_port_forward_migrated_flow(source_port, dest_port->in_ring,
+				       queue_id, flow_hash);
+
+	/* Flow table update and migration destination port's enqueues
+	 * must be seen before the control message.
+	 */
+	rte_smp_wmb();
+
+	dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ, queue_id,
+			       flow_hash);
+	source_port->cfm_cnt = 0;
+	source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
+}
+
+static void
+dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
+{
+	port->cfm_cnt++;
+
+	if (port->cfm_cnt == (dsw->num_ports-1)) {
+		switch (port->migration_state) {
+		case DSW_MIGRATION_STATE_PAUSING:
+			DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding "
+					"migration state.\n");
+			port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
+			break;
+		case DSW_MIGRATION_STATE_UNPAUSING:
+			dsw_port_end_migration(dsw, port);
+			break;
+		default:
+			RTE_ASSERT(0);
+			break;
+		}
+	}
+}
+
+static void
+dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
+{
+	struct dsw_ctl_msg msg;
+
+	/* So any table loads happens before the ring dequeue, in the
+	 * case of a 'paus' message.
+	 */
+	rte_smp_rmb();
+
+	if (dsw_port_ctl_dequeue(port, &msg) == 0) {
+		switch (msg.type) {
+		case DSW_CTL_PAUS_REQ:
+			dsw_port_handle_pause_flow(dsw, port,
+						   msg.originating_port_id,
+						   msg.queue_id, msg.flow_hash);
+			break;
+		case DSW_CTL_UNPAUS_REQ:
+			dsw_port_handle_unpause_flow(dsw, port,
+						     msg.originating_port_id,
+						     msg.queue_id,
+						     msg.flow_hash);
+			break;
+		case DSW_CTL_CFM:
+			dsw_port_handle_confirm(dsw, port);
+			break;
+		}
+	}
+}
+
+static void
+dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
+{
+	/* To pull the control ring reasonbly often on busy ports,
+	 * each dequeued/enqueued event is considered an 'op' too.
+	 */
+	port->ops_since_bg_task += (num_events+1);
+}
+
+static void
+dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
+{
+	if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&
+		     port->pending_releases == 0))
+		dsw_port_move_migrating_flow(dsw, port);
+
+	/* Polling the control ring is relatively inexpensive, and
+	 * polling it often helps bringing down migration latency, so
+	 * do this for every iteration.
+	 */
+	dsw_port_ctl_process(dsw, port);
+
+	/* To avoid considering migration and flushing output buffers,
+	 * and polling control rings on every dequeue/enqueue call,
+	 * the scheduler only performs such 'background' tasks every
+	 * nth (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.
+	 */
+	if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
+		uint64_t now;
+
+		now = rte_get_timer_cycles();
+
+#ifdef DSW_XSTATS
+		port->last_bg = now;
+#endif
+
+		/* Logic to avoid having events linger in the output
+		 * buffer too long.
+		 */
+		dsw_port_flush_out_buffers(dsw, port);
+
+		dsw_port_consider_load_update(port, now);
+
+		dsw_port_consider_migration(dsw, port, now);
+
+		port->ops_since_bg_task = 0;
+	}
+}
+
+static void
+dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
+{
+	uint16_t dest_port_id;
+
+	for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
+		dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
+}
+
+uint16_t
+dsw_event_enqueue(void *port, const struct rte_event *ev)
+{
+	return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
+}
+
+static __rte_always_inline uint16_t
+dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
+				uint16_t events_len, bool op_types_known,
+				uint16_t num_new, uint16_t num_release,
+				uint16_t num_non_release)
+{
+	struct dsw_port *source_port = port;
+	struct dsw_evdev *dsw = source_port->dsw;
+	bool enough_credits;
+	uint16_t i;
+
+	DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
+			"events to port %d.\n", events_len, source_port->id);
+
+	dsw_port_bg_process(dsw, source_port);
+
+	/* XXX: For performance (=ring efficiency) reasons, the
+	 * scheduler relies on internal non-ring buffers instead of
+	 * immediately sending the event to the destination ring. For
+	 * a producer that doesn't intend to produce or consume any
+	 * more events, the scheduler provides a way to flush the
+	 * buffer, by means of doing an enqueue of zero events. In
+	 * addition, a port cannot be left "unattended" (e.g. unused)
+	 * for long periods of time, since that would stall
+	 * migration. Eventdev API extensions to provide a cleaner way
+	 * to archieve both of these functions should be
+	 * considered.
+	 */
+	if (unlikely(events_len == 0)) {
+		dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
+		dsw_port_flush_out_buffers(dsw, source_port);
+		return 0;
+	}
+
+	if (unlikely(events_len > source_port->enqueue_depth))
+		events_len = source_port->enqueue_depth;
+
+	dsw_port_note_op(source_port, events_len);
+
+	if (!op_types_known)
+		for (i = 0; i < events_len; i++) {
+			switch (events[i].op) {
+			case RTE_EVENT_OP_RELEASE:
+				num_release++;
+				break;
+			case RTE_EVENT_OP_NEW:
+				num_new++;
+				/* Falls through. */
+			default:
+				num_non_release++;
+				break;
+			}
+		}
+
+	/* Technically, we could allow the non-new events up to the
+	 * first new event in the array into the system, but for
+	 * simplicity reasons, we deny the whole burst if the port is
+	 * above the water mark.
+	 */
+	if (unlikely(num_new > 0 && rte_atomic32_read(&dsw->credits_on_loan) >
+		     source_port->new_event_threshold))
+		return 0;
+
+	enough_credits = dsw_port_acquire_credits(dsw, source_port,
+						  num_non_release);
+	if (unlikely(!enough_credits))
+		return 0;
+
+	source_port->pending_releases -= num_release;
+
+#ifdef DSW_XSTATS
+	dsw_port_enqueue_stats(source_port, num_new,
+			       num_non_release-num_new, num_release);
+#endif
+
+	for (i = 0; i < events_len; i++) {
+		const struct rte_event *event = &events[i];
+
+		if (likely(num_release == 0 ||
+			   event->op != RTE_EVENT_OP_RELEASE))
+			dsw_port_buffer_event(dsw, source_port, event);
+#ifdef DSW_XSTATS
+		dsw_port_queue_enqueue_stats(source_port, event->queue_id);
+#endif
+	}
+
+	DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
+			"accepted.\n", num_non_release);
+
+	return num_non_release;
+}
+
+uint16_t
+dsw_event_enqueue_burst(void *port, const struct rte_event events[],
+			uint16_t events_len)
+{
+	return dsw_event_enqueue_burst_generic(port, events, events_len, false,
+					       0, 0, 0);
+}
+
+uint16_t
+dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
+			    uint16_t events_len)
+{
+	return dsw_event_enqueue_burst_generic(port, events, events_len, true,
+					       events_len, 0, events_len);
+}
+
+uint16_t
+dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
+				uint16_t events_len)
+{
+	return dsw_event_enqueue_burst_generic(port, events, events_len, true,
+					       0, 0, events_len);
+}
+
+uint16_t
+dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
+{
+	return dsw_event_dequeue_burst(port, events, 1, wait);
+}
+
+static void
+dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
+			    uint16_t num)
+{
+	uint16_t i;
+
+#ifdef DSW_XSTATS
+	dsw_port_dequeue_stats(port, num);
+#endif
+
+	for (i = 0; i < num; i++) {
+		uint16_t l_idx = port->seen_events_idx;
+		struct dsw_queue_flow *qf = &port->seen_events[l_idx];
+		struct rte_event *event = &events[i];
+		qf->queue_id = event->queue_id;
+		qf->flow_hash = dsw_flow_id_hash(event->flow_id);
+
+		port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
+
+#ifdef DSW_XSTATS
+		dsw_port_queue_dequeued_stats(port, event->queue_id);
+#endif
+	}
+
+	if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
+		port->seen_events_len =
+			RTE_MIN(port->seen_events_len + num,
+				DSW_MAX_EVENTS_RECORDED);
+}
+
+#ifdef DSW_SORT_DEQUEUED
+
+#define DSW_EVENT_TO_INT(_event)				\
+	((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))
+
+static inline int
+dsw_cmp_event(const void *v_event_a, const void *v_event_b)
+{
+	const struct rte_event *event_a = v_event_a;
+	const struct rte_event *event_b = v_event_b;
+
+	return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);
+}
+#endif
+
+static uint16_t
+dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
+		       uint16_t num)
+{
+	struct dsw_port *source_port = port;
+	struct dsw_evdev *dsw = source_port->dsw;
+
+	dsw_port_ctl_process(dsw, source_port);
+
+	if (unlikely(port->in_buffer_len > 0)) {
+		uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);
+
+		rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
+			   dequeued * sizeof(struct rte_event));
+
+		port->in_buffer_start += dequeued;
+		port->in_buffer_len -= dequeued;
+
+		if (port->in_buffer_len == 0)
+			port->in_buffer_start = 0;
+
+		return dequeued;
+	}
+
+	return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
+}
+
+uint16_t
+dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
+			uint64_t wait __rte_unused)
+{
+	struct dsw_port *source_port = port;
+	struct dsw_evdev *dsw = source_port->dsw;
+	uint16_t dequeued;
+
+	source_port->pending_releases = 0;
+
+	dsw_port_bg_process(dsw, source_port);
+
+	if (unlikely(num > source_port->dequeue_depth))
+		num = source_port->dequeue_depth;
+
+	dequeued = dsw_port_dequeue_burst(source_port, events, num);
+
+	source_port->pending_releases = dequeued;
+
+	dsw_port_load_record(source_port, dequeued);
+
+	dsw_port_note_op(source_port, dequeued);
+
+	if (dequeued > 0) {
+		DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
+				dequeued);
+
+		dsw_port_return_credits(dsw, source_port, dequeued);
+
+		/* One potential optimization one might think of is to
+		 * add a migration state (prior to 'pausing'), and
+		 * only record seen events when the port is in this
+		 * state (and transit to 'pausing' when enough events
+		 * have been gathered). However, that schema doesn't
+		 * seem to improve performance.
+		 */
+		dsw_port_record_seen_events(port, events, dequeued);
+	}
+	/* XXX: Assuming the port can't produce any more work,
+	 *	consider flushing the output buffer, on dequeued ==
+	 *	0.
+	 */
+
+#ifdef DSW_SORT_DEQUEUED
+	dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);
+#endif
+
+	return dequeued;
+}
+
+void dsw_event_schedule(struct rte_eventdev *dev __rte_unused)
+{
+}
diff --git a/drivers/event/dsw/dsw_sort.h b/drivers/event/dsw/dsw_sort.h
new file mode 100644
index 000000000..ff8ef82f2
--- /dev/null
+++ b/drivers/event/dsw/dsw_sort.h
@@ -0,0 +1,47 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2018 Ericsson AB
+ */
+
+#ifndef _DSW_SORT_
+#define _DSW_SORT_
+
+#include <rte_common.h>
+#include <string.h>
+
+#define DSW_ARY_ELEM_PTR(_ary, _idx, _elem_size)	\
+	RTE_PTR_ADD(_ary, (_idx) * (_elem_size))
+
+#define DSW_ARY_ELEM_SWAP(_ary, _a_idx, _b_idx, _elem_size)		\
+	do {								\
+		char tmp[_elem_size];					\
+		void *_a_ptr = DSW_ARY_ELEM_PTR(_ary, _a_idx, _elem_size); \
+		void *_b_ptr = DSW_ARY_ELEM_PTR(_ary, _b_idx, _elem_size); \
+		memcpy(tmp, _a_ptr, _elem_size);			\
+		memcpy(_a_ptr, _b_ptr, _elem_size);			\
+		memcpy(_b_ptr, tmp, _elem_size);			\
+	} while (0)
+
+static inline void
+dsw_insertion_sort(void *ary, uint16_t len, uint16_t elem_size,
+		   int (*cmp_fn)(const void *, const void *))
+{
+	uint16_t i;
+
+	for (i = 1; i < len; i++) {
+		uint16_t j;
+		for (j = i; j > 0 &&
+			     cmp_fn(DSW_ARY_ELEM_PTR(ary, j-1, elem_size),
+				    DSW_ARY_ELEM_PTR(ary, j, elem_size)) > 0;
+		     j--)
+			DSW_ARY_ELEM_SWAP(ary, j, j-1, elem_size);
+	}
+}
+
+static inline void
+dsw_stable_sort(void *ary, uint16_t len, uint16_t elem_size,
+		int (*cmp_fn)(const void *, const void *))
+{
+	dsw_insertion_sort(ary, len, elem_size, cmp_fn);
+}
+
+#endif
diff --git a/drivers/event/dsw/dsw_xstats.c b/drivers/event/dsw/dsw_xstats.c
new file mode 100644
index 000000000..d688ee07e
--- /dev/null
+++ b/drivers/event/dsw/dsw_xstats.c
@@ -0,0 +1,284 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2018 Ericsson AB
+ */
+
+#include "dsw_evdev.h"
+#include <rte_debug.h>
+#include <string.h>
+
+#ifdef DSW_XSTATS
+
+/* The high bits in the xstats id is used to store an additional
+ * parameter (beyond the queue or port id already in the xstats
+ * interface).
+ */
+#define DSW_XSTATS_ID_PARAM_BITS (8)
+#define DSW_XSTATS_ID_STAT_BITS					\
+	(sizeof(unsigned int)*CHAR_BIT - DSW_XSTATS_ID_PARAM_BITS)
+#define DSW_XSTATS_ID_STAT_MASK ((1 << DSW_XSTATS_ID_STAT_BITS) - 1)
+
+#define DSW_XSTATS_ID_GET_PARAM(id)		\
+	((id)>>DSW_XSTATS_ID_STAT_BITS)
+
+#define DSW_XSTATS_ID_GET_STAT(id)		\
+	((id) & DSW_XSTATS_ID_STAT_MASK)
+
+#define DSW_XSTATS_ID_CREATE(id, param_value)			\
+	(((param_value) << DSW_XSTATS_ID_STAT_BITS) | id)
+
+typedef
+uint64_t (*dsw_xstats_dev_get_value_fn)(struct dsw_evdev *dsw);
+
+struct dsw_xstat_dev {
+	const char *name;
+	dsw_xstats_dev_get_value_fn get_value_fn;
+};
+
+typedef
+uint64_t (*dsw_xstats_port_get_value_fn)(struct dsw_evdev *dsw,
+					 uint8_t port_id, uint8_t queue_id);
+
+struct dsw_xstats_port {
+	const char *name_fmt;
+	dsw_xstats_port_get_value_fn get_value_fn;
+	bool per_queue;
+};
+
+static uint64_t
+dsw_xstats_dev_credits_on_loan(struct dsw_evdev *dsw)
+{
+	return rte_atomic32_read(&dsw->credits_on_loan);
+}
+
+static struct dsw_xstat_dev dsw_dev_xstats[] = {
+	{ "dev_credits_on_loan", dsw_xstats_dev_credits_on_loan }
+};
+
+#define DSW_GEN_PORT_ACCESS_FN(_variable)				\
+	static uint64_t							\
+	dsw_xstats_port_get_ ## _variable(struct dsw_evdev *dsw,	\
+					  uint8_t port_id,		\
+					  uint8_t queue_id __rte_unused) \
+	{								\
+		return dsw->ports[port_id]._variable;			\
+	}
+
+DSW_GEN_PORT_ACCESS_FN(new_enqueued)
+DSW_GEN_PORT_ACCESS_FN(forward_enqueued)
+DSW_GEN_PORT_ACCESS_FN(release_enqueued)
+
+static uint64_t
+dsw_xstats_port_get_queue_enqueued(struct dsw_evdev *dsw, uint8_t port_id,
+				   uint8_t queue_id)
+{
+	return dsw->ports[port_id].queue_enqueued[queue_id];
+}
+
+DSW_GEN_PORT_ACCESS_FN(dequeued)
+
+static uint64_t
+dsw_xstats_port_get_queue_dequeued(struct dsw_evdev *dsw, uint8_t port_id,
+				   uint8_t queue_id)
+{
+	return dsw->ports[port_id].queue_dequeued[queue_id];
+}
+
+DSW_GEN_PORT_ACCESS_FN(migrations)
+
+static uint64_t
+dsw_xstats_port_get_migration_latency(struct dsw_evdev *dsw, uint8_t port_id,
+				      uint8_t queue_id __rte_unused)
+{
+	uint64_t total_latency = dsw->ports[port_id].migration_latency;
+	uint64_t num_migrations = dsw->ports[port_id].migrations;
+
+	return num_migrations > 0 ? total_latency / num_migrations : 0;
+}
+
+static uint64_t
+dsw_xstats_port_get_event_proc_latency(struct dsw_evdev *dsw, uint8_t port_id,
+				       uint8_t queue_id __rte_unused)
+{
+	uint64_t total_busy_cycles =
+		dsw->ports[port_id].total_busy_cycles;
+	uint64_t dequeued =
+		dsw->ports[port_id].dequeued;
+
+	return dequeued > 0 ? total_busy_cycles / dequeued : 0;
+}
+
+DSW_GEN_PORT_ACCESS_FN(inflight_credits)
+
+static uint64_t
+dsw_xstats_port_get_load(struct dsw_evdev *dsw, uint8_t port_id,
+			 uint8_t queue_id __rte_unused)
+{
+	int16_t load = rte_atomic16_read(&dsw->ports[port_id].load);
+	return DSW_LOAD_TO_PERCENT(load);
+}
+
+DSW_GEN_PORT_ACCESS_FN(last_bg)
+
+static struct dsw_xstats_port dsw_port_xstats[] = {
+	{ "port_%u_new_enqueued", dsw_xstats_port_get_new_enqueued,
+	  false },
+	{ "port_%u_forward_enqueued", dsw_xstats_port_get_forward_enqueued,
+	  false },
+	{ "port_%u_release_enqueued", dsw_xstats_port_get_release_enqueued,
+	  false },
+	{ "port_%u_queue_%u_enqueued", dsw_xstats_port_get_queue_enqueued,
+	  true },
+	{ "port_%u_dequeued", dsw_xstats_port_get_dequeued,
+	  false },
+	{ "port_%u_queue_%u_dequeued", dsw_xstats_port_get_queue_dequeued,
+	  true },
+	{ "port_%u_migrations", dsw_xstats_port_get_migrations,
+	  false },
+	{ "port_%u_migration_latency", dsw_xstats_port_get_migration_latency,
+	  false },
+	{ "port_%u_event_proc_latency", dsw_xstats_port_get_event_proc_latency,
+	  false },
+	{ "port_%u_inflight_credits", dsw_xstats_port_get_inflight_credits,
+	  false },
+	{ "port_%u_load", dsw_xstats_port_get_load,
+	  false },
+	{ "port_%u_last_bg", dsw_xstats_port_get_last_bg,
+	  false }
+};
+
+static int
+dsw_xstats_dev_get_names(struct rte_event_dev_xstats_name *xstats_names,
+			 unsigned int *ids, unsigned int size)
+{
+	unsigned int i;
+
+	for (i = 0; i < RTE_DIM(dsw_dev_xstats) && i < size; i++) {
+		ids[i] = i;
+		strcpy(xstats_names[i].name, dsw_dev_xstats[i].name);
+	}
+
+	return i;
+}
+
+static int
+dsw_xstats_port_get_names(struct dsw_evdev *dsw, uint8_t port_id,
+			  struct rte_event_dev_xstats_name *xstats_names,
+			  unsigned int *ids, unsigned int size)
+{
+	uint8_t queue_id = 0;
+	unsigned int id_idx;
+	unsigned int stat_idx;
+
+	for (id_idx = 0, stat_idx = 0;
+	     id_idx < size && stat_idx < RTE_DIM(dsw_port_xstats);
+	     id_idx++) {
+		struct dsw_xstats_port *xstat = &dsw_port_xstats[stat_idx];
+
+		if (xstat->per_queue) {
+			ids[id_idx] = DSW_XSTATS_ID_CREATE(stat_idx, queue_id);
+			snprintf(xstats_names[id_idx].name,
+				 RTE_EVENT_DEV_XSTATS_NAME_SIZE,
+				 dsw_port_xstats[stat_idx].name_fmt, port_id,
+				 queue_id);
+			queue_id++;
+		} else {
+			ids[id_idx] = stat_idx;
+			snprintf(xstats_names[id_idx].name,
+				 RTE_EVENT_DEV_XSTATS_NAME_SIZE,
+				 dsw_port_xstats[stat_idx].name_fmt, port_id);
+		}
+
+		if (!(xstat->per_queue && queue_id < dsw->num_queues)) {
+			stat_idx++;
+			queue_id = 0;
+		}
+	}
+	return id_idx;
+}
+
+int
+dsw_xstats_get_names(const struct rte_eventdev *dev,
+		     enum rte_event_dev_xstats_mode mode,
+		     uint8_t queue_port_id,
+		     struct rte_event_dev_xstats_name *xstats_names,
+		     unsigned int *ids, unsigned int size)
+{
+	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+
+	switch (mode) {
+	case RTE_EVENT_DEV_XSTATS_DEVICE:
+		return dsw_xstats_dev_get_names(xstats_names, ids, size);
+	case RTE_EVENT_DEV_XSTATS_PORT:
+		return dsw_xstats_port_get_names(dsw, queue_port_id,
+						 xstats_names, ids, size);
+	case RTE_EVENT_DEV_XSTATS_QUEUE:
+		return 0;
+	default:
+		RTE_ASSERT(false);
+		return -1;
+	}
+}
+
+static int
+dsw_xstats_dev_get(const struct rte_eventdev *dev,
+		   const unsigned int ids[], uint64_t values[], unsigned int n)
+{
+	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+	unsigned int i;
+	for (i = 0; i < n; i++) {
+		unsigned int id = ids[i];
+		struct dsw_xstat_dev *xstat = &dsw_dev_xstats[id];
+		values[i] = xstat->get_value_fn(dsw);
+	}
+	return n;
+}
+
+static int
+dsw_xstats_port_get(const struct rte_eventdev *dev, uint8_t port_id,
+		    const unsigned int ids[], uint64_t values[], unsigned int n)
+{
+	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+	unsigned int i;
+	for (i = 0; i < n; i++) {
+		unsigned int id = ids[i];
+		unsigned int stat_idx = DSW_XSTATS_ID_GET_STAT(id);
+		struct dsw_xstats_port *xstat = &dsw_port_xstats[stat_idx];
+		uint8_t queue_id = 0;
+
+		if (xstat->per_queue)
+			queue_id = DSW_XSTATS_ID_GET_PARAM(id);
+
+		values[i] = xstat->get_value_fn(dsw, port_id, queue_id);
+	}
+	return n;
+}
+
+int
+dsw_xstats_get(const struct rte_eventdev *dev,
+	       enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id,
+	       const unsigned int ids[], uint64_t values[], unsigned int n)
+{
+	switch (mode) {
+	case RTE_EVENT_DEV_XSTATS_DEVICE:
+		return dsw_xstats_dev_get(dev, ids, values, n);
+	case RTE_EVENT_DEV_XSTATS_PORT:
+		return dsw_xstats_port_get(dev, queue_port_id, ids, values, n);
+	case RTE_EVENT_DEV_XSTATS_QUEUE:
+		return 0;
+	default:
+		RTE_ASSERT(false);
+		return -1;
+	}
+	return 0;
+}
+
+uint64_t dsw_xstats_get_by_name(const struct rte_eventdev *dev,
+				const char *name, unsigned int *id)
+{
+	RTE_SET_USED(dev);
+	RTE_SET_USED(name);
+	RTE_SET_USED(id);
+	return 0;
+}
+
+#endif
diff --git a/drivers/event/dsw/rte_pmd_evdev_dsw_version.map b/drivers/event/dsw/rte_pmd_evdev_dsw_version.map
new file mode 100644
index 000000000..ad6e191e4
--- /dev/null
+++ b/drivers/event/dsw/rte_pmd_evdev_dsw_version.map
@@ -0,0 +1,3 @@
+DPDK_18.08 {
+	local: *;
+};
diff --git a/mk/rte.app.mk b/mk/rte.app.mk
index 7bcf6308d..b14f1e112 100644
--- a/mk/rte.app.mk
+++ b/mk/rte.app.mk
@@ -218,6 +218,7 @@ endif # CONFIG_RTE_LIBRTE_COMPRESSDEV
 ifeq ($(CONFIG_RTE_LIBRTE_EVENTDEV),y)
 _LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_SKELETON_EVENTDEV) += -lrte_pmd_skeleton_event
 _LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += -lrte_pmd_sw_event
+_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += -lrte_pmd_dsw_event
 _LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_OCTEONTX_SSOVF) += -lrte_pmd_octeontx_ssovf
 ifeq ($(CONFIG_RTE_LIBRTE_DPAA_BUS),y)
 _LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_DPAA_EVENTDEV) += -lrte_pmd_dpaa_event
-- 
2.17.1

  reply	other threads:[~2018-07-11 21:22 UTC|newest]

Thread overview: 12+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-07-11 21:21 [dpdk-dev] [RFC 0/1] A Distributed Software Event Device Mattias Rönnblom
2018-07-11 21:21 ` Mattias Rönnblom [this message]
2018-07-12 20:39 ` Wiles, Keith
2018-07-13 17:21   ` Mattias Rönnblom
     [not found] <20180711210844.5467-1-mattias.ronnblom@ericsson.com>
     [not found] ` <20180711210844.5467-2-mattias.ronnblom@ericsson.com>
2018-07-22 11:32   ` [dpdk-dev] [RFC 1/1] eventdev: add distributed software (DSW) event device Jerin Jacob
2018-08-19  6:11     ` Jerin Jacob
2018-08-23 13:08       ` Mattias Rönnblom
2018-08-23 13:44         ` Jerin Jacob
2018-08-23 20:08           ` Mattias Rönnblom
2018-08-27  9:23     ` Mattias Rönnblom
2018-08-27  9:40       ` Jerin Jacob
2018-08-27 10:24         ` Mattias Rönnblom

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20180711212154.5807-2-mattias.ronnblom@ericsson.com \
    --to=mattias.ronnblom@ericsson.com \
    --cc=bruce.richardson@intel.com \
    --cc=dev@dpdk.org \
    --cc=jerin.jacob@caviumnetworks.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).