DPDK patches and discussions
 help / color / mirror / Atom feed
* [PATCH] app/testpmd: fix flow queue job leaks
@ 2025-11-18 10:45 Dariusz Sosnowski
  0 siblings, 0 replies; only message in thread
From: Dariusz Sosnowski @ 2025-11-18 10:45 UTC (permalink / raw)
  To: Aman Singh, Ori Kam; +Cc: dev, Bing Zhao, stable

Each enqueued async flow operation in testpmd has an associated
queue_job struct. It is passed in user data and used to determine
the type of operation when operation results are pulled on a given
queue. This information informs the necessary additional handling
(e.g., freeing flow struct or dumping the queried action state).

If async flow operations were enqueued and results were not pulled
before quitting testpmd, these queue_job structs were leaked as reported
by ASAN:

Direct leak of 88 byte(s) in 1 object(s) allocated from:
    #0 0x7f7539084a37 in __interceptor_calloc
        ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:154
    #1 0x55a872c8e512 in port_queue_flow_create
        (/download/dpdk/install/bin/dpdk-testpmd+0x4cd512)
    #2 0x55a872c28414 in cmd_flow_cb
        (/download/dpdk/install/bin/dpdk-testpmd+0x467414)
    #3 0x55a8734fa6a3 in __cmdline_parse
        (/download/dpdk/install/bin/dpdk-testpmd+0xd396a3)
    #4 0x55a8734f6130 in cmdline_valid_buffer
        (/download/dpdk/install/bin/dpdk-testpmd+0xd35130)
    #5 0x55a873503b4f in rdline_char_in
        (/download/dpdk/install/bin/dpdk-testpmd+0xd42b4f)
    #6 0x55a8734f62ba in cmdline_in
        (/download/dpdk/install/bin/dpdk-testpmd+0xd352ba)
    #7 0x55a8734f65eb in cmdline_interact
        (/download/dpdk/install/bin/dpdk-testpmd+0xd355eb)
    #8 0x55a872c19b8e in prompt
        (/download/dpdk/install/bin/dpdk-testpmd+0x458b8e)
    #9 0x55a872be425a in main
        (/download/dpdk/install/bin/dpdk-testpmd+0x42325a)
    #10 0x7f7538756d8f in __libc_start_call_main
        ../sysdeps/nptl/libc_start_call_main.h:58

This patch addresses that by registering all queue_job structs, for a
given queue, on a linked list. Whenever operation results are pulled
and result is handled, queue_job struct will be removed from that list
and freed.
Before port is closed, during flow flush, testpmd will pull
all of the expected results
(based on the number of queue_job on the list).

Fixes: c9dc03840873 ("ethdev: add indirect action async query")
Fixes: 99231e480b69 ("ethdev: add template table resize")
Fixes: 77e7939acf1f ("app/testpmd: add flow rule update command")
Fixes: 3e3edab530a1 ("ethdev: add flow quota")
Fixes: 966eb55e9a00 ("ethdev: add queue-based API to report aged flow rules")
Cc: stable@dpdk.org

Signed-off-by: Dariusz Sosnowski <dsosnowski@nvidia.com>
---
 app/test-pmd/config.c  | 160 +++++++++++++++++++++++++++++++++++++++--
 app/test-pmd/testpmd.c |   8 +++
 app/test-pmd/testpmd.h |   4 ++
 3 files changed, 165 insertions(+), 7 deletions(-)

diff --git a/app/test-pmd/config.c b/app/test-pmd/config.c
index 8557371488..4049bf03c7 100644
--- a/app/test-pmd/config.c
+++ b/app/test-pmd/config.c
@@ -69,6 +69,8 @@
 
 #define NS_PER_SEC 1E9
 
