DPDK patches and discussions
 help / color / mirror / Atom feed
* Re: [dpdk-dev] [RFC 1/1] eventdev: add distributed software (DSW) event device
       [not found] ` <20180711210844.5467-2-mattias.ronnblom@ericsson.com>
@ 2018-07-22 11:32   ` Jerin Jacob
  2018-08-19  6:11     ` Jerin Jacob
  2018-08-27  9:23     ` Mattias Rönnblom
  0 siblings, 2 replies; 9+ messages in thread
From: Jerin Jacob @ 2018-07-22 11:32 UTC (permalink / raw)
  To: Mattias Rönnblom; +Cc: dev, bruce.richardson

-----Original Message-----
> Date: Wed, 11 Jul 2018 23:08:44 +0200
> From: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
> To: dev@dpdk.org
> CC: jerin.jacob@caviumnetworks.com, bruce.richardson@intel.com, Mattias
>  Rönnblom <mattias.ronnblom@ericsson.com>
> Subject: [RFC 1/1] eventdev: add distributed software (DSW) event device
> X-Mailer: git-send-email 2.17.1
> 
> 
> Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>

Thanks Mattias for a yet another eventdev driver.
Since it can work as distributed scheduler, it is complementary to existing SW scheduler.

A few comments:
1) There is compilation error in my setup
/export/dpdk-next-eventdev/drivers/event/dsw/dsw_event.c: In function
‘dsw_port_consider_migration’:
/export/dpdk-next-eventdev/drivers/event/dsw/dsw_event.c:346:39: error:
‘current_burst’ may be used uninitialized in this function
[-Werror=maybe-uninitialized]
  ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))

2) Please split the patch as more logical one.s You see the git history
of driver/event/sw to get idea on the patch split.

3) Please update the documentation @ doc/guides/eventdevs/

> ---
>  config/common_base                            |    5 +
>  drivers/event/Makefile                        |    1 +

meson build support is missing.

>  drivers/event/dsw/Makefile                    |   28 +
>  drivers/event/dsw/dsw_evdev.c                 |  361 +++++
>  drivers/event/dsw/dsw_evdev.h                 |  296 ++++
>  drivers/event/dsw/dsw_event.c                 | 1285 +++++++++++++++++
>  drivers/event/dsw/dsw_sort.h                  |   47 +
>  drivers/event/dsw/dsw_xstats.c                |  284 ++++
>  .../event/dsw/rte_pmd_evdev_dsw_version.map   |    3 +
>  mk/rte.app.mk                                 |    1 +
>  10 files changed, 2311 insertions(+)
> index 000000000..39008f7eb
> --- /dev/null
> +++ b/drivers/event/dsw/dsw_evdev.c
> @@ -0,0 +1,361 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2018 Ericsson AB
> + */
> +
> +#include <rte_eventdev_pmd.h>
> +#include <rte_eventdev_pmd_vdev.h>
> +#include <rte_cycles.h>
> +#include <rte_random.h>

sort it in alphabetical order.

> +
> +#include "dsw_evdev.h"
> +
> +static int
> +dsw_start(struct rte_eventdev *dev)
> +{
> +       struct dsw_evdev *dsw = dsw_pmd_priv(dev);
> +       uint16_t i;
> +       uint64_t now;
> +
> +       rte_atomic32_init(&dsw->credits_on_loan);
> +
> +       initial_flow_to_port_assignment(dsw);
> +
> +       now = rte_get_timer_cycles();
> +       for (i = 0; i < dsw->num_ports; i++) {
> +               dsw->ports[i].measurement_start = now;
> +               dsw->ports[i].busy_start = now;
> +       }
> +
> +       return 0;
> +}
> +
> +static void
> +dsw_stop(struct rte_eventdev *dev __rte_unused)
> +{

You may implement, eventdev_stop_flush_t callback to free up the
outstanding events in the eventdev.


> +}
> +
> +static int
> +dsw_close(struct rte_eventdev *dev)
> +{
> +       struct dsw_evdev *dsw = dsw_pmd_priv(dev);
> +
> +       dsw->num_ports = 0;
> +       dsw->num_queues = 0;
> +
> +       return 0;
> +}
> +
> +static struct rte_eventdev_ops dsw_evdev_ops = {
> +       .dev_infos_get = dsw_info_get,
> +       .dev_configure = dsw_configure,
> +       .dev_start = dsw_start,
> +       .dev_stop = dsw_stop,
> +       .dev_close = dsw_close,
> +       .port_setup = dsw_port_setup,
> +       .port_def_conf = dsw_port_def_conf,
> +       .port_release = dsw_port_release,
> +       .queue_setup = dsw_queue_setup,
> +       .queue_def_conf = dsw_queue_def_conf,
> +       .queue_release = dsw_queue_release,
> +       .port_link = dsw_port_link,
> +       .port_unlink = dsw_port_unlink,
> +#ifdef DSW_XSTATS

Instead of creating a lot of "DSW_XSTATS" defines in this file, How about
creating NOP version of xstats functions when ifndef DSW_XSTATS as
different file? So that #ifdef clutter will go way.


> +       .xstats_get = dsw_xstats_get,
> +       .xstats_get_names = dsw_xstats_get_names,
> +       .xstats_get_by_name = dsw_xstats_get_by_name
> +#endif
> +};
> +
> +static int
> +dsw_probe(struct rte_vdev_device *vdev)
> +{
> +       const char *name;
> +       struct rte_eventdev *dev;
> +       struct dsw_evdev *dsw;
> +
> +       name = rte_vdev_device_name(vdev);
> +

Please add secondary process check. See SW driver.

> +RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_DSW_PMD, evdev_dsw_pmd_drv);
> diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
> new file mode 100644
> index 000000000..e6b34c013
> --- /dev/null
> +++ b/drivers/event/dsw/dsw_evdev.h
> @@ -0,0 +1,296 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2018 Ericsson AB
> + */
> +
> +#ifndef _DSW_EVDEV_H_
> +#define _DSW_EVDEV_H_
> +
> +#include <inttypes.h>
> +#include <stdbool.h>
> +#include <rte_eventdev.h>
> +#include <rte_event_ring.h>
> +#include <rte_ring.h>
> +#include <rte_event_ring.h>
> +#include <rte_config.h>

sort it in alphabetical order.

> +
> +#define DSW_PMD_NAME RTE_STR(event_dsw)
> +
> +
> +/* Only one outstanding migration per port is allowed */
> +#define DSW_MAX_PAUSED_FLOWS (DSW_MAX_PORTS)
> +
> +/* Enough room for paus request/confirm and unpaus request/confirm for
> + * all possible senders.
> + */
> +#define DSW_CTL_IN_RING_SIZE ((DSW_MAX_PORTS-1)*4)
> +
> +/* Statistics is configurable at this point, mostly to make it easy to
> + * measure their performance impact.
> + */
> +#define DSW_XSTATS

It can be moved to main config area.

> +
> +/* With DSW_SORT_DEQUEUED enabled, the scheduler will, at the point of
> + * dequeue(), arrange events so that events with the same flow id on
> + * the same queue forms a back-to-back "burst", and also so that such
> + * bursts of different flow ids, but on the same queue, also come
> + * consecutively. All this in an attempt to improve data and
> + * instruction cache usage for the application, at the cost of a
> + * scheduler overhead increase.
> + */
> +
> +//#define DSW_SORT_DEQUEUED

Checkpatch will complain about cpp style of comments.

> +
> +struct dsw_queue_flow {
> +       uint8_t queue_id;
> +       uint16_t flow_hash;
> +};
> +
> +enum dsw_migration_state {
> +       DSW_MIGRATION_STATE_IDLE = 0,

zero assignment is unnecessary.

> +       DSW_MIGRATION_STATE_PAUSING,
> +       DSW_MIGRATION_STATE_FORWARDING,
> +       DSW_MIGRATION_STATE_UNPAUSING
> +};
> +
> +
> +       if (!dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
> +                                        port_loads,
> +                                        DSW_MIN_SOURCE_LOAD_FOR_MIGRATION,
> +                                        &source_port->migration_target_qf,
> +                                        &source_port->migration_target_port_id)
> +           &&
> +           !dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
> +                                        port_loads,
> +                                        DSW_MAX_TARGET_LOAD_FOR_MIGRATION,
> +                                        &source_port->migration_target_qf,
> +                                      &source_port->migration_target_port_id))
> +               return;
> +
> +       DSW_LOG_DP_PORT(DEBUG, source_port->id, "Migrating queue_id %d "
> +                       "flow_hash %d from port %d to port %d.\n",
> +                       source_port->migration_target_qf.queue_id,
> +                       source_port->migration_target_qf.flow_hash,
> +                       source_port->id, source_port->migration_target_port_id);
> +#if 0

Spotted #if 0

> +#endif
> diff --git a/drivers/event/dsw/rte_pmd_evdev_dsw_version.map b/drivers/event/dsw/rte_pmd_evdev_dsw_version.map
> new file mode 100644
> index 000000000..ad6e191e4
> --- /dev/null
> +++ b/drivers/event/dsw/rte_pmd_evdev_dsw_version.map
> @@ -0,0 +1,3 @@
> +DPDK_18.08 {

18.11

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [dpdk-dev] [RFC 1/1] eventdev: add distributed software (DSW) event device
  2018-07-22 11:32   ` [dpdk-dev] [RFC 1/1] eventdev: add distributed software (DSW) event device Jerin Jacob
@ 2018-08-19  6:11     ` Jerin Jacob
  2018-08-23 13:08       ` Mattias Rönnblom
  2018-08-27  9:23     ` Mattias Rönnblom
  1 sibling, 1 reply; 9+ messages in thread
From: Jerin Jacob @ 2018-08-19  6:11 UTC (permalink / raw)
  To: Mattias Rönnblom; +Cc: dev, bruce.richardson

-----Original Message-----
> Date: Sun, 22 Jul 2018 17:02:54 +0530
> From: Jerin Jacob <jerin.jacob@caviumnetworks.com>
> To: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
> Cc: dev@dpdk.org, bruce.richardson@intel.com
> Subject: Re: [RFC 1/1] eventdev: add distributed software (DSW) event device
> User-Agent: Mutt/1.10.1 (2018-07-13)
> 
> -----Original Message-----
> > Date: Wed, 11 Jul 2018 23:08:44 +0200
> > From: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
> > To: dev@dpdk.org
> > CC: jerin.jacob@caviumnetworks.com, bruce.richardson@intel.com, Mattias
> >  Rönnblom <mattias.ronnblom@ericsson.com>
> > Subject: [RFC 1/1] eventdev: add distributed software (DSW) event device
> > X-Mailer: git-send-email 2.17.1
> > 
> > 
> > Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
> 
> Thanks Mattias for a yet another eventdev driver.
> Since it can work as distributed scheduler, it is complementary to existing SW scheduler.
> 
> A few comments:
> 1) There is compilation error in my setup
> /export/dpdk-next-eventdev/drivers/event/dsw/dsw_event.c: In function
> ‘dsw_port_consider_migration’:
> /export/dpdk-next-eventdev/drivers/event/dsw/dsw_event.c:346:39: error:
> ‘current_burst’ may be used uninitialized in this function
> [-Werror=maybe-uninitialized]
>   ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
> 
> 2) Please split the patch as more logical one.s You see the git history
> of driver/event/sw to get idea on the patch split.
> 
> 3) Please update the documentation @ doc/guides/eventdevs/


I would like to include this driver for v18.08. Please send the next
the version in order to complete review before giving RC1 pull request.


