* [PATCH] app/testpmd: fix flow queue job leaks
@ 2025-11-18 10:45 Dariusz Sosnowski
2025-12-03 0:04 ` Stephen Hemminger
2026-01-09 15:26 ` [PATCH v2] " Dariusz Sosnowski
0 siblings, 2 replies; 4+ messages in thread
From: Dariusz Sosnowski @ 2025-11-18 10:45 UTC (permalink / raw)
To: Aman Singh, Ori Kam; +Cc: dev, Bing Zhao, stable
Each enqueued async flow operation in testpmd has an associated
queue_job struct. It is passed in user data and used to determine
the type of operation when operation results are pulled on a given
queue. This information informs the necessary additional handling
(e.g., freeing flow struct or dumping the queried action state).
If async flow operations were enqueued and results were not pulled
before quitting testpmd, these queue_job structs were leaked as reported
by ASAN:
Direct leak of 88 byte(s) in 1 object(s) allocated from:
#0 0x7f7539084a37 in __interceptor_calloc
../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:154
#1 0x55a872c8e512 in port_queue_flow_create
(/download/dpdk/install/bin/dpdk-testpmd+0x4cd512)
#2 0x55a872c28414 in cmd_flow_cb
(/download/dpdk/install/bin/dpdk-testpmd+0x467414)
#3 0x55a8734fa6a3 in __cmdline_parse
(/download/dpdk/install/bin/dpdk-testpmd+0xd396a3)
#4 0x55a8734f6130 in cmdline_valid_buffer
(/download/dpdk/install/bin/dpdk-testpmd+0xd35130)
#5 0x55a873503b4f in rdline_char_in
(/download/dpdk/install/bin/dpdk-testpmd+0xd42b4f)
#6 0x55a8734f62ba in cmdline_in
(/download/dpdk/install/bin/dpdk-testpmd+0xd352ba)
#7 0x55a8734f65eb in cmdline_interact
(/download/dpdk/install/bin/dpdk-testpmd+0xd355eb)
#8 0x55a872c19b8e in prompt
(/download/dpdk/install/bin/dpdk-testpmd+0x458b8e)
#9 0x55a872be425a in main
(/download/dpdk/install/bin/dpdk-testpmd+0x42325a)
#10 0x7f7538756d8f in __libc_start_call_main
../sysdeps/nptl/libc_start_call_main.h:58
This patch addresses that by registering all queue_job structs, for a
given queue, on a linked list. Whenever operation results are pulled
and result is handled, queue_job struct will be removed from that list
and freed.
Before port is closed, during flow flush, testpmd will pull
all of the expected results
(based on the number of queue_job on the list).
Fixes: c9dc03840873 ("ethdev: add indirect action async query")
Fixes: 99231e480b69 ("ethdev: add template table resize")
Fixes: 77e7939acf1f ("app/testpmd: add flow rule update command")
Fixes: 3e3edab530a1 ("ethdev: add flow quota")
Fixes: 966eb55e9a00 ("ethdev: add queue-based API to report aged flow rules")
Cc: stable@dpdk.org
Signed-off-by: Dariusz Sosnowski <dsosnowski@nvidia.com>
---
app/test-pmd/config.c | 160 +++++++++++++++++++++++++++++++++++++++--
app/test-pmd/testpmd.c | 8 +++
app/test-pmd/testpmd.h | 4 ++
3 files changed, 165 insertions(+), 7 deletions(-)
diff --git a/app/test-pmd/config.c b/app/test-pmd/config.c
index 8557371488..4049bf03c7 100644
--- a/app/test-pmd/config.c
+++ b/app/test-pmd/config.c
@@ -69,6 +69,8 @@
#define NS_PER_SEC 1E9
+#define FLOW_QUEUE_FLUSH_SLEEP_US (100)
+
static const struct {
enum tx_pkt_split split;
const char *name;
@@ -1834,6 +1836,14 @@ port_flow_configure(portid_t port_id,
port->queue_sz = queue_attr->size;
for (std_queue = 0; std_queue < nb_queue; std_queue++)
attr_list[std_queue] = queue_attr;
+ port->job_list = calloc(nb_queue, sizeof(*port->job_list));
+ if (port->job_list == NULL) {
+ TESTPMD_LOG(ERR, "Failed to allocate memory for operations tracking on port %u\n",
+ port_id);
+ return -ENOMEM;
+ }
+ for (unsigned int i = 0; i < nb_queue; i++)
+ LIST_INIT(&port->job_list[i]);
/* Poisoning to make sure PMDs update it in case of error. */
memset(&error, 0x66, sizeof(error));
if (rte_flow_configure(port_id, port_attr, nb_queue, attr_list, &error))
@@ -2938,6 +2948,7 @@ port_queue_flow_create(portid_t port_id, queueid_t queue_id,
pf->flow = flow;
job->pf = pf;
port->flow_list = pf;
+ LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
printf("Flow rule #%"PRIu64" creation enqueued\n", pf->id);
return 0;
}
@@ -2975,6 +2986,7 @@ port_queue_flow_update_resized(portid_t port_id, queueid_t queue_id,
free(job);
return port_flow_complain(&error);
}
+ LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
return 0;
}
@@ -3028,6 +3040,7 @@ port_queue_flow_destroy(portid_t port_id, queueid_t queue_id,
ret = port_flow_complain(&error);
continue;
}
+ LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
printf("Flow rule #%"PRIu64" destruction enqueued\n",
pf->id);
*tmp = pf->next;
@@ -3161,6 +3174,7 @@ port_queue_flow_update(portid_t port_id, queueid_t queue_id,
uf->flow = pf->flow;
*tmp = uf;
job->pf = pf;
+ LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
printf("Flow rule #%"PRIu64" update enqueued\n", pf->id);
return 0;
@@ -3215,6 +3229,7 @@ port_queue_action_handle_create(portid_t port_id, uint32_t queue_id,
free(job);
return port_flow_complain(&error);
}
+ LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
printf("Indirect action #%u creation queued\n", pia->id);
return 0;
}
@@ -3276,6 +3291,7 @@ port_queue_action_handle_destroy(portid_t port_id,
ret = port_flow_complain(&error);
continue;
}
+ LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
*tmp = pia->next;
printf("Indirect action #%u destruction queued\n",
pia->id);
@@ -3350,6 +3366,7 @@ port_queue_action_handle_update(portid_t port_id,
free(job);
return port_flow_complain(&error);
}
+ LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
printf("Indirect action #%u update queued\n", id);
return 0;
}
@@ -3365,8 +3382,11 @@ port_queue_action_handle_query_update(portid_t port_id,
struct rte_flow_error error;
struct port_indirect_action *pia = action_get_by_id(port_id, id);
const struct rte_flow_op_attr attr = { .postpone = postpone};
+ struct rte_port *port;
struct queue_job *job;
+ port = &ports[port_id];
+
if (!pia || !pia->handle)
return;
job = calloc(1, sizeof(*job));
@@ -3385,6 +3405,7 @@ port_queue_action_handle_query_update(portid_t port_id,
port_flow_complain(&error);
free(job);
} else {
+ LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
printf("port-%u: indirect action #%u update-and-query queued\n",
port_id, id);
}
@@ -3426,6 +3447,7 @@ port_queue_action_handle_query(portid_t port_id,
free(job);
return port_flow_complain(&error);
}
+ LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
printf("Indirect action #%u update queued\n", id);
return 0;
}
@@ -3541,6 +3563,19 @@ port_flow_hash_calc_encap(portid_t port_id,
return 0;
}
+static void
+port_free_queue_job(struct queue_job *job)
+{
+ if (job->type == QUEUE_JOB_TYPE_FLOW_DESTROY ||
+ job->type == QUEUE_JOB_TYPE_FLOW_UPDATE)
+ free(job->pf);
+ else if (job->type == QUEUE_JOB_TYPE_ACTION_DESTROY)
+ free(job->pia);
+
+ LIST_REMOVE(job, chain);
+ free(job);
+}
+
/** Pull queue operation results from the queue. */
static int
port_queue_aged_flow_destroy(portid_t port_id, queueid_t queue_id,
@@ -3578,6 +3613,8 @@ port_queue_aged_flow_destroy(portid_t port_id, queueid_t queue_id,
return ret;
}
while (success < nb_flows) {
+ struct queue_job *job;
+
ret = rte_flow_pull(port_id, queue_id, res,
port->queue_sz, &error);
if (ret < 0) {
@@ -3590,6 +3627,13 @@ port_queue_aged_flow_destroy(portid_t port_id, queueid_t queue_id,
for (i = 0; i < ret; i++) {
if (res[i].status == RTE_FLOW_OP_SUCCESS)
success++;
+ job = res[i].user_data;
+ /*
+ * It is assumed that each enqueued async flow operation
+ * has a queue_job entry.
+ */
+ RTE_ASSERT(job != NULL);
+ port_free_queue_job(job);
}
}
rule += n;
@@ -3738,15 +3782,10 @@ port_queue_flow_pull(portid_t port_id, queueid_t queue_id)
if (res[i].status == RTE_FLOW_OP_SUCCESS)
success++;
job = (struct queue_job *)res[i].user_data;
- if (job->type == QUEUE_JOB_TYPE_FLOW_DESTROY ||
- job->type == QUEUE_JOB_TYPE_FLOW_UPDATE)
- free(job->pf);
- else if (job->type == QUEUE_JOB_TYPE_ACTION_DESTROY)
- free(job->pia);
- else if (job->type == QUEUE_JOB_TYPE_ACTION_QUERY)
+ if (job->type == QUEUE_JOB_TYPE_ACTION_QUERY)
port_action_handle_query_dump(port_id, job->pia,
&job->query);
- free(job);
+ port_free_queue_job(job);
}
printf("Queue #%u pulled %u operations (%u failed, %u succeeded)\n",
queue_id, ret, ret - success, success);
@@ -3960,6 +3999,108 @@ port_flow_update(portid_t port_id, uint32_t rule_id,
return -EINVAL;
}
+static int
+port_flow_queue_job_flush(portid_t port_id, queueid_t queue_id)
+{
+ struct rte_flow_op_result *res;
+ struct rte_flow_error error;
+ unsigned int expected_ops;
+ struct rte_port *port;
+ struct queue_job *job;
+ unsigned int success;
+ unsigned int polled;
+ int ret;
+
+ port = &ports[port_id];
+
+ printf("Flushing flow queue %u on port %u\n", port_id, queue_id);
+
+ /* Poisoning to make sure PMDs update it in case of error. */
+ memset(&error, 0x44, sizeof(error));
+ if (rte_flow_push(port_id, queue_id, &error))
+ port_flow_complain(&error);
+
+ /* Count expected operations. */
+ expected_ops = 0;
+ LIST_FOREACH(job, &port->job_list[queue_id], chain)
+ expected_ops++;
+
+ res = calloc(expected_ops, sizeof(*res));
+ if (res == NULL)
+ return -ENOMEM;
+
+ polled = 0;
+ success = 0;
+ while (expected_ops > 0) {
+ /* Poisoning to make sure PMDs update it in case of error. */
+ memset(&error, 0x55, sizeof(error));
+ ret = rte_flow_pull(port_id, queue_id, res, expected_ops, &error);
+ if (ret < 0) {
+ port_flow_complain(&error);
+ free(res);
+ return ret;
+ }
+ if (ret == 0) {
+ rte_delay_us_sleep(FLOW_QUEUE_FLUSH_SLEEP_US);
+ continue;
+ }
+
+ expected_ops -= ret;
+ polled += ret;
+ for (int i = 0; i < ret; i++) {
+ if (res[i].status == RTE_FLOW_OP_SUCCESS)
+ success++;
+
+ job = (struct queue_job *)res[i].user_data;
+ /*
+ * It is assumed that each enqueued async flow operation
+ * has a queue_job entry.
+ */
+ RTE_ASSERT(job != NULL);
+ port_free_queue_job(job);
+ }
+ }
+ free(res);
+
+ /*
+ * It is assumed that each enqueued async flow operation
+ * has a queue_job entry, so if expected_ops reached zero,
+ * then the queue_job list should be empty.
+ */
+ RTE_ASSERT(LIST_EMPTY(&port->job_list[queue_id]));
+
+ printf("Flushed flow queue %u on port %u (%u failed, %u succeeded).\n",
+ port_id, queue_id, polled - success, success);
+
+ return 0;
+}
+
+static int
+port_flow_queues_job_flush(portid_t port_id)
+{
+ struct rte_port *port;
+ int ret;
+
+ port = &ports[port_id];
+
+ if (port->queue_nb == 0)
+ return 0;
+
+ for (queueid_t queue_id = 0; queue_id < port->queue_nb; ++queue_id) {
+ if (LIST_EMPTY(&port->job_list[queue_id]))
+ continue;
+
+ ret = port_flow_queue_job_flush(port_id, queue_id);
+ if (ret < 0) {
+ TESTPMD_LOG(ERR, "Flushing flows queue %u failed on port %u (ret %d)\n",
+ queue_id, port_id, ret);
+ return ret;
+ }
+ }
+
+ return 0;
+}
+
/** Remove all flow rules. */
int
port_flow_flush(portid_t port_id)
@@ -3974,6 +4115,11 @@ port_flow_flush(portid_t port_id)
port = &ports[port_id];
+ ret = port_flow_queues_job_flush(port_id);
+ if (ret < 0)
+ TESTPMD_LOG(ERR, "Flushing flows queues failed on port %u (ret %d)\n",
+ port_id, ret);
+
if (port->flow_list == NULL)
return ret;
diff --git a/app/test-pmd/testpmd.c b/app/test-pmd/testpmd.c
index b10f6baee2..789f123c6a 100644
--- a/app/test-pmd/testpmd.c
+++ b/app/test-pmd/testpmd.c
@@ -3260,6 +3260,13 @@ remove_invalid_ports(void)
nb_cfg_ports = nb_fwd_ports;
}
+static void
+port_free_job_list(portid_t pi)
+{
+ struct rte_port *port = &ports[pi];
+ free(port->job_list);
+}
+
static void
flush_port_owned_resources(portid_t pi)
{
@@ -3270,6 +3277,7 @@ flush_port_owned_resources(portid_t pi)
port_flow_actions_template_flush(pi);
port_flex_item_flush(pi);
port_action_handle_flush(pi);
+ port_free_job_list(pi);
}
static void
diff --git a/app/test-pmd/testpmd.h b/app/test-pmd/testpmd.h
index fa46865c67..2fed7b9e7d 100644
--- a/app/test-pmd/testpmd.h
+++ b/app/test-pmd/testpmd.h
@@ -280,6 +280,7 @@ union port_action_query {
/* Descriptor for queue job. */
struct queue_job {
+ LIST_ENTRY(queue_job) chain;
uint32_t type; /**< Job type. */
union {
struct port_flow *pf;
@@ -288,6 +289,8 @@ struct queue_job {
union port_action_query query;
};
+LIST_HEAD(queue_job_list, queue_job);
+
struct port_flow_tunnel {
LIST_ENTRY(port_flow_tunnel) chain;
struct rte_flow_action *pmd_actions;
@@ -369,6 +372,7 @@ struct rte_port {
struct port_flow *flow_list; /**< Associated flows. */
struct port_indirect_action *actions_list;
/**< Associated indirect actions. */
+ struct queue_job_list *job_list; /**< Pending async flow API operations, per queue. */
LIST_HEAD(, port_flow_tunnel) flow_tunnel_list;
const struct rte_eth_rxtx_callback *rx_dump_cb[RTE_MAX_QUEUES_PER_PORT+1];
const struct rte_eth_rxtx_callback *tx_dump_cb[RTE_MAX_QUEUES_PER_PORT+1];
--
2.39.5
^ permalink raw reply [flat|nested] 4+ messages in thread
* Re: [PATCH] app/testpmd: fix flow queue job leaks
2025-11-18 10:45 [PATCH] app/testpmd: fix flow queue job leaks Dariusz Sosnowski
@ 2025-12-03 0:04 ` Stephen Hemminger
2026-01-08 15:54 ` Dariusz Sosnowski
2026-01-09 15:26 ` [PATCH v2] " Dariusz Sosnowski
1 sibling, 1 reply; 4+ messages in thread
From: Stephen Hemminger @ 2025-12-03 0:04 UTC (permalink / raw)
To: Dariusz Sosnowski; +Cc: Aman Singh, Ori Kam, dev, Bing Zhao, stable
On Tue, 18 Nov 2025 11:45:18 +0100
Dariusz Sosnowski <dsosnowski@nvidia.com> wrote:
> + polled = 0;
> + success = 0;
> + while (expected_ops > 0) {
> + /* Poisoning to make sure PMDs update it in case of error. */
> + memset(&error, 0x55, sizeof(error));
> + ret = rte_flow_pull(port_id, queue_id, res, expected_ops, &error);
> + if (ret < 0) {
> + port_flow_complain(&error);
> + free(res);
> + return ret;
> + }
> + if (ret == 0) {
> + rte_delay_us_sleep(FLOW_QUEUE_FLUSH_SLEEP_US);
> + continue;
> + }
Infinite loops with sleep are bad. The poisoning seems unnecessary and not done
elsewhere. Sleeping for 10 us is just unlikely to help much.
^ permalink raw reply [flat|nested] 4+ messages in thread
* Re: [PATCH] app/testpmd: fix flow queue job leaks
2025-12-03 0:04 ` Stephen Hemminger
@ 2026-01-08 15:54 ` Dariusz Sosnowski
0 siblings, 0 replies; 4+ messages in thread
From: Dariusz Sosnowski @ 2026-01-08 15:54 UTC (permalink / raw)
To: Stephen Hemminger; +Cc: Aman Singh, Ori Kam, dev, Bing Zhao, stable
On Tue, Dec 02, 2025 at 04:04:06PM -0800, Stephen Hemminger wrote:
> On Tue, 18 Nov 2025 11:45:18 +0100
> Dariusz Sosnowski <dsosnowski@nvidia.com> wrote:
>
> > + polled = 0;
> > + success = 0;
> > + while (expected_ops > 0) {
> > + /* Poisoning to make sure PMDs update it in case of error. */
> > + memset(&error, 0x55, sizeof(error));
> > + ret = rte_flow_pull(port_id, queue_id, res, expected_ops, &error);
> > + if (ret < 0) {
> > + port_flow_complain(&error);
> > + free(res);
> > + return ret;
> > + }
> > + if (ret == 0) {
> > + rte_delay_us_sleep(FLOW_QUEUE_FLUSH_SLEEP_US);
> > + continue;
> > + }
>
> Infinite loops with sleep are bad. The poisoning seems unnecessary and not done
> elsewhere. Sleeping for 10 us is just unlikely to help much.
Regarding poisoning, it's not exactly true.
At least in most other cases in testpmd where flow API is used
(e.g. in port_flow_create() or port_flow_destroy()),
the error struct is poisoned.
It's not a perfect mechanism, but it allows catching bugs where
driver does not populate error struct on failure.
Due to lack of other mechanism and for consistency with rest of the code,
I'd keep the poisoning in this patch.
What do you think?
Regarding the loop logic, I agree.
I'll fix it in v2.
^ permalink raw reply [flat|nested] 4+ messages in thread
* [PATCH v2] app/testpmd: fix flow queue job leaks
2025-11-18 10:45 [PATCH] app/testpmd: fix flow queue job leaks Dariusz Sosnowski
2025-12-03 0:04 ` Stephen Hemminger
@ 2026-01-09 15:26 ` Dariusz Sosnowski
1 sibling, 0 replies; 4+ messages in thread
From: Dariusz Sosnowski @ 2026-01-09 15:26 UTC (permalink / raw)
To: Aman Singh, Ori Kam; +Cc: dev, Bing Zhao, Stephen Hemminger, stable
Each enqueued async flow operation in testpmd has an associated
queue_job struct. It is passed in user data and used to determine
the type of operation when operation results are pulled on a given
queue. This information informs the necessary additional handling
(e.g., freeing flow struct or dumping the queried action state).
If async flow operations were enqueued and results were not pulled
before quitting testpmd, these queue_job structs were leaked as reported
by ASAN:
Direct leak of 88 byte(s) in 1 object(s) allocated from:
#0 0x7f7539084a37 in __interceptor_calloc
../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:154
#1 0x55a872c8e512 in port_queue_flow_create
(/download/dpdk/install/bin/dpdk-testpmd+0x4cd512)
#2 0x55a872c28414 in cmd_flow_cb
(/download/dpdk/install/bin/dpdk-testpmd+0x467414)
#3 0x55a8734fa6a3 in __cmdline_parse
(/download/dpdk/install/bin/dpdk-testpmd+0xd396a3)
#4 0x55a8734f6130 in cmdline_valid_buffer
(/download/dpdk/install/bin/dpdk-testpmd+0xd35130)
#5 0x55a873503b4f in rdline_char_in
(/download/dpdk/install/bin/dpdk-testpmd+0xd42b4f)
#6 0x55a8734f62ba in cmdline_in
(/download/dpdk/install/bin/dpdk-testpmd+0xd352ba)
#7 0x55a8734f65eb in cmdline_interact
(/download/dpdk/install/bin/dpdk-testpmd+0xd355eb)
#8 0x55a872c19b8e in prompt
(/download/dpdk/install/bin/dpdk-testpmd+0x458b8e)
#9 0x55a872be425a in main
(/download/dpdk/install/bin/dpdk-testpmd+0x42325a)
#10 0x7f7538756d8f in __libc_start_call_main
../sysdeps/nptl/libc_start_call_main.h:58
This patch addresses that by registering all queue_job structs, for a
given queue, on a linked list. Whenever operation results are pulled
and result is handled, queue_job struct will be removed from that list
and freed.
Before port is closed, during flow flush, testpmd will pull
all of the expected results
(based on the number of queue_job on the list).
Fixes: c9dc03840873 ("ethdev: add indirect action async query")
Fixes: 99231e480b69 ("ethdev: add template table resize")
Fixes: 77e7939acf1f ("app/testpmd: add flow rule update command")
Fixes: 3e3edab530a1 ("ethdev: add flow quota")
Fixes: 966eb55e9a00 ("ethdev: add queue-based API to report aged flow rules")
Cc: stable@dpdk.org
Signed-off-by: Dariusz Sosnowski <dsosnowski@nvidia.com>
---
v2:
- Bound the cleanup loop's iterations count and
remove sleeps on empty iterations.
- Add missing return on error handling for rte_flow_push().
app/test-pmd/config.c | 180 +++++++++++++++++++++++++++++++++++++++--
app/test-pmd/testpmd.c | 8 ++
app/test-pmd/testpmd.h | 4 +
3 files changed, 185 insertions(+), 7 deletions(-)
diff --git a/app/test-pmd/config.c b/app/test-pmd/config.c
index 6ea506254b..ac716dd1e9 100644
--- a/app/test-pmd/config.c
+++ b/app/test-pmd/config.c
@@ -69,6 +69,8 @@
#define NS_PER_SEC 1E9
+#define FLOW_QUEUE_FLUSH_MAX_ITERS (10)
+
static const struct {
enum tx_pkt_split split;
const char *name;
@@ -1834,6 +1836,14 @@ port_flow_configure(portid_t port_id,
port->queue_sz = queue_attr->size;
for (std_queue = 0; std_queue < nb_queue; std_queue++)
attr_list[std_queue] = queue_attr;
+ port->job_list = calloc(nb_queue, sizeof(*port->job_list));
+ if (port->job_list == NULL) {
+ TESTPMD_LOG(ERR, "Failed to allocate memory for operations tracking on port %u\n",
+ port_id);
+ return -ENOMEM;
+ }
+ for (unsigned int i = 0; i < nb_queue; i++)
+ LIST_INIT(&port->job_list[i]);
/* Poisoning to make sure PMDs update it in case of error. */
memset(&error, 0x66, sizeof(error));
if (rte_flow_configure(port_id, port_attr, nb_queue, attr_list, &error))
@@ -2938,6 +2948,7 @@ port_queue_flow_create(portid_t port_id, queueid_t queue_id,
pf->flow = flow;
job->pf = pf;
port->flow_list = pf;
+ LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
printf("Flow rule #%"PRIu64" creation enqueued\n", pf->id);
return 0;
}
@@ -2975,6 +2986,7 @@ port_queue_flow_update_resized(portid_t port_id, queueid_t queue_id,
free(job);
return port_flow_complain(&error);
}
+ LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
return 0;
}
@@ -3028,6 +3040,7 @@ port_queue_flow_destroy(portid_t port_id, queueid_t queue_id,
ret = port_flow_complain(&error);
continue;
}
+ LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
printf("Flow rule #%"PRIu64" destruction enqueued\n",
pf->id);
*tmp = pf->next;
@@ -3161,6 +3174,7 @@ port_queue_flow_update(portid_t port_id, queueid_t queue_id,
uf->flow = pf->flow;
*tmp = uf;
job->pf = pf;
+ LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
printf("Flow rule #%"PRIu64" update enqueued\n", pf->id);
return 0;
@@ -3215,6 +3229,7 @@ port_queue_action_handle_create(portid_t port_id, uint32_t queue_id,
free(job);
return port_flow_complain(&error);
}
+ LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
printf("Indirect action #%u creation queued\n", pia->id);
return 0;
}
@@ -3276,6 +3291,7 @@ port_queue_action_handle_destroy(portid_t port_id,
ret = port_flow_complain(&error);
continue;
}
+ LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
*tmp = pia->next;
printf("Indirect action #%u destruction queued\n",
pia->id);
@@ -3350,6 +3366,7 @@ port_queue_action_handle_update(portid_t port_id,
free(job);
return port_flow_complain(&error);
}
+ LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
printf("Indirect action #%u update queued\n", id);
return 0;
}
@@ -3365,8 +3382,11 @@ port_queue_action_handle_query_update(portid_t port_id,
struct rte_flow_error error;
struct port_indirect_action *pia = action_get_by_id(port_id, id);
const struct rte_flow_op_attr attr = { .postpone = postpone};
+ struct rte_port *port;
struct queue_job *job;
+ port = &ports[port_id];
+
if (!pia || !pia->handle)
return;
job = calloc(1, sizeof(*job));
@@ -3385,6 +3405,7 @@ port_queue_action_handle_query_update(portid_t port_id,
port_flow_complain(&error);
free(job);
} else {
+ LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
printf("port-%u: indirect action #%u update-and-query queued\n",
port_id, id);
}
@@ -3426,6 +3447,7 @@ port_queue_action_handle_query(portid_t port_id,
free(job);
return port_flow_complain(&error);
}
+ LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
printf("Indirect action #%u update queued\n", id);
return 0;
}
@@ -3541,6 +3563,19 @@ port_flow_hash_calc_encap(portid_t port_id,
return 0;
}
+static void
+port_free_queue_job(struct queue_job *job)
+{
+ if (job->type == QUEUE_JOB_TYPE_FLOW_DESTROY ||
+ job->type == QUEUE_JOB_TYPE_FLOW_UPDATE)
+ free(job->pf);
+ else if (job->type == QUEUE_JOB_TYPE_ACTION_DESTROY)
+ free(job->pia);
+
+ LIST_REMOVE(job, chain);
+ free(job);
+}
+
/** Pull queue operation results from the queue. */
static int
port_queue_aged_flow_destroy(portid_t port_id, queueid_t queue_id,
@@ -3578,6 +3613,8 @@ port_queue_aged_flow_destroy(portid_t port_id, queueid_t queue_id,
return ret;
}
while (success < nb_flows) {
+ struct queue_job *job;
+
ret = rte_flow_pull(port_id, queue_id, res,
port->queue_sz, &error);
if (ret < 0) {
@@ -3590,6 +3627,13 @@ port_queue_aged_flow_destroy(portid_t port_id, queueid_t queue_id,
for (i = 0; i < ret; i++) {
if (res[i].status == RTE_FLOW_OP_SUCCESS)
success++;
+ job = res[i].user_data;
+ /*
+ * It is assumed that each enqueued async flow operation
+ * has a queue_job entry.
+ */
+ RTE_ASSERT(job != NULL);
+ port_free_queue_job(job);
}
}
rule += n;
@@ -3738,15 +3782,10 @@ port_queue_flow_pull(portid_t port_id, queueid_t queue_id)
if (res[i].status == RTE_FLOW_OP_SUCCESS)
success++;
job = (struct queue_job *)res[i].user_data;
- if (job->type == QUEUE_JOB_TYPE_FLOW_DESTROY ||
- job->type == QUEUE_JOB_TYPE_FLOW_UPDATE)
- free(job->pf);
- else if (job->type == QUEUE_JOB_TYPE_ACTION_DESTROY)
- free(job->pia);
- else if (job->type == QUEUE_JOB_TYPE_ACTION_QUERY)
+ if (job->type == QUEUE_JOB_TYPE_ACTION_QUERY)
port_action_handle_query_dump(port_id, job->pia,
&job->query);
- free(job);
+ port_free_queue_job(job);
}
printf("Queue #%u pulled %u operations (%u failed, %u succeeded)\n",
queue_id, ret, ret - success, success);
@@ -3960,6 +3999,128 @@ port_flow_update(portid_t port_id, uint32_t rule_id,
return -EINVAL;
}
+static int
+port_flow_queue_job_flush(portid_t port_id, queueid_t queue_id)
+{
+ struct rte_flow_op_result *res;
+ struct rte_flow_error error;
+ unsigned int expected_ops;
+ struct rte_port *port;
+ struct queue_job *job;
+ unsigned int success;
+ unsigned int polled;
+ int iterations;
+ int ret;
+
+ port = &ports[port_id];
+
+ printf("Flushing flow queue %u on port %u\n", port_id, queue_id);
+
+ /* Poisoning to make sure PMDs update it in case of error. */
+ memset(&error, 0x44, sizeof(error));
+ if (rte_flow_push(port_id, queue_id, &error))
+ return port_flow_complain(&error);
+
+ /* Count expected operations. */
+ expected_ops = 0;
+ LIST_FOREACH(job, &port->job_list[queue_id], chain)
+ expected_ops++;
+
+ res = calloc(expected_ops, sizeof(*res));
+ if (res == NULL)
+ return -ENOMEM;
+
+ polled = 0;
+ success = 0;
+ iterations = FLOW_QUEUE_FLUSH_MAX_ITERS;
+ while (iterations > 0 && expected_ops > 0) {
+ /* Poisoning to make sure PMDs update it in case of error. */
+ memset(&error, 0x55, sizeof(error));
+ ret = rte_flow_pull(port_id, queue_id, res, expected_ops, &error);
+ if (ret < 0) {
+ port_flow_complain(&error);
+ free(res);
+ return ret;
+ }
+ if (ret == 0) {
+ /* Prevent infinite loop when driver does not return any completion. */
+ iterations--;
+ continue;
+ }
+
+ expected_ops -= ret;
+ polled += ret;
+ for (int i = 0; i < ret; i++) {
+ if (res[i].status == RTE_FLOW_OP_SUCCESS)
+ success++;
+
+ job = (struct queue_job *)res[i].user_data;
+ /*
+ * It is assumed that each enqueued async flow operation
+ * has a queue_job entry.
+ */
+ RTE_ASSERT(job != NULL);
+ port_free_queue_job(job);
+ }
+ }
+ free(res);
+
+ printf("Flushed flow queue %u on port %u (%u failed, %u succeeded).\n",
+ port_id, queue_id, polled - success, success);
+
+ if (iterations == 0 && expected_ops > 0) {
+ /*
+ * Driver was not able to return all completions for flow operations in time.
+ * Log the error and free the queue_job entries to prevent leak.
+ */
+
+ TESTPMD_LOG(ERR, "Unable to fully flush flow queue %u on port %u (left ops %u)\n",
+ port_id, queue_id, expected_ops);
+
+ while (!LIST_EMPTY(&port->job_list[queue_id])) {
+ job = LIST_FIRST(&port->job_list[queue_id]);
+ port_free_queue_job(job);
+ }
+
+ return 0;
+ }
+
+ /*
+ * It is assumed that each enqueued async flow operation
+ * has a queue_job entry, so if expected_ops reached zero,
+ * then the queue_job list should be empty.
+ */
+ RTE_ASSERT(LIST_EMPTY(&port->job_list[queue_id]));
+
+ return 0;
+}
+
+static int
+port_flow_queues_job_flush(portid_t port_id)
+{
+ struct rte_port *port;
+ int ret;
+
+ port = &ports[port_id];
+
+ if (port->queue_nb == 0)
+ return 0;
+
+ for (queueid_t queue_id = 0; queue_id < port->queue_nb; ++queue_id) {
+ if (LIST_EMPTY(&port->job_list[queue_id]))
+ continue;
+
+ ret = port_flow_queue_job_flush(port_id, queue_id);
+ if (ret < 0) {
+ TESTPMD_LOG(ERR, "Flushing flows queue %u failed on port %u (ret %d)\n",
+ queue_id, port_id, ret);
+ return ret;
+ }
+ }
+
+ return 0;
+}
+
/** Remove all flow rules. */
int
port_flow_flush(portid_t port_id)
@@ -3974,6 +4135,11 @@ port_flow_flush(portid_t port_id)
port = &ports[port_id];
+ ret = port_flow_queues_job_flush(port_id);
+ if (ret < 0)
+ TESTPMD_LOG(ERR, "Flushing flows queues failed on port %u (ret %d)\n",
+ port_id, ret);
+
if (port->flow_list == NULL)
return ret;
diff --git a/app/test-pmd/testpmd.c b/app/test-pmd/testpmd.c
index 1fe41d852a..51ae2fd418 100644
--- a/app/test-pmd/testpmd.c
+++ b/app/test-pmd/testpmd.c
@@ -3275,6 +3275,13 @@ remove_invalid_ports(void)
nb_cfg_ports = nb_fwd_ports;
}
+static void
+port_free_job_list(portid_t pi)
+{
+ struct rte_port *port = &ports[pi];
+ free(port->job_list);
+}
+
static void
flush_port_owned_resources(portid_t pi)
{
@@ -3285,6 +3292,7 @@ flush_port_owned_resources(portid_t pi)
port_flow_actions_template_flush(pi);
port_flex_item_flush(pi);
port_action_handle_flush(pi);
+ port_free_job_list(pi);
}
static void
diff --git a/app/test-pmd/testpmd.h b/app/test-pmd/testpmd.h
index 492b5757f1..f319471c73 100644
--- a/app/test-pmd/testpmd.h
+++ b/app/test-pmd/testpmd.h
@@ -280,6 +280,7 @@ union port_action_query {
/* Descriptor for queue job. */
struct queue_job {
+ LIST_ENTRY(queue_job) chain;
uint32_t type; /**< Job type. */
union {
struct port_flow *pf;
@@ -288,6 +289,8 @@ struct queue_job {
union port_action_query query;
};
+LIST_HEAD(queue_job_list, queue_job);
+
struct port_flow_tunnel {
LIST_ENTRY(port_flow_tunnel) chain;
struct rte_flow_action *pmd_actions;
@@ -369,6 +372,7 @@ struct rte_port {
struct port_flow *flow_list; /**< Associated flows. */
struct port_indirect_action *actions_list;
/**< Associated indirect actions. */
+ struct queue_job_list *job_list; /**< Pending async flow API operations, per queue. */
LIST_HEAD(, port_flow_tunnel) flow_tunnel_list;
const struct rte_eth_rxtx_callback *rx_dump_cb[RTE_MAX_QUEUES_PER_PORT+1];
const struct rte_eth_rxtx_callback *tx_dump_cb[RTE_MAX_QUEUES_PER_PORT+1];
--
2.47.3
^ permalink raw reply [flat|nested] 4+ messages in thread
end of thread, other threads:[~2026-01-09 15:27 UTC | newest]
Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2025-11-18 10:45 [PATCH] app/testpmd: fix flow queue job leaks Dariusz Sosnowski
2025-12-03 0:04 ` Stephen Hemminger
2026-01-08 15:54 ` Dariusz Sosnowski
2026-01-09 15:26 ` [PATCH v2] " Dariusz Sosnowski
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).