+#define FLOW_QUEUE_FLUSH_SLEEP_US (100)
+
 static const struct {
 	enum tx_pkt_split split;
 	const char *name;
@@ -1834,6 +1836,14 @@ port_flow_configure(portid_t port_id,
 	port->queue_sz = queue_attr->size;
 	for (std_queue = 0; std_queue < nb_queue; std_queue++)
 		attr_list[std_queue] = queue_attr;
+	port->job_list = calloc(nb_queue, sizeof(*port->job_list));
+	if (port->job_list == NULL) {
+		TESTPMD_LOG(ERR, "Failed to allocate memory for operations tracking on port %u\n",
+			    port_id);
+		return -ENOMEM;
+	}
+	for (unsigned int i = 0; i < nb_queue; i++)
+		LIST_INIT(&port->job_list[i]);
 	/* Poisoning to make sure PMDs update it in case of error. */
 	memset(&error, 0x66, sizeof(error));
 	if (rte_flow_configure(port_id, port_attr, nb_queue, attr_list, &error))
@@ -2938,6 +2948,7 @@ port_queue_flow_create(portid_t port_id, queueid_t queue_id,
 	pf->flow = flow;
 	job->pf = pf;
 	port->flow_list = pf;
+	LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
 	printf("Flow rule #%"PRIu64" creation enqueued\n", pf->id);
 	return 0;
 }
@@ -2975,6 +2986,7 @@ port_queue_flow_update_resized(portid_t port_id, queueid_t queue_id,
 		free(job);
 		return port_flow_complain(&error);
 	}
+	LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
 	return 0;
 }
 
@@ -3028,6 +3040,7 @@ port_queue_flow_destroy(portid_t port_id, queueid_t queue_id,
 				ret = port_flow_complain(&error);
 				continue;
 			}
+			LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
 			printf("Flow rule #%"PRIu64" destruction enqueued\n",
 			       pf->id);
 			*tmp = pf->next;
@@ -3161,6 +3174,7 @@ port_queue_flow_update(portid_t port_id, queueid_t queue_id,
 	uf->flow = pf->flow;
 	*tmp = uf;
 	job->pf = pf;
+	LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
 
 	printf("Flow rule #%"PRIu64" update enqueued\n", pf->id);
 	return 0;
@@ -3215,6 +3229,7 @@ port_queue_action_handle_create(portid_t port_id, uint32_t queue_id,
 		free(job);
 		return port_flow_complain(&error);
 	}
+	LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
 	printf("Indirect action #%u creation queued\n", pia->id);
 	return 0;
 }
@@ -3276,6 +3291,7 @@ port_queue_action_handle_destroy(portid_t port_id,
 				ret = port_flow_complain(&error);
 				continue;
 			}
+			LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
 			*tmp = pia->next;
 			printf("Indirect action #%u destruction queued\n",
 			       pia->id);
@@ -3350,6 +3366,7 @@ port_queue_action_handle_update(portid_t port_id,
 		free(job);
 		return port_flow_complain(&error);
 	}
+	LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
 	printf("Indirect action #%u update queued\n", id);
 	return 0;
 }
@@ -3365,8 +3382,11 @@ port_queue_action_handle_query_update(portid_t port_id,
 	struct rte_flow_error error;
 	struct port_indirect_action *pia = action_get_by_id(port_id, id);
 	const struct rte_flow_op_attr attr = { .postpone = postpone};
+	struct rte_port *port;
 	struct queue_job *job;
 
+	port = &ports[port_id];
+
 	if (!pia || !pia->handle)
 		return;
 	job = calloc(1, sizeof(*job));
@@ -3385,6 +3405,7 @@ port_queue_action_handle_query_update(portid_t port_id,
 		port_flow_complain(&error);
 		free(job);
 	} else {
+		LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
 		printf("port-%u: indirect action #%u update-and-query queued\n",
 		       port_id, id);
 	}
@@ -3426,6 +3447,7 @@ port_queue_action_handle_query(portid_t port_id,
 		free(job);
 		return port_flow_complain(&error);
 	}
+	LIST_INSERT_HEAD(&port->job_list[queue_id], job, chain);
 	printf("Indirect action #%u update queued\n", id);
 	return 0;
 }
@@ -3541,6 +3563,19 @@ port_flow_hash_calc_encap(portid_t port_id,
 	return 0;
 }
 