> 
> > ---
> >  config/common_base                            |    5 +
> >  drivers/event/Makefile                        |    1 +
> 
> meson build support is missing.
> 
> >  drivers/event/dsw/Makefile                    |   28 +
> >  drivers/event/dsw/dsw_evdev.c                 |  361 +++++
> >  drivers/event/dsw/dsw_evdev.h                 |  296 ++++
> >  drivers/event/dsw/dsw_event.c                 | 1285 +++++++++++++++++
> >  drivers/event/dsw/dsw_sort.h                  |   47 +
> >  drivers/event/dsw/dsw_xstats.c                |  284 ++++
> >  .../event/dsw/rte_pmd_evdev_dsw_version.map   |    3 +
> >  mk/rte.app.mk                                 |    1 +
> >  10 files changed, 2311 insertions(+)
> > index 000000000..39008f7eb
> > --- /dev/null
> > +++ b/drivers/event/dsw/dsw_evdev.c
> > @@ -0,0 +1,361 @@
> > +/* SPDX-License-Identifier: BSD-3-Clause
> > + * Copyright(c) 2018 Ericsson AB
> > + */
> > +
> > +#include <rte_eventdev_pmd.h>
> > +#include <rte_eventdev_pmd_vdev.h>
> > +#include <rte_cycles.h>
> > +#include <rte_random.h>
> 
> sort it in alphabetical order.
> 
> > +
> > +#include "dsw_evdev.h"
> > +
> > +static int
> > +dsw_start(struct rte_eventdev *dev)
> > +{
> > +       struct dsw_evdev *dsw = dsw_pmd_priv(dev);
> > +       uint16_t i;
> > +       uint64_t now;
> > +
> > +       rte_atomic32_init(&dsw->credits_on_loan);
> > +
> > +       initial_flow_to_port_assignment(dsw);
> > +
> > +       now = rte_get_timer_cycles();
> > +       for (i = 0; i < dsw->num_ports; i++) {
> > +               dsw->ports[i].measurement_start = now;
> > +               dsw->ports[i].busy_start = now;
> > +       }
> > +
> > +       return 0;
> > +}
> > +
> > +static void
> > +dsw_stop(struct rte_eventdev *dev __rte_unused)
> > +{
> 
> You may implement, eventdev_stop_flush_t callback to free up the
> outstanding events in the eventdev.
> 
> 
> > +}
> > +
> > +static int
> > +dsw_close(struct rte_eventdev *dev)
> > +{
> > +       struct dsw_evdev *dsw = dsw_pmd_priv(dev);
> > +
> > +       dsw->num_ports = 0;
> > +       dsw->num_queues = 0;
> > +
> > +       return 0;
> > +}
> > +
> > +static struct rte_eventdev_ops dsw_evdev_ops = {
> > +       .dev_infos_get = dsw_info_get,
> > +       .dev_configure = dsw_configure,
> > +       .dev_start = dsw_start,
> > +       .dev_stop = dsw_stop,
> > +       .dev_close = dsw_close,
> > +       .port_setup = dsw_port_setup,
> > +       .port_def_conf = dsw_port_def_conf,
> > +       .port_release = dsw_port_release,
> > +       .queue_setup = dsw_queue_setup,
> > +       .queue_def_conf = dsw_queue_def_conf,
> > +       .queue_release = dsw_queue_release,
> > +       .port_link = dsw_port_link,
> > +       .port_unlink = dsw_port_unlink,
> > +#ifdef DSW_XSTATS
> 
> Instead of creating a lot of "DSW_XSTATS" defines in this file, How about
> creating NOP version of xstats functions when ifndef DSW_XSTATS as
> different file? So that #ifdef clutter will go way.
> 
> 
> > +       .xstats_get = dsw_xstats_get,
> > +       .xstats_get_names = dsw_xstats_get_names,
> > +       .xstats_get_by_name = dsw_xstats_get_by_name
> > +#endif
> > +};
> > +
> > +static int
> > +dsw_probe(struct rte_vdev_device *vdev)
> > +{
> > +       const char *name;
> > +       struct rte_eventdev *dev;
> > +       struct dsw_evdev *dsw;
> > +
> > +       name = rte_vdev_device_name(vdev);
> > +
> 
> Please add secondary process check. See SW driver.
> 
> > +RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_DSW_PMD, evdev_dsw_pmd_drv);
> > diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
> > new file mode 100644
> > index 000000000..e6b34c013
> > --- /dev/null
> > +++ b/drivers/event/dsw/dsw_evdev.h
> > @@ -0,0 +1,296 @@
> > +/* SPDX-License-Identifier: BSD-3-Clause
> > + * Copyright(c) 2018 Ericsson AB
> > + */
> > +
> > +#ifndef _DSW_EVDEV_H_
> > +#define _DSW_EVDEV_H_
> > +
> > +#include <inttypes.h>
> > +#include <stdbool.h>
> > +#include <rte_eventdev.h>
> > +#include <rte_event_ring.h>
> > +#include <rte_ring.h>
> > +#include <rte_event_ring.h>
> > +#include <rte_config.h>
> 
> sort it in alphabetical order.
> 
> > +
> > +#define DSW_PMD_NAME RTE_STR(event_dsw)
> > +
> > +
> > +/* Only one outstanding migration per port is allowed */
> > +#define DSW_MAX_PAUSED_FLOWS (DSW_MAX_PORTS)
> > +
> > +/* Enough room for paus request/confirm and unpaus request/confirm for
> > + * all possible senders.
> > + */
> > +#define DSW_CTL_IN_RING_SIZE ((DSW_MAX_PORTS-1)*4)
> > +
> > +/* Statistics is configurable at this point, mostly to make it easy to
> > + * measure their performance impact.
> > + */
> > +#define DSW_XSTATS
> 
> It can be moved to main config area.
> 
> > +
> > +/* With DSW_SORT_DEQUEUED enabled, the scheduler will, at the point of
> > + * dequeue(), arrange events so that events with the same flow id on
> > + * the same queue forms a back-to-back "burst", and also so that such
> > + * bursts of different flow ids, but on the same queue, also come
> > + * consecutively. All this in an attempt to improve data and
> > + * instruction cache usage for the application, at the cost of a
> > + * scheduler overhead increase.
> > + */
> > +
> > +//#define DSW_SORT_DEQUEUED
> 
> Checkpatch will complain about cpp style of comments.
> 
> > +
> > +struct dsw_queue_flow {
> > +       uint8_t queue_id;
> > +       uint16_t flow_hash;
> > +};
> > +
> > +enum dsw_migration_state {
> > +       DSW_MIGRATION_STATE_IDLE = 0,
> 
> zero assignment is unnecessary.
> 
> > +       DSW_MIGRATION_STATE_PAUSING,
> > +       DSW_MIGRATION_STATE_FORWARDING,
> > +       DSW_MIGRATION_STATE_UNPAUSING
> > +};
> > +
> > +
> > +       if (!dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
> > +                                        port_loads,
> > +                                        DSW_MIN_SOURCE_LOAD_FOR_MIGRATION,
> > +                                        &source_port->migration_target_qf,
> > +                                        &source_port->migration_target_port_id)
> > +           &&
> > +           !dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
> > +                                        port_loads,
> > +                                        DSW_MAX_TARGET_LOAD_FOR_MIGRATION,
> > +                                        &source_port->migration_target_qf,
> > +                                      &source_port->migration_target_port_id))
> > +               return;
> > +
> > +       DSW_LOG_DP_PORT(DEBUG, source_port->id, "Migrating queue_id %d "
> > +                       "flow_hash %d from port %d to port %d.\n",
> > +                       source_port->migration_target_qf.queue_id,
> > +                       source_port->migration_target_qf.flow_hash,
> > +                       source_port->id, source_port->migration_target_port_id);
> > +#if 0
> 
> Spotted #if 0
> 
> > +#endif
> > diff --git a/drivers/event/dsw/rte_pmd_evdev_dsw_version.map b/drivers/event/dsw/rte_pmd_evdev_dsw_version.map
> > new file mode 100644
> > index 000000000..ad6e191e4
> > --- /dev/null
> > +++ b/drivers/event/dsw/rte_pmd_evdev_dsw_version.map
> > @@ -0,0 +1,3 @@
> > +DPDK_18.08 {
> 
> 18.11
> 

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [dpdk-dev] [RFC 1/1] eventdev: add distributed software (DSW) event device
  2018-08-19  6:11     ` Jerin Jacob
@ 2018-08-23 13:08       ` Mattias Rönnblom
  2018-08-23 13:44         ` Jerin Jacob
  0 siblings, 1 reply; 9+ messages in thread
From: Mattias Rönnblom @ 2018-08-23 13:08 UTC (permalink / raw)
  To: Jerin Jacob; +Cc: dev, bruce.richardson

On 2018-08-19 08:11, Jerin Jacob wrote:
 > I would like to include this driver for v18.08. Please send the next
 > the version in order to complete review before giving RC1 pull request.
 >

I've been working on a test bench with real packet I/O and some more 
reasonable load to verify load balancing behavior and tune some 
parameters to let it have a chance to perform in a real-world application.

Currently, the only experience comes from a pipeline simulator without 
real I/O, and where new packets are always available, plus some very 
limited experience from a prototype integration in a real product.

All this to feel somewhat comfortable it will perform in a product.

I think 18.08 will be difficult. When would you need to have something 
ready to meet that deadline?

Best regards,
	Mattias

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [dpdk-dev] [RFC 1/1] eventdev: add distributed software (DSW) event device
  2018-08-23 13:08       ` Mattias Rönnblom
@ 2018-08-23 13:44         ` Jerin Jacob
  2018-08-23 20:08           ` Mattias Rönnblom
  0 siblings, 1 reply; 9+ messages in thread
From: Jerin Jacob @ 2018-08-23 13:44 UTC (permalink / raw)
  To: Mattias Rönnblom; +Cc: dev, bruce.richardson

-----Original Message-----
> Date: Thu, 23 Aug 2018 15:08:29 +0200
> From: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
> To: Jerin Jacob <jerin.jacob@caviumnetworks.com>
> CC: dev@dpdk.org, bruce.richardson@intel.com
> Subject: Re: [dpdk-dev] [RFC 1/1] eventdev: add distributed software (DSW)
>  event device
> User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101
>  Thunderbird/52.9.1
> 
> 
> On 2018-08-19 08:11, Jerin Jacob wrote:
> > I would like to include this driver for v18.08. Please send the next
> > the version in order to complete review before giving RC1 pull request.
> >
> 
> I've been working on a test bench with real packet I/O and some more
> reasonable load to verify load balancing behavior and tune some
> parameters to let it have a chance to perform in a real-world application.
> 
> Currently, the only experience comes from a pipeline simulator without
> real I/O, and where new packets are always available, plus some very
> limited experience from a prototype integration in a real product.
> 
> All this to feel somewhat comfortable it will perform in a product.
> 
> I think 18.08 will be difficult. When would you need to have something
> ready to meet that deadline?

It is OK to postpone to v19.02 if it is difficult.

According to https://core.dpdk.org/roadmap/, The integration deadline for v18.11 is October 5, 2018.
Considering it only driver change, two/three weeks would be enough to review it. So September mid
would be a reasonable target.

> 
> Best regards,
>        Mattias

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [dpdk-dev] [RFC 1/1] eventdev: add distributed software (DSW) event device
  2018-08-23 13:44         ` Jerin Jacob
@ 2018-08-23 20:08           ` Mattias Rönnblom
  0 siblings, 0 replies; 9+ messages in thread
From: Mattias Rönnblom @ 2018-08-23 20:08 UTC (permalink / raw)
  To: Jerin Jacob; +Cc: dev, bruce.richardson

On 2018-08-23 15:44, Jerin Jacob wrote:
> -----Original Message-----
>> Date: Thu, 23 Aug 2018 15:08:29 +0200
>> From: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
>> To: Jerin Jacob <jerin.jacob@caviumnetworks.com>
>> CC: dev@dpdk.org, bruce.richardson@intel.com
>> Subject: Re: [dpdk-dev] [RFC 1/1] eventdev: add distributed software (DSW)
>>   event device
>> User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101
>>   Thunderbird/52.9.1
>>
>>
>> On 2018-08-19 08:11, Jerin Jacob wrote:
>>> I would like to include this driver for v18.08. Please send the next
>>> the version in order to complete review before giving RC1 pull request.
>>>
>>
>> I've been working on a test bench with real packet I/O and some more
>> reasonable load to verify load balancing behavior and tune some
>> parameters to let it have a chance to perform in a real-world application.
>>
>> Currently, the only experience comes from a pipeline simulator without
>> real I/O, and where new packets are always available, plus some very
>> limited experience from a prototype integration in a real product.
>>
>> All this to feel somewhat comfortable it will perform in a product.
>>
>> I think 18.08 will be difficult. When would you need to have something
>> ready to meet that deadline?
> 
> It is OK to postpone to v19.02 if it is difficult.
> 
> According to https://core.dpdk.org/roadmap/, The integration deadline for v18.11 is October 5, 2018.
> Considering it only driver change, two/three weeks would be enough to review it. So September mid
> would be a reasonable target.

OK, sounds like 18.11 might be doable. Thanks. Do you mark drivers 
experimental, just to warn people there might be dragons? Or is that 
just APIs?

I've been planning to run it on a system with a weak memory model, too. 
If someone want to help out here, that would be appreciated.

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [dpdk-dev] [RFC 1/1] eventdev: add distributed software (DSW) event device
  2018-07-22 11:32   ` [dpdk-dev] [RFC 1/1] eventdev: add distributed software (DSW) event device Jerin Jacob
  2018-08-19  6:11     ` Jerin Jacob
@ 2018-08-27  9:23     ` Mattias Rönnblom
  2018-08-27  9:40       ` Jerin Jacob
  1 sibling, 1 reply; 9+ messages in thread
From: Mattias Rönnblom @ 2018-08-27  9:23 UTC (permalink / raw)
  To: Jerin Jacob; +Cc: dev, bruce.richardson

On 2018-07-22 13:32, Jerin Jacob wrote:

>> +static void
>> +dsw_stop(struct rte_eventdev *dev __rte_unused)
>> +{
> 
> You may implement, eventdev_stop_flush_t callback to free up the
> outstanding events in the eventdev.
> 

Is this support mandatory, or is it OK to leave it to the user to empty 
the machinery before calling stop in the initial driver version?

I can't find any other event device supporting the callback.

In DSW, the events can be a little here-and-there - in the output 
buffers, in the pause buffer, and on the input rings.

That said, assuming the worker lcore threads have stopped using the 
device and issued the appropriate barriers, it should be possible to 
round up the events from the thread running 'rte_event_dev_stop'.

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [dpdk-dev] [RFC 1/1] eventdev: add distributed software (DSW) event device
  2018-08-27  9:23     ` Mattias Rönnblom
@ 2018-08-27  9:40       ` Jerin Jacob
  2018-08-27 10:24         ` Mattias Rönnblom
  0 siblings, 1 reply; 9+ messages in thread
From: Jerin Jacob @ 2018-08-27  9:40 UTC (permalink / raw)
  To: Mattias Rönnblom; +Cc: dev, bruce.richardson

-----Original Message-----
> Date: Mon, 27 Aug 2018 11:23:59 +0200
> From: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
> To: Jerin Jacob <jerin.jacob@caviumnetworks.com>
> CC: dev@dpdk.org, bruce.richardson@intel.com
> Subject: Re: [dpdk-dev] [RFC 1/1] eventdev: add distributed software (DSW)
>  event device
> User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101
>  Thunderbird/52.9.1
> 
> External Email
> 
> On 2018-07-22 13:32, Jerin Jacob wrote:
> 
> > > +static void
> > > +dsw_stop(struct rte_eventdev *dev __rte_unused)
> > > +{
> > 
> > You may implement, eventdev_stop_flush_t callback to free up the
> > outstanding events in the eventdev.
> > 
> 
> Is this support mandatory, or is it OK to leave it to the user to empty
> the machinery before calling stop in the initial driver version?

This is useful to avoid event buffer leak. The application may call stop()
abruptly or some reason event device cannot provide the event on
dequeue().

Any trivial implementation could be doing dequeue() in the driver on
stop().

> 
> I can't find any other event device supporting the callback.

drivers/event/sw and drivers/event/octeontx/ supports it.

$ grep -r "dev_stop_flush" drivers/event/
drivers/event/octeontx/ssovf_evdev.c:	if (dev->dev_ops->dev_stop_flush != NULL)
drivers/event/octeontx/ssovf_evdev.c: dev->dev_ops->dev_stop_flush(dev->data->dev_id, event,
dev->data->dev_stop_flush_arg);
drivers/event/sw/sw_evdev_selftest.c:dev_stop_flush(struct test *t) /*
test to check we can properly flush events */
drivers/event/sw/sw_evdev_selftest.c:	if
(rte_event_dev_stop_flush_callback_register(evdev, flush, &count)) {
drivers/event/sw/sw_evdev_selftest.c:	if
(rte_event_dev_stop_flush_callback_register(evdev, NULL, NULL)) {
drivers/event/sw/sw_evdev_selftest.c:	ret = dev_stop_flush(t);
drivers/event/sw/sw_evdev.c:	eventdev_stop_flush_t flush;
drivers/event/sw/sw_evdev.c:	flush = dev->dev_ops->dev_stop_flush;
drivers/event/sw/sw_evdev.c:	arg = dev->data->dev_stop_flush_arg;
drivers/event/sw/sw_evdev.c:	eventdev_stop_flush_t flush;
drivers/event/sw/sw_evdev.c:	flush = dev->dev_ops->dev_stop_flush;
drivers/event/sw/sw_evdev.c:	arg = dev->data->dev_stop_flush_arg;

> 
> In DSW, the events can be a little here-and-there - in the output
> buffers, in the pause buffer, and on the input rings.

Any trivial implementation could be doing dequeue() in the driver on
stop().

> 
> That said, assuming the worker lcore threads have stopped using the
> device and issued the appropriate barriers, it should be possible to
> round up the events from the thread running 'rte_event_dev_stop'.

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [dpdk-dev] [RFC 1/1] eventdev: add distributed software (DSW) event device
  2018-08-27  9:40       ` Jerin Jacob
@ 2018-08-27 10:24         ` Mattias Rönnblom
  0 siblings, 0 replies; 9+ messages in thread
From: Mattias Rönnblom @ 2018-08-27 10:24 UTC (permalink / raw)
  To: Jerin Jacob; +Cc: dev, bruce.richardson

On 2018-08-27 11:40, Jerin Jacob wrote:
>> On 2018-07-22 13:32, Jerin Jacob wrote:
>>
>>>> +static void
>>>> +dsw_stop(struct rte_eventdev *dev __rte_unused)
>>>> +{
>>>
>>> You may implement, eventdev_stop_flush_t callback to free up the
>>> outstanding events in the eventdev.
>>>
>>
>> Is this support mandatory, or is it OK to leave it to the user to empty
>> the machinery before calling stop in the initial driver version?
> 
> This is useful to avoid event buffer leak. The application may call stop()
> abruptly or some reason event device cannot provide the event on
> dequeue().
> 

I see it being useful.

I'll implement it.

> Any trivial implementation could be doing dequeue() in the driver on
> stop().
> 
>>
>> I can't find any other event device supporting the callback.
> 
> drivers/event/sw and drivers/event/octeontx/ supports it.
> 
> $ grep -r "dev_stop_flush" drivers/event/
> drivers/event/octeontx/ssovf_evdev.c:	if (dev->dev_ops->dev_stop_flush != NULL)
> drivers/event/octeontx/ssovf_evdev.c: dev->dev_ops->dev_stop_flush(dev->data->dev_id, event,
> dev->data->dev_stop_flush_arg);
> drivers/event/sw/sw_evdev_selftest.c:dev_stop_flush(struct test *t) /*
> test to check we can properly flush events */
> drivers/event/sw/sw_evdev_selftest.c:	if
> (rte_event_dev_stop_flush_callback_register(evdev, flush, &count)) {
> drivers/event/sw/sw_evdev_selftest.c:	if
> (rte_event_dev_stop_flush_callback_register(evdev, NULL, NULL)) {
> drivers/event/sw/sw_evdev_selftest.c:	ret = dev_stop_flush(t);
> drivers/event/sw/sw_evdev.c:	eventdev_stop_flush_t flush;
> drivers/event/sw/sw_evdev.c:	flush = dev->dev_ops->dev_stop_flush;
> drivers/event/sw/sw_evdev.c:	arg = dev->data->dev_stop_flush_arg;
> drivers/event/sw/sw_evdev.c:	eventdev_stop_flush_t flush;
> drivers/event/sw/sw_evdev.c:	flush = dev->dev_ops->dev_stop_flush;
> drivers/event/sw/sw_evdev.c:	arg = dev->data->dev_stop_flush_arg;
> 

I needed to rebase against master. Sorry.

>>
>> In DSW, the events can be a little here-and-there - in the output
>> buffers, in the pause buffer, and on the input rings.
> 
> Any trivial implementation could be doing dequeue() in the driver on
> stop().
> 

Sure, but how many times? One zero-dequeue is not enough. A migration 
might be in progress, and the signaling needs to finish before the 
events in the pause buffer is flushed to the destination in_ring.

I'll traverse the port-internal output/pause buffers instead.

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [dpdk-dev] [RFC 1/1] eventdev: add distributed software (DSW) event device
  2018-07-11 21:21 [dpdk-dev] [RFC 0/1] A Distributed Software Event Device Mattias Rönnblom
@ 2018-07-11 21:21 ` Mattias Rönnblom
  0 siblings, 0 replies; 9+ messages in thread
From: Mattias Rönnblom @ 2018-07-11 21:21 UTC (permalink / raw)
  To: dev; +Cc: jerin.jacob, bruce.richardson, Mattias Rönnblom

Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
---
 config/common_base                            |    5 +
 drivers/event/Makefile                        |    1 +
 drivers/event/dsw/Makefile                    |   28 +
 drivers/event/dsw/dsw_evdev.c                 |  361 +++++
 drivers/event/dsw/dsw_evdev.h                 |  296 ++++
 drivers/event/dsw/dsw_event.c                 | 1285 +++++++++++++++++
 drivers/event/dsw/dsw_sort.h                  |   47 +
 drivers/event/dsw/dsw_xstats.c                |  284 ++++
 .../event/dsw/rte_pmd_evdev_dsw_version.map   |    3 +
 mk/rte.app.mk                                 |    1 +
 10 files changed, 2311 insertions(+)
 create mode 100644 drivers/event/dsw/Makefile
 create mode 100644 drivers/event/dsw/dsw_evdev.c
 create mode 100644 drivers/event/dsw/dsw_evdev.h
 create mode 100644 drivers/event/dsw/dsw_event.c
 create mode 100644 drivers/event/dsw/dsw_sort.h
 create mode 100644 drivers/event/dsw/dsw_xstats.c
 create mode 100644 drivers/event/dsw/rte_pmd_evdev_dsw_version.map

diff --git a/config/common_base b/config/common_base
index e4241db16..c07d340d9 100644
--- a/config/common_base
+++ b/config/common_base
@@ -588,6 +588,11 @@ CONFIG_RTE_LIBRTE_PMD_SKELETON_EVENTDEV_DEBUG=n
 #
 CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV=y
 
+#
+# Compile PMD for distributed software event device
+#
+CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV=y
+
 #
 # Compile PMD for octeontx sso event device
 #
diff --git a/drivers/event/Makefile b/drivers/event/Makefile
index f301d8dc2..03ad1b6cb 100644
--- a/drivers/event/Makefile
+++ b/drivers/event/Makefile
@@ -6,6 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk
 
 DIRS-$(CONFIG_RTE_LIBRTE_PMD_SKELETON_EVENTDEV) += skeleton
 DIRS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw
+DIRS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw
 DIRS-$(CONFIG_RTE_LIBRTE_PMD_OCTEONTX_SSOVF) += octeontx
 ifeq ($(CONFIG_RTE_LIBRTE_DPAA_BUS),y)
 DIRS-$(CONFIG_RTE_LIBRTE_PMD_DPAA_EVENTDEV) += dpaa
diff --git a/drivers/event/dsw/Makefile b/drivers/event/dsw/Makefile
new file mode 100644
index 000000000..8f358eaa3
--- /dev/null
+++ b/drivers/event/dsw/Makefile
@@ -0,0 +1,28 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2018 Ericsson AB
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+LIB = librte_pmd_dsw_event.a
+
+CFLAGS += -DALLOW_EXPERIMENTAL_API
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+CFLAGS += -Wno-format-nonliteral
+
+LDLIBS += -lrte_eal
+LDLIBS += -lrte_mbuf
+LDLIBS += -lrte_mempool
+LDLIBS += -lrte_ring
+LDLIBS += -lrte_eventdev
+LDLIBS += -lrte_bus_vdev
+
+LIBABIVER := 1
+
+EXPORT_MAP := rte_pmd_evdev_dsw_version.map
+
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw_evdev.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw_event.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw_xstats.c
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c
new file mode 100644
index 000000000..39008f7eb
--- /dev/null
+++ b/drivers/event/dsw/dsw_evdev.c
@@ -0,0 +1,361 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2018 Ericsson AB
+ */
+
+#include <rte_eventdev_pmd.h>
+#include <rte_eventdev_pmd_vdev.h>
+#include <rte_cycles.h>
+#include <rte_random.h>
+
+#include "dsw_evdev.h"
+
+#define EVENTDEV_NAME_DSW_PMD event_dsw
+
+static int
+dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
+	       const struct rte_event_port_conf *conf)
+{
+	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+	struct dsw_port *port;
+	struct rte_event_ring *in_ring;
+	struct rte_ring *ctl_in_ring;
+	char ring_name[RTE_RING_NAMESIZE];
+
+	port = &dsw->ports[port_id];
+
+	*port = (struct dsw_port) {
+		.id = port_id,
+		.dsw = dsw,
+		.dequeue_depth = conf->dequeue_depth,
+		.enqueue_depth = conf->enqueue_depth,
+		.new_event_threshold = conf->new_event_threshold
+	};
+
+	snprintf(ring_name, sizeof(ring_name), "dsw%d_p%u", dev->data->dev_id,
+		 port_id);
+
+	in_ring = rte_event_ring_create(ring_name, DSW_IN_RING_SIZE,
+					dev->data->socket_id,
+					RING_F_SC_DEQ|RING_F_EXACT_SZ);
+
+	if (in_ring == NULL)
+		return -ENOMEM;
+
+	snprintf(ring_name, sizeof(ring_name), "dswctl%d_p%u",
+		 dev->data->dev_id, port_id);
+
+	ctl_in_ring = rte_ring_create(ring_name, DSW_CTL_IN_RING_SIZE,
+				      dev->data->socket_id,
+				      RING_F_SC_DEQ|RING_F_EXACT_SZ);
+
+	if (in_ring == NULL) {
+		rte_event_ring_free(in_ring);
+		return -ENOMEM;
+	}
+
+	port->in_ring = in_ring;
+	port->ctl_in_ring = ctl_in_ring;
+
+	rte_atomic16_init(&port->load);
+
+	port->load_update_interval =
+		(DSW_LOAD_UPDATE_INTERVAL * rte_get_timer_hz()) / US_PER_S;
+
+	port->migration_interval =
+		(DSW_MIGRATION_INTERVAL * rte_get_timer_hz()) / US_PER_S;
+
+	dev->data->ports[port_id] = port;
+
+	return 0;
+}
+
+static void
+dsw_port_def_conf(struct rte_eventdev *dev __rte_unused,
+		  uint8_t port_id __rte_unused,
+		  struct rte_event_port_conf *port_conf)
+{
+	*port_conf = (struct rte_event_port_conf) {
+		.new_event_threshold = 1024,
+		.dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH / 4,
+		.enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH / 4
+	};
+}
+
+static void
+dsw_port_release(void *p)
+{
+	struct dsw_port *port = p;
+
+	rte_event_ring_free(port->in_ring);
+	rte_ring_free(port->ctl_in_ring);
+}
+
+static int
+dsw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id,
+		const struct rte_event_queue_conf *conf)
+{
+	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+	struct dsw_queue *queue = &dsw->queues[queue_id];
+
+	if (RTE_EVENT_QUEUE_CFG_ALL_TYPES & conf->event_queue_cfg)
+		return -ENOTSUP;
+
+	if (conf->schedule_type == RTE_SCHED_TYPE_ORDERED)
+		return -ENOTSUP;
+
+	/* SINGLE_LINK is better off treated as TYPE_ATOMIC, since it
+	 * avoid the "fake" TYPE_PARALLEL flow_id assignment. Since
+	 * the queue will only have a single serving port, no
+	 * migration will ever happen, so the extra TYPE_ATOMIC
+	 * migration overhead is avoided.
+	 */
+	if (RTE_EVENT_QUEUE_CFG_SINGLE_LINK & conf->event_queue_cfg)
+		queue->schedule_type = RTE_SCHED_TYPE_ATOMIC;
+	else /* atomic or parallel */
+		queue->schedule_type = conf->schedule_type;
+
+	queue->num_serving_ports = 0;
+
+	return 0;
+}
+
+static void
+dsw_queue_def_conf(struct rte_eventdev *dev __rte_unused,
+		   uint8_t queue_id __rte_unused,
+		   struct rte_event_queue_conf *queue_conf)
+{
+	*queue_conf = (struct rte_event_queue_conf) {
+		.nb_atomic_flows = 4096,
+		.schedule_type = RTE_SCHED_TYPE_ATOMIC,
+		.priority = RTE_EVENT_DEV_PRIORITY_NORMAL
+	};
+}
+
+static void
+dsw_queue_release(struct rte_eventdev *dev __rte_unused,
+		  uint8_t queue_id __rte_unused)
+{
+}
+
+static void
+queue_add_port(struct dsw_queue *queue, uint16_t port_id)
+{
+	queue->serving_ports[queue->num_serving_ports] = port_id;
+	queue->num_serving_ports++;
+}
+
+static bool
+queue_remove_port(struct dsw_queue *queue, uint16_t port_id)
+{
+	uint16_t i;
+	for (i = 0; i < queue->num_serving_ports; i++)
+		if (queue->serving_ports[i] == port_id) {
+			uint16_t last_idx = queue->num_serving_ports - 1;
+			if (i != last_idx)
+				queue->serving_ports[i] =
+					queue->serving_ports[last_idx];
+			queue->num_serving_ports--;
+			return true;
+		}
+	return false;
+}
+
+static int
+dsw_port_link_unlink(struct rte_eventdev *dev, void *port,
+		     const uint8_t queues[], uint16_t num, bool link)
+{
+	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+	struct dsw_port *p = port;
+	uint16_t i;
+	uint16_t count = 0;
+
+	for (i = 0; i < num; i++) {
+		uint8_t qid = queues[i];
+		struct dsw_queue *q = &dsw->queues[qid];
+		if (link) {
+			queue_add_port(q, p->id);
+			count++;
+		} else {
+			bool removed = queue_remove_port(q, p->id);
+			if (removed)
+				count++;
+		}
+	}
+
+	return count;
+}
+
+static int
+dsw_port_link(struct rte_eventdev *dev, void *port, const uint8_t queues[],
+	      const uint8_t priorities[] __rte_unused, uint16_t num)
+{
+	return dsw_port_link_unlink(dev, port, queues, num, true);
+}
+
+static int
+dsw_port_unlink(struct rte_eventdev *dev, void *port, uint8_t queues[],
+		uint16_t num)
+{
+	return dsw_port_link_unlink(dev, port, queues, num, false);
+}
+
+static void
+dsw_info_get(struct rte_eventdev *dev __rte_unused,
+	     struct rte_event_dev_info *info)
+{
+	*info = (struct rte_event_dev_info) {
+		.driver_name = DSW_PMD_NAME,
+		.max_event_queues = DSW_MAX_QUEUES,
+		.max_event_queue_flows = DSW_MAX_FLOWS,
+		.max_event_queue_priority_levels = 1,
+		.max_event_priority_levels = 1,
+		.max_event_ports = DSW_MAX_PORTS,
+		.max_event_port_dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH,
+		.max_event_port_enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH,
+		.max_num_events = DSW_MAX_EVENTS,
+		.event_dev_cap = RTE_EVENT_DEV_CAP_BURST_MODE|
+		RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED
+	};
+}
+
+static int
+dsw_configure(const struct rte_eventdev *dev)
+{
+	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+	const struct rte_event_dev_config *conf = &dev->data->dev_conf;
+	int32_t min_max_in_flight;
+
+	dsw->num_queues = conf->nb_event_queues;
+	dsw->num_ports = conf->nb_event_ports;
+
+	/* Avoid a situation where consumer ports are holding all the
+	 * credits, without making use of them.
+	 */
+	min_max_in_flight = conf->nb_event_ports * DSW_PORT_MAX_CREDITS;
+
+	dsw->max_inflight = RTE_MAX(conf->nb_events_limit, min_max_in_flight);
+
+	return 0;
+}
+
+static void
+initial_flow_to_port_assignment(struct dsw_evdev *dsw)
+{
+	uint8_t queue_id;
+	for (queue_id = 0; queue_id < dsw->num_queues; queue_id++) {
+		struct dsw_queue *queue = &dsw->queues[queue_id];
+		uint16_t flow_hash;
+		for (flow_hash = 0; flow_hash < DSW_MAX_FLOWS; flow_hash++) {
+			uint8_t port_idx =
+				rte_rand() % queue->num_serving_ports;
+			uint8_t port_id =
+				queue->serving_ports[port_idx];
+			dsw->queues[queue_id].flow_to_port_map[flow_hash] =
+				port_id;
+		}
+	}
+}
+
+static int
+dsw_start(struct rte_eventdev *dev)
+{
+	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+	uint16_t i;
+	uint64_t now;
+
+	rte_atomic32_init(&dsw->credits_on_loan);
+
+	initial_flow_to_port_assignment(dsw);
+
+	now = rte_get_timer_cycles();
+	for (i = 0; i < dsw->num_ports; i++) {
+		dsw->ports[i].measurement_start = now;
+		dsw->ports[i].busy_start = now;
+	}
+
+	return 0;
+}
+
+static void
+dsw_stop(struct rte_eventdev *dev __rte_unused)
+{
+}
+
+static int
+dsw_close(struct rte_eventdev *dev)
+{
+	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+
+	dsw->num_ports = 0;
+	dsw->num_queues = 0;
+
+	return 0;
+}
+
+static struct rte_eventdev_ops dsw_evdev_ops = {
+	.dev_infos_get = dsw_info_get,
+	.dev_configure = dsw_configure,
+	.dev_start = dsw_start,
+	.dev_stop = dsw_stop,
+	.dev_close = dsw_close,
+	.port_setup = dsw_port_setup,
+	.port_def_conf = dsw_port_def_conf,
+	.port_release = dsw_port_release,
+	.queue_setup = dsw_queue_setup,
+	.queue_def_conf = dsw_queue_def_conf,
+	.queue_release = dsw_queue_release,
+	.port_link = dsw_port_link,
+	.port_unlink = dsw_port_unlink,
+#ifdef DSW_XSTATS
+	.xstats_get = dsw_xstats_get,
+	.xstats_get_names = dsw_xstats_get_names,
+	.xstats_get_by_name = dsw_xstats_get_by_name
+#endif
+};
+
+static int
+dsw_probe(struct rte_vdev_device *vdev)
+{
+	const char *name;
+	struct rte_eventdev *dev;
+	struct dsw_evdev *dsw;
+
+	name = rte_vdev_device_name(vdev);
+
+	dev = rte_event_pmd_vdev_init(name, sizeof(struct dsw_evdev),
+				      rte_socket_id());
+	if (dev == NULL)
+		return -EFAULT;
+
+	dev->dev_ops = &dsw_evdev_ops;
+	dev->enqueue = dsw_event_enqueue;
+	dev->enqueue_burst = dsw_event_enqueue_burst;
+	dev->enqueue_new_burst = dsw_event_enqueue_new_burst;
+	dev->enqueue_forward_burst = dsw_event_enqueue_forward_burst;
+	dev->dequeue = dsw_event_dequeue;
+	dev->dequeue_burst = dsw_event_dequeue_burst;
+
+	dsw = dev->data->dev_private;
+	dsw->data = dev->data;
+
+	return 0;
+}
+
+static int
+dsw_remove(struct rte_vdev_device *vdev)
+{
+	const char *name;
+
+	name = rte_vdev_device_name(vdev);
+	if (name == NULL)
+		return -EINVAL;
+
+	return rte_event_pmd_vdev_uninit(name);
+}
+
+static struct rte_vdev_driver evdev_dsw_pmd_drv = {
+	.probe = dsw_probe,
+	.remove = dsw_remove
+};
+
+RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_DSW_PMD, evdev_dsw_pmd_drv);
diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
new file mode 100644
index 000000000..e6b34c013
--- /dev/null
+++ b/drivers/event/dsw/dsw_evdev.h
@@ -0,0 +1,296 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2018 Ericsson AB
+ */
+
+#ifndef _DSW_EVDEV_H_
+#define _DSW_EVDEV_H_
+
+#include <inttypes.h>
+#include <stdbool.h>
+#include <rte_eventdev.h>
+#include <rte_event_ring.h>
+#include <rte_ring.h>
+#include <rte_event_ring.h>
+#include <rte_config.h>
+
+#define DSW_PMD_NAME RTE_STR(event_dsw)
+
+/* Code changes are required to allow more ports. */
+#define DSW_MAX_PORTS (64)
+#define DSW_MAX_PORT_OUT_BUFFER (32)
+#define DSW_MAX_PORT_DEQUEUE_DEPTH (128)
+#define DSW_MAX_PORT_ENQUEUE_DEPTH (DSW_MAX_PORT_OUT_BUFFER)
+
+#define DSW_MAX_QUEUES (16)
+
+#define DSW_MAX_EVENTS (16384)
+
+/* Code changes are required to allow more flows than 32k. */
+#define DSW_MAX_FLOWS_BITS (15)
+#define DSW_MAX_FLOWS (1<<(DSW_MAX_FLOWS_BITS))
+#define DSW_MAX_FLOWS_MASK (DSW_MAX_FLOWS-1)
+
+/* Eventdev RTE_SCHED_TYPE_PARALLEL doesn't have a concept of flows,
+ * but the 'dsw' scheduler (more or less) randomly assign flow id to
+ * events on parallel queues, to be able to reuse some of the
+ * migration mechanism and scheduling logic from
+ * RTE_SCHED_TYPE_ATOMIC. By moving one of the parallel "flows" from a
+ * particular port, the likely-hood of events being scheduled to this
+ * port is reduced, and thus a kind of statistical load balancing is
+ * achieved.
+ */
+#define DSW_PARALLEL_FLOWS (1024)
+
+/* 'Background tasks' are polling the control rings for *
+ *  migration-related messages, or flush the output buffer (so
+ *  buffered events doesn't linger too long). Shouldn't be too low,
+ *  since the system won't benefit from the 'batching' effects from
+ *  the output buffer, and shouldn't be too high, since it will make
+ *  buffered events linger too long in case the port goes idle.
+ */
+#define DSW_MAX_PORT_OPS_PER_BG_TASK (128)
+
+/* Avoid making small 'loans' from the central in-flight event credit
+ * pool, to improve efficiency.
+ */
+#define DSW_MIN_CREDIT_LOAN (64)
+#define DSW_PORT_MAX_CREDITS (2*DSW_MIN_CREDIT_LOAN)
+#define DSW_PORT_MIN_CREDITS (DSW_MIN_CREDIT_LOAN)
+
+/* The rings are dimensioned so that all in-flight events can reside
+ * on only one of the port rings, to avoid the trouble of having to
+ * care about the case of an event not fitting on the receiver port's
+ * ring.
+ */
+#define DSW_IN_RING_SIZE (DSW_MAX_EVENTS)
+
+#define DSW_MAX_LOAD (INT16_MAX)
+#define DSW_LOAD_FROM_PERCENT(x) ((int16_t)(((x)*DSW_MAX_LOAD)/100))
+#define DSW_LOAD_TO_PERCENT(x) ((100*x)/DSW_MAX_LOAD)
+
+/* The thought behind keeping the load update interval shorter than
+ * the migration interval is that the load from newly migrated flows
+ * should 'show up' on the load measurement before new migrations are
+ * considered. This is to avoid having too many flows, from too many
+ * source ports, to be migrated too quickly to a lightly loaded port -
+ * in particular since this might cause the system to oscillate.
+ */
+#define DSW_LOAD_UPDATE_INTERVAL (DSW_MIGRATION_INTERVAL/4)
+#define DSW_OLD_LOAD_WEIGHT (1)
+
+/* The minimum time (in us) between two flow migrations. What puts an
+ * upper limit on the actual migration rate is primarily the pace in
+ * which the ports send and receive control messages, which in turn is
+ * largely a function of how much cycles are spent the processing of
+ * an event burst.
+ */
+#define DSW_MIGRATION_INTERVAL (1000)
+#define DSW_MIN_SOURCE_LOAD_FOR_MIGRATION (DSW_LOAD_FROM_PERCENT(70))
+#define DSW_MAX_TARGET_LOAD_FOR_MIGRATION (DSW_LOAD_FROM_PERCENT(95))
+
+#define DSW_MAX_EVENTS_RECORDED (128)
+
+/* Only one outstanding migration per port is allowed */
+#define DSW_MAX_PAUSED_FLOWS (DSW_MAX_PORTS)
+
+/* Enough room for paus request/confirm and unpaus request/confirm for
+ * all possible senders.
+ */
+#define DSW_CTL_IN_RING_SIZE ((DSW_MAX_PORTS-1)*4)
+
+/* Statistics is configurable at this point, mostly to make it easy to
+ * measure their performance impact.
+ */
+#define DSW_XSTATS
+
+/* With DSW_SORT_DEQUEUED enabled, the scheduler will, at the point of
+ * dequeue(), arrange events so that events with the same flow id on
+ * the same queue forms a back-to-back "burst", and also so that such
+ * bursts of different flow ids, but on the same queue, also come
+ * consecutively. All this in an attempt to improve data and
+ * instruction cache usage for the application, at the cost of a
+ * scheduler overhead increase.
+ */
+
+//#define DSW_SORT_DEQUEUED
+
+struct dsw_queue_flow {
+	uint8_t queue_id;
+	uint16_t flow_hash;
+};
+
+enum dsw_migration_state {
+	DSW_MIGRATION_STATE_IDLE = 0,
+	DSW_MIGRATION_STATE_PAUSING,
+	DSW_MIGRATION_STATE_FORWARDING,
+	DSW_MIGRATION_STATE_UNPAUSING
+};
+
+struct dsw_port {
+	uint16_t id;
+
+	/* Keeping a pointer here to avoid container_of() calls, which
+	 * are expensive since they are very frequent and will result
+	 * in an integer multiplication (since the port id is an index
+	 * into the dsw_evdev port array).
+	 */
+	struct dsw_evdev *dsw;
+
+	uint16_t dequeue_depth;
+	uint16_t enqueue_depth;
+
+	int32_t inflight_credits;
+
+	int32_t new_event_threshold;
+
+	uint16_t pending_releases;
+
+	uint16_t next_parallel_flow_id;
+
+	uint16_t ops_since_bg_task;
+
+	/* For port load measurement. */
+	uint64_t next_load_update;
+	uint64_t load_update_interval;
+	uint64_t measurement_start;
+	uint64_t busy_start;
+	uint64_t busy_cycles;
+
+	/* For the ctl interface and flow migration mechanism. */
+	uint64_t next_migration;
+	uint64_t migration_interval;
+	enum dsw_migration_state migration_state;
+
+#ifdef DSW_XSTATS
+
+	uint64_t new_enqueued;
+	uint64_t forward_enqueued;
+	uint64_t release_enqueued;
+	uint64_t queue_enqueued[DSW_MAX_QUEUES];
+
+	uint64_t dequeued;
+	uint64_t queue_dequeued[DSW_MAX_QUEUES];
+
+	uint64_t migration_start;
+	uint64_t migrations;
+	uint64_t migration_latency;
+
+	uint64_t total_busy_cycles;
+
+	uint64_t last_bg;
+#endif
+
+	uint8_t migration_target_port_id;
+	struct dsw_queue_flow migration_target_qf;
+	uint8_t cfm_cnt;
+
+	uint16_t paused_flows_len;
+	struct dsw_queue_flow paused_flows[DSW_MAX_PAUSED_FLOWS];
+
+	/* In a very contrived worst case all inflight events can be
+	 * laying around paused here.
+	 */
+	uint16_t paused_events_len;
+	struct rte_event paused_events[DSW_MAX_EVENTS];
+
+	uint16_t seen_events_len;
+	uint16_t seen_events_idx;
+	struct dsw_queue_flow seen_events[DSW_MAX_EVENTS_RECORDED];
+
+	uint16_t out_buffer_len[DSW_MAX_PORTS];
+	struct rte_event out_buffer[DSW_MAX_PORTS][DSW_MAX_PORT_OUT_BUFFER];
+
+	uint16_t in_buffer_len;
+	uint16_t in_buffer_start;
+	/* This buffer may contain events that were read up from the
+	 * in_ring during the flow migration process.
+	 */
+	struct rte_event in_buffer[DSW_MAX_EVENTS];
+
+	struct rte_event_ring *in_ring __rte_cache_aligned;
+
+	struct rte_ring *ctl_in_ring __rte_cache_aligned;
+
+	/* Estimate of current port load. */
+	rte_atomic16_t load __rte_cache_aligned;
+} __rte_cache_aligned;
+
+struct dsw_queue {
+	uint8_t schedule_type;
+	uint8_t serving_ports[DSW_MAX_PORTS];
+	uint16_t num_serving_ports;
+
+	uint8_t flow_to_port_map[DSW_MAX_FLOWS] __rte_cache_aligned;
+};
+
+struct dsw_evdev {
+	struct rte_eventdev_data *data;
+
+	struct dsw_port ports[DSW_MAX_PORTS];
+	uint16_t num_ports;
+	struct dsw_queue queues[DSW_MAX_QUEUES];
+	uint8_t num_queues;
+	int32_t max_inflight;
+
+	rte_atomic32_t credits_on_loan __rte_cache_aligned;
+};
+
+#define DSW_CTL_PAUS_REQ (0)
+#define DSW_CTL_UNPAUS_REQ (1)
+#define DSW_CTL_CFM (2)
+
+/* sizeof(struct dsw_ctl_msg) must be equal or less than
+ * sizeof(void *), to fit on the control ring.
+ */
+struct dsw_ctl_msg {
+	uint8_t type:2;
+	uint8_t originating_port_id:6;
+	uint8_t queue_id;
+	uint16_t flow_hash;
+} __rte_packed;
+
+uint16_t dsw_event_enqueue(void *port, const struct rte_event *event);
+uint16_t dsw_event_enqueue_burst(void *port,
+				 const struct rte_event events[],
+				 uint16_t events_len);
+uint16_t dsw_event_enqueue_new_burst(void *port,
+				     const struct rte_event events[],
+				     uint16_t events_len);
+uint16_t dsw_event_enqueue_forward_burst(void *port,
+					 const struct rte_event events[],
+					 uint16_t events_len);
+
+uint16_t dsw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait);
+uint16_t dsw_event_dequeue_burst(void *port, struct rte_event *events,
+				 uint16_t num, uint64_t wait);
+
+void dsw_event_schedule(struct rte_eventdev *dev);
+
+#ifdef DSW_XSTATS
+int dsw_xstats_get_names(const struct rte_eventdev *dev,
+			 enum rte_event_dev_xstats_mode mode,
+			 uint8_t queue_port_id,
+			 struct rte_event_dev_xstats_name *xstats_names,
+			 unsigned int *ids, unsigned int size);
+int dsw_xstats_get(const struct rte_eventdev *dev,
+		   enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id,
+		   const unsigned int ids[], uint64_t values[], unsigned int n);
+uint64_t dsw_xstats_get_by_name(const struct rte_eventdev *dev,
+				const char *name, unsigned int *id);
+#endif
+
+static inline struct dsw_evdev *
+dsw_pmd_priv(const struct rte_eventdev *eventdev)
+{
+	return eventdev->data->dev_private;
+}
+
+#define DSW_LOG_DP(level, fmt, args...)					\
+	RTE_LOG_DP(level, EVENTDEV, "[%s] %s() line %u: " fmt,		\
+		   DSW_PMD_NAME,					\
+		   __func__, __LINE__, ## args)
+
+#define DSW_LOG_DP_PORT(level, port_id, fmt, args...)		\
+	DSW_LOG_DP(level, "<Port %d> " fmt, port_id, ## args)
+
+#endif
diff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c
new file mode 100644
index 000000000..5295e84c0
--- /dev/null
+++ b/drivers/event/dsw/dsw_event.c
@@ -0,0 +1,1285 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2018 Ericsson AB
+ */
+
+#ifdef DSW_SORT_DEQUEUED
+#include "dsw_sort.h"
+#endif
+
+#include "dsw_evdev.h"
+
+#include <string.h>
+#include <stdbool.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <rte_cycles.h>
+#include <rte_atomic.h>
+#include <rte_random.h>
+#include <rte_branch_prediction.h>
+#include <rte_memcpy.h>
+
+static bool
+dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
+			 int32_t credits)
+{
+	int32_t inflight_credits = port->inflight_credits;
+	int32_t missing_credits = credits - inflight_credits;
+	int32_t total_on_loan;
+	int32_t available;
+	int32_t acquired_credits;
+	int32_t new_total_on_loan;
+
+	if (likely(missing_credits <= 0)) {
+		port->inflight_credits -= credits;
+		return true;
+	}
+
+	total_on_loan = rte_atomic32_read(&dsw->credits_on_loan);
+	available = dsw->max_inflight - total_on_loan;
+	acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
+
+	if (available < acquired_credits)
+		return false;
+
+	/* This is a race, no locks are involved, and thus some other
+	 * thread can allocate tokens in between the check and the
+	 * allocation.
+	 */
+	new_total_on_loan = rte_atomic32_add_return(&dsw->credits_on_loan,
+						    acquired_credits);
+
+	if (unlikely(new_total_on_loan > dsw->max_inflight)) {
+		/* Some other port took the last credits */
+		rte_atomic32_sub(&dsw->credits_on_loan, acquired_credits);
+		return false;
+	}
+
+	DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
+			acquired_credits);
+
+	port->inflight_credits += acquired_credits;
+	port->inflight_credits -= credits;
+
+	return true;
+}
+
+static void
+dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
+			int32_t credits)
+{
+	port->inflight_credits += credits;
+
+	if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
+		int32_t leave_credits = DSW_PORT_MIN_CREDITS;
+		int32_t return_credits =
+			port->inflight_credits - leave_credits;
+
+		port->inflight_credits = leave_credits;
+
+		rte_atomic32_sub(&dsw->credits_on_loan, return_credits);
+
+		DSW_LOG_DP_PORT(DEBUG, port->id,
+				"Returned %d tokens to pool.\n",
+				return_credits);
+	}
+}
+
+#ifdef DSW_XSTATS
+
+static void
+dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new,
+		       uint16_t num_forward, uint16_t num_release)
+{
+	port->new_enqueued += num_new;
+	port->forward_enqueued += num_forward;
+	port->release_enqueued += num_release;
+}
+
+static void
+dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id)
+{
+	source_port->queue_enqueued[queue_id]++;
+}
+
+static void
+dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num)
+{
+	port->dequeued += num;
+}
+
+static void
+dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id)
+{
+	source_port->queue_dequeued[queue_id]++;
+}
+#endif
+
+static void
+dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
+{
+	if (dequeued > 0 && port->busy_start == 0)
+		/* work period begins */
+		port->busy_start = rte_get_timer_cycles();
+	else if (dequeued == 0 && port->busy_start > 0) {
+		/* work period ends */
+		uint64_t work_period =
+			rte_get_timer_cycles() - port->busy_start;
+		port->busy_cycles += work_period;
+		port->busy_start = 0;
+	}
+}
+
+static int16_t
+dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
+{
+	uint64_t passed = now - port->measurement_start;
+	uint64_t busy_cycles = port->busy_cycles;
+
+	if (port->busy_start > 0) {
+		busy_cycles += (now - port->busy_start);
+		port->busy_start = now;
+	}
+
+	int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
+
+	port->measurement_start = now;
+	port->busy_cycles = 0;
+
+#ifdef DSW_XSTATS
+	port->total_busy_cycles += busy_cycles;
+#endif
+
+	return load;
+}
+
+static void
+dsw_port_load_update(struct dsw_port *port, uint64_t now)
+{
+	int16_t old_load;
+	int16_t period_load;
+	int16_t new_load;
+
+	old_load = rte_atomic16_read(&port->load);
+
+	period_load = dsw_port_load_close_period(port, now);
+
+	new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
+		(DSW_OLD_LOAD_WEIGHT+1);
+
+	rte_atomic16_set(&port->load, new_load);
+}
+
+static void
+dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
+{
+	if (now < port->next_load_update)
+		return;
+
+	port->next_load_update = now + port->load_update_interval;
+
+	dsw_port_load_update(port, now);
+}
+
+static void
+dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
+{
+	void *raw_msg;
+	memcpy(&raw_msg, msg, sizeof(*msg));
+
+	/* there's always room on the ring */
+	while (rte_ring_enqueue(port->ctl_in_ring, raw_msg) != 0)
+		rte_pause();
+}
+
+static int
+dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
+{
+	void *raw_msg;
+	int rc;
+
+	rc = rte_ring_dequeue(port->ctl_in_ring, &raw_msg);
+
+	if (rc == 0)
+		memcpy(msg, &raw_msg, sizeof(*msg));
+
+	return rc;
+}
+
+static void
+dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
+		       uint8_t type, uint8_t queue_id, uint16_t flow_hash)
+{
+	uint16_t port_id;
+	struct dsw_ctl_msg msg = {
+		.type = type,
+		.originating_port_id = source_port->id,
+		.queue_id = queue_id,
+		.flow_hash = flow_hash
+	};
+
+	for (port_id = 0; port_id < dsw->num_ports; port_id++)
+		if (port_id != source_port->id)
+			dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
+}
+
+static bool
+dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
+			uint16_t flow_hash)
+{
+	uint16_t i;
+	for (i = 0; i < port->paused_flows_len; i++) {
+		struct dsw_queue_flow *qf = &port->paused_flows[i];
+		if (qf->queue_id == queue_id &&
+		    qf->flow_hash == flow_hash)
+			return true;
+	}
+	return false;
+}
+
+static void
+dsw_port_add_paused_flow(struct dsw_port *port, uint8_t queue_id,
+			 uint16_t paused_flow_hash)
+{
+	port->paused_flows[port->paused_flows_len] = (struct dsw_queue_flow) {
+		.queue_id = queue_id,
+		.flow_hash = paused_flow_hash
+	};
+	port->paused_flows_len++;
+}
+
+static void
+dsw_port_remove_paused_flow(struct dsw_port *port, uint8_t queue_id,
+			    uint16_t paused_flow_hash)
+{
+	uint16_t i;
+
+	for (i = 0; i < port->paused_flows_len; i++) {
+		struct dsw_queue_flow *qf = &port->paused_flows[i];
+
+		if (qf->queue_id == queue_id &&
+		    qf->flow_hash == paused_flow_hash) {
+			uint16_t last_idx = port->paused_flows_len-1;
+			if (i != last_idx)
+				port->paused_flows[i] =
+					port->paused_flows[last_idx];
+			port->paused_flows_len--;
+			break;
+		}
+	}
+}
+
+static void
+dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
+
+static void
+dsw_port_handle_pause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
+			   uint8_t originating_port_id, uint8_t queue_id,
+			   uint16_t paused_flow_hash)
+{
+	struct dsw_ctl_msg cfm = {
+		.type = DSW_CTL_CFM,
+		.originating_port_id = port->id,
+		.queue_id = queue_id,
+		.flow_hash = paused_flow_hash
+	};
+
+	DSW_LOG_DP_PORT(DEBUG, port->id, "Pausing queue_id %d flow_hash %d.\n",
+			queue_id, paused_flow_hash);
+
+	/* There might be already-scheduled events belonging to the
+	 * paused flow in the output buffers.
+	 */
+	dsw_port_flush_out_buffers(dsw, port);
+
+	dsw_port_add_paused_flow(port, queue_id, paused_flow_hash);
+
+	/* Make sure any stores to the original port's in_ring is seen
+	 * before the ctl message.
+	 */
+	rte_smp_wmb();
+
+	dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
+}
+
+static void
+dsw_find_lowest_load_port(uint8_t *port_ids, uint16_t num_port_ids,
+			  uint8_t exclude_port_id, int16_t *port_loads,
+			  uint8_t *target_port_id, int16_t *target_load)
+{
+	int16_t candidate_port_id = -1;
+	int16_t candidate_load = DSW_MAX_LOAD;
+	uint16_t i;
+
+	for (i = 0; i < num_port_ids; i++) {
+		uint8_t port_id = port_ids[i];
+		if (port_id != exclude_port_id) {
+			int16_t load = port_loads[port_id];
+			if (candidate_port_id == -1 ||
+			    load < candidate_load) {
+				candidate_port_id = port_id;
+				candidate_load = load;
+			}
+		}
+	}
+	*target_port_id = candidate_port_id;
+	*target_load = candidate_load;
+}
+
+struct dsw_queue_flow_burst {
+	struct dsw_queue_flow queue_flow;
+	uint16_t count;
+};
+
+static inline int
+dsw_cmp_burst(const void *v_burst_a, const void *v_burst_b)
+{
+	const struct dsw_queue_flow_burst *burst_a = v_burst_a;
+	const struct dsw_queue_flow_burst *burst_b = v_burst_b;
+
+	int a_count = burst_a->count;
+	int b_count = burst_b->count;
+
+	return a_count - b_count;
+}
+
+#define DSW_QF_TO_INT(_qf)					\
+	((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
+
+static inline int
+dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
+{
+	const struct dsw_queue_flow *qf_a = v_qf_a;
+	const struct dsw_queue_flow *qf_b = v_qf_b;
+
+	return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
+}
+
+static uint16_t
+dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
+		       struct dsw_queue_flow_burst *bursts)
+{
+	uint16_t i;
+	struct dsw_queue_flow_burst *current_burst;
+	uint16_t num_bursts = 0;
+
+	/* We don't need the stable property, and the list is likely
+	 * large enough for qsort() to outperform dsw_stable_sort(),
+	 * so we use qsort() here.
+	 */
+	qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
+
+	/* arrange the (now-consecutive) events into bursts */
+	for (i = 0; i < qfs_len; i++) {
+		if (i == 0 ||
+		    dsw_cmp_qf(&qfs[i], &current_burst->queue_flow) != 0) {
+			current_burst = &bursts[num_bursts];
+			current_burst->queue_flow = qfs[i];
+			current_burst->count = 0;
+			num_bursts++;
+		}
+		current_burst->count++;
+	}
+
+	qsort(bursts, num_bursts, sizeof(bursts[0]), dsw_cmp_burst);
+
+	return num_bursts;
+}
+
+static bool
+dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
+			int16_t load_limit)
+{
+	bool below_limit = false;
+	uint16_t i;
+	for (i = 0; i < dsw->num_ports; i++) {
+		int16_t load = rte_atomic16_read(&dsw->ports[i].load);
+		if (load < load_limit)
+			below_limit = true;
+		port_loads[i] = load;
+	}
+	return below_limit;
+}
+
+static bool
+dsw_select_migration_target(struct dsw_evdev *dsw,
+			    struct dsw_port *source_port,
+			    struct dsw_queue_flow_burst *bursts,
+			    uint16_t num_bursts, int16_t *port_loads,
+			    int16_t max_load, struct dsw_queue_flow *target_qf,
+			    uint8_t *target_port_id)
+{
+	uint16_t source_load = port_loads[source_port->id];
+	uint16_t i;
+
+	for (i = 0; i < num_bursts; i++) {
+		struct dsw_queue_flow *qf = &bursts[i].queue_flow;
+
+		if (dsw_port_is_flow_paused(source_port, qf->queue_id,
+					    qf->flow_hash))
+			continue;
+
+		struct dsw_queue *queue = &dsw->queues[qf->queue_id];
+		int16_t target_load;
+
+		dsw_find_lowest_load_port(queue->serving_ports,
+					  queue->num_serving_ports,
+					  source_port->id, port_loads,
+					  target_port_id, &target_load);
+
+		if (target_load < source_load &&
+		    target_load < max_load) {
+			*target_qf = *qf;
+			return true;
+		}
+	}
+
+	DSW_LOG_DP_PORT(DEBUG, source_port->id, "For the %d flows considered, "
+			"no target port found with load less than %d.\n",
+			num_bursts, DSW_LOAD_TO_PERCENT(max_load));
+
+	return false;
+}
+
+static uint8_t
+dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
+{
+	struct dsw_queue *queue = &dsw->queues[queue_id];
+	uint8_t port_id;
+
+	if (queue->num_serving_ports > 1)
+		port_id = queue->flow_to_port_map[flow_hash];
+	else
+		/* A single-link queue, or atomic/ordered/parallel but
+		 * with just a single serving port.
+		 */
+		port_id = queue->serving_ports[0];
+
+	DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
+		   "to port %d.\n", queue_id, flow_hash, port_id);
+
+	return port_id;
+}
+
+static void
+dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
+			   uint8_t dest_port_id)
+{
+	struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
+	uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
+	struct rte_event *buffer = source_port->out_buffer[dest_port_id];
+	uint16_t enqueued = 0;
+
+	if (*buffer_len == 0)
+		return;
+
+	/* The rings are dimensioned to fit all in-flight events (even
+	 * on a single ring), so looping will work.
+	 */
+	do {
+		enqueued +=
+			rte_event_ring_enqueue_burst(dest_port->in_ring,
+						     buffer+enqueued,
+						     *buffer_len-enqueued,
+						     NULL);
+	} while (unlikely(enqueued != *buffer_len));
+
+	(*buffer_len) = 0;
+}
+
+static uint16_t
+dsw_port_get_parallel_flow_id(struct dsw_port *port)
+{
+	uint16_t flow_id = port->next_parallel_flow_id;
+
+	port->next_parallel_flow_id =
+		(port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
+
+	return flow_id;
+}
+
+static void
+dsw_port_buffer_paused(struct dsw_port *port,
+		       const struct rte_event *paused_event)
+{
+	port->paused_events[port->paused_events_len] = *paused_event;
+	port->paused_events_len++;
+}
+
+static void
+dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
+			   uint8_t dest_port_id, const struct rte_event *event)
+{
+	struct rte_event *buffer = source_port->out_buffer[dest_port_id];
+	uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
+
+	if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
+		dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
+
+	buffer[*buffer_len] = *event;
+
+	(*buffer_len)++;
+}
+
+#define DSW_FLOW_ID_BITS (24)
+static uint16_t
+dsw_flow_id_hash(uint32_t flow_id)
+{
+	uint16_t hash = 0;
+	uint16_t offset = 0;
+
+	do {
+		hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
+		offset += DSW_MAX_FLOWS_BITS;
+	} while (offset < DSW_FLOW_ID_BITS);
+
+	return hash;
+}
+
+static void
+dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
+			 struct rte_event event)
+{
+	uint8_t dest_port_id;
+
+	event.flow_id = dsw_port_get_parallel_flow_id(source_port);
+
+	dest_port_id = dsw_schedule(dsw, event.queue_id,
+				    dsw_flow_id_hash(event.flow_id));
+
+	dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
+}
+
+static void
+dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
+		      const struct rte_event *event)
+{
+	uint16_t flow_hash;
+	uint8_t dest_port_id;
+
+	if (unlikely(dsw->queues[event->queue_id].schedule_type ==
+		     RTE_SCHED_TYPE_PARALLEL)) {
+		dsw_port_buffer_parallel(dsw, source_port, *event);
+		return;
+	}
+
+	flow_hash = dsw_flow_id_hash(event->flow_id);
+
+	if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
+					     flow_hash))) {
+		dsw_port_buffer_paused(source_port, event);
+		return;
+	}
+
+	dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
+
+	dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
+}
+
+static void
+dsw_port_flush_paused_events(struct dsw_evdev *dsw,
+			     struct dsw_port *source_port,
+			     uint8_t queue_id, uint16_t paused_flow_hash)
+{
+	uint16_t paused_events_len = source_port->paused_events_len;
+	struct rte_event paused_events[paused_events_len];
+	uint8_t dest_port_id;
+	uint16_t i;
+
+	if (paused_events_len == 0)
+		return;
+
+	if (dsw_port_is_flow_paused(source_port, queue_id, paused_flow_hash))
+		return;
+
+	rte_memcpy(paused_events, source_port->paused_events,
+		   paused_events_len * sizeof(struct rte_event));
+
+	source_port->paused_events_len = 0;
+
+	dest_port_id = dsw_schedule(dsw, queue_id, paused_flow_hash);
+
+	for (i = 0; i < paused_events_len; i++) {
+		struct rte_event *event = &paused_events[i];
+		uint16_t flow_hash;
+
+		flow_hash = dsw_flow_id_hash(event->flow_id);
+
+		if (event->queue_id == queue_id &&
+		    flow_hash == paused_flow_hash)
+			dsw_port_buffer_non_paused(dsw, source_port,
+						   dest_port_id, event);
+		else
+			dsw_port_buffer_paused(source_port, event);
+	}
+}
+
+#ifdef DSW_XSTATS
+static void
+dsw_port_migration_stats(struct dsw_port *port)
+{
+	uint64_t migration_latency;
+
+	migration_latency = (rte_get_timer_cycles() - port->migration_start);
+	port->migration_latency += migration_latency;
+	port->migrations++;
+}
+#endif
+
+static void
+dsw_port_end_migration(struct dsw_evdev *dsw, struct dsw_port *port)
+{
+	uint8_t queue_id = port->migration_target_qf.queue_id;
+	uint16_t flow_hash = port->migration_target_qf.flow_hash;
+
+	port->migration_state = DSW_MIGRATION_STATE_IDLE;
+	port->seen_events_len = 0;
+
+#ifdef DSW_XSTATS
+	dsw_port_migration_stats(port);
+#endif
+
+	if (dsw->queues[queue_id].schedule_type != RTE_SCHED_TYPE_PARALLEL) {
+		dsw_port_remove_paused_flow(port, queue_id, flow_hash);
+		dsw_port_flush_paused_events(dsw, port, queue_id, flow_hash);
+	}
+
+	DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for queue_id "
+			"%d flow_hash %d.\n", queue_id, flow_hash);
+}
+
+static void
+dsw_port_consider_migration(struct dsw_evdev *dsw,
+			    struct dsw_port *source_port,
+			    uint64_t now)
+{
+	bool any_port_below_limit;
+	struct dsw_queue_flow *seen_events = source_port->seen_events;
+	uint16_t seen_events_len = source_port->seen_events_len;
+	struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
+	uint16_t num_bursts;
+	int16_t source_port_load;
+	int16_t port_loads[dsw->num_ports];
+
+	if (now < source_port->next_migration)
+		return;
+
+	if (dsw->num_ports == 1)
+		return;
+
+	DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering migration.\n");
+
+	/* Randomize interval to avoid having all threads considering
+	 * migration at the same in point in time, which might lead to
+	 * all choosing the same target port.
+	 */
+	source_port->next_migration = now +
+		source_port->migration_interval / 2 +
+		rte_rand() % source_port->migration_interval;
+
+	if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
+		DSW_LOG_DP_PORT(DEBUG, source_port->id,
+				"Migration already in progress.\n");
+		return;
+	}
+
+	/* For simplicity, avoid migration in the unlikely case there
+	 * is still events to consume in the in_buffer (from the last
+	 * migration).
+	 */
+	if (source_port->in_buffer_len > 0) {
+		DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
+				"events in the input buffer.\n");
+		return;
+	}
+
+	source_port_load = rte_atomic16_read(&source_port->load);
+	if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
+		DSW_LOG_DP_PORT(DEBUG, source_port->id,
+				"Load %d is below threshold level %d.\n",
+				DSW_LOAD_TO_PERCENT(source_port_load),
+		       DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
+		return;
+	}
+
+	/* Avoid starting any expensive operations (sorting etc), in
+	 * case of a scenario with all ports above the load limit.
+	 */
+	any_port_below_limit =
+		dsw_retrieve_port_loads(dsw, port_loads,
+					DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
+	if (!any_port_below_limit) {
+		DSW_LOG_DP_PORT(DEBUG, source_port->id,
+				"Candidate target ports are all too highly "
+				"loaded.\n");
+		return;
+	}
+
+	/* Sort flows into 'bursts' to allow attempting to migrating
+	 * small (but still active) flows first - this it to avoid
+	 * having large flows moving around the worker cores too much
+	 * (to avoid cache misses, among other things). Of course, the
+	 * number of recorded events (queue+flow ids) are limited, and
+	 * provides only a snapshot, so only so many conclusions can
+	 * be drawn from this data.
+	 */
+	num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
+					    bursts);
+	/* For non-big-little systems, there's no point in moving the
+	 * only (known) flow.
+	 */
+	if (num_bursts < 2) {
+		DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow "
+				"queue_id %d flow_hash %d has been seen.\n",
+				bursts[0].queue_flow.queue_id,
+				bursts[0].queue_flow.flow_hash);
+		return;
+	}
+
+	/* The strategy is to first try to find a flow to move to a
+	 * port with low load (below the migration-attempt
+	 * threshold). If that fails, we try to find a port which is
+	 * below the max threshold, and also less loaded than this
+	 * port is.
+	 */
+	if (!dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
+					 port_loads,
+					 DSW_MIN_SOURCE_LOAD_FOR_MIGRATION,
+					 &source_port->migration_target_qf,
+					 &source_port->migration_target_port_id)
+	    &&
+	    !dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
+					 port_loads,
+					 DSW_MAX_TARGET_LOAD_FOR_MIGRATION,
+					 &source_port->migration_target_qf,
+				       &source_port->migration_target_port_id))
+		return;
+
+	DSW_LOG_DP_PORT(DEBUG, source_port->id, "Migrating queue_id %d "
+			"flow_hash %d from port %d to port %d.\n",
+			source_port->migration_target_qf.queue_id,
+			source_port->migration_target_qf.flow_hash,
+			source_port->id, source_port->migration_target_port_id);
+#if 0
+	printf("Migrating queue_id %d "
+			"flow_hash %d from port %d to port %d.\n",
+			source_port->migration_target_qf.queue_id,
+			source_port->migration_target_qf.flow_hash,
+			source_port->id, source_port->migration_target_port_id);
+#endif
+
+	/* We have a winner. */
+
+	source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
+#ifdef DSW_XSTATS
+	source_port->migration_start = rte_get_timer_cycles();
+#endif
+
+	/* No need to go through the whole pause procedure for
+	 * parallel queues, since atomic/ordered semantics need not to
+	 * be maintained.
+	 */
+
+	if (dsw->queues[source_port->migration_target_qf.queue_id].schedule_type
+	    == RTE_SCHED_TYPE_PARALLEL) {
+		uint8_t queue_id = source_port->migration_target_qf.queue_id;
+		uint16_t flow_hash = source_port->migration_target_qf.flow_hash;
+		uint8_t dest_port_id = source_port->migration_target_port_id;
+
+		/* Single byte-sized stores are always atomic. */
+		dsw->queues[queue_id].flow_to_port_map[flow_hash] =
+			dest_port_id;
+		rte_smp_wmb();
+
+		dsw_port_end_migration(dsw, source_port);
+
+		return;
+	}
+
+	/* There might be 'loopback' events already scheduled in the
+	 * output buffers.
+	 */
+	dsw_port_flush_out_buffers(dsw, source_port);
+
+	dsw_port_add_paused_flow(source_port,
+				 source_port->migration_target_qf.queue_id,
+				 source_port->migration_target_qf.flow_hash);
+
+	dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
+			       source_port->migration_target_qf.queue_id,
+			       source_port->migration_target_qf.flow_hash);
+	source_port->cfm_cnt = 0;
+}
+
+static void
+dsw_port_flush_paused_events(struct dsw_evdev *dsw,
+			     struct dsw_port *source_port,
+			     uint8_t queue_id, uint16_t paused_flow_hash);
+
+static void
+dsw_port_handle_unpause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
+			     uint8_t originating_port_id, uint8_t queue_id,
+			     uint16_t paused_flow_hash)
+{
+	struct dsw_ctl_msg cfm = {
+		.type = DSW_CTL_CFM,
+		.originating_port_id = port->id,
+		.queue_id = queue_id,
+		.flow_hash = paused_flow_hash
+	};
+
+	DSW_LOG_DP_PORT(DEBUG, port->id, "Un-pausing queue_id %d flow_hash %d.\n",
+			queue_id, paused_flow_hash);
+
+	dsw_port_remove_paused_flow(port, queue_id, paused_flow_hash);
+
+	rte_smp_rmb();
+
+	dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
+
+	dsw_port_flush_paused_events(dsw, port, queue_id, paused_flow_hash);
+}
+
+#define FORWARD_BURST_SIZE (32)
+
+static void
+dsw_port_forward_migrated_flow(struct dsw_port *source_port,
+			       struct rte_event_ring *dest_ring,
+			       uint8_t queue_id,
+			       uint16_t flow_hash)
+{
+	uint16_t events_left;
+
+	/* Control ring message should been seen before the ring count
+	 * is read on the port's in_ring.
+	 */
+	rte_smp_rmb();
+
+	events_left = rte_event_ring_count(source_port->in_ring);
+
+	while (events_left > 0) {
+		uint16_t in_burst_size =
+			RTE_MIN(FORWARD_BURST_SIZE, events_left);
+		struct rte_event in_burst[in_burst_size];
+		uint16_t in_len;
+		uint16_t i;
+
+		in_len = rte_event_ring_dequeue_burst(source_port->in_ring,
+						      in_burst,
+						      in_burst_size, NULL);
+		/* No need to care about bursting forwarded events (to
+		 * the destination port's in_ring), since migration
+		 * doesn't happen very often, and also the majority of
+		 * the dequeued events will likely *not* be forwarded.
+		 */
+		for (i = 0; i < in_len; i++) {
+			struct rte_event *e = &in_burst[i];
+			if (e->queue_id == queue_id &&
+			    dsw_flow_id_hash(e->flow_id) == flow_hash) {
+				while (rte_event_ring_enqueue_burst(dest_ring,
+								    e, 1,
+								    NULL) != 1)
+					rte_pause();
+			} else {
+				uint16_t last_idx = source_port->in_buffer_len;
+				source_port->in_buffer[last_idx] = *e;
+				source_port->in_buffer_len++;
+			}
+		}
+
+		events_left -= in_len;
+	}
+}
+
+static void
+dsw_port_move_migrating_flow(struct dsw_evdev *dsw,
+			     struct dsw_port *source_port)
+{
+	uint8_t queue_id = source_port->migration_target_qf.queue_id;
+	uint16_t flow_hash = source_port->migration_target_qf.flow_hash;
+	uint8_t dest_port_id = source_port->migration_target_port_id;
+	struct dsw_port *dest_port = &dsw->ports[dest_port_id];
+
+	dsw_port_flush_out_buffers(dsw, source_port);
+
+	rte_smp_wmb();
+
+	dsw->queues[queue_id].flow_to_port_map[flow_hash] =
+		dest_port_id;
+
+	dsw_port_forward_migrated_flow(source_port, dest_port->in_ring,
+				       queue_id, flow_hash);
+
+	/* Flow table update and migration destination port's enqueues
+	 * must be seen before the control message.
+	 */
+	rte_smp_wmb();
+
+	dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ, queue_id,
+			       flow_hash);
+	source_port->cfm_cnt = 0;
+	source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
+}
+
+static void
+dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
+{
+	port->cfm_cnt++;
+
+	if (port->cfm_cnt == (dsw->num_ports-1)) {
+		switch (port->migration_state) {
+		case DSW_MIGRATION_STATE_PAUSING:
+			DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding "
+					"migration state.\n");
+			port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
+			break;
+		case DSW_MIGRATION_STATE_UNPAUSING:
+			dsw_port_end_migration(dsw, port);
+			break;
+		default:
+			RTE_ASSERT(0);
+			break;
+		}
+	}
+}
+
+static void
+dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
+{
+	struct dsw_ctl_msg msg;
+
+	/* So any table loads happens before the ring dequeue, in the
+	 * case of a 'paus' message.
+	 */
+	rte_smp_rmb();
+
+	if (dsw_port_ctl_dequeue(port, &msg) == 0) {
+		switch (msg.type) {
+		case DSW_CTL_PAUS_REQ:
+			dsw_port_handle_pause_flow(dsw, port,
+						   msg.originating_port_id,
+						   msg.queue_id, msg.flow_hash);
+			break;
+		case DSW_CTL_UNPAUS_REQ:
+			dsw_port_handle_unpause_flow(dsw, port,
+						     msg.originating_port_id,
+						     msg.queue_id,
+						     msg.flow_hash);
+			break;
+		case DSW_CTL_CFM:
+			dsw_port_handle_confirm(dsw, port);
+			break;
+		}
+	}
+}
+
+static void
+dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
+{
+	/* To pull the control ring reasonbly often on busy ports,
+	 * each dequeued/enqueued event is considered an 'op' too.
+	 */
+	port->ops_since_bg_task += (num_events+1);
+}
+
+static void
+dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
+{
+	if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&
+		     port->pending_releases == 0))
+		dsw_port_move_migrating_flow(dsw, port);
+
+	/* Polling the control ring is relatively inexpensive, and
+	 * polling it often helps bringing down migration latency, so
+	 * do this for every iteration.
+	 */
+	dsw_port_ctl_process(dsw, port);
+
+	/* To avoid considering migration and flushing output buffers,
+	 * and polling control rings on every dequeue/enqueue call,
+	 * the scheduler only performs such 'background' tasks every
+	 * nth (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.
+	 */
+	if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
+		uint64_t now;
+
+		now = rte_get_timer_cycles();
+
+#ifdef DSW_XSTATS
+		port->last_bg = now;
+#endif
+
+		/* Logic to avoid having events linger in the output
+		 * buffer too long.
+		 */
+		dsw_port_flush_out_buffers(dsw, port);
+
+		dsw_port_consider_load_update(port, now);
+
+		dsw_port_consider_migration(dsw, port, now);
+
+		port->ops_since_bg_task = 0;
+	}
+}
+
+static void
+dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
+{
+	uint16_t dest_port_id;
+
+	for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
+		dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
+}
+
+uint16_t
+dsw_event_enqueue(void *port, const struct rte_event *ev)
+{
+	return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
+}
+
+static __rte_always_inline uint16_t
+dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
+				uint16_t events_len, bool op_types_known,
+				uint16_t num_new, uint16_t num_release,
+				uint16_t num_non_release)
+{
+	struct dsw_port *source_port = port;
+	struct dsw_evdev *dsw = source_port->dsw;
+	bool enough_credits;
+	uint16_t i;
+
+	DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
+			"events to port %d.\n", events_len, source_port->id);
+
+	dsw_port_bg_process(dsw, source_port);
+
+	/* XXX: For performance (=ring efficiency) reasons, the
+	 * scheduler relies on internal non-ring buffers instead of
+	 * immediately sending the event to the destination ring. For
+	 * a producer that doesn't intend to produce or consume any
+	 * more events, the scheduler provides a way to flush the
+	 * buffer, by means of doing an enqueue of zero events. In
+	 * addition, a port cannot be left "unattended" (e.g. unused)
+	 * for long periods of time, since that would stall
+	 * migration. Eventdev API extensions to provide a cleaner way
+	 * to archieve both of these functions should be
+	 * considered.
+	 */
+	if (unlikely(events_len == 0)) {
+		dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
+		dsw_port_flush_out_buffers(dsw, source_port);
+		return 0;
+	}
+
+	if (unlikely(events_len > source_port->enqueue_depth))
+		events_len = source_port->enqueue_depth;
+
+	dsw_port_note_op(source_port, events_len);
+
+	if (!op_types_known)
+		for (i = 0; i < events_len; i++) {
+			switch (events[i].op) {
+			case RTE_EVENT_OP_RELEASE:
+				num_release++;
+				break;
+			case RTE_EVENT_OP_NEW:
+				num_new++;
+				/* Falls through. */
+			default:
+				num_non_release++;
+				break;
+			}
+		}
+
+	/* Technically, we could allow the non-new events up to the
+	 * first new event in the array into the system, but for
+	 * simplicity reasons, we deny the whole burst if the port is
+	 * above the water mark.
+	 */
+	if (unlikely(num_new > 0 && rte_atomic32_read(&dsw->credits_on_loan) >
+		     source_port->new_event_threshold))
+		return 0;
+
+	enough_credits = dsw_port_acquire_credits(dsw, source_port,
+						  num_non_release);
+	if (unlikely(!enough_credits))
+		return 0;
+
+	source_port->pending_releases -= num_release;
+
+#ifdef DSW_XSTATS
+	dsw_port_enqueue_stats(source_port, num_new,
+			       num_non_release-num_new, num_release);
+#endif
+
+	for (i = 0; i < events_len; i++) {
+		const struct rte_event *event = &events[i];
+
+		if (likely(num_release == 0 ||
+			   event->op != RTE_EVENT_OP_RELEASE))
+			dsw_port_buffer_event(dsw, source_port, event);
+#ifdef DSW_XSTATS
+		dsw_port_queue_enqueue_stats(source_port, event->queue_id);
+#endif
+	}
+
+	DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
+			"accepted.\n", num_non_release);
+
+	return num_non_release;
+}
+
+uint16_t
+dsw_event_enqueue_burst(void *port, const struct rte_event events[],
+			uint16_t events_len)
+{
+	return dsw_event_enqueue_burst_generic(port, events, events_len, false,
+					       0, 0, 0);
+}
+
+uint16_t
+dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
+			    uint16_t events_len)
+{
+	return dsw_event_enqueue_burst_generic(port, events, events_len, true,
+					       events_len, 0, events_len);
+}
+
+uint16_t
+dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
+				uint16_t events_len)
+{
+	return dsw_event_enqueue_burst_generic(port, events, events_len, true,
+					       0, 0, events_len);
+}
+
+uint16_t
+dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
+{
+	return dsw_event_dequeue_burst(port, events, 1, wait);
+}
+
+static void
+dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
+			    uint16_t num)
+{
+	uint16_t i;
+
+#ifdef DSW_XSTATS
+	dsw_port_dequeue_stats(port, num);
+#endif
+
+	for (i = 0; i < num; i++) {
+		uint16_t l_idx = port->seen_events_idx;
+		struct dsw_queue_flow *qf = &port->seen_events[l_idx];
+		struct rte_event *event = &events[i];
+		qf->queue_id = event->queue_id;
+		qf->flow_hash = dsw_flow_id_hash(event->flow_id);
+
+		port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
+
+#ifdef DSW_XSTATS
+		dsw_port_queue_dequeued_stats(port, event->queue_id);
+#endif
+	}
+
+	if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
+		port->seen_events_len =
+			RTE_MIN(port->seen_events_len + num,
+				DSW_MAX_EVENTS_RECORDED);
+}
+
+#ifdef DSW_SORT_DEQUEUED
+
+#define DSW_EVENT_TO_INT(_event)				\
+	((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))
+
+static inline int
+dsw_cmp_event(const void *v_event_a, const void *v_event_b)
+{
+	const struct rte_event *event_a = v_event_a;
+	const struct rte_event *event_b = v_event_b;
+
+	return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);
+}
+#endif
+
+static uint16_t
+dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
+		       uint16_t num)
+{
+	struct dsw_port *source_port = port;
+	struct dsw_evdev *dsw = source_port->dsw;
+
+	dsw_port_ctl_process(dsw, source_port);
+
+	if (unlikely(port->in_buffer_len > 0)) {
+		uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);
+
+		rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
+			   dequeued * sizeof(struct rte_event));
+
+		port->in_buffer_start += dequeued;
+		port->in_buffer_len -= dequeued;
+
+		if (port->in_buffer_len == 0)
+			port->in_buffer_start = 0;
+
+		return dequeued;
+	}
+
+	return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
+}
+
+uint16_t
+dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
+			uint64_t wait __rte_unused)
+{
+	struct dsw_port *source_port = port;
+	struct dsw_evdev *dsw = source_port->dsw;
+	uint16_t dequeued;
+
+	source_port->pending_releases = 0;
+
+	dsw_port_bg_process(dsw, source_port);
+
+	if (unlikely(num > source_port->dequeue_depth))
+		num = source_port->dequeue_depth;
+
+	dequeued = dsw_port_dequeue_burst(source_port, events, num);
+
+	source_port->pending_releases = dequeued;
+
+	dsw_port_load_record(source_port, dequeued);
+
+	dsw_port_note_op(source_port, dequeued);
+
+	if (dequeued > 0) {
+		DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
+				dequeued);
+
+		dsw_port_return_credits(dsw, source_port, dequeued);
+
+		/* One potential optimization one might think of is to
+		 * add a migration state (prior to 'pausing'), and
+		 * only record seen events when the port is in this
+		 * state (and transit to 'pausing' when enough events
+		 * have been gathered). However, that schema doesn't
+		 * seem to improve performance.
+		 */
+		dsw_port_record_seen_events(port, events, dequeued);
+	}
+	/* XXX: Assuming the port can't produce any more work,
+	 *	consider flushing the output buffer, on dequeued ==
+	 *	0.
+	 */
+
+#ifdef DSW_SORT_DEQUEUED
+	dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);
+#endif
+
+	return dequeued;
+}
+
+void dsw_event_schedule(struct rte_eventdev *dev __rte_unused)
+{
+}
diff --git a/drivers/event/dsw/dsw_sort.h b/drivers/event/dsw/dsw_sort.h
new file mode 100644
index 000000000..ff8ef82f2
--- /dev/null
+++ b/drivers/event/dsw/dsw_sort.h
@@ -0,0 +1,47 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2018 Ericsson AB
+ */
+
+#ifndef _DSW_SORT_
+#define _DSW_SORT_
+
+#include <rte_common.h>
+#include <string.h>
+
+#define DSW_ARY_ELEM_PTR(_ary, _idx, _elem_size)	\
+	RTE_PTR_ADD(_ary, (_idx) * (_elem_size))
+
+#define DSW_ARY_ELEM_SWAP(_ary, _a_idx, _b_idx, _elem_size)		\
+	do {								\
+		char tmp[_elem_size];					\
+		void *_a_ptr = DSW_ARY_ELEM_PTR(_ary, _a_idx, _elem_size); \
+		void *_b_ptr = DSW_ARY_ELEM_PTR(_ary, _b_idx, _elem_size); \
+		memcpy(tmp, _a_ptr, _elem_size);			\
+		memcpy(_a_ptr, _b_ptr, _elem_size);			\
+		memcpy(_b_ptr, tmp, _elem_size);			\
+	} while (0)
+
+static inline void
+dsw_insertion_sort(void *ary, uint16_t len, uint16_t elem_size,
+		   int (*cmp_fn)(const void *, const void *))
+{
+	uint16_t i;
+
+	for (i = 1; i < len; i++) {
+		uint16_t j;
+		for (j = i; j > 0 &&
+			     cmp_fn(DSW_ARY_ELEM_PTR(ary, j-1, elem_size),
+				    DSW_ARY_ELEM_PTR(ary, j, elem_size)) > 0;
+		     j--)
+			DSW_ARY_ELEM_SWAP(ary, j, j-1, elem_size);
+	}
+}
+
+static inline void
+dsw_stable_sort(void *ary, uint16_t len, uint16_t elem_size,
+		int (*cmp_fn)(const void *, const void *))
+{
+	dsw_insertion_sort(ary, len, elem_size, cmp_fn);
+}
+
+#endif
diff --git a/drivers/event/dsw/dsw_xstats.c b/drivers/event/dsw/dsw_xstats.c
new file mode 100644
index 000000000..d688ee07e
--- /dev/null
+++ b/drivers/event/dsw/dsw_xstats.c
@@ -0,0 +1,284 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2018 Ericsson AB
+ */
+
+#include "dsw_evdev.h"
+#include <rte_debug.h>
+#include <string.h>
+
+#ifdef DSW_XSTATS
+
+/* The high bits in the xstats id is used to store an additional
+ * parameter (beyond the queue or port id already in the xstats
+ * interface).
+ */
+#define DSW_XSTATS_ID_PARAM_BITS (8)
+#define DSW_XSTATS_ID_STAT_BITS					\
+	(sizeof(unsigned int)*CHAR_BIT - DSW_XSTATS_ID_PARAM_BITS)
+#define DSW_XSTATS_ID_STAT_MASK ((1 << DSW_XSTATS_ID_STAT_BITS) - 1)
+
+#define DSW_XSTATS_ID_GET_PARAM(id)		\
+	((id)>>DSW_XSTATS_ID_STAT_BITS)
+
+#define DSW_XSTATS_ID_GET_STAT(id)		\
+	((id) & DSW_XSTATS_ID_STAT_MASK)
+
+#define DSW_XSTATS_ID_CREATE(id, param_value)			\
+	(((param_value) << DSW_XSTATS_ID_STAT_BITS) | id)
+
+typedef
+uint64_t (*dsw_xstats_dev_get_value_fn)(struct dsw_evdev *dsw);
+
+struct dsw_xstat_dev {
+	const char *name;
+	dsw_xstats_dev_get_value_fn get_value_fn;
+};
+
+typedef
+uint64_t (*dsw_xstats_port_get_value_fn)(struct dsw_evdev *dsw,
+					 uint8_t port_id, uint8_t queue_id);
+
+struct dsw_xstats_port {
+	const char *name_fmt;
+	dsw_xstats_port_get_value_fn get_value_fn;
+	bool per_queue;
+};
+
+static uint64_t
+dsw_xstats_dev_credits_on_loan(struct dsw_evdev *dsw)
+{
+	return rte_atomic32_read(&dsw->credits_on_loan);
+}
+
+static struct dsw_xstat_dev dsw_dev_xstats[] = {
+	{ "dev_credits_on_loan", dsw_xstats_dev_credits_on_loan }
+};
+
+#define DSW_GEN_PORT_ACCESS_FN(_variable)				\
+	static uint64_t							\
+	dsw_xstats_port_get_ ## _variable(struct dsw_evdev *dsw,	\
+					  uint8_t port_id,		\
+					  uint8_t queue_id __rte_unused) \
+	{								\
+		return dsw->ports[port_id]._variable;			\
+	}
+
+DSW_GEN_PORT_ACCESS_FN(new_enqueued)
+DSW_GEN_PORT_ACCESS_FN(forward_enqueued)
+DSW_GEN_PORT_ACCESS_FN(release_enqueued)
+
+static uint64_t
+dsw_xstats_port_get_queue_enqueued(struct dsw_evdev *dsw, uint8_t port_id,
+				   uint8_t queue_id)
+{
+	return dsw->ports[port_id].queue_enqueued[queue_id];
+}
+
+DSW_GEN_PORT_ACCESS_FN(dequeued)
+
+static uint64_t
+dsw_xstats_port_get_queue_dequeued(struct dsw_evdev *dsw, uint8_t port_id,
+				   uint8_t queue_id)
+{
+	return dsw->ports[port_id].queue_dequeued[queue_id];
+}
+
+DSW_GEN_PORT_ACCESS_FN(migrations)
+
+static uint64_t
+dsw_xstats_port_get_migration_latency(struct dsw_evdev *dsw, uint8_t port_id,
+				      uint8_t queue_id __rte_unused)
+{
+	uint64_t total_latency = dsw->ports[port_id].migration_latency;
+	uint64_t num_migrations = dsw->ports[port_id].migrations;
+
+	return num_migrations > 0 ? total_latency / num_migrations : 0;
+}
+
+static uint64_t
+dsw_xstats_port_get_event_proc_latency(struct dsw_evdev *dsw, uint8_t port_id,
+				       uint8_t queue_id __rte_unused)
+{
+	uint64_t total_busy_cycles =
+		dsw->ports[port_id].total_busy_cycles;
+	uint64_t dequeued =
+		dsw->ports[port_id].dequeued;
+
+	return dequeued > 0 ? total_busy_cycles / dequeued : 0;
+}
+
+DSW_GEN_PORT_ACCESS_FN(inflight_credits)
+
+static uint64_t
+dsw_xstats_port_get_load(struct dsw_evdev *dsw, uint8_t port_id,
+			 uint8_t queue_id __rte_unused)
+{
+	int16_t load = rte_atomic16_read(&dsw->ports[port_id].load);
+	return DSW_LOAD_TO_PERCENT(load);
+}
+
+DSW_GEN_PORT_ACCESS_FN(last_bg)
+
+static struct dsw_xstats_port dsw_port_xstats[] = {
+	{ "port_%u_new_enqueued", dsw_xstats_port_get_new_enqueued,
+	  false },
+	{ "port_%u_forward_enqueued", dsw_xstats_port_get_forward_enqueued,
+	  false },
+	{ "port_%u_release_enqueued", dsw_xstats_port_get_release_enqueued,
+	  false },
+	{ "port_%u_queue_%u_enqueued", dsw_xstats_port_get_queue_enqueued,
+	  true },
+	{ "port_%u_dequeued", dsw_xstats_port_get_dequeued,
+	  false },
+	{ "port_%u_queue_%u_dequeued", dsw_xstats_port_get_queue_dequeued,
+	  true },
+	{ "port_%u_migrations", dsw_xstats_port_get_migrations,
+	  false },
+	{ "port_%u_migration_latency", dsw_xstats_port_get_migration_latency,
+	  false },
+	{ "port_%u_event_proc_latency", dsw_xstats_port_get_event_proc_latency,
+	  false },
+	{ "port_%u_inflight_credits", dsw_xstats_port_get_inflight_credits,
+	  false },
+	{ "port_%u_load", dsw_xstats_port_get_load,
+	  false },
+	{ "port_%u_last_bg", dsw_xstats_port_get_last_bg,
+	  false }
+};
+
+static int
+dsw_xstats_dev_get_names(struct rte_event_dev_xstats_name *xstats_names,
+			 unsigned int *ids, unsigned int size)
+{
+	unsigned int i;
+
+	for (i = 0; i < RTE_DIM(dsw_dev_xstats) && i < size; i++) {
+		ids[i] = i;
+		strcpy(xstats_names[i].name, dsw_dev_xstats[i].name);
+	}
+
+	return i;
+}
+
+static int
+dsw_xstats_port_get_names(struct dsw_evdev *dsw, uint8_t port_id,
+			  struct rte_event_dev_xstats_name *xstats_names,
+			  unsigned int *ids, unsigned int size)
+{
+	uint8_t queue_id = 0;
+	unsigned int id_idx;
+	unsigned int stat_idx;
+
+	for (id_idx = 0, stat_idx = 0;
+	     id_idx < size && stat_idx < RTE_DIM(dsw_port_xstats);
+	     id_idx++) {
+		struct dsw_xstats_port *xstat = &dsw_port_xstats[stat_idx];
+
+		if (xstat->per_queue) {
+			ids[id_idx] = DSW_XSTATS_ID_CREATE(stat_idx, queue_id);
+			snprintf(xstats_names[id_idx].name,
+				 RTE_EVENT_DEV_XSTATS_NAME_SIZE,
+				 dsw_port_xstats[stat_idx].name_fmt, port_id,
+				 queue_id);
+			queue_id++;
+		} else {
+			ids[id_idx] = stat_idx;
+			snprintf(xstats_names[id_idx].name,
+				 RTE_EVENT_DEV_XSTATS_NAME_SIZE,
+				 dsw_port_xstats[stat_idx].name_fmt, port_id);
+		}
+
+		if (!(xstat->per_queue && queue_id < dsw->num_queues)) {
+			stat_idx++;
+			queue_id = 0;
+		}
+	}
+	return id_idx;
+}
+
+int
+dsw_xstats_get_names(const struct rte_eventdev *dev,
+		     enum rte_event_dev_xstats_mode mode,
+		     uint8_t queue_port_id,
+		     struct rte_event_dev_xstats_name *xstats_names,
+		     unsigned int *ids, unsigned int size)
+{
+	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+
+	switch (mode) {
+	case RTE_EVENT_DEV_XSTATS_DEVICE:
+		return dsw_xstats_dev_get_names(xstats_names, ids, size);
+	case RTE_EVENT_DEV_XSTATS_PORT:
+		return dsw_xstats_port_get_names(dsw, queue_port_id,
+						 xstats_names, ids, size);
+	case RTE_EVENT_DEV_XSTATS_QUEUE:
+		return 0;
+	default:
+		RTE_ASSERT(false);
+		return -1;
+	}
+}
+
+static int
+dsw_xstats_dev_get(const struct rte_eventdev *dev,
+		   const unsigned int ids[], uint64_t values[], unsigned int n)
+{
+	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+	unsigned int i;
+	for (i = 0; i < n; i++) {
+		unsigned int id = ids[i];
+		struct dsw_xstat_dev *xstat = &dsw_dev_xstats[id];
+		values[i] = xstat->get_value_fn(dsw);
+	}
+	return n;
+}
+
+static int
+dsw_xstats_port_get(const struct rte_eventdev *dev, uint8_t port_id,
+		    const unsigned int ids[], uint64_t values[], unsigned int n)
+{
+	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+	unsigned int i;
+	for (i = 0; i < n; i++) {
+		unsigned int id = ids[i];
+		unsigned int stat_idx = DSW_XSTATS_ID_GET_STAT(id);
+		struct dsw_xstats_port *xstat = &dsw_port_xstats[stat_idx];
+		uint8_t queue_id = 0;
+
+		if (xstat->per_queue)
+			queue_id = DSW_XSTATS_ID_GET_PARAM(id);
+
+		values[i] = xstat->get_value_fn(dsw, port_id, queue_id);
+	}
+	return n;
+}
+
+int
+dsw_xstats_get(const struct rte_eventdev *dev,
+	       enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id,
+	       const unsigned int ids[], uint64_t values[], unsigned int n)
+{
+	switch (mode) {
+	case RTE_EVENT_DEV_XSTATS_DEVICE:
+		return dsw_xstats_dev_get(dev, ids, values, n);
+	case RTE_EVENT_DEV_XSTATS_PORT:
+		return dsw_xstats_port_get(dev, queue_port_id, ids, values, n);
+	case RTE_EVENT_DEV_XSTATS_QUEUE:
+		return 0;
+	default:
+		RTE_ASSERT(false);
+		return -1;
+	}
+	return 0;
+}
+
+uint64_t dsw_xstats_get_by_name(const struct rte_eventdev *dev,
+				const char *name, unsigned int *id)
+{
+	RTE_SET_USED(dev);
+	RTE_SET_USED(name);
+	RTE_SET_USED(id);
+	return 0;
+}
+
+#endif
diff --git a/drivers/event/dsw/rte_pmd_evdev_dsw_version.map b/drivers/event/dsw/rte_pmd_evdev_dsw_version.map
new file mode 100644
index 000000000..ad6e191e4
--- /dev/null
+++ b/drivers/event/dsw/rte_pmd_evdev_dsw_version.map
@@ -0,0 +1,3 @@
+DPDK_18.08 {
+	local: *;
+};
diff --git a/mk/rte.app.mk b/mk/rte.app.mk
index 7bcf6308d..b14f1e112 100644
--- a/mk/rte.app.mk
+++ b/mk/rte.app.mk
@@ -218,6 +218,7 @@ endif # CONFIG_RTE_LIBRTE_COMPRESSDEV
 ifeq ($(CONFIG_RTE_LIBRTE_EVENTDEV),y)
 _LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_SKELETON_EVENTDEV) += -lrte_pmd_skeleton_event
 _LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += -lrte_pmd_sw_event
