From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 8C145A00C2 for ; Thu, 3 Nov 2022 10:29:05 +0100 (CET) Received: from [217.70.189.124] (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 86EC740694; Thu, 3 Nov 2022 10:29:05 +0100 (CET) Received: from mail-wr1-f52.google.com (mail-wr1-f52.google.com [209.85.221.52]) by mails.dpdk.org (Postfix) with ESMTP id 3A60840693 for ; Thu, 3 Nov 2022 10:29:04 +0100 (CET) Received: by mail-wr1-f52.google.com with SMTP id bk15so1666512wrb.13 for ; Thu, 03 Nov 2022 02:29:04 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20210112; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=TfDrWzWkRFZJJ4fD3Ai+m3xus87MaAdvW3lwMTVe23I=; b=EMQXVokb59Tr5i1ZQURmP+LHsJUpVR5zjiQxpN674idrOjZ+k9U6gLzqmetmN70NA1 zOtA6trisEBl6n6e96I1By6e+T6LscQJ1qiJAK36HCTWyE9NSFdZ6DvwOO07Rr9e9ON5 RmIX5L93ES6lMp/Mu4oFbhYlGu3P2XvbTPkLsGuIFKlyHqKbJlssFqaUodZqxCb8d3A9 79nN5KPPHttBCVXTzdAlc9TFGm3psPHhtpBHAbh8wxMnimXre6hLPc/6LQUa+j/mcBPP W1sNe36G+VOOOFeHJoDVqqmQeXfjycuk6lJ/caCNEmwU/HA4HoZW09JmTpViFU5jO0ex ok5Q== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20210112; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=TfDrWzWkRFZJJ4fD3Ai+m3xus87MaAdvW3lwMTVe23I=; b=GVt4Z0Wctu97i5Cuml8pnVkhGErVp0cwR3gbRZIn+rkbrKYroaHips+fz+VBHysoMB EBdFItWiSTNXc+z+Z3vxD7uR3QapUHAuOHNln/V5qqwKXcDKhQ5bo+Eu0/pfoPnuGE8L AgmiKVzIz+5W7z+wQKN9WF/8bwjZjYmr+YqSgL69Gy8HMWOO0IQP5Yc+1Ed7ZqAHrZFf JLQlIMd+PVT39Bh+DJRTlG0PRFOdmTZFK6XvwPLgu+tfCrTPRsNHGOlTTRTKpOTBr8Ug pd3iPXLwYEAO94NU3+fyn5rHeGnCykRfYjuh9Uz2G1WsjGbIooWr9fcwcsnnKmq/dtZz /zuQ== X-Gm-Message-State: ACrzQf3mU3xSq83wILkRuuAYfS1NsQkX9xL1PM8E4D2rlI6LVWGu9oT6 kXWCwutG4keOZSjuJYHpGmWY+W1Dxjx9/w== X-Google-Smtp-Source: AMsMyM6JQ7wuMDJMs2YhNMg8D2oWv/MkdMYukudibXoXiL352rN2if778dU2BSz4kUwAmyONcXFz0g== X-Received: by 2002:adf:f843:0:b0:236:9d21:4c79 with SMTP id d3-20020adff843000000b002369d214c79mr17270041wrq.606.1667467743784; Thu, 03 Nov 2022 02:29:03 -0700 (PDT) Received: from localhost ([137.220.119.58]) by smtp.gmail.com with ESMTPSA id m7-20020adfe0c7000000b0022584c82c80sm390717wri.19.2022.11.03.02.29.02 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Thu, 03 Nov 2022 02:29:02 -0700 (PDT) From: luca.boccassi@gmail.com To: =?UTF-8?q?Mattias=20R=C3=B6nnblom?= Cc: dpdk stable Subject: patch 'event/dsw: fix flow migration' has been queued to stable release 20.11.7 Date: Thu, 3 Nov 2022 09:26:36 +0000 Message-Id: <20221103092758.1099402-18-luca.boccassi@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20221103092758.1099402-1-luca.boccassi@gmail.com> References: <20221103092758.1099402-1-luca.boccassi@gmail.com> MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit X-BeenThere: stable@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: patches for DPDK stable branches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: stable-bounces@dpdk.org Hi, FYI, your patch has been queued to stable release 20.11.7 Note it hasn't been pushed to http://dpdk.org/browse/dpdk-stable yet. It will be pushed if I get no objections before 11/05/22. So please shout if anyone has objections. Also note that after the patch there's a diff of the upstream commit vs the patch applied to the branch. This will indicate if there was any rebasing needed to apply to the stable branch. If there were code changes for rebasing (ie: not only metadata diffs), please double check that the rebase was correctly done. Queued patches are on a temporary branch at: https://github.com/kevintraynor/dpdk-stable This queued commit can be viewed at: https://github.com/kevintraynor/dpdk-stable/commit/c69260f49962d781417a0d17256d2386b46b4d3d Thanks. Luca Boccassi --- >From c69260f49962d781417a0d17256d2386b46b4d3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mattias=20R=C3=B6nnblom?= Date: Thu, 7 Jul 2022 13:43:25 +0200 Subject: [PATCH] event/dsw: fix flow migration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit [ upstream commit 70cb0278a4c52a857fb56cda2183e2ee3fa2633a ] Fix bug in flow migration, which under certain conditions causes reordering and violation of atomicity guarantees. The issue occurs when the processing of a flow (on an atomic queue) has resulted in events enqueued to a flow currently being migrated, and the former (producer) flow is also selected for migration. The events are buffered ("paused") on the originating port, and released (forwarded) when the migration has completed. However, at the time of "unpausing" the latter (consumer) flow, processing of the producer flow on the port to which it was migrated may have already produced events, for the same paused flow. This constitutes a race condition, and depending on which port wins, reordering may have been introduced. This patch forbids migration when a port has paused events, since those events may have been the result of processing a to-be-migrated flow. This patch also disallows processing events pertaining to a flow under migration, for the same reason. A new buffer is introduced, which holds such not-yet-processed events dequeued from the port's input ring. Such events are forwarded to the target port as a part of the migration process. The 'forwarding' migration state is eliminated, and instead background processing is only performed if there are no unreleased events on the port. The bug is primarily triggered in situations where multiple flows are migrated as one transaction, but may occur even if only a single flow is migrated (e.g., with older DSW versions, which does not support multi-flow migration). Fixes: f6257b22e767 ("event/dsw: add load balancing") Signed-off-by: Mattias Rönnblom --- drivers/event/dsw/dsw_evdev.h | 8 +- drivers/event/dsw/dsw_event.c | 315 ++++++++++++++++++++++++---------- 2 files changed, 232 insertions(+), 91 deletions(-) diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h index 6513d35ee7..ceabf9557d 100644 --- a/drivers/event/dsw/dsw_evdev.h +++ b/drivers/event/dsw/dsw_evdev.h @@ -126,7 +126,6 @@ struct dsw_queue_flow { enum dsw_migration_state { DSW_MIGRATION_STATE_IDLE, DSW_MIGRATION_STATE_PAUSING, - DSW_MIGRATION_STATE_FORWARDING, DSW_MIGRATION_STATE_UNPAUSING }; @@ -190,6 +189,13 @@ struct dsw_port { uint16_t paused_events_len; struct rte_event paused_events[DSW_MAX_EVENTS]; + uint16_t emigrating_events_len; + /* Buffer for not-yet-processed events pertaining to a flow + * emigrating from this port. These events will be forwarded + * to the target port. + */ + struct rte_event emigrating_events[DSW_MAX_EVENTS]; + uint16_t seen_events_len; uint16_t seen_events_idx; struct dsw_queue_flow seen_events[DSW_MAX_EVENTS_RECORDED]; diff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c index 8b81dc5c56..76c89056a1 100644 --- a/drivers/event/dsw/dsw_event.c +++ b/drivers/event/dsw/dsw_event.c @@ -234,6 +234,15 @@ dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id, queue_id, flow_hash); } +static __rte_always_inline bool +dsw_port_is_flow_migrating(struct dsw_port *port, uint8_t queue_id, + uint16_t flow_hash) +{ + return dsw_is_queue_flow_in_ary(port->emigration_target_qfs, + port->emigration_targets_len, + queue_id, flow_hash); +} + static void dsw_port_add_paused_flows(struct dsw_port *port, struct dsw_queue_flow *qfs, uint8_t qfs_len) @@ -268,9 +277,19 @@ dsw_port_remove_paused_flow(struct dsw_port *port, port->paused_flows[i] = port->paused_flows[last_idx]; port->paused_flows_len--; - break; + + DSW_LOG_DP_PORT(DEBUG, port->id, + "Unpausing queue_id %d flow_hash %d.\n", + target_qf->queue_id, + target_qf->flow_hash); + + return; } } + + DSW_LOG_DP_PORT(ERR, port->id, + "Failed to unpause queue_id %d flow_hash %d.\n", + target_qf->queue_id, target_qf->flow_hash); } static void @@ -281,7 +300,6 @@ dsw_port_remove_paused_flows(struct dsw_port *port, for (i = 0; i < qfs_len; i++) dsw_port_remove_paused_flow(port, &qfs[i]); - } static void @@ -434,14 +452,15 @@ dsw_is_serving_port(struct dsw_evdev *dsw, uint8_t port_id, uint8_t queue_id) static bool dsw_select_emigration_target(struct dsw_evdev *dsw, - struct dsw_queue_flow_burst *bursts, - uint16_t num_bursts, uint8_t source_port_id, - int16_t *port_loads, uint16_t num_ports, - uint8_t *target_port_ids, - struct dsw_queue_flow *target_qfs, - uint8_t *targets_len) + struct dsw_port *source_port, + struct dsw_queue_flow_burst *bursts, + uint16_t num_bursts, + int16_t *port_loads, uint16_t num_ports, + uint8_t *target_port_ids, + struct dsw_queue_flow *target_qfs, + uint8_t *targets_len) { - int16_t source_port_load = port_loads[source_port_id]; + int16_t source_port_load = port_loads[source_port->id]; struct dsw_queue_flow *candidate_qf = NULL; uint8_t candidate_port_id = 0; int16_t candidate_weight = -1; @@ -466,7 +485,7 @@ dsw_select_emigration_target(struct dsw_evdev *dsw, for (port_id = 0; port_id < num_ports; port_id++) { int16_t weight; - if (port_id == source_port_id) + if (port_id == source_port->id) continue; if (!dsw_is_serving_port(dsw, port_id, qf->queue_id)) @@ -488,7 +507,7 @@ dsw_select_emigration_target(struct dsw_evdev *dsw, if (candidate_weight < 0) return false; - DSW_LOG_DP_PORT(DEBUG, source_port_id, "Selected queue_id %d " + DSW_LOG_DP_PORT(DEBUG, source_port->id, "Selected queue_id %d " "flow_hash %d (with flow load %d) for migration " "to port %d.\n", candidate_qf->queue_id, candidate_qf->flow_hash, @@ -496,7 +515,7 @@ dsw_select_emigration_target(struct dsw_evdev *dsw, candidate_port_id); port_loads[candidate_port_id] += candidate_flow_load; - port_loads[source_port_id] -= candidate_flow_load; + port_loads[source_port->id] -= candidate_flow_load; target_port_ids[*targets_len] = candidate_port_id; target_qfs[*targets_len] = *candidate_qf; @@ -522,8 +541,8 @@ dsw_select_emigration_targets(struct dsw_evdev *dsw, for (i = 0; i < DSW_MAX_FLOWS_PER_MIGRATION; i++) { bool found; - found = dsw_select_emigration_target(dsw, bursts, num_bursts, - source_port->id, + found = dsw_select_emigration_target(dsw, source_port, + bursts, num_bursts, port_loads, dsw->num_ports, target_port_ids, target_qfs, @@ -603,6 +622,7 @@ dsw_port_buffer_paused(struct dsw_port *port, port->paused_events_len++; } + static void dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port, uint8_t dest_port_id, const struct rte_event *event) @@ -674,40 +694,39 @@ dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port, } static void -dsw_port_flush_paused_events(struct dsw_evdev *dsw, - struct dsw_port *source_port, - const struct dsw_queue_flow *qf) +dsw_port_flush_no_longer_paused_events(struct dsw_evdev *dsw, + struct dsw_port *source_port) { uint16_t paused_events_len = source_port->paused_events_len; struct rte_event paused_events[paused_events_len]; - uint8_t dest_port_id; uint16_t i; if (paused_events_len == 0) return; - if (dsw_port_is_flow_paused(source_port, qf->queue_id, qf->flow_hash)) - return; - rte_memcpy(paused_events, source_port->paused_events, paused_events_len * sizeof(struct rte_event)); source_port->paused_events_len = 0; - dest_port_id = dsw_schedule(dsw, qf->queue_id, qf->flow_hash); - for (i = 0; i < paused_events_len; i++) { struct rte_event *event = &paused_events[i]; uint16_t flow_hash; flow_hash = dsw_flow_id_hash(event->flow_id); - if (event->queue_id == qf->queue_id && - flow_hash == qf->flow_hash) + if (dsw_port_is_flow_paused(source_port, event->queue_id, + flow_hash)) + dsw_port_buffer_paused(source_port, event); + else { + uint8_t dest_port_id; + + dest_port_id = dsw_schedule(dsw, event->queue_id, + flow_hash); + dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event); - else - dsw_port_buffer_paused(source_port, event); + } } } @@ -750,11 +769,6 @@ dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port, DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for " "queue_id %d flow_hash %d.\n", queue_id, flow_hash); - - if (queue_schedule_type == RTE_SCHED_TYPE_ATOMIC) { - dsw_port_remove_paused_flow(port, qf); - dsw_port_flush_paused_events(dsw, port, qf); - } } finished = port->emigration_targets_len - left_qfs_len; @@ -821,11 +835,32 @@ dsw_port_consider_emigration(struct dsw_evdev *dsw, if (dsw->num_ports == 1) return; - if (seen_events_len < DSW_MAX_EVENTS_RECORDED) - return; - DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering emigration.\n"); + if (seen_events_len < DSW_MAX_EVENTS_RECORDED) { + DSW_LOG_DP_PORT(DEBUG, source_port->id, "Not enough events " + "are recorded to allow for a migration.\n"); + return; + } + + /* A flow migration cannot be initiated if there are paused + * events, since some/all of those events may be have been + * produced as a result of processing the flow(s) selected for + * migration. Moving such a flow would potentially introduced + * reordering, since processing the migrated flow on the + * receiving flow may commence before the to-be-enqueued-to + + * flows are unpaused, leading to paused events on the second + * port as well, destined for the same paused flow(s). When + * those flows are unpaused, the resulting events are + * delivered the owning port in an undefined order. + */ + if (source_port->paused_events_len > 0) { + DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are " + "events in the paus buffer.\n"); + return; + } + /* Randomize interval to avoid having all threads considering * emigration at the same in point in time, which might lead * to all choosing the same target port. @@ -921,9 +956,8 @@ dsw_port_consider_emigration(struct dsw_evdev *dsw, } static void -dsw_port_flush_paused_events(struct dsw_evdev *dsw, - struct dsw_port *source_port, - const struct dsw_queue_flow *qf); +dsw_port_flush_no_longer_paused_events(struct dsw_evdev *dsw, + struct dsw_port *source_port); static void dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port, @@ -948,62 +982,123 @@ dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port, if (dsw_schedule(dsw, qf->queue_id, qf->flow_hash) == port->id) port->immigrations++; + } + + dsw_port_flush_no_longer_paused_events(dsw, port); +} + +static void +dsw_port_buffer_in_buffer(struct dsw_port *port, + const struct rte_event *event) + +{ + RTE_ASSERT(port->in_buffer_start == 0); + + port->in_buffer[port->in_buffer_len] = *event; + port->in_buffer_len++; +} - dsw_port_flush_paused_events(dsw, port, qf); +static void +dsw_port_forward_emigrated_event(struct dsw_evdev *dsw, + struct dsw_port *source_port, + struct rte_event *event) +{ + uint16_t i; + + for (i = 0; i < source_port->emigration_targets_len; i++) { + struct dsw_queue_flow *qf = + &source_port->emigration_target_qfs[i]; + uint8_t dest_port_id = + source_port->emigration_target_port_ids[i]; + struct dsw_port *dest_port = &dsw->ports[dest_port_id]; + + if (event->queue_id == qf->queue_id && + dsw_flow_id_hash(event->flow_id) == qf->flow_hash) { + /* No need to care about bursting forwarded + * events (to the destination port's in_ring), + * since migration doesn't happen very often, + * and also the majority of the dequeued + * events will likely *not* be forwarded. + */ + while (rte_event_ring_enqueue_burst(dest_port->in_ring, + event, 1, + NULL) != 1) + rte_pause(); + return; + } } + + /* Event did not belong to the emigrated flows */ + dsw_port_buffer_in_buffer(source_port, event); +} + +static void +dsw_port_stash_migrating_event(struct dsw_port *port, + const struct rte_event *event) +{ + port->emigrating_events[port->emigrating_events_len] = *event; + port->emigrating_events_len++; } -#define FORWARD_BURST_SIZE (32) +#define DRAIN_DEQUEUE_BURST_SIZE (32) static void -dsw_port_forward_emigrated_flow(struct dsw_port *source_port, - struct rte_event_ring *dest_ring, - uint8_t queue_id, - uint16_t flow_hash) +dsw_port_drain_in_ring(struct dsw_port *source_port) { - uint16_t events_left; + uint16_t num_events; + uint16_t dequeued; /* Control ring message should been seen before the ring count * is read on the port's in_ring. */ rte_smp_rmb(); - events_left = rte_event_ring_count(source_port->in_ring); + num_events = rte_event_ring_count(source_port->in_ring); - while (events_left > 0) { - uint16_t in_burst_size = - RTE_MIN(FORWARD_BURST_SIZE, events_left); - struct rte_event in_burst[in_burst_size]; - uint16_t in_len; + for (dequeued = 0; dequeued < num_events; ) { + uint16_t burst_size = RTE_MIN(DRAIN_DEQUEUE_BURST_SIZE, + num_events - dequeued); + struct rte_event events[burst_size]; + uint16_t len; uint16_t i; - in_len = rte_event_ring_dequeue_burst(source_port->in_ring, - in_burst, - in_burst_size, NULL); - /* No need to care about bursting forwarded events (to - * the destination port's in_ring), since migration - * doesn't happen very often, and also the majority of - * the dequeued events will likely *not* be forwarded. - */ - for (i = 0; i < in_len; i++) { - struct rte_event *e = &in_burst[i]; - if (e->queue_id == queue_id && - dsw_flow_id_hash(e->flow_id) == flow_hash) { - while (rte_event_ring_enqueue_burst(dest_ring, - e, 1, - NULL) != 1) - rte_pause(); - } else { - uint16_t last_idx = source_port->in_buffer_len; - source_port->in_buffer[last_idx] = *e; - source_port->in_buffer_len++; - } + len = rte_event_ring_dequeue_burst(source_port->in_ring, + events, burst_size, + NULL); + + for (i = 0; i < len; i++) { + struct rte_event *event = &events[i]; + uint16_t flow_hash; + + flow_hash = dsw_flow_id_hash(event->flow_id); + + if (unlikely(dsw_port_is_flow_migrating(source_port, + event->queue_id, + flow_hash))) + dsw_port_stash_migrating_event(source_port, + event); + else + dsw_port_buffer_in_buffer(source_port, event); } - events_left -= in_len; + dequeued += len; } } +static void +dsw_port_forward_emigrated_flows(struct dsw_evdev *dsw, + struct dsw_port *source_port) +{ + uint16_t i; + + for (i = 0; i < source_port->emigrating_events_len; i++) { + struct rte_event *event = &source_port->emigrating_events[i]; + + dsw_port_forward_emigrated_event(dsw, source_port, event); + } + source_port->emigrating_events_len = 0; +} + static void dsw_port_move_emigrating_flows(struct dsw_evdev *dsw, struct dsw_port *source_port) @@ -1012,22 +1107,27 @@ dsw_port_move_emigrating_flows(struct dsw_evdev *dsw, dsw_port_flush_out_buffers(dsw, source_port); - rte_smp_wmb(); - for (i = 0; i < source_port->emigration_targets_len; i++) { struct dsw_queue_flow *qf = &source_port->emigration_target_qfs[i]; uint8_t dest_port_id = source_port->emigration_target_port_ids[i]; - struct dsw_port *dest_port = &dsw->ports[dest_port_id]; dsw->queues[qf->queue_id].flow_to_port_map[qf->flow_hash] = - dest_port_id; - - dsw_port_forward_emigrated_flow(source_port, dest_port->in_ring, - qf->queue_id, qf->flow_hash); + dest_port_id; } + rte_smp_wmb(); + + dsw_port_drain_in_ring(source_port); + dsw_port_forward_emigrated_flows(dsw, source_port); + + dsw_port_remove_paused_flows(source_port, + source_port->emigration_target_qfs, + source_port->emigration_targets_len); + + dsw_port_flush_no_longer_paused_events(dsw, source_port); + /* Flow table update and migration destination port's enqueues * must be seen before the control message. */ @@ -1048,9 +1148,7 @@ dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port) if (port->cfm_cnt == (dsw->num_ports-1)) { switch (port->migration_state) { case DSW_MIGRATION_STATE_PAUSING: - DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding " - "migration state.\n"); - port->migration_state = DSW_MIGRATION_STATE_FORWARDING; + dsw_port_move_emigrating_flows(dsw, port); break; case DSW_MIGRATION_STATE_UNPAUSING: dsw_port_end_emigration(dsw, port, @@ -1090,18 +1188,18 @@ dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port) static void dsw_port_note_op(struct dsw_port *port, uint16_t num_events) { - /* To pull the control ring reasonably often on busy ports, - * each dequeued/enqueued event is considered an 'op' too. - */ port->ops_since_bg_task += (num_events+1); } static void dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port) { - if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING && - port->pending_releases == 0)) - dsw_port_move_emigrating_flows(dsw, port); + /* For simplicity (in the migration logic), avoid all + * background processing in case event processing is in + * progress. + */ + if (port->pending_releases > 0) + return; /* Polling the control ring is relatively inexpensive, and * polling it often helps bringing down migration latency, so @@ -1161,7 +1259,7 @@ dsw_event_enqueue_burst_generic(struct dsw_port *source_port, uint16_t i; DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d " - "events to port %d.\n", events_len, source_port->id); + "events.\n", events_len); dsw_port_bg_process(dsw, source_port); @@ -1344,6 +1442,38 @@ dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events, return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL); } +static void +dsw_port_stash_migrating_events(struct dsw_port *port, + struct rte_event *events, uint16_t *num) +{ + uint16_t i; + + /* The assumption here - performance-wise - is that events + * belonging to migrating flows are relatively rare. + */ + for (i = 0; i < (*num); ) { + struct rte_event *event = &events[i]; + uint16_t flow_hash; + + flow_hash = dsw_flow_id_hash(event->flow_id); + + if (unlikely(dsw_port_is_flow_migrating(port, event->queue_id, + flow_hash))) { + uint16_t left; + + dsw_port_stash_migrating_event(port, event); + + (*num)--; + left = *num - i; + + if (left > 0) + memmove(event, event + 1, + left * sizeof(struct rte_event)); + } else + i++; + } +} + uint16_t dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num, uint64_t wait __rte_unused) @@ -1361,6 +1491,11 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num, dequeued = dsw_port_dequeue_burst(source_port, events, num); + if (unlikely(source_port->migration_state == + DSW_MIGRATION_STATE_PAUSING)) + dsw_port_stash_migrating_events(source_port, events, + &dequeued); + source_port->pending_releases = dequeued; dsw_port_load_record(source_port, dequeued); -- 2.34.1 --- Diff of the applied patch vs upstream commit (please double-check if non-empty: --- --- - 2022-11-03 09:27:26.560230939 +0000 +++ 0018-event-dsw-fix-flow-migration.patch 2022-11-03 09:27:25.325421511 +0000 @@ -1 +1 @@ -From 70cb0278a4c52a857fb56cda2183e2ee3fa2633a Mon Sep 17 00:00:00 2001 +From c69260f49962d781417a0d17256d2386b46b4d3d Mon Sep 17 00:00:00 2001 @@ -8,0 +9,2 @@ +[ upstream commit 70cb0278a4c52a857fb56cda2183e2ee3fa2633a ] + @@ -42 +43,0 @@ -Cc: stable@dpdk.org @@ -51 +52 @@ -index c907c00c78..df7dcc5577 100644 +index 6513d35ee7..ceabf9557d 100644 @@ -54 +55 @@ -@@ -128,7 +128,6 @@ struct dsw_queue_flow { +@@ -126,7 +126,6 @@ struct dsw_queue_flow { @@ -62 +63 @@ -@@ -192,6 +191,13 @@ struct dsw_port { +@@ -190,6 +189,13 @@ struct dsw_port { @@ -77 +78 @@ -index 340561b4e6..9932caf2ee 100644 +index 8b81dc5c56..76c89056a1 100644 @@ -80 +81 @@ -@@ -238,6 +238,15 @@ dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id, +@@ -234,6 +234,15 @@ dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id, @@ -96 +97 @@ -@@ -272,9 +281,19 @@ dsw_port_remove_paused_flow(struct dsw_port *port, +@@ -268,9 +277,19 @@ dsw_port_remove_paused_flow(struct dsw_port *port, @@ -117 +118 @@ -@@ -285,7 +304,6 @@ dsw_port_remove_paused_flows(struct dsw_port *port, +@@ -281,7 +300,6 @@ dsw_port_remove_paused_flows(struct dsw_port *port, @@ -125 +126 @@ -@@ -440,14 +458,15 @@ dsw_is_serving_port(struct dsw_evdev *dsw, uint8_t port_id, uint8_t queue_id) +@@ -434,14 +452,15 @@ dsw_is_serving_port(struct dsw_evdev *dsw, uint8_t port_id, uint8_t queue_id) @@ -148 +149 @@ -@@ -472,7 +491,7 @@ dsw_select_emigration_target(struct dsw_evdev *dsw, +@@ -466,7 +485,7 @@ dsw_select_emigration_target(struct dsw_evdev *dsw, @@ -157 +158 @@ -@@ -494,7 +513,7 @@ dsw_select_emigration_target(struct dsw_evdev *dsw, +@@ -488,7 +507,7 @@ dsw_select_emigration_target(struct dsw_evdev *dsw, @@ -166 +167 @@ -@@ -502,7 +521,7 @@ dsw_select_emigration_target(struct dsw_evdev *dsw, +@@ -496,7 +515,7 @@ dsw_select_emigration_target(struct dsw_evdev *dsw, @@ -175 +176 @@ -@@ -528,8 +547,8 @@ dsw_select_emigration_targets(struct dsw_evdev *dsw, +@@ -522,8 +541,8 @@ dsw_select_emigration_targets(struct dsw_evdev *dsw, @@ -186 +187 @@ -@@ -609,6 +628,7 @@ dsw_port_buffer_paused(struct dsw_port *port, +@@ -603,6 +622,7 @@ dsw_port_buffer_paused(struct dsw_port *port, @@ -194 +195 @@ -@@ -680,40 +700,39 @@ dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port, +@@ -674,40 +694,39 @@ dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port, @@ -247 +248 @@ -@@ -756,11 +775,6 @@ dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port, +@@ -750,11 +769,6 @@ dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port, @@ -259 +260 @@ -@@ -827,11 +841,32 @@ dsw_port_consider_emigration(struct dsw_evdev *dsw, +@@ -821,11 +835,32 @@ dsw_port_consider_emigration(struct dsw_evdev *dsw, @@ -295 +296 @@ -@@ -928,9 +963,8 @@ dsw_port_consider_emigration(struct dsw_evdev *dsw, +@@ -921,9 +956,8 @@ dsw_port_consider_emigration(struct dsw_evdev *dsw, @@ -307 +308 @@ -@@ -955,62 +989,123 @@ dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port, +@@ -948,62 +982,123 @@ dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port, @@ -466 +467 @@ -@@ -1019,22 +1114,27 @@ dsw_port_move_emigrating_flows(struct dsw_evdev *dsw, +@@ -1012,22 +1107,27 @@ dsw_port_move_emigrating_flows(struct dsw_evdev *dsw, @@ -501 +502 @@ -@@ -1055,9 +1155,7 @@ dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port) +@@ -1048,9 +1148,7 @@ dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port) @@ -512 +513 @@ -@@ -1097,18 +1195,18 @@ dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port) +@@ -1090,18 +1188,18 @@ dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port) @@ -537 +538 @@ -@@ -1168,7 +1266,7 @@ dsw_event_enqueue_burst_generic(struct dsw_port *source_port, +@@ -1161,7 +1259,7 @@ dsw_event_enqueue_burst_generic(struct dsw_port *source_port, @@ -546 +547 @@ -@@ -1352,6 +1450,38 @@ dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events, +@@ -1344,6 +1442,38 @@ dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events, @@ -585 +586 @@ -@@ -1369,6 +1499,11 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num, +@@ -1361,6 +1491,11 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,