+static void
+port_free_queue_job(struct queue_job *job)
+{
+	if (job->type == QUEUE_JOB_TYPE_FLOW_DESTROY ||
+	    job->type == QUEUE_JOB_TYPE_FLOW_UPDATE)
+		free(job->pf);
+	else if (job->type == QUEUE_JOB_TYPE_ACTION_DESTROY)
+		free(job->pia);
+
+	LIST_REMOVE(job, chain);
+	free(job);
+}
+
 /** Pull queue operation results from the queue. */
 static int
 port_queue_aged_flow_destroy(portid_t port_id, queueid_t queue_id,
@@ -3578,6 +3613,8 @@ port_queue_aged_flow_destroy(portid_t port_id, queueid_t queue_id,
 			return ret;
 		}
 		while (success < nb_flows) {
+			struct queue_job *job;
+
 			ret = rte_flow_pull(port_id, queue_id, res,
 					    port->queue_sz, &error);
 			if (ret < 0) {
@@ -3590,6 +3627,13 @@ port_queue_aged_flow_destroy(portid_t port_id, queueid_t queue_id,
 			for (i = 0; i < ret; i++) {
 				if (res[i].status == RTE_FLOW_OP_SUCCESS)
 					success++;
+				job = res[i].user_data;
+				/*
+				 * It is assumed that each enqueued async flow operation
+				 * has a queue_job entry.
+				 */
+				RTE_ASSERT(job != NULL);
+				port_free_queue_job(job);
 			}
 		}
 		rule += n;
@@ -3738,15 +3782,10 @@ port_queue_flow_pull(portid_t port_id, queueid_t queue_id)
 		if (res[i].status == RTE_FLOW_OP_SUCCESS)
 			success++;
 		job = (struct queue_job *)res[i].user_data;
-		if (job->type == QUEUE_JOB_TYPE_FLOW_DESTROY ||
-		    job->type == QUEUE_JOB_TYPE_FLOW_UPDATE)
-			free(job->pf);
-		else if (job->type == QUEUE_JOB_TYPE_ACTION_DESTROY)
-			free(job->pia);
-		else if (job->type == QUEUE_JOB_TYPE_ACTION_QUERY)
+		if (job->type == QUEUE_JOB_TYPE_ACTION_QUERY)
 			port_action_handle_query_dump(port_id, job->pia,
 						      &job->query);
-		free(job);
+		port_free_queue_job(job);
 	}
 	printf("Queue #%u pulled %u operations (%u failed, %u succeeded)\n",
 	       queue_id, ret, ret - success, success);
@@ -3960,6 +3999,108 @@ port_flow_update(portid_t port_id, uint32_t rule_id,
 	return -EINVAL;
 }
 
+static int
+port_flow_queue_job_flush(portid_t port_id, queueid_t queue_id)
+{
+	struct rte_flow_op_result *res;
+	struct rte_flow_error error;
+	unsigned int expected_ops;
+	struct rte_port *port;
+	struct queue_job *job;
+	unsigned int success;
+	unsigned int polled;
+	int ret;
+
+	port = &ports[port_id];
+
+	printf("Flushing flow queue %u on port %u\n", port_id, queue_id);
+
+	/* Poisoning to make sure PMDs update it in case of error. */
+	memset(&error, 0x44, sizeof(error));
+	if (rte_flow_push(port_id, queue_id, &error))
+		port_flow_complain(&error);
+
+	/* Count expected operations. */
+	expected_ops = 0;
+	LIST_FOREACH(job, &port->job_list[queue_id], chain)
+		expected_ops++;
+
+	res = calloc(expected_ops, sizeof(*res));
+	if (res == NULL)
+		return -ENOMEM;
+
+	polled = 0;
+	success = 0;
+	while (expected_ops > 0) {
+		/* Poisoning to make sure PMDs update it in case of error. */
+		memset(&error, 0x55, sizeof(error));
+		ret = rte_flow_pull(port_id, queue_id, res, expected_ops, &error);
+		if (ret < 0) {
+			port_flow_complain(&error);
+			free(res);
+			return ret;
+		}
+		if (ret == 0) {
+			rte_delay_us_sleep(FLOW_QUEUE_FLUSH_SLEEP_US);
+			continue;
+		}
+
+		expected_ops -= ret;
+		polled += ret;
+		for (int i = 0; i < ret; i++) {
+			if (res[i].status == RTE_FLOW_OP_SUCCESS)
+				success++;
+
+			job = (struct queue_job *)res[i].user_data;
+			/*
+			 * It is assumed that each enqueued async flow operation
+			 * has a queue_job entry.
+			 */
+			RTE_ASSERT(job != NULL);
+			port_free_queue_job(job);
+		}
+	}
+	free(res);
+
+	/*
+	 * It is assumed that each enqueued async flow operation
+	 * has a queue_job entry, so if expected_ops reached zero,
+	 * then the queue_job list should be empty.
+	 */
+	RTE_ASSERT(LIST_EMPTY(&port->job_list[queue_id]));
+
+	printf("Flushed flow queue %u on port %u (%u failed, %u succeeded).\n",
+	       port_id, queue_id, polled - success, success);
+
+	return 0;
+}
+
+static int
+port_flow_queues_job_flush(portid_t port_id)
+{
+	struct rte_port *port;
+	int ret;
+
+	port = &ports[port_id];
+
+	if (port->queue_nb == 0)
+		return 0;
+
+	for (queueid_t queue_id = 0; queue_id < port->queue_nb; ++queue_id) {
+		if (LIST_EMPTY(&port->job_list[queue_id]))
+			continue;
+
+		ret = port_flow_queue_job_flush(port_id, queue_id);
+		if (ret < 0) {
+			TESTPMD_LOG(ERR, "Flushing flows queue %u failed on port %u (ret %d)\n",
+				    queue_id, port_id, ret);
+			return ret;
+		}
+	}
+
+	return 0;
+}
+
 /** Remove all flow rules. */
 int
 port_flow_flush(portid_t port_id)
@@ -3974,6 +4115,11 @@ port_flow_flush(portid_t port_id)
 
 	port = &ports[port_id];
 
+	ret = port_flow_queues_job_flush(port_id);
+	if (ret < 0)
+		TESTPMD_LOG(ERR, "Flushing flows queues failed on port %u (ret %d)\n",
+			    port_id, ret);
+
 	if (port->flow_list == NULL)
 		return ret;
 
diff --git a/app/test-pmd/testpmd.c b/app/test-pmd/testpmd.c
index b10f6baee2..789f123c6a 100644
--- a/app/test-pmd/testpmd.c
+++ b/app/test-pmd/testpmd.c
@@ -3260,6 +3260,13 @@ remove_invalid_ports(void)
 	nb_cfg_ports = nb_fwd_ports;
 }
 
+static void
+port_free_job_list(portid_t pi)
+{
+	struct rte_port *port = &ports[pi];
+	free(port->job_list);
+}
+
 static void
 flush_port_owned_resources(portid_t pi)
 {
@@ -3270,6 +3277,7 @@ flush_port_owned_resources(portid_t pi)
 	port_flow_actions_template_flush(pi);
 	port_flex_item_flush(pi);
 	port_action_handle_flush(pi);
+	port_free_job_list(pi);
 }
 
 static void
diff --git a/app/test-pmd/testpmd.h b/app/test-pmd/testpmd.h
index fa46865c67..2fed7b9e7d 100644
--- a/app/test-pmd/testpmd.h
+++ b/app/test-pmd/testpmd.h
@@ -280,6 +280,7 @@ union port_action_query {
 
 /* Descriptor for queue job. */
 struct queue_job {
+	LIST_ENTRY(queue_job) chain;
 	uint32_t type; /**< Job type. */
 	union {
 		struct port_flow *pf;
@@ -288,6 +289,8 @@ struct queue_job {
 	union port_action_query query;
 };
 
+LIST_HEAD(queue_job_list, queue_job);
+
 struct port_flow_tunnel {
 	LIST_ENTRY(port_flow_tunnel) chain;
 	struct rte_flow_action *pmd_actions;
@@ -369,6 +372,7 @@ struct rte_port {
 	struct port_flow        *flow_list; /**< Associated flows. */
 	struct port_indirect_action *actions_list;
 	/**< Associated indirect actions. */
+	struct queue_job_list *job_list; /**< Pending async flow API operations, per queue. */
 	LIST_HEAD(, port_flow_tunnel) flow_tunnel_list;
 	const struct rte_eth_rxtx_callback *rx_dump_cb[RTE_MAX_QUEUES_PER_PORT+1];
 	const struct rte_eth_rxtx_callback *tx_dump_cb[RTE_MAX_QUEUES_PER_PORT+1];
-- 
2.39.5


^ permalink raw reply	[flat|nested] only message in thread

only message in thread, other threads:[~2025-11-18 10:45 UTC | newest]

Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2025-11-18 10:45 [PATCH] app/testpmd: fix flow queue job leaks Dariusz Sosnowski

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).