+_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += -lrte_pmd_dsw_event
 _LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_OCTEONTX_SSOVF) += -lrte_pmd_octeontx_ssovf
 ifeq ($(CONFIG_RTE_LIBRTE_DPAA_BUS),y)
 _LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_DPAA_EVENTDEV) += -lrte_pmd_dpaa_event
-- 
2.17.1

^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2018-08-27 10:24 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
     [not found] <20180711210844.5467-1-mattias.ronnblom@ericsson.com>
     [not found] ` <20180711210844.5467-2-mattias.ronnblom@ericsson.com>
2018-07-22 11:32   ` [dpdk-dev] [RFC 1/1] eventdev: add distributed software (DSW) event device Jerin Jacob
2018-08-19  6:11     ` Jerin Jacob
2018-08-23 13:08       ` Mattias Rönnblom
2018-08-23 13:44         ` Jerin Jacob
2018-08-23 20:08           ` Mattias Rönnblom
2018-08-27  9:23     ` Mattias Rönnblom
2018-08-27  9:40       ` Jerin Jacob
2018-08-27 10:24         ` Mattias Rönnblom
2018-07-11 21:21 [dpdk-dev] [RFC 0/1] A Distributed Software Event Device Mattias Rönnblom
2018-07-11 21:21 ` [dpdk-dev] [RFC 1/1] eventdev: add distributed software (DSW) event device Mattias Rönnblom

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).