* [dpdk-dev] [PATCH v3 01/10] event/dsw: add DSW device registration and build system
2018-09-11 8:02 [dpdk-dev] [PATCH v3 00/10] A Distributed Software Event Device Mattias Rönnblom
@ 2018-09-11 8:02 ` Mattias Rönnblom
2018-09-17 11:41 ` Jerin Jacob
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 02/10] event/dsw: add DSW device and queue configuration Mattias Rönnblom
` (8 subsequent siblings)
9 siblings, 1 reply; 19+ messages in thread
From: Mattias Rönnblom @ 2018-09-11 8:02 UTC (permalink / raw)
To: jerin.jacob; +Cc: bruce.richardson, dev, Mattias Rönnblom
This patch contains the Meson and GNU Make build system extensions
required for the Distributed Event Device, and also the initialization
code for the driver itself.
Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
---
config/common_base | 5 ++
drivers/event/Makefile | 1 +
drivers/event/dsw/Makefile | 26 ++++++++++
drivers/event/dsw/dsw_evdev.c | 52 +++++++++++++++++++
drivers/event/dsw/dsw_evdev.h | 16 ++++++
drivers/event/dsw/meson.build | 6 +++
.../event/dsw/rte_pmd_dsw_event_version.map | 3 ++
drivers/event/meson.build | 2 +-
mk/rte.app.mk | 1 +
9 files changed, 111 insertions(+), 1 deletion(-)
create mode 100644 drivers/event/dsw/Makefile
create mode 100644 drivers/event/dsw/dsw_evdev.c
create mode 100644 drivers/event/dsw/dsw_evdev.h
create mode 100644 drivers/event/dsw/meson.build
create mode 100644 drivers/event/dsw/rte_pmd_dsw_event_version.map
diff --git a/config/common_base b/config/common_base
index 4bcbaf923..c43f5139d 100644
--- a/config/common_base
+++ b/config/common_base
@@ -614,6 +614,11 @@ CONFIG_RTE_LIBRTE_PMD_SKELETON_EVENTDEV_DEBUG=n
#
CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV=y
+#
+# Compile PMD for distributed software event device
+#
+CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV=y
+
#
# Compile PMD for octeontx sso event device
#
diff --git a/drivers/event/Makefile b/drivers/event/Makefile
index f301d8dc2..03ad1b6cb 100644
--- a/drivers/event/Makefile
+++ b/drivers/event/Makefile
@@ -6,6 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk
DIRS-$(CONFIG_RTE_LIBRTE_PMD_SKELETON_EVENTDEV) += skeleton
DIRS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw
+DIRS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw
DIRS-$(CONFIG_RTE_LIBRTE_PMD_OCTEONTX_SSOVF) += octeontx
ifeq ($(CONFIG_RTE_LIBRTE_DPAA_BUS),y)
DIRS-$(CONFIG_RTE_LIBRTE_PMD_DPAA_EVENTDEV) += dpaa
diff --git a/drivers/event/dsw/Makefile b/drivers/event/dsw/Makefile
new file mode 100644
index 000000000..5cbf488ff
--- /dev/null
+++ b/drivers/event/dsw/Makefile
@@ -0,0 +1,26 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2018 Ericsson AB
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+LIB = librte_pmd_dsw_event.a
+
+CFLAGS += -DALLOW_EXPERIMENTAL_API
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+CFLAGS += -Wno-format-nonliteral
+
+LDLIBS += -lrte_eal
+LDLIBS += -lrte_mbuf
+LDLIBS += -lrte_mempool
+LDLIBS += -lrte_ring
+LDLIBS += -lrte_eventdev
+LDLIBS += -lrte_bus_vdev
+
+LIBABIVER := 1
+
+EXPORT_MAP := rte_pmd_dsw_event_version.map
+
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw_evdev.c
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c
new file mode 100644
index 000000000..6990bbc9e
--- /dev/null
+++ b/drivers/event/dsw/dsw_evdev.c
@@ -0,0 +1,52 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2018 Ericsson AB
+ */
+
+#include <rte_eventdev_pmd.h>
+#include <rte_eventdev_pmd_vdev.h>
+
+#include "dsw_evdev.h"
+
+#define EVENTDEV_NAME_DSW_PMD event_dsw
+
+static int
+dsw_probe(struct rte_vdev_device *vdev)
+{
+ const char *name;
+ struct rte_eventdev *dev;
+ struct dsw_evdev *dsw;
+
+ name = rte_vdev_device_name(vdev);
+
+ dev = rte_event_pmd_vdev_init(name, sizeof(struct dsw_evdev),
+ rte_socket_id());
+ if (dev == NULL)
+ return -EFAULT;
+
+ if (rte_eal_process_type() != RTE_PROC_PRIMARY)
+ return 0;
+
+ dsw = dev->data->dev_private;
+ dsw->data = dev->data;
+
+ return 0;
+}
+
+static int
+dsw_remove(struct rte_vdev_device *vdev)
+{
+ const char *name;
+
+ name = rte_vdev_device_name(vdev);
+ if (name == NULL)
+ return -EINVAL;
+
+ return rte_event_pmd_vdev_uninit(name);
+}
+
+static struct rte_vdev_driver evdev_dsw_pmd_drv = {
+ .probe = dsw_probe,
+ .remove = dsw_remove
+};
+
+RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_DSW_PMD, evdev_dsw_pmd_drv);
diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
new file mode 100644
index 000000000..9a0f4c357
--- /dev/null
+++ b/drivers/event/dsw/dsw_evdev.h
@@ -0,0 +1,16 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2018 Ericsson AB
+ */
+
+#ifndef _DSW_EVDEV_H_
+#define _DSW_EVDEV_H_
+
+#include <rte_eventdev.h>
+
+#define DSW_PMD_NAME RTE_STR(event_dsw)
+
+struct dsw_evdev {
+ struct rte_eventdev_data *data;
+};
+
+#endif
diff --git a/drivers/event/dsw/meson.build b/drivers/event/dsw/meson.build
new file mode 100644
index 000000000..275d051c3
--- /dev/null
+++ b/drivers/event/dsw/meson.build
@@ -0,0 +1,6 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2018 Ericsson AB
+
+allow_experimental_apis = true
+deps += ['bus_vdev']
+sources = files('dsw_evdev.c')
diff --git a/drivers/event/dsw/rte_pmd_dsw_event_version.map b/drivers/event/dsw/rte_pmd_dsw_event_version.map
new file mode 100644
index 000000000..24bd5cdb3
--- /dev/null
+++ b/drivers/event/dsw/rte_pmd_dsw_event_version.map
@@ -0,0 +1,3 @@
+DPDK_18.11 {
+ local: *;
+};
diff --git a/drivers/event/meson.build b/drivers/event/meson.build
index e95119935..4b9fed22b 100644
--- a/drivers/event/meson.build
+++ b/drivers/event/meson.build
@@ -1,7 +1,7 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright(c) 2017 Intel Corporation
-drivers = ['dpaa', 'dpaa2', 'octeontx', 'skeleton', 'sw']
+drivers = ['dpaa', 'dpaa2', 'octeontx', 'skeleton', 'sw', 'dsw']
std_deps = ['eventdev', 'kvargs']
config_flag_fmt = 'RTE_LIBRTE_@0@_EVENTDEV_PMD'
driver_name_fmt = 'rte_pmd_@0@_event'
diff --git a/mk/rte.app.mk b/mk/rte.app.mk
index de33883be..682e224eb 100644
--- a/mk/rte.app.mk
+++ b/mk/rte.app.mk
@@ -236,6 +236,7 @@ endif # CONFIG_RTE_LIBRTE_COMPRESSDEV
ifeq ($(CONFIG_RTE_LIBRTE_EVENTDEV),y)
_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_SKELETON_EVENTDEV) += -lrte_pmd_skeleton_event
_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += -lrte_pmd_sw_event
+_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += -lrte_pmd_dsw_event
_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_OCTEONTX_SSOVF) += -lrte_pmd_octeontx_ssovf
ifeq ($(CONFIG_RTE_LIBRTE_DPAA_BUS),y)
_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_DPAA_EVENTDEV) += -lrte_pmd_dpaa_event
--
2.17.1
^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [dpdk-dev] [PATCH v3 01/10] event/dsw: add DSW device registration and build system
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 01/10] event/dsw: add DSW device registration and build system Mattias Rönnblom
@ 2018-09-17 11:41 ` Jerin Jacob
0 siblings, 0 replies; 19+ messages in thread
From: Jerin Jacob @ 2018-09-17 11:41 UTC (permalink / raw)
To: Mattias Rönnblom; +Cc: bruce.richardson, dev
-----Original Message-----
> Date: Tue, 11 Sep 2018 10:02:07 +0200
> From: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
> To: jerin.jacob@caviumnetworks.com
> CC: bruce.richardson@intel.com, dev@dpdk.org, Mattias Rönnblom
> <mattias.ronnblom@ericsson.com>
> Subject: [PATCH v3 01/10] event/dsw: add DSW device registration and build
> system
> X-Mailer: git-send-email 2.17.1
>
>
> This patch contains the Meson and GNU Make build system extensions
> required for the Distributed Event Device, and also the initialization
> code for the driver itself.
>
> Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
Acked-by: Jerin Jacob <jerin.jacob@caviumnetworks.com>
^ permalink raw reply [flat|nested] 19+ messages in thread
* [dpdk-dev] [PATCH v3 02/10] event/dsw: add DSW device and queue configuration
2018-09-11 8:02 [dpdk-dev] [PATCH v3 00/10] A Distributed Software Event Device Mattias Rönnblom
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 01/10] event/dsw: add DSW device registration and build system Mattias Rönnblom
@ 2018-09-11 8:02 ` Mattias Rönnblom
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 03/10] event/dsw: add DSW port configuration Mattias Rönnblom
` (7 subsequent siblings)
9 siblings, 0 replies; 19+ messages in thread
From: Mattias Rönnblom @ 2018-09-11 8:02 UTC (permalink / raw)
To: jerin.jacob; +Cc: bruce.richardson, dev, Mattias Rönnblom
Allow queue- and device-level configuration for and retrieval of
contextual information from a DSW event device.
Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
---
drivers/event/dsw/dsw_evdev.c | 87 +++++++++++++++++++++++++++++++++++
drivers/event/dsw/dsw_evdev.h | 28 +++++++++++
2 files changed, 115 insertions(+)
diff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c
index 6990bbc9e..1500d2426 100644
--- a/drivers/event/dsw/dsw_evdev.c
+++ b/drivers/event/dsw/dsw_evdev.c
@@ -9,6 +9,91 @@
#define EVENTDEV_NAME_DSW_PMD event_dsw
+static int
+dsw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id,
+ const struct rte_event_queue_conf *conf)
+{
+ struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+ struct dsw_queue *queue = &dsw->queues[queue_id];
+
+ if (RTE_EVENT_QUEUE_CFG_ALL_TYPES & conf->event_queue_cfg)
+ return -ENOTSUP;
+
+ if (conf->schedule_type == RTE_SCHED_TYPE_ORDERED)
+ return -ENOTSUP;
+
+ /* SINGLE_LINK is better off treated as TYPE_ATOMIC, since it
+ * avoid the "fake" TYPE_PARALLEL flow_id assignment. Since
+ * the queue will only have a single serving port, no
+ * migration will ever happen, so the extra TYPE_ATOMIC
+ * migration overhead is avoided.
+ */
+ if (RTE_EVENT_QUEUE_CFG_SINGLE_LINK & conf->event_queue_cfg)
+ queue->schedule_type = RTE_SCHED_TYPE_ATOMIC;
+ else /* atomic or parallel */
+ queue->schedule_type = conf->schedule_type;
+
+ queue->num_serving_ports = 0;
+
+ return 0;
+}
+
+static void
+dsw_queue_def_conf(struct rte_eventdev *dev __rte_unused,
+ uint8_t queue_id __rte_unused,
+ struct rte_event_queue_conf *queue_conf)
+{
+ *queue_conf = (struct rte_event_queue_conf) {
+ .nb_atomic_flows = 4096,
+ .schedule_type = RTE_SCHED_TYPE_ATOMIC,
+ .priority = RTE_EVENT_DEV_PRIORITY_NORMAL
+ };
+}
+
+static void
+dsw_queue_release(struct rte_eventdev *dev __rte_unused,
+ uint8_t queue_id __rte_unused)
+{
+}
+
+static void
+dsw_info_get(struct rte_eventdev *dev __rte_unused,
+ struct rte_event_dev_info *info)
+{
+ *info = (struct rte_event_dev_info) {
+ .driver_name = DSW_PMD_NAME,
+ .max_event_queues = DSW_MAX_QUEUES,
+ .max_event_queue_flows = DSW_MAX_FLOWS,
+ .max_event_queue_priority_levels = 1,
+ .max_event_priority_levels = 1,
+ .max_event_ports = DSW_MAX_PORTS,
+ .max_event_port_dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH,
+ .max_event_port_enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH,
+ .max_num_events = DSW_MAX_EVENTS,
+ .event_dev_cap = RTE_EVENT_DEV_CAP_BURST_MODE|
+ RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED
+ };
+}
+
+static int
+dsw_configure(const struct rte_eventdev *dev)
+{
+ struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+ const struct rte_event_dev_config *conf = &dev->data->dev_conf;
+
+ dsw->num_queues = conf->nb_event_queues;
+
+ return 0;
+}
+
+static struct rte_eventdev_ops dsw_evdev_ops = {
+ .queue_setup = dsw_queue_setup,
+ .queue_def_conf = dsw_queue_def_conf,
+ .queue_release = dsw_queue_release,
+ .dev_infos_get = dsw_info_get,
+ .dev_configure = dsw_configure,
+};
+
static int
dsw_probe(struct rte_vdev_device *vdev)
{
@@ -23,6 +108,8 @@ dsw_probe(struct rte_vdev_device *vdev)
if (dev == NULL)
return -EFAULT;
+ dev->dev_ops = &dsw_evdev_ops;
+
if (rte_eal_process_type() != RTE_PROC_PRIMARY)
return 0;
diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
index 9a0f4c357..5eda8d114 100644
--- a/drivers/event/dsw/dsw_evdev.h
+++ b/drivers/event/dsw/dsw_evdev.h
@@ -9,8 +9,36 @@
#define DSW_PMD_NAME RTE_STR(event_dsw)
+/* Code changes are required to allow more ports. */
+#define DSW_MAX_PORTS (64)
+#define DSW_MAX_PORT_DEQUEUE_DEPTH (128)
+#define DSW_MAX_PORT_ENQUEUE_DEPTH (128)
+
+#define DSW_MAX_QUEUES (16)
+
+#define DSW_MAX_EVENTS (16384)
+
+/* Code changes are required to allow more flows than 32k. */
+#define DSW_MAX_FLOWS_BITS (15)
+#define DSW_MAX_FLOWS (1<<(DSW_MAX_FLOWS_BITS))
+#define DSW_MAX_FLOWS_MASK (DSW_MAX_FLOWS-1)
+
+struct dsw_queue {
+ uint8_t schedule_type;
+ uint16_t num_serving_ports;
+};
+
struct dsw_evdev {
struct rte_eventdev_data *data;
+
+ struct dsw_queue queues[DSW_MAX_QUEUES];
+ uint8_t num_queues;
};
+static inline struct dsw_evdev *
+dsw_pmd_priv(const struct rte_eventdev *eventdev)
+{
+ return eventdev->data->dev_private;
+}
+
#endif
--
2.17.1
^ permalink raw reply [flat|nested] 19+ messages in thread
* [dpdk-dev] [PATCH v3 03/10] event/dsw: add DSW port configuration
2018-09-11 8:02 [dpdk-dev] [PATCH v3 00/10] A Distributed Software Event Device Mattias Rönnblom
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 01/10] event/dsw: add DSW device registration and build system Mattias Rönnblom
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 02/10] event/dsw: add DSW device and queue configuration Mattias Rönnblom
@ 2018-09-11 8:02 ` Mattias Rönnblom
2018-09-17 12:12 ` Jerin Jacob
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 04/10] event/dsw: add support in DSW for linking/unlinking ports Mattias Rönnblom
` (6 subsequent siblings)
9 siblings, 1 reply; 19+ messages in thread
From: Mattias Rönnblom @ 2018-09-11 8:02 UTC (permalink / raw)
To: jerin.jacob; +Cc: bruce.richardson, dev, Mattias Rönnblom
Allow port setup and release in the DSW event device.
Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
---
drivers/event/dsw/dsw_evdev.c | 60 +++++++++++++++++++++++++++++++++++
drivers/event/dsw/dsw_evdev.h | 28 ++++++++++++++++
2 files changed, 88 insertions(+)
diff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c
index 1500d2426..91b1a2449 100644
--- a/drivers/event/dsw/dsw_evdev.c
+++ b/drivers/event/dsw/dsw_evdev.c
@@ -9,6 +9,62 @@
#define EVENTDEV_NAME_DSW_PMD event_dsw
+static int
+dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
+ const struct rte_event_port_conf *conf)
+{
+ struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+ struct dsw_port *port;
+ struct rte_event_ring *in_ring;
+ char ring_name[RTE_RING_NAMESIZE];
+
+ port = &dsw->ports[port_id];
+
+ *port = (struct dsw_port) {
+ .id = port_id,
+ .dsw = dsw,
+ .dequeue_depth = conf->dequeue_depth,
+ .enqueue_depth = conf->enqueue_depth,
+ .new_event_threshold = conf->new_event_threshold
+ };
+
+ snprintf(ring_name, sizeof(ring_name), "dsw%d_p%u", dev->data->dev_id,
+ port_id);
+
+ in_ring = rte_event_ring_create(ring_name, DSW_IN_RING_SIZE,
+ dev->data->socket_id,
+ RING_F_SC_DEQ|RING_F_EXACT_SZ);
+
+ if (in_ring == NULL)
+ return -ENOMEM;
+
+ port->in_ring = in_ring;
+
+ dev->data->ports[port_id] = port;
+
+ return 0;
+}
+
+static void
+dsw_port_def_conf(struct rte_eventdev *dev __rte_unused,
+ uint8_t port_id __rte_unused,
+ struct rte_event_port_conf *port_conf)
+{
+ *port_conf = (struct rte_event_port_conf) {
+ .new_event_threshold = 1024,
+ .dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH / 4,
+ .enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH / 4
+ };
+}
+
+static void
+dsw_port_release(void *p)
+{
+ struct dsw_port *port = p;
+
+ rte_event_ring_free(port->in_ring);
+}
+
static int
dsw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id,
const struct rte_event_queue_conf *conf)
@@ -81,12 +137,16 @@ dsw_configure(const struct rte_eventdev *dev)
struct dsw_evdev *dsw = dsw_pmd_priv(dev);
const struct rte_event_dev_config *conf = &dev->data->dev_conf;
+ dsw->num_ports = conf->nb_event_ports;
dsw->num_queues = conf->nb_event_queues;
return 0;
}
static struct rte_eventdev_ops dsw_evdev_ops = {
+ .port_setup = dsw_port_setup,
+ .port_def_conf = dsw_port_def_conf,
+ .port_release = dsw_port_release,
.queue_setup = dsw_queue_setup,
.queue_def_conf = dsw_queue_def_conf,
.queue_release = dsw_queue_release,
diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
index 5eda8d114..2a4f10421 100644
--- a/drivers/event/dsw/dsw_evdev.h
+++ b/drivers/event/dsw/dsw_evdev.h
@@ -5,6 +5,7 @@
#ifndef _DSW_EVDEV_H_
#define _DSW_EVDEV_H_
+#include <rte_event_ring.h>
#include <rte_eventdev.h>
#define DSW_PMD_NAME RTE_STR(event_dsw)
@@ -23,6 +24,31 @@
#define DSW_MAX_FLOWS (1<<(DSW_MAX_FLOWS_BITS))
#define DSW_MAX_FLOWS_MASK (DSW_MAX_FLOWS-1)
+/* The rings are dimensioned so that all in-flight events can reside
+ * on any one of the port rings, to avoid the trouble of having to
+ * care about the case where there's no room on the destination port's
+ * input ring.
+ */
+#define DSW_IN_RING_SIZE (DSW_MAX_EVENTS)
+
+struct dsw_port {
+ uint16_t id;
+
+ /* Keeping a pointer here to avoid container_of() calls, which
+ * are expensive since they are very frequent and will result
+ * in an integer multiplication (since the port id is an index
+ * into the dsw_evdev port array).
+ */
+ struct dsw_evdev *dsw;
+
+ uint16_t dequeue_depth;
+ uint16_t enqueue_depth;
+
+ int32_t new_event_threshold;
+
+ struct rte_event_ring *in_ring __rte_cache_aligned;
+} __rte_cache_aligned;
+
struct dsw_queue {
uint8_t schedule_type;
uint16_t num_serving_ports;
@@ -31,6 +57,8 @@ struct dsw_queue {
struct dsw_evdev {
struct rte_eventdev_data *data;
+ struct dsw_port ports[DSW_MAX_PORTS];
+ uint16_t num_ports;
struct dsw_queue queues[DSW_MAX_QUEUES];
uint8_t num_queues;
};
--
2.17.1
^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [dpdk-dev] [PATCH v3 03/10] event/dsw: add DSW port configuration
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 03/10] event/dsw: add DSW port configuration Mattias Rönnblom
@ 2018-09-17 12:12 ` Jerin Jacob
0 siblings, 0 replies; 19+ messages in thread
From: Jerin Jacob @ 2018-09-17 12:12 UTC (permalink / raw)
To: Mattias Rönnblom; +Cc: bruce.richardson, dev
-----Original Message-----
> Date: Tue, 11 Sep 2018 10:02:09 +0200
> From: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
> To: jerin.jacob@caviumnetworks.com
> CC: bruce.richardson@intel.com, dev@dpdk.org, Mattias Rönnblom
> <mattias.ronnblom@ericsson.com>
> Subject: [PATCH v3 03/10] event/dsw: add DSW port configuration
> X-Mailer: git-send-email 2.17.1
>
>
> Allow port setup and release in the DSW event device.
>
> Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
Acked-by: Jerin Jacob <jerin.jacob@caviumnetworks.com>
^ permalink raw reply [flat|nested] 19+ messages in thread
* [dpdk-dev] [PATCH v3 04/10] event/dsw: add support in DSW for linking/unlinking ports
2018-09-11 8:02 [dpdk-dev] [PATCH v3 00/10] A Distributed Software Event Device Mattias Rönnblom
` (2 preceding siblings ...)
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 03/10] event/dsw: add DSW port configuration Mattias Rönnblom
@ 2018-09-11 8:02 ` Mattias Rönnblom
2018-09-17 12:14 ` Jerin Jacob
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 05/10] event/dsw: add DSW event scheduling and device start/stop Mattias Rönnblom
` (5 subsequent siblings)
9 siblings, 1 reply; 19+ messages in thread
From: Mattias Rönnblom @ 2018-09-11 8:02 UTC (permalink / raw)
To: jerin.jacob; +Cc: bruce.richardson, dev, Mattias Rönnblom
Added support for linking and unlinking ports to queues in a DSW event
device.
Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
---
drivers/event/dsw/dsw_evdev.c | 67 +++++++++++++++++++++++++++++++++++
drivers/event/dsw/dsw_evdev.h | 1 +
2 files changed, 68 insertions(+)
diff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c
index 91b1a2449..5dccc232a 100644
--- a/drivers/event/dsw/dsw_evdev.c
+++ b/drivers/event/dsw/dsw_evdev.c
@@ -2,6 +2,8 @@
* Copyright(c) 2018 Ericsson AB
*/
+#include <stdbool.h>
+
#include <rte_eventdev_pmd.h>
#include <rte_eventdev_pmd_vdev.h>
@@ -112,6 +114,69 @@ dsw_queue_release(struct rte_eventdev *dev __rte_unused,
{
}
+static void
+queue_add_port(struct dsw_queue *queue, uint16_t port_id)
+{
+ queue->serving_ports[queue->num_serving_ports] = port_id;
+ queue->num_serving_ports++;
+}
+
+static bool
+queue_remove_port(struct dsw_queue *queue, uint16_t port_id)
+{
+ uint16_t i;
+
+ for (i = 0; i < queue->num_serving_ports; i++)
+ if (queue->serving_ports[i] == port_id) {
+ uint16_t last_idx = queue->num_serving_ports - 1;
+ if (i != last_idx)
+ queue->serving_ports[i] =
+ queue->serving_ports[last_idx];
+ queue->num_serving_ports--;
+ return true;
+ }
+ return false;
+}
+
+static int
+dsw_port_link_unlink(struct rte_eventdev *dev, void *port,
+ const uint8_t queues[], uint16_t num, bool link)
+{
+ struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+ struct dsw_port *p = port;
+ uint16_t i;
+ uint16_t count = 0;
+
+ for (i = 0; i < num; i++) {
+ uint8_t qid = queues[i];
+ struct dsw_queue *q = &dsw->queues[qid];
+ if (link) {
+ queue_add_port(q, p->id);
+ count++;
+ } else {
+ bool removed = queue_remove_port(q, p->id);
+ if (removed)
+ count++;
+ }
+ }
+
+ return count;
+}
+
+static int
+dsw_port_link(struct rte_eventdev *dev, void *port, const uint8_t queues[],
+ const uint8_t priorities[] __rte_unused, uint16_t num)
+{
+ return dsw_port_link_unlink(dev, port, queues, num, true);
+}
+
+static int
+dsw_port_unlink(struct rte_eventdev *dev, void *port, uint8_t queues[],
+ uint16_t num)
+{
+ return dsw_port_link_unlink(dev, port, queues, num, false);
+}
+
static void
dsw_info_get(struct rte_eventdev *dev __rte_unused,
struct rte_event_dev_info *info)
@@ -150,6 +215,8 @@ static struct rte_eventdev_ops dsw_evdev_ops = {
.queue_setup = dsw_queue_setup,
.queue_def_conf = dsw_queue_def_conf,
.queue_release = dsw_queue_release,
+ .port_link = dsw_port_link,
+ .port_unlink = dsw_port_unlink,
.dev_infos_get = dsw_info_get,
.dev_configure = dsw_configure,
};
diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
index 2a4f10421..ad0f857cc 100644
--- a/drivers/event/dsw/dsw_evdev.h
+++ b/drivers/event/dsw/dsw_evdev.h
@@ -51,6 +51,7 @@ struct dsw_port {
struct dsw_queue {
uint8_t schedule_type;
+ uint8_t serving_ports[DSW_MAX_PORTS];
uint16_t num_serving_ports;
};
--
2.17.1
^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [dpdk-dev] [PATCH v3 04/10] event/dsw: add support in DSW for linking/unlinking ports
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 04/10] event/dsw: add support in DSW for linking/unlinking ports Mattias Rönnblom
@ 2018-09-17 12:14 ` Jerin Jacob
0 siblings, 0 replies; 19+ messages in thread
From: Jerin Jacob @ 2018-09-17 12:14 UTC (permalink / raw)
To: Mattias Rönnblom; +Cc: bruce.richardson, dev
-----Original Message-----
> Date: Tue, 11 Sep 2018 10:02:10 +0200
> From: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
> To: jerin.jacob@caviumnetworks.com
> CC: bruce.richardson@intel.com, dev@dpdk.org, Mattias Rönnblom
> <mattias.ronnblom@ericsson.com>
> Subject: [PATCH v3 04/10] event/dsw: add support in DSW for
> linking/unlinking ports
> X-Mailer: git-send-email 2.17.1
>
>
> Added support for linking and unlinking ports to queues in a DSW event
> device.
>
> Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
Acked-by: Jerin Jacob <jerin.jacob@caviumnetworks.com>
^ permalink raw reply [flat|nested] 19+ messages in thread
* [dpdk-dev] [PATCH v3 05/10] event/dsw: add DSW event scheduling and device start/stop
2018-09-11 8:02 [dpdk-dev] [PATCH v3 00/10] A Distributed Software Event Device Mattias Rönnblom
` (3 preceding siblings ...)
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 04/10] event/dsw: add support in DSW for linking/unlinking ports Mattias Rönnblom
@ 2018-09-11 8:02 ` Mattias Rönnblom
2018-09-17 12:22 ` Jerin Jacob
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 06/10] event/dsw: add DSW port load measurements Mattias Rönnblom
` (4 subsequent siblings)
9 siblings, 1 reply; 19+ messages in thread
From: Mattias Rönnblom @ 2018-09-11 8:02 UTC (permalink / raw)
To: jerin.jacob; +Cc: bruce.richardson, dev, Mattias Rönnblom
With this patch, the DSW event device can be started and stopped,
and also supports scheduling events between ports.
Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
---
drivers/event/dsw/Makefile | 2 +-
drivers/event/dsw/dsw_evdev.c | 125 ++++++++++++
drivers/event/dsw/dsw_evdev.h | 56 ++++++
drivers/event/dsw/dsw_event.c | 359 ++++++++++++++++++++++++++++++++++
drivers/event/dsw/meson.build | 2 +-
5 files changed, 542 insertions(+), 2 deletions(-)
create mode 100644 drivers/event/dsw/dsw_event.c
diff --git a/drivers/event/dsw/Makefile b/drivers/event/dsw/Makefile
index 5cbf488ff..6374a454e 100644
--- a/drivers/event/dsw/Makefile
+++ b/drivers/event/dsw/Makefile
@@ -21,6 +21,6 @@ LIBABIVER := 1
EXPORT_MAP := rte_pmd_dsw_event_version.map
-SRCS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw_evdev.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw_evdev.c dsw_event.c
include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c
index 5dccc232a..40a7435be 100644
--- a/drivers/event/dsw/dsw_evdev.c
+++ b/drivers/event/dsw/dsw_evdev.c
@@ -6,6 +6,7 @@
#include <rte_eventdev_pmd.h>
#include <rte_eventdev_pmd_vdev.h>
+#include <rte_random.h>
#include "dsw_evdev.h"
@@ -201,10 +202,125 @@ dsw_configure(const struct rte_eventdev *dev)
{
struct dsw_evdev *dsw = dsw_pmd_priv(dev);
const struct rte_event_dev_config *conf = &dev->data->dev_conf;
+ int32_t min_max_in_flight;
dsw->num_ports = conf->nb_event_ports;
dsw->num_queues = conf->nb_event_queues;
+ /* Avoid a situation where consumer ports are holding all the
+ * credits, without making use of them.
+ */
+ min_max_in_flight = conf->nb_event_ports * DSW_PORT_MAX_CREDITS;
+
+ dsw->max_inflight = RTE_MAX(conf->nb_events_limit, min_max_in_flight);
+
+ return 0;
+}
+
+
+static void
+initial_flow_to_port_assignment(struct dsw_evdev *dsw)
+{
+ uint8_t queue_id;
+ for (queue_id = 0; queue_id < dsw->num_queues; queue_id++) {
+ struct dsw_queue *queue = &dsw->queues[queue_id];
+ uint16_t flow_hash;
+ for (flow_hash = 0; flow_hash < DSW_MAX_FLOWS; flow_hash++) {
+ uint8_t port_idx =
+ rte_rand() % queue->num_serving_ports;
+ uint8_t port_id =
+ queue->serving_ports[port_idx];
+ dsw->queues[queue_id].flow_to_port_map[flow_hash] =
+ port_id;
+ }
+ }
+}
+
+static int
+dsw_start(struct rte_eventdev *dev)
+{
+ struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+
+ rte_atomic32_init(&dsw->credits_on_loan);
+
+ initial_flow_to_port_assignment(dsw);
+
+ return 0;
+}
+
+static void
+dsw_port_drain_buf(uint8_t dev_id, struct rte_event *buf, uint16_t buf_len,
+ eventdev_stop_flush_t flush, void *flush_arg)
+{
+ uint16_t i;
+
+ for (i = 0; i < buf_len; i++)
+ flush(dev_id, buf[i], flush_arg);
+}
+
+static void
+dsw_port_drain_out(uint8_t dev_id, struct dsw_evdev *dsw, struct dsw_port *port,
+ eventdev_stop_flush_t flush, void *flush_arg)
+{
+ uint16_t dport_id;
+
+ for (dport_id = 0; dport_id < dsw->num_ports; dport_id++)
+ if (dport_id != port->id)
+ dsw_port_drain_buf(dev_id, port->out_buffer[dport_id],
+ port->out_buffer_len[dport_id],
+ flush, flush_arg);
+}
+
+static void
+dsw_port_drain_in_ring(uint8_t dev_id, struct dsw_port *port,
+ eventdev_stop_flush_t flush, void *flush_arg)
+{
+ struct rte_event ev;
+
+ while (rte_event_ring_dequeue_burst(port->in_ring, &ev, 1, NULL))
+ flush(dev_id, ev, flush_arg);
+}
+
+static void
+dsw_drain(uint8_t dev_id, struct dsw_evdev *dsw,
+ eventdev_stop_flush_t flush, void *flush_arg)
+{
+ uint16_t port_id;
+
+ if (flush == NULL)
+ return;
+
+ for (port_id = 0; port_id < dsw->num_ports; port_id++) {
+ struct dsw_port *port = &dsw->ports[port_id];
+
+ dsw_port_drain_out(dev_id, dsw, port, flush, flush_arg);
+ dsw_port_drain_in_ring(dev_id, port, flush, flush_arg);
+ }
+}
+
+static void
+dsw_stop(struct rte_eventdev *dev)
+{
+ struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+ uint8_t dev_id;
+ eventdev_stop_flush_t flush;
+ void *flush_arg;
+
+ dev_id = dev->data->dev_id;
+ flush = dev->dev_ops->dev_stop_flush;
+ flush_arg = dev->data->dev_stop_flush_arg;
+
+ dsw_drain(dev_id, dsw, flush, flush_arg);
+}
+
+static int
+dsw_close(struct rte_eventdev *dev)
+{
+ struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+
+ dsw->num_ports = 0;
+ dsw->num_queues = 0;
+
return 0;
}
@@ -219,6 +335,9 @@ static struct rte_eventdev_ops dsw_evdev_ops = {
.port_unlink = dsw_port_unlink,
.dev_infos_get = dsw_info_get,
.dev_configure = dsw_configure,
+ .dev_start = dsw_start,
+ .dev_stop = dsw_stop,
+ .dev_close = dsw_close
};
static int
@@ -236,6 +355,12 @@ dsw_probe(struct rte_vdev_device *vdev)
return -EFAULT;
dev->dev_ops = &dsw_evdev_ops;
+ dev->enqueue = dsw_event_enqueue;
+ dev->enqueue_burst = dsw_event_enqueue_burst;
+ dev->enqueue_new_burst = dsw_event_enqueue_new_burst;
+ dev->enqueue_forward_burst = dsw_event_enqueue_forward_burst;
+ dev->dequeue = dsw_event_dequeue;
+ dev->dequeue_burst = dsw_event_dequeue_burst;
if (rte_eal_process_type() != RTE_PROC_PRIMARY)
return 0;
diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
index ad0f857cc..f8e94e4a4 100644
--- a/drivers/event/dsw/dsw_evdev.h
+++ b/drivers/event/dsw/dsw_evdev.h
@@ -14,6 +14,7 @@
#define DSW_MAX_PORTS (64)
#define DSW_MAX_PORT_DEQUEUE_DEPTH (128)
#define DSW_MAX_PORT_ENQUEUE_DEPTH (128)
+#define DSW_MAX_PORT_OUT_BUFFER (32)
#define DSW_MAX_QUEUES (16)
@@ -24,6 +25,24 @@
#define DSW_MAX_FLOWS (1<<(DSW_MAX_FLOWS_BITS))
#define DSW_MAX_FLOWS_MASK (DSW_MAX_FLOWS-1)
+/* Eventdev RTE_SCHED_TYPE_PARALLEL doesn't have a concept of flows,
+ * but the 'dsw' scheduler (more or less) randomly assign flow id to
+ * events on parallel queues, to be able to reuse some of the
+ * migration mechanism and scheduling logic from
+ * RTE_SCHED_TYPE_ATOMIC. By moving one of the parallel "flows" from a
+ * particular port, the likely-hood of events being scheduled to this
+ * port is reduced, and thus a kind of statistical load balancing is
+ * achieved.
+ */
+#define DSW_PARALLEL_FLOWS (1024)
+
+/* Avoid making small 'loans' from the central in-flight event credit
+ * pool, to improve efficiency.
+ */
+#define DSW_MIN_CREDIT_LOAN (64)
+#define DSW_PORT_MAX_CREDITS (2*DSW_MIN_CREDIT_LOAN)
+#define DSW_PORT_MIN_CREDITS (DSW_MIN_CREDIT_LOAN)
+
/* The rings are dimensioned so that all in-flight events can reside
* on any one of the port rings, to avoid the trouble of having to
* care about the case where there's no room on the destination port's
@@ -44,8 +63,17 @@ struct dsw_port {
uint16_t dequeue_depth;
uint16_t enqueue_depth;
+ int32_t inflight_credits;
+
int32_t new_event_threshold;
+ uint16_t pending_releases;
+
+ uint16_t next_parallel_flow_id;
+
+ uint16_t out_buffer_len[DSW_MAX_PORTS];
+ struct rte_event out_buffer[DSW_MAX_PORTS][DSW_MAX_PORT_OUT_BUFFER];
+
struct rte_event_ring *in_ring __rte_cache_aligned;
} __rte_cache_aligned;
@@ -53,6 +81,8 @@ struct dsw_queue {
uint8_t schedule_type;
uint8_t serving_ports[DSW_MAX_PORTS];
uint16_t num_serving_ports;
+
+ uint8_t flow_to_port_map[DSW_MAX_FLOWS] __rte_cache_aligned;
};
struct dsw_evdev {
@@ -62,12 +92,38 @@ struct dsw_evdev {
uint16_t num_ports;
struct dsw_queue queues[DSW_MAX_QUEUES];
uint8_t num_queues;
+ int32_t max_inflight;
+
+ rte_atomic32_t credits_on_loan __rte_cache_aligned;
};
+uint16_t dsw_event_enqueue(void *port, const struct rte_event *event);
+uint16_t dsw_event_enqueue_burst(void *port,
+ const struct rte_event events[],
+ uint16_t events_len);
+uint16_t dsw_event_enqueue_new_burst(void *port,
+ const struct rte_event events[],
+ uint16_t events_len);
+uint16_t dsw_event_enqueue_forward_burst(void *port,
+ const struct rte_event events[],
+ uint16_t events_len);
+
+uint16_t dsw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait);
+uint16_t dsw_event_dequeue_burst(void *port, struct rte_event *events,
+ uint16_t num, uint64_t wait);
+
static inline struct dsw_evdev *
dsw_pmd_priv(const struct rte_eventdev *eventdev)
{
return eventdev->data->dev_private;
}
+#define DSW_LOG_DP(level, fmt, args...) \
+ RTE_LOG_DP(level, EVENTDEV, "[%s] %s() line %u: " fmt, \
+ DSW_PMD_NAME, \
+ __func__, __LINE__, ## args)
+
+#define DSW_LOG_DP_PORT(level, port_id, fmt, args...) \
+ DSW_LOG_DP(level, "<Port %d> " fmt, port_id, ## args)
+
#endif
diff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c
new file mode 100644
index 000000000..4a3af8ecd
--- /dev/null
+++ b/drivers/event/dsw/dsw_event.c
@@ -0,0 +1,359 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2018 Ericsson AB
+ */
+
+#include "dsw_evdev.h"
+
+#include <stdbool.h>
+
+#include <rte_atomic.h>
+#include <rte_random.h>
+
+static bool
+dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
+ int32_t credits)
+{
+ int32_t inflight_credits = port->inflight_credits;
+ int32_t missing_credits = credits - inflight_credits;
+ int32_t total_on_loan;
+ int32_t available;
+ int32_t acquired_credits;
+ int32_t new_total_on_loan;
+
+ if (likely(missing_credits <= 0)) {
+ port->inflight_credits -= credits;
+ return true;
+ }
+
+ total_on_loan = rte_atomic32_read(&dsw->credits_on_loan);
+ available = dsw->max_inflight - total_on_loan;
+ acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
+
+ if (available < acquired_credits)
+ return false;
+
+ /* This is a race, no locks are involved, and thus some other
+ * thread can allocate tokens in between the check and the
+ * allocation.
+ */
+ new_total_on_loan = rte_atomic32_add_return(&dsw->credits_on_loan,
+ acquired_credits);
+
+ if (unlikely(new_total_on_loan > dsw->max_inflight)) {
+ /* Some other port took the last credits */
+ rte_atomic32_sub(&dsw->credits_on_loan, acquired_credits);
+ return false;
+ }
+
+ DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
+ acquired_credits);
+
+ port->inflight_credits += acquired_credits;
+ port->inflight_credits -= credits;
+
+ return true;
+}
+
+static void
+dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
+ int32_t credits)
+{
+ port->inflight_credits += credits;
+
+ if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
+ int32_t leave_credits = DSW_PORT_MIN_CREDITS;
+ int32_t return_credits =
+ port->inflight_credits - leave_credits;
+
+ port->inflight_credits = leave_credits;
+
+ rte_atomic32_sub(&dsw->credits_on_loan, return_credits);
+
+ DSW_LOG_DP_PORT(DEBUG, port->id,
+ "Returned %d tokens to pool.\n",
+ return_credits);
+ }
+}
+
+static uint8_t
+dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
+{
+ struct dsw_queue *queue = &dsw->queues[queue_id];
+ uint8_t port_id;
+
+ if (queue->num_serving_ports > 1)
+ port_id = queue->flow_to_port_map[flow_hash];
+ else
+ /* A single-link queue, or atomic/ordered/parallel but
+ * with just a single serving port.
+ */
+ port_id = queue->serving_ports[0];
+
+ DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
+ "to port %d.\n", queue_id, flow_hash, port_id);
+
+ return port_id;
+}
+
+static void
+dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
+ uint8_t dest_port_id)
+{
+ struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
+ uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
+ struct rte_event *buffer = source_port->out_buffer[dest_port_id];
+ uint16_t enqueued = 0;
+
+ if (*buffer_len == 0)
+ return;
+
+ /* The rings are dimensioned to fit all in-flight events (even
+ * on a single ring), so looping will work.
+ */
+ do {
+ enqueued +=
+ rte_event_ring_enqueue_burst(dest_port->in_ring,
+ buffer+enqueued,
+ *buffer_len-enqueued,
+ NULL);
+ } while (unlikely(enqueued != *buffer_len));
+
+ (*buffer_len) = 0;
+}
+
+static uint16_t
+dsw_port_get_parallel_flow_id(struct dsw_port *port)
+{
+ uint16_t flow_id = port->next_parallel_flow_id;
+
+ port->next_parallel_flow_id =
+ (port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
+
+ return flow_id;
+}
+
+static void
+dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
+ uint8_t dest_port_id, const struct rte_event *event)
+{
+ struct rte_event *buffer = source_port->out_buffer[dest_port_id];
+ uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
+
+ if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
+ dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
+
+ buffer[*buffer_len] = *event;
+
+ (*buffer_len)++;
+}
+
+#define DSW_FLOW_ID_BITS (24)
+static uint16_t
+dsw_flow_id_hash(uint32_t flow_id)
+{
+ uint16_t hash = 0;
+ uint16_t offset = 0;
+
+ do {
+ hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
+ offset += DSW_MAX_FLOWS_BITS;
+ } while (offset < DSW_FLOW_ID_BITS);
+
+ return hash;
+}
+
+static void
+dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
+ struct rte_event event)
+{
+ uint8_t dest_port_id;
+
+ event.flow_id = dsw_port_get_parallel_flow_id(source_port);
+
+ dest_port_id = dsw_schedule(dsw, event.queue_id,
+ dsw_flow_id_hash(event.flow_id));
+
+ dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
+}
+
+static void
+dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
+ const struct rte_event *event)
+{
+ uint16_t flow_hash;
+ uint8_t dest_port_id;
+
+ if (unlikely(dsw->queues[event->queue_id].schedule_type ==
+ RTE_SCHED_TYPE_PARALLEL)) {
+ dsw_port_buffer_parallel(dsw, source_port, *event);
+ return;
+ }
+
+ flow_hash = dsw_flow_id_hash(event->flow_id);
+
+ dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
+
+ dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
+}
+
+static void
+dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
+{
+ uint16_t dest_port_id;
+
+ for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
+ dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
+}
+
+uint16_t
+dsw_event_enqueue(void *port, const struct rte_event *ev)
+{
+ return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
+}
+
+static __rte_always_inline uint16_t
+dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
+ uint16_t events_len, bool op_types_known,
+ uint16_t num_new, uint16_t num_release,
+ uint16_t num_non_release)
+{
+ struct dsw_port *source_port = port;
+ struct dsw_evdev *dsw = source_port->dsw;
+ bool enough_credits;
+ uint16_t i;
+
+ DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
+ "events to port %d.\n", events_len, source_port->id);
+
+ /* XXX: For performance (=ring efficiency) reasons, the
+ * scheduler relies on internal non-ring buffers instead of
+ * immediately sending the event to the destination ring. For
+ * a producer that doesn't intend to produce or consume any
+ * more events, the scheduler provides a way to flush the
+ * buffer, by means of doing an enqueue of zero events. In
+ * addition, a port cannot be left "unattended" (e.g. unused)
+ * for long periods of time, since that would stall
+ * migration. Eventdev API extensions to provide a cleaner way
+ * to archieve both of these functions should be
+ * considered.
+ */
+ if (unlikely(events_len == 0)) {
+ dsw_port_flush_out_buffers(dsw, source_port);
+ return 0;
+ }
+
+ if (unlikely(events_len > source_port->enqueue_depth))
+ events_len = source_port->enqueue_depth;
+
+ if (!op_types_known)
+ for (i = 0; i < events_len; i++) {
+ switch (events[i].op) {
+ case RTE_EVENT_OP_RELEASE:
+ num_release++;
+ break;
+ case RTE_EVENT_OP_NEW:
+ num_new++;
+ /* Falls through. */
+ default:
+ num_non_release++;
+ break;
+ }
+ }
+
+ /* Technically, we could allow the non-new events up to the
+ * first new event in the array into the system, but for
+ * simplicity reasons, we deny the whole burst if the port is
+ * above the water mark.
+ */
+ if (unlikely(num_new > 0 && rte_atomic32_read(&dsw->credits_on_loan) >
+ source_port->new_event_threshold))
+ return 0;
+
+ enough_credits = dsw_port_acquire_credits(dsw, source_port,
+ num_non_release);
+ if (unlikely(!enough_credits))
+ return 0;
+
+ source_port->pending_releases -= num_release;
+
+ for (i = 0; i < events_len; i++) {
+ const struct rte_event *event = &events[i];
+
+ if (likely(num_release == 0 ||
+ event->op != RTE_EVENT_OP_RELEASE))
+ dsw_port_buffer_event(dsw, source_port, event);
+ }
+
+ DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
+ "accepted.\n", num_non_release);
+
+ return num_non_release;
+}
+
+uint16_t
+dsw_event_enqueue_burst(void *port, const struct rte_event events[],
+ uint16_t events_len)
+{
+ return dsw_event_enqueue_burst_generic(port, events, events_len, false,
+ 0, 0, 0);
+}
+
+uint16_t
+dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
+ uint16_t events_len)
+{
+ return dsw_event_enqueue_burst_generic(port, events, events_len, true,
+ events_len, 0, events_len);
+}
+
+uint16_t
+dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
+ uint16_t events_len)
+{
+ return dsw_event_enqueue_burst_generic(port, events, events_len, true,
+ 0, 0, events_len);
+}
+
+uint16_t
+dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
+{
+ return dsw_event_dequeue_burst(port, events, 1, wait);
+}
+
+static uint16_t
+dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
+ uint16_t num)
+{
+ return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
+}
+
+uint16_t
+dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
+ uint64_t wait __rte_unused)
+{
+ struct dsw_port *source_port = port;
+ struct dsw_evdev *dsw = source_port->dsw;
+ uint16_t dequeued;
+
+ source_port->pending_releases = 0;
+
+ if (unlikely(num > source_port->dequeue_depth))
+ num = source_port->dequeue_depth;
+
+ dequeued = dsw_port_dequeue_burst(source_port, events, num);
+
+ source_port->pending_releases = dequeued;
+
+ if (dequeued > 0) {
+ DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
+ dequeued);
+
+ dsw_port_return_credits(dsw, source_port, dequeued);
+ }
+ /* XXX: Assuming the port can't produce any more work,
+ * consider flushing the output buffer, on dequeued ==
+ * 0.
+ */
+
+ return dequeued;
+}
diff --git a/drivers/event/dsw/meson.build b/drivers/event/dsw/meson.build
index 275d051c3..bd2e4c809 100644
--- a/drivers/event/dsw/meson.build
+++ b/drivers/event/dsw/meson.build
@@ -3,4 +3,4 @@
allow_experimental_apis = true
deps += ['bus_vdev']
-sources = files('dsw_evdev.c')
+sources = files('dsw_evdev.c', 'dsw_event.c')
--
2.17.1
^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [dpdk-dev] [PATCH v3 05/10] event/dsw: add DSW event scheduling and device start/stop
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 05/10] event/dsw: add DSW event scheduling and device start/stop Mattias Rönnblom
@ 2018-09-17 12:22 ` Jerin Jacob
0 siblings, 0 replies; 19+ messages in thread
From: Jerin Jacob @ 2018-09-17 12:22 UTC (permalink / raw)
To: Mattias Rönnblom; +Cc: bruce.richardson, dev
-----Original Message-----
> Date: Tue, 11 Sep 2018 10:02:11 +0200
> From: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
> To: jerin.jacob@caviumnetworks.com
> CC: bruce.richardson@intel.com, dev@dpdk.org, Mattias Rönnblom
> <mattias.ronnblom@ericsson.com>
> Subject: [PATCH v3 05/10] event/dsw: add DSW event scheduling and device
> start/stop
> X-Mailer: git-send-email 2.17.1
>
>
> With this patch, the DSW event device can be started and stopped,
> and also supports scheduling events between ports.
>
> Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
Acked-by: Jerin Jacob <jerin.jacob@caviumnetworks.com>
^ permalink raw reply [flat|nested] 19+ messages in thread
* [dpdk-dev] [PATCH v3 06/10] event/dsw: add DSW port load measurements
2018-09-11 8:02 [dpdk-dev] [PATCH v3 00/10] A Distributed Software Event Device Mattias Rönnblom
` (4 preceding siblings ...)
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 05/10] event/dsw: add DSW event scheduling and device start/stop Mattias Rönnblom
@ 2018-09-11 8:02 ` Mattias Rönnblom
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 07/10] event/dsw: add load balancing to the DSW event device Mattias Rönnblom
` (3 subsequent siblings)
9 siblings, 0 replies; 19+ messages in thread
From: Mattias Rönnblom @ 2018-09-11 8:02 UTC (permalink / raw)
To: jerin.jacob; +Cc: bruce.richardson, dev, Mattias Rönnblom
The DSW event device port now attempts to estimate its load (i.e. how
busy it is). This is required for load balancing to work (although
load balancing is not included in this patch), and may also be useful
for debugging purposes.
Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
---
drivers/event/dsw/dsw_evdev.c | 14 +++++
drivers/event/dsw/dsw_evdev.h | 40 +++++++++++++
drivers/event/dsw/dsw_event.c | 109 ++++++++++++++++++++++++++++++++++
3 files changed, 163 insertions(+)
diff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c
index 40a7435be..bcfa17bab 100644
--- a/drivers/event/dsw/dsw_evdev.c
+++ b/drivers/event/dsw/dsw_evdev.c
@@ -4,6 +4,7 @@
#include <stdbool.h>
+#include <rte_cycles.h>
#include <rte_eventdev_pmd.h>
#include <rte_eventdev_pmd_vdev.h>
#include <rte_random.h>
@@ -43,6 +44,11 @@ dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
port->in_ring = in_ring;
+ rte_atomic16_init(&port->load);
+
+ port->load_update_interval =
+ (DSW_LOAD_UPDATE_INTERVAL * rte_get_timer_hz()) / US_PER_S;
+
dev->data->ports[port_id] = port;
return 0;
@@ -240,11 +246,19 @@ static int
dsw_start(struct rte_eventdev *dev)
{
struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+ uint16_t i;
+ uint64_t now;
rte_atomic32_init(&dsw->credits_on_loan);
initial_flow_to_port_assignment(dsw);
+ now = rte_get_timer_cycles();
+ for (i = 0; i < dsw->num_ports; i++) {
+ dsw->ports[i].measurement_start = now;
+ dsw->ports[i].busy_start = now;
+ }
+
return 0;
}
diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
index f8e94e4a4..a5399dda5 100644
--- a/drivers/event/dsw/dsw_evdev.h
+++ b/drivers/event/dsw/dsw_evdev.h
@@ -36,6 +36,15 @@
*/
#define DSW_PARALLEL_FLOWS (1024)
+/* 'Background tasks' are polling the control rings for *
+ * migration-related messages, or flush the output buffer (so
+ * buffered events doesn't linger too long). Shouldn't be too low,
+ * since the system won't benefit from the 'batching' effects from
+ * the output buffer, and shouldn't be too high, since it will make
+ * buffered events linger too long in case the port goes idle.
+ */
+#define DSW_MAX_PORT_OPS_PER_BG_TASK (128)
+
/* Avoid making small 'loans' from the central in-flight event credit
* pool, to improve efficiency.
*/
@@ -50,6 +59,22 @@
*/
#define DSW_IN_RING_SIZE (DSW_MAX_EVENTS)
+#define DSW_MAX_LOAD (INT16_MAX)
+#define DSW_LOAD_FROM_PERCENT(x) ((int16_t)(((x)*DSW_MAX_LOAD)/100))
+#define DSW_LOAD_TO_PERCENT(x) ((100*x)/DSW_MAX_LOAD)
+
+/* The thought behind keeping the load update interval shorter than
+ * the migration interval is that the load from newly migrated flows
+ * should 'show up' on the load measurement before new migrations are
+ * considered. This is to avoid having too many flows, from too many
+ * source ports, to be migrated too quickly to a lightly loaded port -
+ * in particular since this might cause the system to oscillate.
+ */
+#define DSW_LOAD_UPDATE_INTERVAL (DSW_MIGRATION_INTERVAL/4)
+#define DSW_OLD_LOAD_WEIGHT (1)
+
+#define DSW_MIGRATION_INTERVAL (1000)
+
struct dsw_port {
uint16_t id;
@@ -71,10 +96,25 @@ struct dsw_port {
uint16_t next_parallel_flow_id;
+ uint16_t ops_since_bg_task;
+
+ uint64_t last_bg;
+
+ /* For port load measurement. */
+ uint64_t next_load_update;
+ uint64_t load_update_interval;
+ uint64_t measurement_start;
+ uint64_t busy_start;
+ uint64_t busy_cycles;
+ uint64_t total_busy_cycles;
+
uint16_t out_buffer_len[DSW_MAX_PORTS];
struct rte_event out_buffer[DSW_MAX_PORTS][DSW_MAX_PORT_OUT_BUFFER];
struct rte_event_ring *in_ring __rte_cache_aligned;
+
+ /* Estimate of current port load. */
+ rte_atomic16_t load __rte_cache_aligned;
} __rte_cache_aligned;
struct dsw_queue {
diff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c
index 4a3af8ecd..f326147c9 100644
--- a/drivers/event/dsw/dsw_event.c
+++ b/drivers/event/dsw/dsw_event.c
@@ -7,6 +7,7 @@
#include <stdbool.h>
#include <rte_atomic.h>
+#include <rte_cycles.h>
#include <rte_random.h>
static bool
@@ -75,6 +76,70 @@ dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
}
}
+static void
+dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
+{
+ if (dequeued > 0 && port->busy_start == 0)
+ /* work period begins */
+ port->busy_start = rte_get_timer_cycles();
+ else if (dequeued == 0 && port->busy_start > 0) {
+ /* work period ends */
+ uint64_t work_period =
+ rte_get_timer_cycles() - port->busy_start;
+ port->busy_cycles += work_period;
+ port->busy_start = 0;
+ }
+}
+
+static int16_t
+dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
+{
+ uint64_t passed = now - port->measurement_start;
+ uint64_t busy_cycles = port->busy_cycles;
+
+ if (port->busy_start > 0) {
+ busy_cycles += (now - port->busy_start);
+ port->busy_start = now;
+ }
+
+ int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
+
+ port->measurement_start = now;
+ port->busy_cycles = 0;
+
+ port->total_busy_cycles += busy_cycles;
+
+ return load;
+}
+
+static void
+dsw_port_load_update(struct dsw_port *port, uint64_t now)
+{
+ int16_t old_load;
+ int16_t period_load;
+ int16_t new_load;
+
+ old_load = rte_atomic16_read(&port->load);
+
+ period_load = dsw_port_load_close_period(port, now);
+
+ new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
+ (DSW_OLD_LOAD_WEIGHT+1);
+
+ rte_atomic16_set(&port->load, new_load);
+}
+
+static void
+dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
+{
+ if (now < port->next_load_update)
+ return;
+
+ port->next_load_update = now + port->load_update_interval;
+
+ dsw_port_load_update(port, now);
+}
+
static uint8_t
dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
{
@@ -196,6 +261,39 @@ dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
}
+static void
+dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
+{
+ /* To pull the control ring reasonbly often on busy ports,
+ * each dequeued/enqueued event is considered an 'op' too.
+ */
+ port->ops_since_bg_task += (num_events+1);
+}
+
+static void
+dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
+
+static void
+dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
+{
+ if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
+ uint64_t now;
+
+ now = rte_get_timer_cycles();
+
+ port->last_bg = now;
+
+ /* Logic to avoid having events linger in the output
+ * buffer too long.
+ */
+ dsw_port_flush_out_buffers(dsw, port);
+
+ dsw_port_consider_load_update(port, now);
+
+ port->ops_since_bg_task = 0;
+ }
+}
+
static void
dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
{
@@ -225,6 +323,8 @@ dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
"events to port %d.\n", events_len, source_port->id);
+ dsw_port_bg_process(dsw, source_port);
+
/* XXX: For performance (=ring efficiency) reasons, the
* scheduler relies on internal non-ring buffers instead of
* immediately sending the event to the destination ring. For
@@ -238,6 +338,7 @@ dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
* considered.
*/
if (unlikely(events_len == 0)) {
+ dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
dsw_port_flush_out_buffers(dsw, source_port);
return 0;
}
@@ -245,6 +346,8 @@ dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
if (unlikely(events_len > source_port->enqueue_depth))
events_len = source_port->enqueue_depth;
+ dsw_port_note_op(source_port, events_len);
+
if (!op_types_known)
for (i = 0; i < events_len; i++) {
switch (events[i].op) {
@@ -337,6 +440,8 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
source_port->pending_releases = 0;
+ dsw_port_bg_process(dsw, source_port);
+
if (unlikely(num > source_port->dequeue_depth))
num = source_port->dequeue_depth;
@@ -344,6 +449,10 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
source_port->pending_releases = dequeued;
+ dsw_port_load_record(source_port, dequeued);
+
+ dsw_port_note_op(source_port, dequeued);
+
if (dequeued > 0) {
DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
dequeued);
--
2.17.1
^ permalink raw reply [flat|nested] 19+ messages in thread
* [dpdk-dev] [PATCH v3 07/10] event/dsw: add load balancing to the DSW event device
2018-09-11 8:02 [dpdk-dev] [PATCH v3 00/10] A Distributed Software Event Device Mattias Rönnblom
` (5 preceding siblings ...)
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 06/10] event/dsw: add DSW port load measurements Mattias Rönnblom
@ 2018-09-11 8:02 ` Mattias Rönnblom
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 08/10] event/dsw: let DSW event device sort events on dequeue Mattias Rönnblom
` (2 subsequent siblings)
9 siblings, 0 replies; 19+ messages in thread
From: Mattias Rönnblom @ 2018-09-11 8:02 UTC (permalink / raw)
To: jerin.jacob; +Cc: bruce.richardson, dev, Mattias Rönnblom
The DSW event device will now attempt to migrate (move) flows between
ports in order to balance the load.
Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
---
drivers/event/dsw/dsw_evdev.c | 27 ++
drivers/event/dsw/dsw_evdev.h | 80 ++++
drivers/event/dsw/dsw_event.c | 735 +++++++++++++++++++++++++++++++++-
3 files changed, 838 insertions(+), 4 deletions(-)
diff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c
index bcfa17bab..2ecb365ba 100644
--- a/drivers/event/dsw/dsw_evdev.c
+++ b/drivers/event/dsw/dsw_evdev.c
@@ -20,6 +20,7 @@ dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
struct dsw_evdev *dsw = dsw_pmd_priv(dev);
struct dsw_port *port;
struct rte_event_ring *in_ring;
+ struct rte_ring *ctl_in_ring;
char ring_name[RTE_RING_NAMESIZE];
port = &dsw->ports[port_id];
@@ -42,13 +43,29 @@ dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
if (in_ring == NULL)
return -ENOMEM;
+ snprintf(ring_name, sizeof(ring_name), "dswctl%d_p%u",
+ dev->data->dev_id, port_id);
+
+ ctl_in_ring = rte_ring_create(ring_name, DSW_CTL_IN_RING_SIZE,
+ dev->data->socket_id,
+ RING_F_SC_DEQ|RING_F_EXACT_SZ);
+
+ if (ctl_in_ring == NULL) {
+ rte_event_ring_free(in_ring);
+ return -ENOMEM;
+ }
+
port->in_ring = in_ring;
+ port->ctl_in_ring = ctl_in_ring;
rte_atomic16_init(&port->load);
port->load_update_interval =
(DSW_LOAD_UPDATE_INTERVAL * rte_get_timer_hz()) / US_PER_S;
+ port->migration_interval =
+ (DSW_MIGRATION_INTERVAL * rte_get_timer_hz()) / US_PER_S;
+
dev->data->ports[port_id] = port;
return 0;
@@ -72,6 +89,7 @@ dsw_port_release(void *p)
struct dsw_port *port = p;
rte_event_ring_free(port->in_ring);
+ rte_ring_free(port->ctl_in_ring);
}
static int
@@ -272,6 +290,14 @@ dsw_port_drain_buf(uint8_t dev_id, struct rte_event *buf, uint16_t buf_len,
flush(dev_id, buf[i], flush_arg);
}
+static void
+dsw_port_drain_paused(uint8_t dev_id, struct dsw_port *port,
+ eventdev_stop_flush_t flush, void *flush_arg)
+{
+ dsw_port_drain_buf(dev_id, port->paused_events, port->paused_events_len,
+ flush, flush_arg);
+}
+
static void
dsw_port_drain_out(uint8_t dev_id, struct dsw_evdev *dsw, struct dsw_port *port,
eventdev_stop_flush_t flush, void *flush_arg)
@@ -308,6 +334,7 @@ dsw_drain(uint8_t dev_id, struct dsw_evdev *dsw,
struct dsw_port *port = &dsw->ports[port_id];
dsw_port_drain_out(dev_id, dsw, port, flush, flush_arg);
+ dsw_port_drain_paused(dev_id, port, flush, flush_arg);
dsw_port_drain_in_ring(dev_id, port, flush, flush_arg);
}
}
diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
index a5399dda5..783c418bf 100644
--- a/drivers/event/dsw/dsw_evdev.h
+++ b/drivers/event/dsw/dsw_evdev.h
@@ -73,7 +73,37 @@
#define DSW_LOAD_UPDATE_INTERVAL (DSW_MIGRATION_INTERVAL/4)
#define DSW_OLD_LOAD_WEIGHT (1)
+/* The minimum time (in us) between two flow migrations. What puts an
+ * upper limit on the actual migration rate is primarily the pace in
+ * which the ports send and receive control messages, which in turn is
+ * largely a function of how much cycles are spent the processing of
+ * an event burst.
+ */
#define DSW_MIGRATION_INTERVAL (1000)
+#define DSW_MIN_SOURCE_LOAD_FOR_MIGRATION (DSW_LOAD_FROM_PERCENT(70))
+#define DSW_MAX_TARGET_LOAD_FOR_MIGRATION (DSW_LOAD_FROM_PERCENT(95))
+
+#define DSW_MAX_EVENTS_RECORDED (128)
+
+/* Only one outstanding migration per port is allowed */
+#define DSW_MAX_PAUSED_FLOWS (DSW_MAX_PORTS)
+
+/* Enough room for paus request/confirm and unpaus request/confirm for
+ * all possible senders.
+ */
+#define DSW_CTL_IN_RING_SIZE ((DSW_MAX_PORTS-1)*4)
+
+struct dsw_queue_flow {
+ uint8_t queue_id;
+ uint16_t flow_hash;
+};
+
+enum dsw_migration_state {
+ DSW_MIGRATION_STATE_IDLE,
+ DSW_MIGRATION_STATE_PAUSING,
+ DSW_MIGRATION_STATE_FORWARDING,
+ DSW_MIGRATION_STATE_UNPAUSING
+};
struct dsw_port {
uint16_t id;
@@ -98,6 +128,7 @@ struct dsw_port {
uint16_t ops_since_bg_task;
+ /* most recent 'background' processing */
uint64_t last_bg;
/* For port load measurement. */
@@ -108,11 +139,46 @@ struct dsw_port {
uint64_t busy_cycles;
uint64_t total_busy_cycles;
+ /* For the ctl interface and flow migration mechanism. */
+ uint64_t next_migration;
+ uint64_t migration_interval;
+ enum dsw_migration_state migration_state;
+
+ uint64_t migration_start;
+ uint64_t migrations;
+ uint64_t migration_latency;
+
+ uint8_t migration_target_port_id;
+ struct dsw_queue_flow migration_target_qf;
+ uint8_t cfm_cnt;
+
+ uint16_t paused_flows_len;
+ struct dsw_queue_flow paused_flows[DSW_MAX_PAUSED_FLOWS];
+
+ /* In a very contrived worst case all inflight events can be
+ * laying around paused here.
+ */
+ uint16_t paused_events_len;
+ struct rte_event paused_events[DSW_MAX_EVENTS];
+
+ uint16_t seen_events_len;
+ uint16_t seen_events_idx;
+ struct dsw_queue_flow seen_events[DSW_MAX_EVENTS_RECORDED];
+
uint16_t out_buffer_len[DSW_MAX_PORTS];
struct rte_event out_buffer[DSW_MAX_PORTS][DSW_MAX_PORT_OUT_BUFFER];
+ uint16_t in_buffer_len;
+ uint16_t in_buffer_start;
+ /* This buffer may contain events that were read up from the
+ * in_ring during the flow migration process.
+ */
+ struct rte_event in_buffer[DSW_MAX_EVENTS];
+
struct rte_event_ring *in_ring __rte_cache_aligned;
+ struct rte_ring *ctl_in_ring __rte_cache_aligned;
+
/* Estimate of current port load. */
rte_atomic16_t load __rte_cache_aligned;
} __rte_cache_aligned;
@@ -137,6 +203,20 @@ struct dsw_evdev {
rte_atomic32_t credits_on_loan __rte_cache_aligned;
};
+#define DSW_CTL_PAUS_REQ (0)
+#define DSW_CTL_UNPAUS_REQ (1)
+#define DSW_CTL_CFM (2)
+
+/* sizeof(struct dsw_ctl_msg) must be equal or less than
+ * sizeof(void *), to fit on the control ring.
+ */
+struct dsw_ctl_msg {
+ uint8_t type:2;
+ uint8_t originating_port_id:6;
+ uint8_t queue_id;
+ uint16_t flow_hash;
+} __rte_packed;
+
uint16_t dsw_event_enqueue(void *port, const struct rte_event *event);
uint16_t dsw_event_enqueue_burst(void *port,
const struct rte_event events[],
diff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c
index f326147c9..f0347592d 100644
--- a/drivers/event/dsw/dsw_event.c
+++ b/drivers/event/dsw/dsw_event.c
@@ -5,9 +5,11 @@
#include "dsw_evdev.h"
#include <stdbool.h>
+#include <string.h>
#include <rte_atomic.h>
#include <rte_cycles.h>
+#include <rte_memcpy.h>
#include <rte_random.h>
static bool
@@ -140,6 +142,269 @@ dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
dsw_port_load_update(port, now);
}
+static void
+dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
+{
+ void *raw_msg;
+
+ memcpy(&raw_msg, msg, sizeof(*msg));
+
+ /* there's always room on the ring */
+ while (rte_ring_enqueue(port->ctl_in_ring, raw_msg) != 0)
+ rte_pause();
+}
+
+static int
+dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
+{
+ void *raw_msg;
+ int rc;
+
+ rc = rte_ring_dequeue(port->ctl_in_ring, &raw_msg);
+
+ if (rc == 0)
+ memcpy(msg, &raw_msg, sizeof(*msg));
+
+ return rc;
+}
+
+static void
+dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
+ uint8_t type, uint8_t queue_id, uint16_t flow_hash)
+{
+ uint16_t port_id;
+ struct dsw_ctl_msg msg = {
+ .type = type,
+ .originating_port_id = source_port->id,
+ .queue_id = queue_id,
+ .flow_hash = flow_hash
+ };
+
+ for (port_id = 0; port_id < dsw->num_ports; port_id++)
+ if (port_id != source_port->id)
+ dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
+}
+
+static bool
+dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
+ uint16_t flow_hash)
+{
+ uint16_t i;
+
+ for (i = 0; i < port->paused_flows_len; i++) {
+ struct dsw_queue_flow *qf = &port->paused_flows[i];
+ if (qf->queue_id == queue_id &&
+ qf->flow_hash == flow_hash)
+ return true;
+ }
+ return false;
+}
+
+static void
+dsw_port_add_paused_flow(struct dsw_port *port, uint8_t queue_id,
+ uint16_t paused_flow_hash)
+{
+ port->paused_flows[port->paused_flows_len] = (struct dsw_queue_flow) {
+ .queue_id = queue_id,
+ .flow_hash = paused_flow_hash
+ };
+ port->paused_flows_len++;
+}
+
+static void
+dsw_port_remove_paused_flow(struct dsw_port *port, uint8_t queue_id,
+ uint16_t paused_flow_hash)
+{
+ uint16_t i;
+
+ for (i = 0; i < port->paused_flows_len; i++) {
+ struct dsw_queue_flow *qf = &port->paused_flows[i];
+
+ if (qf->queue_id == queue_id &&
+ qf->flow_hash == paused_flow_hash) {
+ uint16_t last_idx = port->paused_flows_len-1;
+ if (i != last_idx)
+ port->paused_flows[i] =
+ port->paused_flows[last_idx];
+ port->paused_flows_len--;
+ break;
+ }
+ }
+}
+
+static void
+dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
+
+static void
+dsw_port_handle_pause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
+ uint8_t originating_port_id, uint8_t queue_id,
+ uint16_t paused_flow_hash)
+{
+ struct dsw_ctl_msg cfm = {
+ .type = DSW_CTL_CFM,
+ .originating_port_id = port->id,
+ .queue_id = queue_id,
+ .flow_hash = paused_flow_hash
+ };
+
+ DSW_LOG_DP_PORT(DEBUG, port->id, "Pausing queue_id %d flow_hash %d.\n",
+ queue_id, paused_flow_hash);
+
+ /* There might be already-scheduled events belonging to the
+ * paused flow in the output buffers.
+ */
+ dsw_port_flush_out_buffers(dsw, port);
+
+ dsw_port_add_paused_flow(port, queue_id, paused_flow_hash);
+
+ /* Make sure any stores to the original port's in_ring is seen
+ * before the ctl message.
+ */
+ rte_smp_wmb();
+
+ dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
+}
+
+static void
+dsw_find_lowest_load_port(uint8_t *port_ids, uint16_t num_port_ids,
+ uint8_t exclude_port_id, int16_t *port_loads,
+ uint8_t *target_port_id, int16_t *target_load)
+{
+ int16_t candidate_port_id = -1;
+ int16_t candidate_load = DSW_MAX_LOAD;
+ uint16_t i;
+
+ for (i = 0; i < num_port_ids; i++) {
+ uint8_t port_id = port_ids[i];
+ if (port_id != exclude_port_id) {
+ int16_t load = port_loads[port_id];
+ if (candidate_port_id == -1 ||
+ load < candidate_load) {
+ candidate_port_id = port_id;
+ candidate_load = load;
+ }
+ }
+ }
+ *target_port_id = candidate_port_id;
+ *target_load = candidate_load;
+}
+
+struct dsw_queue_flow_burst {
+ struct dsw_queue_flow queue_flow;
+ uint16_t count;
+};
+
+static inline int
+dsw_cmp_burst(const void *v_burst_a, const void *v_burst_b)
+{
+ const struct dsw_queue_flow_burst *burst_a = v_burst_a;
+ const struct dsw_queue_flow_burst *burst_b = v_burst_b;
+
+ int a_count = burst_a->count;
+ int b_count = burst_b->count;
+
+ return a_count - b_count;
+}
+
+#define DSW_QF_TO_INT(_qf) \
+ ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
+
+static inline int
+dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
+{
+ const struct dsw_queue_flow *qf_a = v_qf_a;
+ const struct dsw_queue_flow *qf_b = v_qf_b;
+
+ return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
+}
+
+static uint16_t
+dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
+ struct dsw_queue_flow_burst *bursts)
+{
+ uint16_t i;
+ struct dsw_queue_flow_burst *current_burst = NULL;
+ uint16_t num_bursts = 0;
+
+ /* We don't need the stable property, and the list is likely
+ * large enough for qsort() to outperform dsw_stable_sort(),
+ * so we use qsort() here.
+ */
+ qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
+
+ /* arrange the (now-consecutive) events into bursts */
+ for (i = 0; i < qfs_len; i++) {
+ if (i == 0 ||
+ dsw_cmp_qf(&qfs[i], ¤t_burst->queue_flow) != 0) {
+ current_burst = &bursts[num_bursts];
+ current_burst->queue_flow = qfs[i];
+ current_burst->count = 0;
+ num_bursts++;
+ }
+ current_burst->count++;
+ }
+
+ qsort(bursts, num_bursts, sizeof(bursts[0]), dsw_cmp_burst);
+
+ return num_bursts;
+}
+
+static bool
+dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
+ int16_t load_limit)
+{
+ bool below_limit = false;
+ uint16_t i;
+
+ for (i = 0; i < dsw->num_ports; i++) {
+ int16_t load = rte_atomic16_read(&dsw->ports[i].load);
+ if (load < load_limit)
+ below_limit = true;
+ port_loads[i] = load;
+ }
+ return below_limit;
+}
+
+static bool
+dsw_select_migration_target(struct dsw_evdev *dsw,
+ struct dsw_port *source_port,
+ struct dsw_queue_flow_burst *bursts,
+ uint16_t num_bursts, int16_t *port_loads,
+ int16_t max_load, struct dsw_queue_flow *target_qf,
+ uint8_t *target_port_id)
+{
+ uint16_t source_load = port_loads[source_port->id];
+ uint16_t i;
+
+ for (i = 0; i < num_bursts; i++) {
+ struct dsw_queue_flow *qf = &bursts[i].queue_flow;
+
+ if (dsw_port_is_flow_paused(source_port, qf->queue_id,
+ qf->flow_hash))
+ continue;
+
+ struct dsw_queue *queue = &dsw->queues[qf->queue_id];
+ int16_t target_load;
+
+ dsw_find_lowest_load_port(queue->serving_ports,
+ queue->num_serving_ports,
+ source_port->id, port_loads,
+ target_port_id, &target_load);
+
+ if (target_load < source_load &&
+ target_load < max_load) {
+ *target_qf = *qf;
+ return true;
+ }
+ }
+
+ DSW_LOG_DP_PORT(DEBUG, source_port->id, "For the %d flows considered, "
+ "no target port found with load less than %d.\n",
+ num_bursts, DSW_LOAD_TO_PERCENT(max_load));
+
+ return false;
+}
+
static uint8_t
dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
{
@@ -197,6 +462,14 @@ dsw_port_get_parallel_flow_id(struct dsw_port *port)
return flow_id;
}
+static void
+dsw_port_buffer_paused(struct dsw_port *port,
+ const struct rte_event *paused_event)
+{
+ port->paused_events[port->paused_events_len] = *paused_event;
+ port->paused_events_len++;
+}
+
static void
dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
uint8_t dest_port_id, const struct rte_event *event)
@@ -256,11 +529,401 @@ dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
flow_hash = dsw_flow_id_hash(event->flow_id);
+ if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
+ flow_hash))) {
+ dsw_port_buffer_paused(source_port, event);
+ return;
+ }
+
dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
}
+static void
+dsw_port_flush_paused_events(struct dsw_evdev *dsw,
+ struct dsw_port *source_port,
+ uint8_t queue_id, uint16_t paused_flow_hash)
+{
+ uint16_t paused_events_len = source_port->paused_events_len;
+ struct rte_event paused_events[paused_events_len];
+ uint8_t dest_port_id;
+ uint16_t i;
+
+ if (paused_events_len == 0)
+ return;
+
+ if (dsw_port_is_flow_paused(source_port, queue_id, paused_flow_hash))
+ return;
+
+ rte_memcpy(paused_events, source_port->paused_events,
+ paused_events_len * sizeof(struct rte_event));
+
+ source_port->paused_events_len = 0;
+
+ dest_port_id = dsw_schedule(dsw, queue_id, paused_flow_hash);
+
+ for (i = 0; i < paused_events_len; i++) {
+ struct rte_event *event = &paused_events[i];
+ uint16_t flow_hash;
+
+ flow_hash = dsw_flow_id_hash(event->flow_id);
+
+ if (event->queue_id == queue_id &&
+ flow_hash == paused_flow_hash)
+ dsw_port_buffer_non_paused(dsw, source_port,
+ dest_port_id, event);
+ else
+ dsw_port_buffer_paused(source_port, event);
+ }
+}
+
+static void
+dsw_port_migration_stats(struct dsw_port *port)
+{
+ uint64_t migration_latency;
+
+ migration_latency = (rte_get_timer_cycles() - port->migration_start);
+ port->migration_latency += migration_latency;
+ port->migrations++;
+}
+
+static void
+dsw_port_end_migration(struct dsw_evdev *dsw, struct dsw_port *port)
+{
+ uint8_t queue_id = port->migration_target_qf.queue_id;
+ uint16_t flow_hash = port->migration_target_qf.flow_hash;
+
+ port->migration_state = DSW_MIGRATION_STATE_IDLE;
+ port->seen_events_len = 0;
+
+ dsw_port_migration_stats(port);
+
+ if (dsw->queues[queue_id].schedule_type != RTE_SCHED_TYPE_PARALLEL) {
+ dsw_port_remove_paused_flow(port, queue_id, flow_hash);
+ dsw_port_flush_paused_events(dsw, port, queue_id, flow_hash);
+ }
+
+ DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for queue_id "
+ "%d flow_hash %d.\n", queue_id, flow_hash);
+}
+
+static void
+dsw_port_consider_migration(struct dsw_evdev *dsw,
+ struct dsw_port *source_port,
+ uint64_t now)
+{
+ bool any_port_below_limit;
+ struct dsw_queue_flow *seen_events = source_port->seen_events;
+ uint16_t seen_events_len = source_port->seen_events_len;
+ struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
+ uint16_t num_bursts;
+ int16_t source_port_load;
+ int16_t port_loads[dsw->num_ports];
+
+ if (now < source_port->next_migration)
+ return;
+
+ if (dsw->num_ports == 1)
+ return;
+
+ DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering migration.\n");
+
+ /* Randomize interval to avoid having all threads considering
+ * migration at the same in point in time, which might lead to
+ * all choosing the same target port.
+ */
+ source_port->next_migration = now +
+ source_port->migration_interval / 2 +
+ rte_rand() % source_port->migration_interval;
+
+ if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
+ DSW_LOG_DP_PORT(DEBUG, source_port->id,
+ "Migration already in progress.\n");
+ return;
+ }
+
+ /* For simplicity, avoid migration in the unlikely case there
+ * is still events to consume in the in_buffer (from the last
+ * migration).
+ */
+ if (source_port->in_buffer_len > 0) {
+ DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
+ "events in the input buffer.\n");
+ return;
+ }
+
+ source_port_load = rte_atomic16_read(&source_port->load);
+ if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
+ DSW_LOG_DP_PORT(DEBUG, source_port->id,
+ "Load %d is below threshold level %d.\n",
+ DSW_LOAD_TO_PERCENT(source_port_load),
+ DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
+ return;
+ }
+
+ /* Avoid starting any expensive operations (sorting etc), in
+ * case of a scenario with all ports above the load limit.
+ */
+ any_port_below_limit =
+ dsw_retrieve_port_loads(dsw, port_loads,
+ DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
+ if (!any_port_below_limit) {
+ DSW_LOG_DP_PORT(DEBUG, source_port->id,
+ "Candidate target ports are all too highly "
+ "loaded.\n");
+ return;
+ }
+
+ /* Sort flows into 'bursts' to allow attempting to migrating
+ * small (but still active) flows first - this it to avoid
+ * having large flows moving around the worker cores too much
+ * (to avoid cache misses, among other things). Of course, the
+ * number of recorded events (queue+flow ids) are limited, and
+ * provides only a snapshot, so only so many conclusions can
+ * be drawn from this data.
+ */
+ num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
+ bursts);
+ /* For non-big-little systems, there's no point in moving the
+ * only (known) flow.
+ */
+ if (num_bursts < 2) {
+ DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow "
+ "queue_id %d flow_hash %d has been seen.\n",
+ bursts[0].queue_flow.queue_id,
+ bursts[0].queue_flow.flow_hash);
+ return;
+ }
+
+ /* The strategy is to first try to find a flow to move to a
+ * port with low load (below the migration-attempt
+ * threshold). If that fails, we try to find a port which is
+ * below the max threshold, and also less loaded than this
+ * port is.
+ */
+ if (!dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
+ port_loads,
+ DSW_MIN_SOURCE_LOAD_FOR_MIGRATION,
+ &source_port->migration_target_qf,
+ &source_port->migration_target_port_id)
+ &&
+ !dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
+ port_loads,
+ DSW_MAX_TARGET_LOAD_FOR_MIGRATION,
+ &source_port->migration_target_qf,
+ &source_port->migration_target_port_id))
+ return;
+
+ DSW_LOG_DP_PORT(DEBUG, source_port->id, "Migrating queue_id %d "
+ "flow_hash %d from port %d to port %d.\n",
+ source_port->migration_target_qf.queue_id,
+ source_port->migration_target_qf.flow_hash,
+ source_port->id, source_port->migration_target_port_id);
+
+ /* We have a winner. */
+
+ source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
+ source_port->migration_start = rte_get_timer_cycles();
+
+ /* No need to go through the whole pause procedure for
+ * parallel queues, since atomic/ordered semantics need not to
+ * be maintained.
+ */
+
+ if (dsw->queues[source_port->migration_target_qf.queue_id].schedule_type
+ == RTE_SCHED_TYPE_PARALLEL) {
+ uint8_t queue_id = source_port->migration_target_qf.queue_id;
+ uint16_t flow_hash = source_port->migration_target_qf.flow_hash;
+ uint8_t dest_port_id = source_port->migration_target_port_id;
+
+ /* Single byte-sized stores are always atomic. */
+ dsw->queues[queue_id].flow_to_port_map[flow_hash] =
+ dest_port_id;
+ rte_smp_wmb();
+
+ dsw_port_end_migration(dsw, source_port);
+
+ return;
+ }
+
+ /* There might be 'loopback' events already scheduled in the
+ * output buffers.
+ */
+ dsw_port_flush_out_buffers(dsw, source_port);
+
+ dsw_port_add_paused_flow(source_port,
+ source_port->migration_target_qf.queue_id,
+ source_port->migration_target_qf.flow_hash);
+
+ dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
+ source_port->migration_target_qf.queue_id,
+ source_port->migration_target_qf.flow_hash);
+ source_port->cfm_cnt = 0;
+}
+
+static void
+dsw_port_flush_paused_events(struct dsw_evdev *dsw,
+ struct dsw_port *source_port,
+ uint8_t queue_id, uint16_t paused_flow_hash);
+
+static void
+dsw_port_handle_unpause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
+ uint8_t originating_port_id, uint8_t queue_id,
+ uint16_t paused_flow_hash)
+{
+ struct dsw_ctl_msg cfm = {
+ .type = DSW_CTL_CFM,
+ .originating_port_id = port->id,
+ .queue_id = queue_id,
+ .flow_hash = paused_flow_hash
+ };
+
+ DSW_LOG_DP_PORT(DEBUG, port->id, "Un-pausing queue_id %d flow_hash %d.\n",
+ queue_id, paused_flow_hash);
+
+ dsw_port_remove_paused_flow(port, queue_id, paused_flow_hash);
+
+ rte_smp_rmb();
+
+ dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
+
+ dsw_port_flush_paused_events(dsw, port, queue_id, paused_flow_hash);
+}
+
+#define FORWARD_BURST_SIZE (32)
+
+static void
+dsw_port_forward_migrated_flow(struct dsw_port *source_port,
+ struct rte_event_ring *dest_ring,
+ uint8_t queue_id,
+ uint16_t flow_hash)
+{
+ uint16_t events_left;
+
+ /* Control ring message should been seen before the ring count
+ * is read on the port's in_ring.
+ */
+ rte_smp_rmb();
+
+ events_left = rte_event_ring_count(source_port->in_ring);
+
+ while (events_left > 0) {
+ uint16_t in_burst_size =
+ RTE_MIN(FORWARD_BURST_SIZE, events_left);
+ struct rte_event in_burst[in_burst_size];
+ uint16_t in_len;
+ uint16_t i;
+
+ in_len = rte_event_ring_dequeue_burst(source_port->in_ring,
+ in_burst,
+ in_burst_size, NULL);
+ /* No need to care about bursting forwarded events (to
+ * the destination port's in_ring), since migration
+ * doesn't happen very often, and also the majority of
+ * the dequeued events will likely *not* be forwarded.
+ */
+ for (i = 0; i < in_len; i++) {
+ struct rte_event *e = &in_burst[i];
+ if (e->queue_id == queue_id &&
+ dsw_flow_id_hash(e->flow_id) == flow_hash) {
+ while (rte_event_ring_enqueue_burst(dest_ring,
+ e, 1,
+ NULL) != 1)
+ rte_pause();
+ } else {
+ uint16_t last_idx = source_port->in_buffer_len;
+ source_port->in_buffer[last_idx] = *e;
+ source_port->in_buffer_len++;
+ }
+ }
+
+ events_left -= in_len;
+ }
+}
+
+static void
+dsw_port_move_migrating_flow(struct dsw_evdev *dsw,
+ struct dsw_port *source_port)
+{
+ uint8_t queue_id = source_port->migration_target_qf.queue_id;
+ uint16_t flow_hash = source_port->migration_target_qf.flow_hash;
+ uint8_t dest_port_id = source_port->migration_target_port_id;
+ struct dsw_port *dest_port = &dsw->ports[dest_port_id];
+
+ dsw_port_flush_out_buffers(dsw, source_port);
+
+ rte_smp_wmb();
+
+ dsw->queues[queue_id].flow_to_port_map[flow_hash] =
+ dest_port_id;
+
+ dsw_port_forward_migrated_flow(source_port, dest_port->in_ring,
+ queue_id, flow_hash);
+
+ /* Flow table update and migration destination port's enqueues
+ * must be seen before the control message.
+ */
+ rte_smp_wmb();
+
+ dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ, queue_id,
+ flow_hash);
+ source_port->cfm_cnt = 0;
+ source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
+}
+
+static void
+dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
+{
+ port->cfm_cnt++;
+
+ if (port->cfm_cnt == (dsw->num_ports-1)) {
+ switch (port->migration_state) {
+ case DSW_MIGRATION_STATE_PAUSING:
+ DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding "
+ "migration state.\n");
+ port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
+ break;
+ case DSW_MIGRATION_STATE_UNPAUSING:
+ dsw_port_end_migration(dsw, port);
+ break;
+ default:
+ RTE_ASSERT(0);
+ break;
+ }
+ }
+}
+
+static void
+dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
+{
+ struct dsw_ctl_msg msg;
+
+ /* So any table loads happens before the ring dequeue, in the
+ * case of a 'paus' message.
+ */
+ rte_smp_rmb();
+
+ if (dsw_port_ctl_dequeue(port, &msg) == 0) {
+ switch (msg.type) {
+ case DSW_CTL_PAUS_REQ:
+ dsw_port_handle_pause_flow(dsw, port,
+ msg.originating_port_id,
+ msg.queue_id, msg.flow_hash);
+ break;
+ case DSW_CTL_UNPAUS_REQ:
+ dsw_port_handle_unpause_flow(dsw, port,
+ msg.originating_port_id,
+ msg.queue_id,
+ msg.flow_hash);
+ break;
+ case DSW_CTL_CFM:
+ dsw_port_handle_confirm(dsw, port);
+ break;
+ }
+ }
+}
+
static void
dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
{
@@ -270,12 +933,24 @@ dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
port->ops_since_bg_task += (num_events+1);
}
-static void
-dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
-
static void
dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
{
+ if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&
+ port->pending_releases == 0))
+ dsw_port_move_migrating_flow(dsw, port);
+
+ /* Polling the control ring is relatively inexpensive, and
+ * polling it often helps bringing down migration latency, so
+ * do this for every iteration.
+ */
+ dsw_port_ctl_process(dsw, port);
+
+ /* To avoid considering migration and flushing output buffers
+ * on every dequeue/enqueue call, the scheduler only performs
+ * such 'background' tasks every nth
+ * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.
+ */
if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
uint64_t now;
@@ -290,6 +965,8 @@ dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
dsw_port_consider_load_update(port, now);
+ dsw_port_consider_migration(dsw, port, now);
+
port->ops_since_bg_task = 0;
}
}
@@ -339,7 +1016,6 @@ dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
*/
if (unlikely(events_len == 0)) {
dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
- dsw_port_flush_out_buffers(dsw, source_port);
return 0;
}
@@ -423,10 +1099,52 @@ dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
return dsw_event_dequeue_burst(port, events, 1, wait);
}
+static void
+dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
+ uint16_t num)
+{
+ uint16_t i;
+
+ for (i = 0; i < num; i++) {
+ uint16_t l_idx = port->seen_events_idx;
+ struct dsw_queue_flow *qf = &port->seen_events[l_idx];
+ struct rte_event *event = &events[i];
+ qf->queue_id = event->queue_id;
+ qf->flow_hash = dsw_flow_id_hash(event->flow_id);
+
+ port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
+ }
+
+ if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
+ port->seen_events_len =
+ RTE_MIN(port->seen_events_len + num,
+ DSW_MAX_EVENTS_RECORDED);
+}
+
static uint16_t
dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
uint16_t num)
{
+ struct dsw_port *source_port = port;
+ struct dsw_evdev *dsw = source_port->dsw;
+
+ dsw_port_ctl_process(dsw, source_port);
+
+ if (unlikely(port->in_buffer_len > 0)) {
+ uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);
+
+ rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
+ dequeued * sizeof(struct rte_event));
+
+ port->in_buffer_start += dequeued;
+ port->in_buffer_len -= dequeued;
+
+ if (port->in_buffer_len == 0)
+ port->in_buffer_start = 0;
+
+ return dequeued;
+ }
+
return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
}
@@ -458,6 +1176,15 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
dequeued);
dsw_port_return_credits(dsw, source_port, dequeued);
+
+ /* One potential optimization one might think of is to
+ * add a migration state (prior to 'pausing'), and
+ * only record seen events when the port is in this
+ * state (and transit to 'pausing' when enough events
+ * have been gathered). However, that schema doesn't
+ * seem to improve performance.
+ */
+ dsw_port_record_seen_events(port, events, dequeued);
}
/* XXX: Assuming the port can't produce any more work,
* consider flushing the output buffer, on dequeued ==
--
2.17.1
^ permalink raw reply [flat|nested] 19+ messages in thread
* [dpdk-dev] [PATCH v3 08/10] event/dsw: let DSW event device sort events on dequeue
2018-09-11 8:02 [dpdk-dev] [PATCH v3 00/10] A Distributed Software Event Device Mattias Rönnblom
` (6 preceding siblings ...)
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 07/10] event/dsw: add load balancing to the DSW event device Mattias Rönnblom
@ 2018-09-11 8:02 ` Mattias Rönnblom
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 09/10] event/dsw: implement eventdev 'xstats' counters in DSW Mattias Rönnblom
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 10/10] event/dsw: include DSW event device documentation Mattias Rönnblom
9 siblings, 0 replies; 19+ messages in thread
From: Mattias Rönnblom @ 2018-09-11 8:02 UTC (permalink / raw)
To: jerin.jacob; +Cc: bruce.richardson, dev, Mattias Rönnblom
With this patch, the DSW event device will (optionally) sort the event
burst before giving it to the application. The sorting will primarily
be on queue id, and secondary on flow id.
The sorting is an attempt to optimize data and instruction cache usage
for the application, at the cost of additional event device overhead.
Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
---
drivers/event/dsw/dsw_evdev.h | 11 ++++++++
drivers/event/dsw/dsw_event.c | 23 +++++++++++++++++
drivers/event/dsw/dsw_sort.h | 48 +++++++++++++++++++++++++++++++++++
3 files changed, 82 insertions(+)
create mode 100644 drivers/event/dsw/dsw_sort.h
diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
index 783c418bf..f6f8f0454 100644
--- a/drivers/event/dsw/dsw_evdev.h
+++ b/drivers/event/dsw/dsw_evdev.h
@@ -93,6 +93,17 @@
*/
#define DSW_CTL_IN_RING_SIZE ((DSW_MAX_PORTS-1)*4)
+/* With DSW_SORT_DEQUEUED enabled, the scheduler will, at the point of
+ * dequeue(), arrange events so that events with the same flow id on
+ * the same queue forms a back-to-back "burst", and also so that such
+ * bursts of different flow ids, but on the same queue, also come
+ * consecutively. All this in an attempt to improve data and
+ * instruction cache usage for the application, at the cost of a
+ * scheduler overhead increase.
+ */
+
+/* #define DSW_SORT_DEQUEUED */
+
struct dsw_queue_flow {
uint8_t queue_id;
uint16_t flow_hash;
diff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c
index f0347592d..a84b19c33 100644
--- a/drivers/event/dsw/dsw_event.c
+++ b/drivers/event/dsw/dsw_event.c
@@ -4,6 +4,10 @@
#include "dsw_evdev.h"
+#ifdef DSW_SORT_DEQUEUED
+#include "dsw_sort.h"
+#endif
+
#include <stdbool.h>
#include <string.h>
@@ -1121,6 +1125,21 @@ dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
DSW_MAX_EVENTS_RECORDED);
}
+#ifdef DSW_SORT_DEQUEUED
+
+#define DSW_EVENT_TO_INT(_event) \
+ ((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))
+
+static inline int
+dsw_cmp_event(const void *v_event_a, const void *v_event_b)
+{
+ const struct rte_event *event_a = v_event_a;
+ const struct rte_event *event_b = v_event_b;
+
+ return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);
+}
+#endif
+
static uint16_t
dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
uint16_t num)
@@ -1191,5 +1210,9 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
* 0.
*/
+#ifdef DSW_SORT_DEQUEUED
+ dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);
+#endif
+
return dequeued;
}
diff --git a/drivers/event/dsw/dsw_sort.h b/drivers/event/dsw/dsw_sort.h
new file mode 100644
index 000000000..609767fdf
--- /dev/null
+++ b/drivers/event/dsw/dsw_sort.h
@@ -0,0 +1,48 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2018 Ericsson AB
+ */
+
+#ifndef _DSW_SORT_
+#define _DSW_SORT_
+
+#include <string.h>
+
+#include <rte_common.h>
+
+#define DSW_ARY_ELEM_PTR(_ary, _idx, _elem_size) \
+ RTE_PTR_ADD(_ary, (_idx) * (_elem_size))
+
+#define DSW_ARY_ELEM_SWAP(_ary, _a_idx, _b_idx, _elem_size) \
+ do { \
+ char tmp[_elem_size]; \
+ void *_a_ptr = DSW_ARY_ELEM_PTR(_ary, _a_idx, _elem_size); \
+ void *_b_ptr = DSW_ARY_ELEM_PTR(_ary, _b_idx, _elem_size); \
+ memcpy(tmp, _a_ptr, _elem_size); \
+ memcpy(_a_ptr, _b_ptr, _elem_size); \
+ memcpy(_b_ptr, tmp, _elem_size); \
+ } while (0)
+
+static inline void
+dsw_insertion_sort(void *ary, uint16_t len, uint16_t elem_size,
+ int (*cmp_fn)(const void *, const void *))
+{
+ uint16_t i;
+
+ for (i = 1; i < len; i++) {
+ uint16_t j;
+ for (j = i; j > 0 &&
+ cmp_fn(DSW_ARY_ELEM_PTR(ary, j-1, elem_size),
+ DSW_ARY_ELEM_PTR(ary, j, elem_size)) > 0;
+ j--)
+ DSW_ARY_ELEM_SWAP(ary, j, j-1, elem_size);
+ }
+}
+
+static inline void
+dsw_stable_sort(void *ary, uint16_t len, uint16_t elem_size,
+ int (*cmp_fn)(const void *, const void *))
+{
+ dsw_insertion_sort(ary, len, elem_size, cmp_fn);
+}
+
+#endif
--
2.17.1
^ permalink raw reply [flat|nested] 19+ messages in thread
* [dpdk-dev] [PATCH v3 09/10] event/dsw: implement eventdev 'xstats' counters in DSW
2018-09-11 8:02 [dpdk-dev] [PATCH v3 00/10] A Distributed Software Event Device Mattias Rönnblom
` (7 preceding siblings ...)
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 08/10] event/dsw: let DSW event device sort events on dequeue Mattias Rönnblom
@ 2018-09-11 8:02 ` Mattias Rönnblom
2018-09-17 12:23 ` Jerin Jacob
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 10/10] event/dsw: include DSW event device documentation Mattias Rönnblom
9 siblings, 1 reply; 19+ messages in thread
From: Mattias Rönnblom @ 2018-09-11 8:02 UTC (permalink / raw)
To: jerin.jacob; +Cc: bruce.richardson, dev, Mattias Rönnblom
The DSW event device now implements the 'xstats' interface and a
number of port- and device-level counters.
Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
---
drivers/event/dsw/Makefile | 3 +-
drivers/event/dsw/dsw_evdev.c | 5 +-
drivers/event/dsw/dsw_evdev.h | 19 +++
drivers/event/dsw/dsw_event.c | 35 ++++
drivers/event/dsw/dsw_xstats.c | 288 +++++++++++++++++++++++++++++++++
drivers/event/dsw/meson.build | 2 +-
6 files changed, 349 insertions(+), 3 deletions(-)
create mode 100644 drivers/event/dsw/dsw_xstats.c
diff --git a/drivers/event/dsw/Makefile b/drivers/event/dsw/Makefile
index 6374a454e..ea1e5259a 100644
--- a/drivers/event/dsw/Makefile
+++ b/drivers/event/dsw/Makefile
@@ -21,6 +21,7 @@ LIBABIVER := 1
EXPORT_MAP := rte_pmd_dsw_event_version.map
-SRCS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw_evdev.c dsw_event.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += \
+ dsw_evdev.c dsw_event.c dsw_xstats.c
include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c
index 2ecb365ba..33ba13647 100644
--- a/drivers/event/dsw/dsw_evdev.c
+++ b/drivers/event/dsw/dsw_evdev.c
@@ -378,7 +378,10 @@ static struct rte_eventdev_ops dsw_evdev_ops = {
.dev_configure = dsw_configure,
.dev_start = dsw_start,
.dev_stop = dsw_stop,
- .dev_close = dsw_close
+ .dev_close = dsw_close,
+ .xstats_get = dsw_xstats_get,
+ .xstats_get_names = dsw_xstats_get_names,
+ .xstats_get_by_name = dsw_xstats_get_by_name
};
static int
diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
index f6f8f0454..dc28ab125 100644
--- a/drivers/event/dsw/dsw_evdev.h
+++ b/drivers/event/dsw/dsw_evdev.h
@@ -176,6 +176,14 @@ struct dsw_port {
uint16_t seen_events_idx;
struct dsw_queue_flow seen_events[DSW_MAX_EVENTS_RECORDED];
+ uint64_t new_enqueued;
+ uint64_t forward_enqueued;
+ uint64_t release_enqueued;
+ uint64_t queue_enqueued[DSW_MAX_QUEUES];
+
+ uint64_t dequeued;
+ uint64_t queue_dequeued[DSW_MAX_QUEUES];
+
uint16_t out_buffer_len[DSW_MAX_PORTS];
struct rte_event out_buffer[DSW_MAX_PORTS][DSW_MAX_PORT_OUT_BUFFER];
@@ -243,6 +251,17 @@ uint16_t dsw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait);
uint16_t dsw_event_dequeue_burst(void *port, struct rte_event *events,
uint16_t num, uint64_t wait);
+int dsw_xstats_get_names(const struct rte_eventdev *dev,
+ enum rte_event_dev_xstats_mode mode,
+ uint8_t queue_port_id,
+ struct rte_event_dev_xstats_name *xstats_names,
+ unsigned int *ids, unsigned int size);
+int dsw_xstats_get(const struct rte_eventdev *dev,
+ enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id,
+ const unsigned int ids[], uint64_t values[], unsigned int n);
+uint64_t dsw_xstats_get_by_name(const struct rte_eventdev *dev,
+ const char *name, unsigned int *id);
+
static inline struct dsw_evdev *
dsw_pmd_priv(const struct rte_eventdev *eventdev)
{
diff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c
index a84b19c33..61a66fabf 100644
--- a/drivers/event/dsw/dsw_event.c
+++ b/drivers/event/dsw/dsw_event.c
@@ -82,6 +82,33 @@ dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
}
}
+static void
+dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new,
+ uint16_t num_forward, uint16_t num_release)
+{
+ port->new_enqueued += num_new;
+ port->forward_enqueued += num_forward;
+ port->release_enqueued += num_release;
+}
+
+static void
+dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id)
+{
+ source_port->queue_enqueued[queue_id]++;
+}
+
+static void
+dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num)
+{
+ port->dequeued += num;
+}
+
+static void
+dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id)
+{
+ source_port->queue_dequeued[queue_id]++;
+}
+
static void
dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
{
@@ -1059,12 +1086,16 @@ dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
source_port->pending_releases -= num_release;
+ dsw_port_enqueue_stats(source_port, num_new,
+ num_non_release-num_new, num_release);
+
for (i = 0; i < events_len; i++) {
const struct rte_event *event = &events[i];
if (likely(num_release == 0 ||
event->op != RTE_EVENT_OP_RELEASE))
dsw_port_buffer_event(dsw, source_port, event);
+ dsw_port_queue_enqueue_stats(source_port, event->queue_id);
}
DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
@@ -1109,6 +1140,8 @@ dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
{
uint16_t i;
+ dsw_port_dequeue_stats(port, num);
+
for (i = 0; i < num; i++) {
uint16_t l_idx = port->seen_events_idx;
struct dsw_queue_flow *qf = &port->seen_events[l_idx];
@@ -1117,6 +1150,8 @@ dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
qf->flow_hash = dsw_flow_id_hash(event->flow_id);
port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
+
+ dsw_port_queue_dequeued_stats(port, event->queue_id);
}
if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
diff --git a/drivers/event/dsw/dsw_xstats.c b/drivers/event/dsw/dsw_xstats.c
new file mode 100644
index 000000000..bf2eec527
--- /dev/null
+++ b/drivers/event/dsw/dsw_xstats.c
@@ -0,0 +1,288 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2018 Ericsson AB
+ */
+
+#include "dsw_evdev.h"
+
+#include <stdbool.h>
+#include <string.h>
+
+#include <rte_debug.h>
+
+/* The high bits in the xstats id is used to store an additional
+ * parameter (beyond the queue or port id already in the xstats
+ * interface).
+ */
+#define DSW_XSTATS_ID_PARAM_BITS (8)
+#define DSW_XSTATS_ID_STAT_BITS \
+ (sizeof(unsigned int)*CHAR_BIT - DSW_XSTATS_ID_PARAM_BITS)
+#define DSW_XSTATS_ID_STAT_MASK ((1 << DSW_XSTATS_ID_STAT_BITS) - 1)
+
+#define DSW_XSTATS_ID_GET_PARAM(id) \
+ ((id)>>DSW_XSTATS_ID_STAT_BITS)
+
+#define DSW_XSTATS_ID_GET_STAT(id) \
+ ((id) & DSW_XSTATS_ID_STAT_MASK)
+
+#define DSW_XSTATS_ID_CREATE(id, param_value) \
+ (((param_value) << DSW_XSTATS_ID_STAT_BITS) | id)
+
+typedef
+uint64_t (*dsw_xstats_dev_get_value_fn)(struct dsw_evdev *dsw);
+
+struct dsw_xstat_dev {
+ const char *name;
+ dsw_xstats_dev_get_value_fn get_value_fn;
+};
+
+typedef
+uint64_t (*dsw_xstats_port_get_value_fn)(struct dsw_evdev *dsw,
+ uint8_t port_id, uint8_t queue_id);
+
+struct dsw_xstats_port {
+ const char *name_fmt;
+ dsw_xstats_port_get_value_fn get_value_fn;
+ bool per_queue;
+};
+
+static uint64_t
+dsw_xstats_dev_credits_on_loan(struct dsw_evdev *dsw)
+{
+ return rte_atomic32_read(&dsw->credits_on_loan);
+}
+
+static struct dsw_xstat_dev dsw_dev_xstats[] = {
+ { "dev_credits_on_loan", dsw_xstats_dev_credits_on_loan }
+};
+
+#define DSW_GEN_PORT_ACCESS_FN(_variable) \
+ static uint64_t \
+ dsw_xstats_port_get_ ## _variable(struct dsw_evdev *dsw, \
+ uint8_t port_id, \
+ uint8_t queue_id __rte_unused) \
+ { \
+ return dsw->ports[port_id]._variable; \
+ }
+
+DSW_GEN_PORT_ACCESS_FN(new_enqueued)
+DSW_GEN_PORT_ACCESS_FN(forward_enqueued)
+DSW_GEN_PORT_ACCESS_FN(release_enqueued)
+
+static uint64_t
+dsw_xstats_port_get_queue_enqueued(struct dsw_evdev *dsw, uint8_t port_id,
+ uint8_t queue_id)
+{
+ return dsw->ports[port_id].queue_enqueued[queue_id];
+}
+
+DSW_GEN_PORT_ACCESS_FN(dequeued)
+
+static uint64_t
+dsw_xstats_port_get_queue_dequeued(struct dsw_evdev *dsw, uint8_t port_id,
+ uint8_t queue_id)
+{
+ return dsw->ports[port_id].queue_dequeued[queue_id];
+}
+
+DSW_GEN_PORT_ACCESS_FN(migrations)
+
+static uint64_t
+dsw_xstats_port_get_migration_latency(struct dsw_evdev *dsw, uint8_t port_id,
+ uint8_t queue_id __rte_unused)
+{
+ uint64_t total_latency = dsw->ports[port_id].migration_latency;
+ uint64_t num_migrations = dsw->ports[port_id].migrations;
+
+ return num_migrations > 0 ? total_latency / num_migrations : 0;
+}
+
+static uint64_t
+dsw_xstats_port_get_event_proc_latency(struct dsw_evdev *dsw, uint8_t port_id,
+ uint8_t queue_id __rte_unused)
+{
+ uint64_t total_busy_cycles =
+ dsw->ports[port_id].total_busy_cycles;
+ uint64_t dequeued =
+ dsw->ports[port_id].dequeued;
+
+ return dequeued > 0 ? total_busy_cycles / dequeued : 0;
+}
+
+DSW_GEN_PORT_ACCESS_FN(inflight_credits)
+
+static uint64_t
+dsw_xstats_port_get_load(struct dsw_evdev *dsw, uint8_t port_id,
+ uint8_t queue_id __rte_unused)
+{
+ int16_t load;
+
+ load = rte_atomic16_read(&dsw->ports[port_id].load);
+
+ return DSW_LOAD_TO_PERCENT(load);
+}
+
+DSW_GEN_PORT_ACCESS_FN(last_bg)
+
+static struct dsw_xstats_port dsw_port_xstats[] = {
+ { "port_%u_new_enqueued", dsw_xstats_port_get_new_enqueued,
+ false },
+ { "port_%u_forward_enqueued", dsw_xstats_port_get_forward_enqueued,
+ false },
+ { "port_%u_release_enqueued", dsw_xstats_port_get_release_enqueued,
+ false },
+ { "port_%u_queue_%u_enqueued", dsw_xstats_port_get_queue_enqueued,
+ true },
+ { "port_%u_dequeued", dsw_xstats_port_get_dequeued,
+ false },
+ { "port_%u_queue_%u_dequeued", dsw_xstats_port_get_queue_dequeued,
+ true },
+ { "port_%u_migrations", dsw_xstats_port_get_migrations,
+ false },
+ { "port_%u_migration_latency", dsw_xstats_port_get_migration_latency,
+ false },
+ { "port_%u_event_proc_latency", dsw_xstats_port_get_event_proc_latency,
+ false },
+ { "port_%u_inflight_credits", dsw_xstats_port_get_inflight_credits,
+ false },
+ { "port_%u_load", dsw_xstats_port_get_load,
+ false },
+ { "port_%u_last_bg", dsw_xstats_port_get_last_bg,
+ false }
+};
+
+static int
+dsw_xstats_dev_get_names(struct rte_event_dev_xstats_name *xstats_names,
+ unsigned int *ids, unsigned int size)
+{
+ unsigned int i;
+
+ for (i = 0; i < RTE_DIM(dsw_dev_xstats) && i < size; i++) {
+ ids[i] = i;
+ strcpy(xstats_names[i].name, dsw_dev_xstats[i].name);
+ }
+
+ return i;
+}
+
+static int
+dsw_xstats_port_get_names(struct dsw_evdev *dsw, uint8_t port_id,
+ struct rte_event_dev_xstats_name *xstats_names,
+ unsigned int *ids, unsigned int size)
+{
+ uint8_t queue_id = 0;
+ unsigned int id_idx;
+ unsigned int stat_idx;
+
+ for (id_idx = 0, stat_idx = 0;
+ id_idx < size && stat_idx < RTE_DIM(dsw_port_xstats);
+ id_idx++) {
+ struct dsw_xstats_port *xstat = &dsw_port_xstats[stat_idx];
+
+ if (xstat->per_queue) {
+ ids[id_idx] = DSW_XSTATS_ID_CREATE(stat_idx, queue_id);
+ snprintf(xstats_names[id_idx].name,
+ RTE_EVENT_DEV_XSTATS_NAME_SIZE,
+ dsw_port_xstats[stat_idx].name_fmt, port_id,
+ queue_id);
+ queue_id++;
+ } else {
+ ids[id_idx] = stat_idx;
+ snprintf(xstats_names[id_idx].name,
+ RTE_EVENT_DEV_XSTATS_NAME_SIZE,
+ dsw_port_xstats[stat_idx].name_fmt, port_id);
+ }
+
+ if (!(xstat->per_queue && queue_id < dsw->num_queues)) {
+ stat_idx++;
+ queue_id = 0;
+ }
+ }
+ return id_idx;
+}
+
+int
+dsw_xstats_get_names(const struct rte_eventdev *dev,
+ enum rte_event_dev_xstats_mode mode,
+ uint8_t queue_port_id,
+ struct rte_event_dev_xstats_name *xstats_names,
+ unsigned int *ids, unsigned int size)
+{
+ struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+
+ switch (mode) {
+ case RTE_EVENT_DEV_XSTATS_DEVICE:
+ return dsw_xstats_dev_get_names(xstats_names, ids, size);
+ case RTE_EVENT_DEV_XSTATS_PORT:
+ return dsw_xstats_port_get_names(dsw, queue_port_id,
+ xstats_names, ids, size);
+ case RTE_EVENT_DEV_XSTATS_QUEUE:
+ return 0;
+ default:
+ RTE_ASSERT(false);
+ return -1;
+ }
+}
+
+static int
+dsw_xstats_dev_get(const struct rte_eventdev *dev,
+ const unsigned int ids[], uint64_t values[], unsigned int n)
+{
+ struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+ unsigned int i;
+
+ for (i = 0; i < n; i++) {
+ unsigned int id = ids[i];
+ struct dsw_xstat_dev *xstat = &dsw_dev_xstats[id];
+ values[i] = xstat->get_value_fn(dsw);
+ }
+ return n;
+}
+
+static int
+dsw_xstats_port_get(const struct rte_eventdev *dev, uint8_t port_id,
+ const unsigned int ids[], uint64_t values[], unsigned int n)
+{
+ struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+ unsigned int i;
+
+ for (i = 0; i < n; i++) {
+ unsigned int id = ids[i];
+ unsigned int stat_idx = DSW_XSTATS_ID_GET_STAT(id);
+ struct dsw_xstats_port *xstat = &dsw_port_xstats[stat_idx];
+ uint8_t queue_id = 0;
+
+ if (xstat->per_queue)
+ queue_id = DSW_XSTATS_ID_GET_PARAM(id);
+
+ values[i] = xstat->get_value_fn(dsw, port_id, queue_id);
+ }
+ return n;
+}
+
+int
+dsw_xstats_get(const struct rte_eventdev *dev,
+ enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id,
+ const unsigned int ids[], uint64_t values[], unsigned int n)
+{
+ switch (mode) {
+ case RTE_EVENT_DEV_XSTATS_DEVICE:
+ return dsw_xstats_dev_get(dev, ids, values, n);
+ case RTE_EVENT_DEV_XSTATS_PORT:
+ return dsw_xstats_port_get(dev, queue_port_id, ids, values, n);
+ case RTE_EVENT_DEV_XSTATS_QUEUE:
+ return 0;
+ default:
+ RTE_ASSERT(false);
+ return -1;
+ }
+ return 0;
+}
+
+uint64_t dsw_xstats_get_by_name(const struct rte_eventdev *dev,
+ const char *name, unsigned int *id)
+{
+ RTE_SET_USED(dev);
+ RTE_SET_USED(name);
+ RTE_SET_USED(id);
+ return 0;
+}
diff --git a/drivers/event/dsw/meson.build b/drivers/event/dsw/meson.build
index bd2e4c809..a6b7bfa59 100644
--- a/drivers/event/dsw/meson.build
+++ b/drivers/event/dsw/meson.build
@@ -3,4 +3,4 @@
allow_experimental_apis = true
deps += ['bus_vdev']
-sources = files('dsw_evdev.c', 'dsw_event.c')
+sources = files('dsw_evdev.c', 'dsw_event.c', 'dsw_xstats.c')
--
2.17.1
^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [dpdk-dev] [PATCH v3 09/10] event/dsw: implement eventdev 'xstats' counters in DSW
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 09/10] event/dsw: implement eventdev 'xstats' counters in DSW Mattias Rönnblom
@ 2018-09-17 12:23 ` Jerin Jacob
0 siblings, 0 replies; 19+ messages in thread
From: Jerin Jacob @ 2018-09-17 12:23 UTC (permalink / raw)
To: Mattias Rönnblom; +Cc: bruce.richardson, dev
-----Original Message-----
> Date: Tue, 11 Sep 2018 10:02:15 +0200
> From: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
> To: jerin.jacob@caviumnetworks.com
> CC: bruce.richardson@intel.com, dev@dpdk.org, Mattias Rönnblom
> <mattias.ronnblom@ericsson.com>
> Subject: [PATCH v3 09/10] event/dsw: implement eventdev 'xstats' counters
> in DSW
> X-Mailer: git-send-email 2.17.1
>
> External Email
>
> The DSW event device now implements the 'xstats' interface and a
> number of port- and device-level counters.
>
> Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
Acked-by: Jerin Jacob <jerin.jacob@caviumnetworks.com>
^ permalink raw reply [flat|nested] 19+ messages in thread
* [dpdk-dev] [PATCH v3 10/10] event/dsw: include DSW event device documentation
2018-09-11 8:02 [dpdk-dev] [PATCH v3 00/10] A Distributed Software Event Device Mattias Rönnblom
` (8 preceding siblings ...)
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 09/10] event/dsw: implement eventdev 'xstats' counters in DSW Mattias Rönnblom
@ 2018-09-11 8:02 ` Mattias Rönnblom
2018-09-17 12:29 ` Jerin Jacob
9 siblings, 1 reply; 19+ messages in thread
From: Mattias Rönnblom @ 2018-09-11 8:02 UTC (permalink / raw)
To: jerin.jacob; +Cc: bruce.richardson, dev, Mattias Rönnblom
The DSW event device is documented in DPDK Programmer's Guide.
Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
---
doc/guides/eventdevs/dsw.rst | 97 ++++++++++++++++++++++++++++++++++
doc/guides/eventdevs/index.rst | 1 +
2 files changed, 98 insertions(+)
create mode 100644 doc/guides/eventdevs/dsw.rst
diff --git a/doc/guides/eventdevs/dsw.rst b/doc/guides/eventdevs/dsw.rst
new file mode 100644
index 000000000..de41ae9d3
--- /dev/null
+++ b/doc/guides/eventdevs/dsw.rst
@@ -0,0 +1,97 @@
+.. SPDX-License-Identifier: BSD-3-Clause
+ Copyright(c) 2017 Intel Corporation.
+ Copyright(c) 2018 Ericsson AB
+
+Distributed Software Eventdev Poll Mode Driver
+==============================================
+
+The distributed software eventdev is a parallel implementation of the
+eventdev API, which distributes the task of scheduling events among
+all the eventdev ports and the lcore threads using them.
+
+Features
+--------
+
+Queues
+ * Atomic
+ * Parallel
+ * Single-Link
+
+Ports
+ * Load balanced (for Atomic, Ordered, Parallel queues)
+ * Single Link (for single-link queues)
+
+Configuration and Options
+-------------------------
+
+The distributed software eventdev is a vdev device, and as such can be
+created from the application code, or from the EAL command line:
+
+* Call ``rte_vdev_init("event_dsw0")`` from the application
+
+* Use ``--vdev="event_dsw0"`` in the EAL options, which will call
+ rte_vdev_init() internally
+
+Example:
+
+.. code-block:: console
+
+ ./your_eventdev_application --vdev="event_dsw0"
+
+Limitations
+-----------
+
+Unattended Ports
+~~~~~~~~~~~~~~~~
+
+The distributed software eventdev uses an internal signaling schema
+between the ports to achieve load balancing. In order for this to
+work, the application must perform enqueue and/or dequeue operations
+on all ports.
+
+Producer-only ports which currently have no events to enqueue should
+periodically call rte_event_enqueue_burst() with a zero-sized burst.
+
+Ports left unattended for longer periods of time will prevent load
+balancing, and also cause traffic interruptions on the flows which
+are in the process of being migrated.
+
+Output Buffering
+~~~~~~~~~~~~~~~~
+
+For efficiency reasons, the distributed software eventdev might not
+send enqueued events immediately to the destination port, but instead
+store them in an internal buffer in the source port.
+
+In case no more events are enqueued on a port with buffered events,
+these events will be sent after the application has performed a number
+of enqueue and/or dequeue operations.
+
+For explicit flushing, an application may call
+rte_event_enqueue_burst() with a zero-sized burst.
+
+
+Priorities
+~~~~~~~~~~
+
+The distributed software eventdev does not support event priorities.
+
+Ordered Queues
+~~~~~~~~~~~~~~
+
+The distributed software eventdev does not support the ordered queue type.
+
+
+"All Types" Queues
+~~~~~~~~~~~~~~~~~~
+
+The distributed software eventdev does not support queues of type
+RTE_EVENT_QUEUE_CFG_ALL_TYPES, which allow both atomic, ordered, and
+parallel events on the same queue.
+
+Dynamic Link/Unlink
+~~~~~~~~~~~~~~~~~~~
+
+The distributed software eventdev does not support calls to
+rte_event_port_link() or rte_event_port_unlink() after
+rte_event_dev_start() has been called.
diff --git a/doc/guides/eventdevs/index.rst b/doc/guides/eventdevs/index.rst
index 18ec8e462..984eea5f4 100644
--- a/doc/guides/eventdevs/index.rst
+++ b/doc/guides/eventdevs/index.rst
@@ -14,5 +14,6 @@ application trough the eventdev API.
dpaa
dpaa2
sw
+ dsw
octeontx
opdl
--
2.17.1
^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [dpdk-dev] [PATCH v3 10/10] event/dsw: include DSW event device documentation
2018-09-11 8:02 ` [dpdk-dev] [PATCH v3 10/10] event/dsw: include DSW event device documentation Mattias Rönnblom
@ 2018-09-17 12:29 ` Jerin Jacob
2018-09-17 19:17 ` Mattias Rönnblom
0 siblings, 1 reply; 19+ messages in thread
From: Jerin Jacob @ 2018-09-17 12:29 UTC (permalink / raw)
To: Mattias Rönnblom; +Cc: bruce.richardson, dev
-----Original Message-----
> Date: Tue, 11 Sep 2018 10:02:16 +0200
> From: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
> To: jerin.jacob@caviumnetworks.com
> CC: bruce.richardson@intel.com, dev@dpdk.org, Mattias Rönnblom
> <mattias.ronnblom@ericsson.com>
> Subject: [PATCH v3 10/10] event/dsw: include DSW event device documentation
> X-Mailer: git-send-email 2.17.1
>
>
> The DSW event device is documented in DPDK Programmer's Guide.
>
> Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
> ---
> doc/guides/eventdevs/dsw.rst | 97 ++++++++++++++++++++++++++++++++++
> doc/guides/eventdevs/index.rst | 1 +
> 2 files changed, 98 insertions(+)
> create mode 100644 doc/guides/eventdevs/dsw.rst
>
> diff --git a/doc/guides/eventdevs/dsw.rst b/doc/guides/eventdevs/dsw.rst
> new file mode 100644
> index 000000000..de41ae9d3
> --- /dev/null
> +++ b/doc/guides/eventdevs/dsw.rst
> @@ -0,0 +1,97 @@
> +.. SPDX-License-Identifier: BSD-3-Clause
> + Copyright(c) 2017 Intel Corporation.
Is adding Intel copyright by intention?
> + Copyright(c) 2018 Ericsson AB
> +
> +Distributed Software Eventdev Poll Mode Driver
> +==============================================
> +
> +The distributed software eventdev is a parallel implementation of the
s/parallel/driver??
> +eventdev API, which distributes the task of scheduling events among
> +all the eventdev ports and the lcore threads using them.
> +
Please update MAINTAINERS and doc/guides/rel_notes/release_18_11.rst
files.
With above changes:
Acked-by: Jerin Jacob <jerin.jacob@caviumnetworks.com>
if there are no other review comments then I will push next revision v4 to next-eventdev.
Thanks,
Jerin
^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [dpdk-dev] [PATCH v3 10/10] event/dsw: include DSW event device documentation
2018-09-17 12:29 ` Jerin Jacob
@ 2018-09-17 19:17 ` Mattias Rönnblom
2018-09-18 5:28 ` Jerin Jacob
0 siblings, 1 reply; 19+ messages in thread
From: Mattias Rönnblom @ 2018-09-17 19:17 UTC (permalink / raw)
To: Jerin Jacob; +Cc: bruce.richardson, dev
On 2018-09-17 14:29, Jerin Jacob wrote:
>> diff --git a/doc/guides/eventdevs/dsw.rst b/doc/guides/eventdevs/dsw.rst
>> new file mode 100644
>> index 000000000..de41ae9d3
>> --- /dev/null
>> +++ b/doc/guides/eventdevs/dsw.rst
>> @@ -0,0 +1,97 @@
>> +.. SPDX-License-Identifier: BSD-3-Clause
>> + Copyright(c) 2017 Intel Corporation.
>
> Is adding Intel copyright by intention?
>
dsw.rst is a derived work of sw.rst, so I figured I should leave the
Intel copyright. But maybe that's not needed?
>> + Copyright(c) 2018 Ericsson AB
>> +
>> +Distributed Software Eventdev Poll Mode Driver
>> +==============================================
>> +
>> +The distributed software eventdev is a parallel implementation of the
>
> s/parallel/driver??
>
I'm not sure I follow. What do you mean?
"Parallel" here is as in executing on multiple CPU cores.
I could always add "driver".
Regards,
Mattias
^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [dpdk-dev] [PATCH v3 10/10] event/dsw: include DSW event device documentation
2018-09-17 19:17 ` Mattias Rönnblom
@ 2018-09-18 5:28 ` Jerin Jacob
0 siblings, 0 replies; 19+ messages in thread
From: Jerin Jacob @ 2018-09-18 5:28 UTC (permalink / raw)
To: Mattias Rönnblom; +Cc: bruce.richardson, dev
-----Original Message-----
> Date: Mon, 17 Sep 2018 21:17:34 +0200
> From: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
> To: Jerin Jacob <jerin.jacob@caviumnetworks.com>
> CC: bruce.richardson@intel.com, dev@dpdk.org
> Subject: Re: [dpdk-dev] [PATCH v3 10/10] event/dsw: include DSW event
> device documentation
> User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101
> Thunderbird/52.9.1
>
> External Email
>
> On 2018-09-17 14:29, Jerin Jacob wrote:
>
> > > diff --git a/doc/guides/eventdevs/dsw.rst b/doc/guides/eventdevs/dsw.rst
> > > new file mode 100644
> > > index 000000000..de41ae9d3
> > > --- /dev/null
> > > +++ b/doc/guides/eventdevs/dsw.rst
> > > @@ -0,0 +1,97 @@
> > > +.. SPDX-License-Identifier: BSD-3-Clause
> > > + Copyright(c) 2017 Intel Corporation.
> >
> > Is adding Intel copyright by intention?
> >
>
> dsw.rst is a derived work of sw.rst, so I figured I should leave the
> Intel copyright. But maybe that's not needed?
Looks so.
>
> > > + Copyright(c) 2018 Ericsson AB
> > > +
> > > +Distributed Software Eventdev Poll Mode Driver
> > > +==============================================
> > > +
> > > +The distributed software eventdev is a parallel implementation of the
> >
> > s/parallel/driver??
> >
>
> I'm not sure I follow. What do you mean?
>
> "Parallel" here is as in executing on multiple CPU cores.
I read "Parallel" as "alternative" on that context. So I think you can
elaborate a bit, like ending sentence "where scheduler is executing on
multiple CPU cores" or something similar what you like to add more
clarity.
>
> I could always add "driver".
OK.
>
> Regards,
> Mattias
^ permalink raw reply [flat|nested] 19+ messages in thread