* [dpdk-dev] [PATCH 3/8] event/dsw: extend statistics
2020-03-09 6:50 [dpdk-dev] [PATCH 0/8] DSW performance and statistics improvements Mattias Rönnblom
2020-03-09 6:50 ` [dpdk-dev] [PATCH 1/8] event/dsw: reduce latency in low-load situations Mattias Rönnblom
2020-03-09 6:51 ` [dpdk-dev] [PATCH 2/8] event/dsw: reduce max flows to speed up load balancing Mattias Rönnblom
@ 2020-03-09 6:51 ` Mattias Rönnblom
2020-03-09 6:51 ` [dpdk-dev] [PATCH 4/8] event/dsw: improve migration mechanism Mattias Rönnblom
` (5 subsequent siblings)
8 siblings, 0 replies; 17+ messages in thread
From: Mattias Rönnblom @ 2020-03-09 6:51 UTC (permalink / raw)
To: jerinj; +Cc: dev, stefan.sundkvist, Ola.Liljedahl, Mattias Rönnblom
Extend DSW xstats.
To allow visualization of migrations, track the number flow
immigrations in "port_<N>_immigrations". The "port_<N>_migrations"
retains legacy semantics, but is renamed "port_<N>_emigrations".
Expose the number of events currently undergoing processing
(i.e. pending releases) at a particular port.
Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
---
drivers/event/dsw/dsw_evdev.h | 16 ++--
drivers/event/dsw/dsw_event.c | 131 +++++++++++++++++----------------
drivers/event/dsw/dsw_xstats.c | 17 +++--
3 files changed, 91 insertions(+), 73 deletions(-)
diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
index dc44bce81..2c7f9efa3 100644
--- a/drivers/event/dsw/dsw_evdev.h
+++ b/drivers/event/dsw/dsw_evdev.h
@@ -162,18 +162,20 @@ struct dsw_port {
uint64_t total_busy_cycles;
/* For the ctl interface and flow migration mechanism. */
- uint64_t next_migration;
+ uint64_t next_emigration;
uint64_t migration_interval;
enum dsw_migration_state migration_state;
- uint64_t migration_start;
- uint64_t migrations;
- uint64_t migration_latency;
+ uint64_t emigration_start;
+ uint64_t emigrations;
+ uint64_t emigration_latency;
- uint8_t migration_target_port_id;
- struct dsw_queue_flow migration_target_qf;
+ uint8_t emigration_target_port_id;
+ struct dsw_queue_flow emigration_target_qf;
uint8_t cfm_cnt;
+ uint64_t immigrations;
+
uint16_t paused_flows_len;
struct dsw_queue_flow paused_flows[DSW_MAX_PAUSED_FLOWS];
@@ -187,11 +189,13 @@ struct dsw_port {
uint16_t seen_events_idx;
struct dsw_queue_flow seen_events[DSW_MAX_EVENTS_RECORDED];
+ uint64_t enqueue_calls;
uint64_t new_enqueued;
uint64_t forward_enqueued;
uint64_t release_enqueued;
uint64_t queue_enqueued[DSW_MAX_QUEUES];
+ uint64_t dequeue_calls;
uint64_t dequeued;
uint64_t queue_dequeued[DSW_MAX_QUEUES];
diff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c
index 7f1f29218..69cff7aa2 100644
--- a/drivers/event/dsw/dsw_event.c
+++ b/drivers/event/dsw/dsw_event.c
@@ -385,12 +385,12 @@ dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
}
static bool
-dsw_select_migration_target(struct dsw_evdev *dsw,
- struct dsw_port *source_port,
- struct dsw_queue_flow_burst *bursts,
- uint16_t num_bursts, int16_t *port_loads,
- int16_t max_load, struct dsw_queue_flow *target_qf,
- uint8_t *target_port_id)
+dsw_select_emigration_target(struct dsw_evdev *dsw,
+ struct dsw_port *source_port,
+ struct dsw_queue_flow_burst *bursts,
+ uint16_t num_bursts, int16_t *port_loads,
+ int16_t max_load, struct dsw_queue_flow *target_qf,
+ uint8_t *target_port_id)
{
uint16_t source_load = port_loads[source_port->id];
uint16_t i;
@@ -598,39 +598,39 @@ dsw_port_flush_paused_events(struct dsw_evdev *dsw,
}
static void
-dsw_port_migration_stats(struct dsw_port *port)
+dsw_port_emigration_stats(struct dsw_port *port)
{
- uint64_t migration_latency;
+ uint64_t emigration_latency;
- migration_latency = (rte_get_timer_cycles() - port->migration_start);
- port->migration_latency += migration_latency;
- port->migrations++;
+ emigration_latency = (rte_get_timer_cycles() - port->emigration_start);
+ port->emigration_latency += emigration_latency;
+ port->emigrations++;
}
static void
-dsw_port_end_migration(struct dsw_evdev *dsw, struct dsw_port *port)
+dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port)
{
- uint8_t queue_id = port->migration_target_qf.queue_id;
- uint16_t flow_hash = port->migration_target_qf.flow_hash;
+ uint8_t queue_id = port->emigration_target_qf.queue_id;
+ uint16_t flow_hash = port->emigration_target_qf.flow_hash;
port->migration_state = DSW_MIGRATION_STATE_IDLE;
port->seen_events_len = 0;
- dsw_port_migration_stats(port);
+ dsw_port_emigration_stats(port);
if (dsw->queues[queue_id].schedule_type != RTE_SCHED_TYPE_PARALLEL) {
dsw_port_remove_paused_flow(port, queue_id, flow_hash);
dsw_port_flush_paused_events(dsw, port, queue_id, flow_hash);
}
- DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for queue_id "
+ DSW_LOG_DP_PORT(DEBUG, port->id, "Emigration completed for queue_id "
"%d flow_hash %d.\n", queue_id, flow_hash);
}
static void
-dsw_port_consider_migration(struct dsw_evdev *dsw,
- struct dsw_port *source_port,
- uint64_t now)
+dsw_port_consider_emigration(struct dsw_evdev *dsw,
+ struct dsw_port *source_port,
+ uint64_t now)
{
bool any_port_below_limit;
struct dsw_queue_flow *seen_events = source_port->seen_events;
@@ -640,7 +640,7 @@ dsw_port_consider_migration(struct dsw_evdev *dsw,
int16_t source_port_load;
int16_t port_loads[dsw->num_ports];
- if (now < source_port->next_migration)
+ if (now < source_port->next_emigration)
return;
if (dsw->num_ports == 1)
@@ -649,25 +649,25 @@ dsw_port_consider_migration(struct dsw_evdev *dsw,
if (seen_events_len < DSW_MAX_EVENTS_RECORDED)
return;
- DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering migration.\n");
+ DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering emigration.\n");
/* Randomize interval to avoid having all threads considering
- * migration at the same in point in time, which might lead to
- * all choosing the same target port.
+ * emigration at the same in point in time, which might lead
+ * to all choosing the same target port.
*/
- source_port->next_migration = now +
+ source_port->next_emigration = now +
source_port->migration_interval / 2 +
rte_rand() % source_port->migration_interval;
if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
DSW_LOG_DP_PORT(DEBUG, source_port->id,
- "Migration already in progress.\n");
+ "Emigration already in progress.\n");
return;
}
/* For simplicity, avoid migration in the unlikely case there
* is still events to consume in the in_buffer (from the last
- * migration).
+ * emigration).
*/
if (source_port->in_buffer_len > 0) {
DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
@@ -719,52 +719,56 @@ dsw_port_consider_migration(struct dsw_evdev *dsw,
}
/* The strategy is to first try to find a flow to move to a
- * port with low load (below the migration-attempt
+ * port with low load (below the emigration-attempt
* threshold). If that fails, we try to find a port which is
* below the max threshold, and also less loaded than this
* port is.
*/
- if (!dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
- port_loads,
- DSW_MIN_SOURCE_LOAD_FOR_MIGRATION,
- &source_port->migration_target_qf,
- &source_port->migration_target_port_id)
+ if (!dsw_select_emigration_target(dsw, source_port, bursts, num_bursts,
+ port_loads,
+ DSW_MIN_SOURCE_LOAD_FOR_MIGRATION,
+ &source_port->emigration_target_qf,
+ &source_port->emigration_target_port_id)
&&
- !dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
- port_loads,
- DSW_MAX_TARGET_LOAD_FOR_MIGRATION,
- &source_port->migration_target_qf,
- &source_port->migration_target_port_id))
+ !dsw_select_emigration_target(dsw, source_port, bursts, num_bursts,
+ port_loads,
+ DSW_MAX_TARGET_LOAD_FOR_MIGRATION,
+ &source_port->emigration_target_qf,
+ &source_port->emigration_target_port_id))
return;
DSW_LOG_DP_PORT(DEBUG, source_port->id, "Migrating queue_id %d "
"flow_hash %d from port %d to port %d.\n",
- source_port->migration_target_qf.queue_id,
- source_port->migration_target_qf.flow_hash,
- source_port->id, source_port->migration_target_port_id);
+ source_port->emigration_target_qf.queue_id,
+ source_port->emigration_target_qf.flow_hash,
+ source_port->id,
+ source_port->emigration_target_port_id);
/* We have a winner. */
source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
- source_port->migration_start = rte_get_timer_cycles();
+ source_port->emigration_start = rte_get_timer_cycles();
/* No need to go through the whole pause procedure for
* parallel queues, since atomic/ordered semantics need not to
* be maintained.
*/
- if (dsw->queues[source_port->migration_target_qf.queue_id].schedule_type
- == RTE_SCHED_TYPE_PARALLEL) {
- uint8_t queue_id = source_port->migration_target_qf.queue_id;
- uint16_t flow_hash = source_port->migration_target_qf.flow_hash;
- uint8_t dest_port_id = source_port->migration_target_port_id;
+ if (dsw->queues[source_port->emigration_target_qf.queue_id].
+ schedule_type == RTE_SCHED_TYPE_PARALLEL) {
+ uint8_t queue_id =
+ source_port->emigration_target_qf.queue_id;
+ uint16_t flow_hash =
+ source_port->emigration_target_qf.flow_hash;
+ uint8_t dest_port_id =
+ source_port->emigration_target_port_id;
/* Single byte-sized stores are always atomic. */
dsw->queues[queue_id].flow_to_port_map[flow_hash] =
dest_port_id;
rte_smp_wmb();
- dsw_port_end_migration(dsw, source_port);
+ dsw_port_end_emigration(dsw, source_port);
return;
}
@@ -775,12 +779,12 @@ dsw_port_consider_migration(struct dsw_evdev *dsw,
dsw_port_flush_out_buffers(dsw, source_port);
dsw_port_add_paused_flow(source_port,
- source_port->migration_target_qf.queue_id,
- source_port->migration_target_qf.flow_hash);
+ source_port->emigration_target_qf.queue_id,
+ source_port->emigration_target_qf.flow_hash);
dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
- source_port->migration_target_qf.queue_id,
- source_port->migration_target_qf.flow_hash);
+ source_port->emigration_target_qf.queue_id,
+ source_port->emigration_target_qf.flow_hash);
source_port->cfm_cnt = 0;
}
@@ -808,6 +812,9 @@ dsw_port_handle_unpause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
rte_smp_rmb();
+ if (dsw_schedule(dsw, queue_id, paused_flow_hash) == port->id)
+ port->immigrations++;
+
dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
dsw_port_flush_paused_events(dsw, port, queue_id, paused_flow_hash);
@@ -816,10 +823,10 @@ dsw_port_handle_unpause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
#define FORWARD_BURST_SIZE (32)
static void
-dsw_port_forward_migrated_flow(struct dsw_port *source_port,
- struct rte_event_ring *dest_ring,
- uint8_t queue_id,
- uint16_t flow_hash)
+dsw_port_forward_emigrated_flow(struct dsw_port *source_port,
+ struct rte_event_ring *dest_ring,
+ uint8_t queue_id,
+ uint16_t flow_hash)
{
uint16_t events_left;
@@ -868,9 +875,9 @@ static void
dsw_port_move_migrating_flow(struct dsw_evdev *dsw,
struct dsw_port *source_port)
{
- uint8_t queue_id = source_port->migration_target_qf.queue_id;
- uint16_t flow_hash = source_port->migration_target_qf.flow_hash;
- uint8_t dest_port_id = source_port->migration_target_port_id;
+ uint8_t queue_id = source_port->emigration_target_qf.queue_id;
+ uint16_t flow_hash = source_port->emigration_target_qf.flow_hash;
+ uint8_t dest_port_id = source_port->emigration_target_port_id;
struct dsw_port *dest_port = &dsw->ports[dest_port_id];
dsw_port_flush_out_buffers(dsw, source_port);
@@ -880,8 +887,8 @@ dsw_port_move_migrating_flow(struct dsw_evdev *dsw,
dsw->queues[queue_id].flow_to_port_map[flow_hash] =
dest_port_id;
- dsw_port_forward_migrated_flow(source_port, dest_port->in_ring,
- queue_id, flow_hash);
+ dsw_port_forward_emigrated_flow(source_port, dest_port->in_ring,
+ queue_id, flow_hash);
/* Flow table update and migration destination port's enqueues
* must be seen before the control message.
@@ -907,7 +914,7 @@ dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
break;
case DSW_MIGRATION_STATE_UNPAUSING:
- dsw_port_end_migration(dsw, port);
+ dsw_port_end_emigration(dsw, port);
break;
default:
RTE_ASSERT(0);
@@ -987,7 +994,7 @@ dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
dsw_port_consider_load_update(port, now);
- dsw_port_consider_migration(dsw, port, now);
+ dsw_port_consider_emigration(dsw, port, now);
port->ops_since_bg_task = 0;
}
diff --git a/drivers/event/dsw/dsw_xstats.c b/drivers/event/dsw/dsw_xstats.c
index c3f5db89c..d332a57b6 100644
--- a/drivers/event/dsw/dsw_xstats.c
+++ b/drivers/event/dsw/dsw_xstats.c
@@ -84,16 +84,17 @@ dsw_xstats_port_get_queue_dequeued(struct dsw_evdev *dsw, uint8_t port_id,
return dsw->ports[port_id].queue_dequeued[queue_id];
}
-DSW_GEN_PORT_ACCESS_FN(migrations)
+DSW_GEN_PORT_ACCESS_FN(emigrations)
+DSW_GEN_PORT_ACCESS_FN(immigrations)
static uint64_t
dsw_xstats_port_get_migration_latency(struct dsw_evdev *dsw, uint8_t port_id,
uint8_t queue_id __rte_unused)
{
- uint64_t total_latency = dsw->ports[port_id].migration_latency;
- uint64_t num_migrations = dsw->ports[port_id].migrations;
+ uint64_t total_latency = dsw->ports[port_id].emigration_latency;
+ uint64_t num_emigrations = dsw->ports[port_id].emigrations;
- return num_migrations > 0 ? total_latency / num_migrations : 0;
+ return num_emigrations > 0 ? total_latency / num_emigrations : 0;
}
static uint64_t
@@ -110,6 +111,8 @@ dsw_xstats_port_get_event_proc_latency(struct dsw_evdev *dsw, uint8_t port_id,
DSW_GEN_PORT_ACCESS_FN(inflight_credits)
+DSW_GEN_PORT_ACCESS_FN(pending_releases)
+
static uint64_t
dsw_xstats_port_get_load(struct dsw_evdev *dsw, uint8_t port_id,
uint8_t queue_id __rte_unused)
@@ -136,14 +139,18 @@ static struct dsw_xstats_port dsw_port_xstats[] = {
false },
{ "port_%u_queue_%u_dequeued", dsw_xstats_port_get_queue_dequeued,
true },
- { "port_%u_migrations", dsw_xstats_port_get_migrations,
+ { "port_%u_emigrations", dsw_xstats_port_get_emigrations,
false },
{ "port_%u_migration_latency", dsw_xstats_port_get_migration_latency,
false },
+ { "port_%u_immigrations", dsw_xstats_port_get_immigrations,
+ false },
{ "port_%u_event_proc_latency", dsw_xstats_port_get_event_proc_latency,
false },
{ "port_%u_inflight_credits", dsw_xstats_port_get_inflight_credits,
false },
+ { "port_%u_pending_releases", dsw_xstats_port_get_pending_releases,
+ false },
{ "port_%u_load", dsw_xstats_port_get_load,
false },
{ "port_%u_last_bg", dsw_xstats_port_get_last_bg,
--
2.17.1
^ permalink raw reply [flat|nested] 17+ messages in thread
* [dpdk-dev] [PATCH 4/8] event/dsw: improve migration mechanism
2020-03-09 6:50 [dpdk-dev] [PATCH 0/8] DSW performance and statistics improvements Mattias Rönnblom
` (2 preceding siblings ...)
2020-03-09 6:51 ` [dpdk-dev] [PATCH 3/8] event/dsw: extend statistics Mattias Rönnblom
@ 2020-03-09 6:51 ` Mattias Rönnblom
2020-03-09 6:51 ` [dpdk-dev] [PATCH 5/8] event/dsw: avoid migration waves in large systems Mattias Rönnblom
` (4 subsequent siblings)
8 siblings, 0 replies; 17+ messages in thread
From: Mattias Rönnblom @ 2020-03-09 6:51 UTC (permalink / raw)
To: jerinj; +Cc: dev, stefan.sundkvist, Ola.Liljedahl, Mattias Rönnblom
Allowing moving multiple flows in one migration transaction, to
rebalance load more quickly.
Introduce a threshold to avoid migrating flows between ports with very
similar load.
Simplify logic for selecting which flow to migrate. The aim is now to
move flows in such a way that the receiving port is as lightly-loaded
as possible (after receiving the flow), while still migrating enough
flows from the source port to reduce its load. This is essentially how
legacy strategy work as well, but the code is more readable.
Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
---
drivers/event/dsw/dsw_evdev.h | 15 +-
drivers/event/dsw/dsw_event.c | 541 +++++++++++++++++++++-------------
2 files changed, 343 insertions(+), 213 deletions(-)
diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
index 2c7f9efa3..ced40ef8d 100644
--- a/drivers/event/dsw/dsw_evdev.h
+++ b/drivers/event/dsw/dsw_evdev.h
@@ -93,11 +93,14 @@
#define DSW_MIGRATION_INTERVAL (1000)
#define DSW_MIN_SOURCE_LOAD_FOR_MIGRATION (DSW_LOAD_FROM_PERCENT(70))
#define DSW_MAX_TARGET_LOAD_FOR_MIGRATION (DSW_LOAD_FROM_PERCENT(95))
+#define DSW_REBALANCE_THRESHOLD (DSW_LOAD_FROM_PERCENT(3))
#define DSW_MAX_EVENTS_RECORDED (128)
+#define DSW_MAX_FLOWS_PER_MIGRATION (8)
+
/* Only one outstanding migration per port is allowed */
-#define DSW_MAX_PAUSED_FLOWS (DSW_MAX_PORTS)
+#define DSW_MAX_PAUSED_FLOWS (DSW_MAX_PORTS*DSW_MAX_FLOWS_PER_MIGRATION)
/* Enough room for paus request/confirm and unpaus request/confirm for
* all possible senders.
@@ -170,8 +173,10 @@ struct dsw_port {
uint64_t emigrations;
uint64_t emigration_latency;
- uint8_t emigration_target_port_id;
- struct dsw_queue_flow emigration_target_qf;
+ uint8_t emigration_target_port_ids[DSW_MAX_FLOWS_PER_MIGRATION];
+ struct dsw_queue_flow
+ emigration_target_qfs[DSW_MAX_FLOWS_PER_MIGRATION];
+ uint8_t emigration_targets_len;
uint8_t cfm_cnt;
uint64_t immigrations;
@@ -244,8 +249,8 @@ struct dsw_evdev {
struct dsw_ctl_msg {
uint8_t type;
uint8_t originating_port_id;
- uint8_t queue_id;
- uint16_t flow_hash;
+ uint8_t qfs_len;
+ struct dsw_queue_flow qfs[DSW_MAX_FLOWS_PER_MIGRATION];
} __rte_aligned(4);
uint16_t dsw_event_enqueue(void *port, const struct rte_event *event);
diff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c
index 69cff7aa2..21c102275 100644
--- a/drivers/event/dsw/dsw_event.c
+++ b/drivers/event/dsw/dsw_event.c
@@ -189,58 +189,75 @@ dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
static void
dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
- uint8_t type, uint8_t queue_id, uint16_t flow_hash)
+ uint8_t type, struct dsw_queue_flow *qfs,
+ uint8_t qfs_len)
{
uint16_t port_id;
struct dsw_ctl_msg msg = {
.type = type,
.originating_port_id = source_port->id,
- .queue_id = queue_id,
- .flow_hash = flow_hash
+ .qfs_len = qfs_len
};
+ memcpy(msg.qfs, qfs, sizeof(struct dsw_queue_flow) * qfs_len);
+
for (port_id = 0; port_id < dsw->num_ports; port_id++)
if (port_id != source_port->id)
dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
}
-static bool
-dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
- uint16_t flow_hash)
+static __rte_always_inline bool
+dsw_is_queue_flow_in_ary(const struct dsw_queue_flow *qfs, uint16_t qfs_len,
+ uint8_t queue_id, uint16_t flow_hash)
{
uint16_t i;
- for (i = 0; i < port->paused_flows_len; i++) {
- struct dsw_queue_flow *qf = &port->paused_flows[i];
- if (qf->queue_id == queue_id &&
- qf->flow_hash == flow_hash)
+ for (i = 0; i < qfs_len; i++)
+ if (qfs[i].queue_id == queue_id &&
+ qfs[i].flow_hash == flow_hash)
return true;
- }
+
return false;
}
+static __rte_always_inline bool
+dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
+ uint16_t flow_hash)
+{
+ return dsw_is_queue_flow_in_ary(port->paused_flows,
+ port->paused_flows_len,
+ queue_id, flow_hash);
+}
+
static void
-dsw_port_add_paused_flow(struct dsw_port *port, uint8_t queue_id,
- uint16_t paused_flow_hash)
+dsw_port_add_paused_flows(struct dsw_port *port, struct dsw_queue_flow *qfs,
+ uint8_t qfs_len)
{
- port->paused_flows[port->paused_flows_len] = (struct dsw_queue_flow) {
- .queue_id = queue_id,
- .flow_hash = paused_flow_hash
+ uint8_t i;
+
+ for (i = 0; i < qfs_len; i++) {
+ struct dsw_queue_flow *qf = &qfs[i];
+
+ DSW_LOG_DP_PORT(DEBUG, port->id,
+ "Pausing queue_id %d flow_hash %d.\n",
+ qf->queue_id, qf->flow_hash);
+
+ port->paused_flows[port->paused_flows_len] = *qf;
+ port->paused_flows_len++;
};
- port->paused_flows_len++;
}
static void
-dsw_port_remove_paused_flow(struct dsw_port *port, uint8_t queue_id,
- uint16_t paused_flow_hash)
+dsw_port_remove_paused_flow(struct dsw_port *port,
+ struct dsw_queue_flow *target_qf)
{
uint16_t i;
for (i = 0; i < port->paused_flows_len; i++) {
struct dsw_queue_flow *qf = &port->paused_flows[i];
- if (qf->queue_id == queue_id &&
- qf->flow_hash == paused_flow_hash) {
+ if (qf->queue_id == target_qf->queue_id &&
+ qf->flow_hash == target_qf->flow_hash) {
uint16_t last_idx = port->paused_flows_len-1;
if (i != last_idx)
port->paused_flows[i] =
@@ -251,30 +268,37 @@ dsw_port_remove_paused_flow(struct dsw_port *port, uint8_t queue_id,
}
}
+static void
+dsw_port_remove_paused_flows(struct dsw_port *port,
+ struct dsw_queue_flow *qfs, uint8_t qfs_len)
+{
+ uint8_t i;
+
+ for (i = 0; i < qfs_len; i++)
+ dsw_port_remove_paused_flow(port, &qfs[i]);
+
+}
+
static void
dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
static void
-dsw_port_handle_pause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
- uint8_t originating_port_id, uint8_t queue_id,
- uint16_t paused_flow_hash)
+dsw_port_handle_pause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
+ uint8_t originating_port_id,
+ struct dsw_queue_flow *paused_qfs,
+ uint8_t qfs_len)
{
struct dsw_ctl_msg cfm = {
.type = DSW_CTL_CFM,
- .originating_port_id = port->id,
- .queue_id = queue_id,
- .flow_hash = paused_flow_hash
+ .originating_port_id = port->id
};
- DSW_LOG_DP_PORT(DEBUG, port->id, "Pausing queue_id %d flow_hash %d.\n",
- queue_id, paused_flow_hash);
-
/* There might be already-scheduled events belonging to the
* paused flow in the output buffers.
*/
dsw_port_flush_out_buffers(dsw, port);
- dsw_port_add_paused_flow(port, queue_id, paused_flow_hash);
+ dsw_port_add_paused_flows(port, paused_qfs, qfs_len);
/* Make sure any stores to the original port's in_ring is seen
* before the ctl message.
@@ -284,47 +308,11 @@ dsw_port_handle_pause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
}
-static void
-dsw_find_lowest_load_port(uint8_t *port_ids, uint16_t num_port_ids,
- uint8_t exclude_port_id, int16_t *port_loads,
- uint8_t *target_port_id, int16_t *target_load)
-{
- int16_t candidate_port_id = -1;
- int16_t candidate_load = DSW_MAX_LOAD;
- uint16_t i;
-
- for (i = 0; i < num_port_ids; i++) {
- uint8_t port_id = port_ids[i];
- if (port_id != exclude_port_id) {
- int16_t load = port_loads[port_id];
- if (candidate_port_id == -1 ||
- load < candidate_load) {
- candidate_port_id = port_id;
- candidate_load = load;
- }
- }
- }
- *target_port_id = candidate_port_id;
- *target_load = candidate_load;
-}
-
struct dsw_queue_flow_burst {
struct dsw_queue_flow queue_flow;
uint16_t count;
};
-static inline int
-dsw_cmp_burst(const void *v_burst_a, const void *v_burst_b)
-{
- const struct dsw_queue_flow_burst *burst_a = v_burst_a;
- const struct dsw_queue_flow_burst *burst_b = v_burst_b;
-
- int a_count = burst_a->count;
- int b_count = burst_b->count;
-
- return a_count - b_count;
-}
-
#define DSW_QF_TO_INT(_qf) \
((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
@@ -363,8 +351,6 @@ dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
current_burst->count++;
}
- qsort(bursts, num_bursts, sizeof(bursts[0]), dsw_cmp_burst);
-
return num_bursts;
}
@@ -384,44 +370,158 @@ dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
return below_limit;
}
+static int16_t
+dsw_flow_load(uint16_t num_events, int16_t port_load)
+{
+ return ((int32_t)port_load * (int32_t)num_events) /
+ DSW_MAX_EVENTS_RECORDED;
+}
+
+static int16_t
+dsw_evaluate_migration(int16_t source_load, int16_t target_load,
+ int16_t flow_load)
+{
+ int32_t res_target_load;
+ int32_t imbalance;
+
+ if (target_load > DSW_MAX_TARGET_LOAD_FOR_MIGRATION)
+ return -1;
+
+ imbalance = source_load - target_load;
+
+ if (imbalance < DSW_REBALANCE_THRESHOLD)
+ return -1;
+
+ res_target_load = target_load + flow_load;
+
+ /* If the estimated load of the target port will be higher
+ * than the source port's load, it doesn't make sense to move
+ * the flow.
+ */
+ if (res_target_load > source_load)
+ return -1;
+
+ /* The more idle the target will be, the better. This will
+ * make migration prefer moving smaller flows, and flows to
+ * lightly loaded ports.
+ */
+ return DSW_MAX_LOAD - res_target_load;
+}
+
+static bool
+dsw_is_serving_port(struct dsw_evdev *dsw, uint8_t port_id, uint8_t queue_id)
+{
+ struct dsw_queue *queue = &dsw->queues[queue_id];
+ uint16_t i;
+
+ for (i = 0; i < queue->num_serving_ports; i++)
+ if (queue->serving_ports[i] == port_id)
+ return true;
+
+ return false;
+}
+
static bool
dsw_select_emigration_target(struct dsw_evdev *dsw,
- struct dsw_port *source_port,
- struct dsw_queue_flow_burst *bursts,
- uint16_t num_bursts, int16_t *port_loads,
- int16_t max_load, struct dsw_queue_flow *target_qf,
- uint8_t *target_port_id)
+ struct dsw_queue_flow_burst *bursts,
+ uint16_t num_bursts, uint8_t source_port_id,
+ int16_t *port_loads, uint16_t num_ports,
+ uint8_t *target_port_ids,
+ struct dsw_queue_flow *target_qfs,
+ uint8_t *targets_len)
{
- uint16_t source_load = port_loads[source_port->id];
+ int16_t source_port_load = port_loads[source_port_id];
+ struct dsw_queue_flow *candidate_qf;
+ uint8_t candidate_port_id;
+ int16_t candidate_weight = -1;
+ int16_t candidate_flow_load;
uint16_t i;
+ if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)
+ return false;
+
for (i = 0; i < num_bursts; i++) {
- struct dsw_queue_flow *qf = &bursts[i].queue_flow;
+ struct dsw_queue_flow_burst *burst = &bursts[i];
+ struct dsw_queue_flow *qf = &burst->queue_flow;
+ int16_t flow_load;
+ uint16_t port_id;
- if (dsw_port_is_flow_paused(source_port, qf->queue_id,
- qf->flow_hash))
+ if (dsw_is_queue_flow_in_ary(target_qfs, *targets_len,
+ qf->queue_id, qf->flow_hash))
continue;
- struct dsw_queue *queue = &dsw->queues[qf->queue_id];
- int16_t target_load;
+ flow_load = dsw_flow_load(burst->count, source_port_load);
- dsw_find_lowest_load_port(queue->serving_ports,
- queue->num_serving_ports,
- source_port->id, port_loads,
- target_port_id, &target_load);
+ for (port_id = 0; port_id < num_ports; port_id++) {
+ int16_t weight;
- if (target_load < source_load &&
- target_load < max_load) {
- *target_qf = *qf;
- return true;
+ if (port_id == source_port_id)
+ continue;
+
+ if (!dsw_is_serving_port(dsw, port_id, qf->queue_id))
+ continue;
+
+ weight = dsw_evaluate_migration(source_port_load,
+ port_loads[port_id],
+ flow_load);
+
+ if (weight > candidate_weight) {
+ candidate_qf = qf;
+ candidate_port_id = port_id;
+ candidate_weight = weight;
+ candidate_flow_load = flow_load;
+ }
}
}
- DSW_LOG_DP_PORT(DEBUG, source_port->id, "For the %d flows considered, "
- "no target port found with load less than %d.\n",
- num_bursts, DSW_LOAD_TO_PERCENT(max_load));
+ if (candidate_weight < 0)
+ return false;
- return false;
+ DSW_LOG_DP_PORT(DEBUG, source_port_id, "Selected queue_id %d "
+ "flow_hash %d (with flow load %d) for migration "
+ "to port %d.\n", candidate_qf->queue_id,
+ candidate_qf->flow_hash,
+ DSW_LOAD_TO_PERCENT(candidate_flow_load),
+ candidate_port_id);
+
+ port_loads[candidate_port_id] += candidate_flow_load;
+ port_loads[source_port_id] -= candidate_flow_load;
+
+ target_port_ids[*targets_len] = candidate_port_id;
+ target_qfs[*targets_len] = *candidate_qf;
+ (*targets_len)++;
+
+ return true;
+}
+
+static void
+dsw_select_emigration_targets(struct dsw_evdev *dsw,
+ struct dsw_port *source_port,
+ struct dsw_queue_flow_burst *bursts,
+ uint16_t num_bursts, int16_t *port_loads)
+{
+ struct dsw_queue_flow *target_qfs = source_port->emigration_target_qfs;
+ uint8_t *target_port_ids = source_port->emigration_target_port_ids;
+ uint8_t *targets_len = &source_port->emigration_targets_len;
+ uint8_t i;
+
+ for (i = 0; i < DSW_MAX_FLOWS_PER_MIGRATION; i++) {
+ bool found;
+
+ found = dsw_select_emigration_target(dsw, bursts, num_bursts,
+ source_port->id,
+ port_loads, dsw->num_ports,
+ target_port_ids,
+ target_qfs,
+ targets_len);
+ if (!found)
+ break;
+ }
+
+ if (*targets_len == 0)
+ DSW_LOG_DP_PORT(DEBUG, source_port->id,
+ "For the %d flows considered, no target port "
+ "was found.\n", num_bursts);
}
static uint8_t
@@ -562,7 +662,7 @@ dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
static void
dsw_port_flush_paused_events(struct dsw_evdev *dsw,
struct dsw_port *source_port,
- uint8_t queue_id, uint16_t paused_flow_hash)
+ const struct dsw_queue_flow *qf)
{
uint16_t paused_events_len = source_port->paused_events_len;
struct rte_event paused_events[paused_events_len];
@@ -572,7 +672,7 @@ dsw_port_flush_paused_events(struct dsw_evdev *dsw,
if (paused_events_len == 0)
return;
- if (dsw_port_is_flow_paused(source_port, queue_id, paused_flow_hash))
+ if (dsw_port_is_flow_paused(source_port, qf->queue_id, qf->flow_hash))
return;
rte_memcpy(paused_events, source_port->paused_events,
@@ -580,7 +680,7 @@ dsw_port_flush_paused_events(struct dsw_evdev *dsw,
source_port->paused_events_len = 0;
- dest_port_id = dsw_schedule(dsw, queue_id, paused_flow_hash);
+ dest_port_id = dsw_schedule(dsw, qf->queue_id, qf->flow_hash);
for (i = 0; i < paused_events_len; i++) {
struct rte_event *event = &paused_events[i];
@@ -588,8 +688,8 @@ dsw_port_flush_paused_events(struct dsw_evdev *dsw,
flow_hash = dsw_flow_id_hash(event->flow_id);
- if (event->queue_id == queue_id &&
- flow_hash == paused_flow_hash)
+ if (event->queue_id == qf->queue_id &&
+ flow_hash == qf->flow_hash)
dsw_port_buffer_non_paused(dsw, source_port,
dest_port_id, event);
else
@@ -598,33 +698,94 @@ dsw_port_flush_paused_events(struct dsw_evdev *dsw,
}
static void
-dsw_port_emigration_stats(struct dsw_port *port)
+dsw_port_emigration_stats(struct dsw_port *port, uint8_t finished)
{
- uint64_t emigration_latency;
+ uint64_t flow_migration_latency;
- emigration_latency = (rte_get_timer_cycles() - port->emigration_start);
- port->emigration_latency += emigration_latency;
- port->emigrations++;
+ flow_migration_latency =
+ (rte_get_timer_cycles() - port->emigration_start);
+ port->emigration_latency += (flow_migration_latency * finished);
+ port->emigrations += finished;
}
static void
-dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port)
+dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port,
+ uint8_t schedule_type)
{
- uint8_t queue_id = port->emigration_target_qf.queue_id;
- uint16_t flow_hash = port->emigration_target_qf.flow_hash;
+ uint8_t i;
+ struct dsw_queue_flow left_qfs[DSW_MAX_FLOWS_PER_MIGRATION];
+ uint8_t left_port_ids[DSW_MAX_FLOWS_PER_MIGRATION];
+ uint8_t left_qfs_len = 0;
+ uint8_t finished;
+
+ for (i = 0; i < port->emigration_targets_len; i++) {
+ struct dsw_queue_flow *qf = &port->emigration_target_qfs[i];
+ uint8_t queue_id = qf->queue_id;
+ uint8_t queue_schedule_type =
+ dsw->queues[queue_id].schedule_type;
+ uint16_t flow_hash = qf->flow_hash;
+
+ if (queue_schedule_type != schedule_type) {
+ left_port_ids[left_qfs_len] =
+ port->emigration_target_port_ids[i];
+ left_qfs[left_qfs_len] = *qf;
+ left_qfs_len++;
+ continue;
+ }
+
+ DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for "
+ "queue_id %d flow_hash %d.\n", queue_id,
+ flow_hash);
+
+ if (queue_schedule_type == RTE_SCHED_TYPE_ATOMIC) {
+ dsw_port_remove_paused_flow(port, qf);
+ dsw_port_flush_paused_events(dsw, port, qf);
+ }
+ }
- port->migration_state = DSW_MIGRATION_STATE_IDLE;
- port->seen_events_len = 0;
+ finished = port->emigration_targets_len - left_qfs_len;
- dsw_port_emigration_stats(port);
+ if (finished > 0)
+ dsw_port_emigration_stats(port, finished);
- if (dsw->queues[queue_id].schedule_type != RTE_SCHED_TYPE_PARALLEL) {
- dsw_port_remove_paused_flow(port, queue_id, flow_hash);
- dsw_port_flush_paused_events(dsw, port, queue_id, flow_hash);
+ for (i = 0; i < left_qfs_len; i++) {
+ port->emigration_target_port_ids[i] = left_port_ids[i];
+ port->emigration_target_qfs[i] = left_qfs[i];
}
+ port->emigration_targets_len = left_qfs_len;
- DSW_LOG_DP_PORT(DEBUG, port->id, "Emigration completed for queue_id "
- "%d flow_hash %d.\n", queue_id, flow_hash);
+ if (port->emigration_targets_len == 0) {
+ port->migration_state = DSW_MIGRATION_STATE_IDLE;
+ port->seen_events_len = 0;
+ }
+}
+
+static void
+dsw_port_move_parallel_flows(struct dsw_evdev *dsw,
+ struct dsw_port *source_port)
+{
+ uint8_t i;
+
+ for (i = 0; i < source_port->emigration_targets_len; i++) {
+ struct dsw_queue_flow *qf =
+ &source_port->emigration_target_qfs[i];
+ uint8_t queue_id = qf->queue_id;
+
+ if (dsw->queues[queue_id].schedule_type ==
+ RTE_SCHED_TYPE_PARALLEL) {
+ uint8_t dest_port_id =
+ source_port->emigration_target_port_ids[i];
+ uint16_t flow_hash = qf->flow_hash;
+
+ /* Single byte-sized stores are always atomic. */
+ dsw->queues[queue_id].flow_to_port_map[flow_hash] =
+ dest_port_id;
+ }
+ }
+
+ rte_smp_wmb();
+
+ dsw_port_end_emigration(dsw, source_port, RTE_SCHED_TYPE_PARALLEL);
}
static void
@@ -678,9 +839,9 @@ dsw_port_consider_emigration(struct dsw_evdev *dsw,
source_port_load = rte_atomic16_read(&source_port->load);
if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
DSW_LOG_DP_PORT(DEBUG, source_port->id,
- "Load %d is below threshold level %d.\n",
- DSW_LOAD_TO_PERCENT(source_port_load),
- DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
+ "Load %d is below threshold level %d.\n",
+ DSW_LOAD_TO_PERCENT(source_port_load),
+ DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
return;
}
@@ -697,16 +858,9 @@ dsw_port_consider_emigration(struct dsw_evdev *dsw,
return;
}
- /* Sort flows into 'bursts' to allow attempting to migrating
- * small (but still active) flows first - this it to avoid
- * having large flows moving around the worker cores too much
- * (to avoid cache misses, among other things). Of course, the
- * number of recorded events (queue+flow ids) are limited, and
- * provides only a snapshot, so only so many conclusions can
- * be drawn from this data.
- */
num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
bursts);
+
/* For non-big-little systems, there's no point in moving the
* only (known) flow.
*/
@@ -718,33 +872,11 @@ dsw_port_consider_emigration(struct dsw_evdev *dsw,
return;
}
- /* The strategy is to first try to find a flow to move to a
- * port with low load (below the emigration-attempt
- * threshold). If that fails, we try to find a port which is
- * below the max threshold, and also less loaded than this
- * port is.
- */
- if (!dsw_select_emigration_target(dsw, source_port, bursts, num_bursts,
- port_loads,
- DSW_MIN_SOURCE_LOAD_FOR_MIGRATION,
- &source_port->emigration_target_qf,
- &source_port->emigration_target_port_id)
- &&
- !dsw_select_emigration_target(dsw, source_port, bursts, num_bursts,
- port_loads,
- DSW_MAX_TARGET_LOAD_FOR_MIGRATION,
- &source_port->emigration_target_qf,
- &source_port->emigration_target_port_id))
- return;
-
- DSW_LOG_DP_PORT(DEBUG, source_port->id, "Migrating queue_id %d "
- "flow_hash %d from port %d to port %d.\n",
- source_port->emigration_target_qf.queue_id,
- source_port->emigration_target_qf.flow_hash,
- source_port->id,
- source_port->emigration_target_port_id);
+ dsw_select_emigration_targets(dsw, source_port, bursts, num_bursts,
+ port_loads);
- /* We have a winner. */
+ if (source_port->emigration_targets_len == 0)
+ return;
source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
source_port->emigration_start = rte_get_timer_cycles();
@@ -753,71 +885,58 @@ dsw_port_consider_emigration(struct dsw_evdev *dsw,
* parallel queues, since atomic/ordered semantics need not to
* be maintained.
*/
+ dsw_port_move_parallel_flows(dsw, source_port);
- if (dsw->queues[source_port->emigration_target_qf.queue_id].
- schedule_type == RTE_SCHED_TYPE_PARALLEL) {
- uint8_t queue_id =
- source_port->emigration_target_qf.queue_id;
- uint16_t flow_hash =
- source_port->emigration_target_qf.flow_hash;
- uint8_t dest_port_id =
- source_port->emigration_target_port_id;
-
- /* Single byte-sized stores are always atomic. */
- dsw->queues[queue_id].flow_to_port_map[flow_hash] =
- dest_port_id;
- rte_smp_wmb();
-
- dsw_port_end_emigration(dsw, source_port);
-
+ /* All flows were on PARALLEL queues. */
+ if (source_port->migration_state == DSW_MIGRATION_STATE_IDLE)
return;
- }
/* There might be 'loopback' events already scheduled in the
* output buffers.
*/
dsw_port_flush_out_buffers(dsw, source_port);
- dsw_port_add_paused_flow(source_port,
- source_port->emigration_target_qf.queue_id,
- source_port->emigration_target_qf.flow_hash);
+ dsw_port_add_paused_flows(source_port,
+ source_port->emigration_target_qfs,
+ source_port->emigration_targets_len);
dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
- source_port->emigration_target_qf.queue_id,
- source_port->emigration_target_qf.flow_hash);
+ source_port->emigration_target_qfs,
+ source_port->emigration_targets_len);
source_port->cfm_cnt = 0;
}
static void
dsw_port_flush_paused_events(struct dsw_evdev *dsw,
struct dsw_port *source_port,
- uint8_t queue_id, uint16_t paused_flow_hash);
+ const struct dsw_queue_flow *qf);
static void
-dsw_port_handle_unpause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
- uint8_t originating_port_id, uint8_t queue_id,
- uint16_t paused_flow_hash)
+dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
+ uint8_t originating_port_id,
+ struct dsw_queue_flow *paused_qfs,
+ uint8_t qfs_len)
{
+ uint16_t i;
struct dsw_ctl_msg cfm = {
.type = DSW_CTL_CFM,
- .originating_port_id = port->id,
- .queue_id = queue_id,
- .flow_hash = paused_flow_hash
+ .originating_port_id = port->id
};
- DSW_LOG_DP_PORT(DEBUG, port->id, "Un-pausing queue_id %d flow_hash %d.\n",
- queue_id, paused_flow_hash);
-
- dsw_port_remove_paused_flow(port, queue_id, paused_flow_hash);
+ dsw_port_remove_paused_flows(port, paused_qfs, qfs_len);
rte_smp_rmb();
- if (dsw_schedule(dsw, queue_id, paused_flow_hash) == port->id)
- port->immigrations++;
-
dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
- dsw_port_flush_paused_events(dsw, port, queue_id, paused_flow_hash);
+ for (i = 0; i < qfs_len; i++) {
+ struct dsw_queue_flow *qf = &paused_qfs[i];
+
+ if (dsw_schedule(dsw, qf->queue_id, qf->flow_hash) == port->id)
+ port->immigrations++;
+
+ dsw_port_flush_paused_events(dsw, port, qf);
+ }
}
#define FORWARD_BURST_SIZE (32)
@@ -872,31 +991,37 @@ dsw_port_forward_emigrated_flow(struct dsw_port *source_port,
}
static void
-dsw_port_move_migrating_flow(struct dsw_evdev *dsw,
- struct dsw_port *source_port)
+dsw_port_move_emigrating_flows(struct dsw_evdev *dsw,
+ struct dsw_port *source_port)
{
- uint8_t queue_id = source_port->emigration_target_qf.queue_id;
- uint16_t flow_hash = source_port->emigration_target_qf.flow_hash;
- uint8_t dest_port_id = source_port->emigration_target_port_id;
- struct dsw_port *dest_port = &dsw->ports[dest_port_id];
+ uint8_t i;
dsw_port_flush_out_buffers(dsw, source_port);
rte_smp_wmb();
- dsw->queues[queue_id].flow_to_port_map[flow_hash] =
- dest_port_id;
+ for (i = 0; i < source_port->emigration_targets_len; i++) {
+ struct dsw_queue_flow *qf =
+ &source_port->emigration_target_qfs[i];
+ uint8_t dest_port_id =
+ source_port->emigration_target_port_ids[i];
+ struct dsw_port *dest_port = &dsw->ports[dest_port_id];
+
+ dsw->queues[qf->queue_id].flow_to_port_map[qf->flow_hash] =
+ dest_port_id;
- dsw_port_forward_emigrated_flow(source_port, dest_port->in_ring,
- queue_id, flow_hash);
+ dsw_port_forward_emigrated_flow(source_port, dest_port->in_ring,
+ qf->queue_id, qf->flow_hash);
+ }
/* Flow table update and migration destination port's enqueues
* must be seen before the control message.
*/
rte_smp_wmb();
- dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ, queue_id,
- flow_hash);
+ dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ,
+ source_port->emigration_target_qfs,
+ source_port->emigration_targets_len);
source_port->cfm_cnt = 0;
source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
}
@@ -914,7 +1039,8 @@ dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
break;
case DSW_MIGRATION_STATE_UNPAUSING:
- dsw_port_end_emigration(dsw, port);
+ dsw_port_end_emigration(dsw, port,
+ RTE_SCHED_TYPE_ATOMIC);
break;
default:
RTE_ASSERT(0);
@@ -936,15 +1062,14 @@ dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
if (dsw_port_ctl_dequeue(port, &msg) == 0) {
switch (msg.type) {
case DSW_CTL_PAUS_REQ:
- dsw_port_handle_pause_flow(dsw, port,
- msg.originating_port_id,
- msg.queue_id, msg.flow_hash);
+ dsw_port_handle_pause_flows(dsw, port,
+ msg.originating_port_id,
+ msg.qfs, msg.qfs_len);
break;
case DSW_CTL_UNPAUS_REQ:
- dsw_port_handle_unpause_flow(dsw, port,
- msg.originating_port_id,
- msg.queue_id,
- msg.flow_hash);
+ dsw_port_handle_unpause_flows(dsw, port,
+ msg.originating_port_id,
+ msg.qfs, msg.qfs_len);
break;
case DSW_CTL_CFM:
dsw_port_handle_confirm(dsw, port);
@@ -967,7 +1092,7 @@ dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
{
if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&
port->pending_releases == 0))
- dsw_port_move_migrating_flow(dsw, port);
+ dsw_port_move_emigrating_flows(dsw, port);
/* Polling the control ring is relatively inexpensive, and
* polling it often helps bringing down migration latency, so
--
2.17.1
^ permalink raw reply [flat|nested] 17+ messages in thread