DPDK patches and discussions
 help / color / mirror / Atom feed
* [RFC v2 1/1] eventdev: add atomic queue to test-eventdev app
@ 2024-12-19 14:48 Luka Jankovic
  2024-12-23 11:16 ` Mattias Rönnblom
                   ` (2 more replies)
  0 siblings, 3 replies; 6+ messages in thread
From: Luka Jankovic @ 2024-12-19 14:48 UTC (permalink / raw)
  To: Luka Jankovic; +Cc: dev, Mattias Rönnblom

From 2e55ecd0e522f50cbb3635f53b025e165db7cf3e Mon Sep 17 00:00:00 2001
In-Reply-To: <228d44a6f2f1f6a4fb5519d9a91c99973f8d7352.camel@ericsson.com>
References: <228d44a6f2f1f6a4fb5519d9a91c99973f8d7352.camel@ericsson.com>
From: Luka Jankovic <luka.jankovic@ericsson.com>
Date: Thu, 19 Dec 2024 13:31:26 +0000
Subject: [RFC v2 1/1] eventdev: add atomic queue to test-eventdev app
To: luka.jankovic@ericsson.com
Cc: dev@dpdk.org,
    mattias.ronnblom@ericsson.com

Add an atomic queue test based on the order queue test but use exclusively atomic queues.
This makes it compatible with event devices such as the distributed software eventdev.

The test detects whether port maintenance is required.

To verify atomicity, a spinlock is set up for each combination of queue and flow.
It is taken whenever an event is dequeued for processing and released when processing is finished.
The test will fail if a port attempts to take a lock which is already taken.

Signed-off-by: Luka Jankovic <luka.jankovic@ericsson.com>
---
v2:
 * Changed to only check queue, flow combination, not port, queue, flow.
 * Lock is only held when a packet is processed.
 * Utilize event u64 instead of mbuf.
 * General cleanup.
---
 app/test-eventdev/evt_common.h        |  10 +
 app/test-eventdev/meson.build         |   1 +
 app/test-eventdev/test_atomic_queue.c | 372 ++++++++++++++++++++++++++
 3 files changed, 383 insertions(+)
 create mode 100644 app/test-eventdev/test_atomic_queue.c

diff --git a/app/test-eventdev/evt_common.h b/app/test-eventdev/evt_common.h
index 901b8ba55d..adb024c011 100644
--- a/app/test-eventdev/evt_common.h
+++ b/app/test-eventdev/evt_common.h
@@ -138,6 +138,16 @@ evt_has_flow_id(uint8_t dev_id)
 			true : false;
 }
 
+static inline bool
+evt_is_maintenance_free(uint8_t dev_id)
+{
+	struct rte_event_dev_info dev_info;
+
+	rte_event_dev_info_get(dev_id, &dev_info);
+	return (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_MAINTENANCE_FREE) ?
+			true : false;
+}
+
 static inline int
 evt_service_setup(uint32_t service_id)
 {
diff --git a/app/test-eventdev/meson.build b/app/test-eventdev/meson.build
index ab8769c755..db5add39eb 100644
--- a/app/test-eventdev/meson.build
+++ b/app/test-eventdev/meson.build
@@ -15,6 +15,7 @@ sources = files(
         'test_order_atq.c',
         'test_order_common.c',
         'test_order_queue.c',
+        'test_atomic_queue.c',
         'test_perf_atq.c',
         'test_perf_common.c',
         'test_perf_queue.c',
diff --git a/app/test-eventdev/test_atomic_queue.c b/app/test-eventdev/test_atomic_queue.c
new file mode 100644
index 0000000000..51e988c527
--- /dev/null
+++ b/app/test-eventdev/test_atomic_queue.c
@@ -0,0 +1,372 @@
+#include <stdio.h>
+#include <unistd.h>
+#include <rte_lcore_var.h>
+
+#include "test_order_common.h"
+
+#define NB_QUEUES 2
+
+static rte_spinlock_t *atomic_locks;
+
+static inline uint64_t
+event_data_create(flow_id_t flow, uint32_t seq)
+{
+	return ((uint64_t)flow << 32) | seq;
+}
+
+static inline uint32_t
+event_data_get_seq(struct rte_event *const ev)
+{
+	return ev->u64 & 0xFFFFFFFF;
+}
+
+static inline uint32_t
+event_data_get_flow(struct rte_event *const ev)
+{
+	return ev->u64 >> 32;
+}
+
+static inline uint32_t
+get_lock_idx(int queue, flow_id_t flow, uint32_t nb_flows)
+{
+	return (queue * nb_flows) + flow;
+}
+
+static inline bool
+test_done(struct test_order *const t)
+{
+	return t->err || t->result == EVT_TEST_SUCCESS;
+}
+
+static inline int
+atomic_producer(void *arg)
+{
+	struct prod_data *p = arg;
+	struct test_order *t = p->t;
+	struct evt_options *opt = t->opt;
+	const uint8_t dev_id = p->dev_id;
+	const uint8_t port = p->port_id;
+	const uint64_t nb_pkts = t->nb_pkts;
+	uint32_t *producer_flow_seq = t->producer_flow_seq;
+	const uint32_t nb_flows = t->nb_flows;
+	uint64_t count = 0;
+	struct rte_event ev;
+
+	if (opt->verbose_level > 1)
+		printf("%s(): lcore %d dev_id %d port=%d queue=%d\n", __func__, rte_lcore_id(),
+				dev_id, port, p->queue_id);
+
+	ev = (struct rte_event){
+			.event = 0,
+			.op = RTE_EVENT_OP_NEW,
+			.queue_id = p->queue_id,
+			.sched_type = RTE_SCHED_TYPE_ORDERED,
+			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+			.event_type = RTE_EVENT_TYPE_CPU,
+			.sub_event_type = 0 /* stage 0 */
+	};
+
+	while (count < nb_pkts && t->err == false) {
+		const flow_id_t flow = rte_rand_max(nb_flows);
+
+		/* Maintain seq number per flow */
+		ev.u64 = event_data_create(flow, producer_flow_seq[flow]++);
+		ev.flow_id = flow;
+
+		while (rte_event_enqueue_burst(dev_id, port, &ev, 1) != 1) {
+			if (t->err)
+				break;
+			rte_pause();
+		}
+
+		count++;
+	}
+
+	if (!evt_is_maintenance_free(dev_id)) {
+		while (!test_done(t)) {
+			rte_event_maintain(dev_id, port, RTE_EVENT_DEV_MAINT_OP_FLUSH);
+			rte_pause();
+		}
+	}
+
+	return 0;
+}
+
+static __rte_always_inline void
+atomic_process_stage_0(struct rte_event *const ev, uint32_t nb_flows, uint32_t port)
+{
+	const uint32_t flow = event_data_get_flow(ev);
+
+	if (!rte_spinlock_trylock(&atomic_locks[get_lock_idx(0, flow, nb_flows)])) {
+		evt_err("Port %u unable to take spinlock for queue %u, flow %x", port, 0, flow);
+	}
+
+	ev->queue_id = 1;
+	ev->op = RTE_EVENT_OP_FORWARD;
+	ev->sched_type = RTE_SCHED_TYPE_ATOMIC;
+	ev->event_type = RTE_EVENT_TYPE_CPU;
+
+	rte_spinlock_unlock(&atomic_locks[get_lock_idx(0, flow, nb_flows)]);
+}
+
+static __rte_always_inline void
+atomic_process_stage_1(struct test_order *const t, struct rte_event *const ev, uint32_t nb_flows,
+		uint32_t *const expected_flow_seq, RTE_ATOMIC(uint64_t) *const outstand_pkts,
+		uint32_t port)
+{
+	const uint32_t flow = event_data_get_flow(ev);
+
+	if (!rte_spinlock_trylock(&atomic_locks[get_lock_idx(1, flow, nb_flows)])) {
+		evt_err("Port %u unable to take spinlock for queue %u, flow %x", port, 0, flow);
+	}
+
+	/* compare the seqn against expected value */
+	if (event_data_get_seq(ev) != expected_flow_seq[flow]) {
+		evt_err("flow=%x seqn mismatch got=%lx expected=%x", flow, ev->u64,
+				expected_flow_seq[flow]);
+		t->err = true;
+	}
+
+	expected_flow_seq[flow]++;
+	rte_atomic_fetch_sub_explicit(outstand_pkts, 1, rte_memory_order_relaxed);
+
+	ev->op = RTE_EVENT_OP_RELEASE;
+
+	rte_spinlock_unlock(&atomic_locks[get_lock_idx(1, flow, nb_flows)]);
+}
+
+static int
+atomic_queue_worker_burst(void *arg, bool flow_id_cap, uint32_t max_burst)
+{
+	ORDER_WORKER_INIT;
+	struct rte_event ev[BURST_SIZE];
+	uint16_t i;
+
+	while (t->err == false) {
+
+		uint16_t const nb_rx = rte_event_dequeue_burst(dev_id, port, ev, max_burst, 0);
+
+		if (nb_rx == 0) {
+			if (rte_atomic_load_explicit(outstand_pkts, rte_memory_order_relaxed) <=
+					0) {
+				break;
+			}
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			if (!flow_id_cap) {
+				ev[i].flow_id = event_data_get_flow(&ev[i]);
+			}
+
+			switch (ev[i].queue_id) {
+			case 0:
+				atomic_process_stage_0(&ev[i], nb_flows, port);
+				break;
+			case 1:
+				atomic_process_stage_1(t, &ev[i], nb_flows, expected_flow_seq,
+						outstand_pkts, port);
+				break;
+			default:
+				order_process_stage_invalid(t, &ev[i]);
+				break;
+			}
+		}
+
+		uint16_t total_enq = 0;
+
+		do {
+			total_enq += rte_event_enqueue_burst(
+					dev_id, port, ev + total_enq, nb_rx - total_enq);
+		} while (total_enq < nb_rx);
+	}
+
+	return 0;
+}
+
+static int
+worker_wrapper(void *arg)
+{
+	struct worker_data *w = arg;
+	int max_burst = evt_has_burst_mode(w->dev_id) ? BURST_SIZE : 1;
+	const bool flow_id_cap = evt_has_flow_id(w->dev_id);
+
+	return atomic_queue_worker_burst(arg, flow_id_cap, max_burst);
+}
+
+static int
+atomic_queue_launch_lcores(struct evt_test *test, struct evt_options *opt)
+{
+	int ret, lcore_id;
+	struct test_order *t = evt_test_priv(test);
+
+	/* launch workers */
+
+	int wkr_idx = 0;
+	RTE_LCORE_FOREACH_WORKER(lcore_id) {
+		if (!(opt->wlcores[lcore_id]))
+			continue;
+
+		ret = rte_eal_remote_launch(worker_wrapper, &t->worker[wkr_idx], lcore_id);
+		if (ret) {
+			evt_err("failed to launch worker %d", lcore_id);
+			return ret;
+		}
+		wkr_idx++;
+	}
+
+	/* launch producer */
+	int plcore = evt_get_first_active_lcore(opt->plcores);
+
+	ret = rte_eal_remote_launch(atomic_producer, &t->prod, plcore);
+	if (ret) {
+		evt_err("failed to launch order_producer %d", plcore);
+		return ret;
+	}
+
+	uint64_t cycles = rte_get_timer_cycles();
+	int64_t old_remaining = -1;
+
+	while (t->err == false) {
+		uint64_t new_cycles = rte_get_timer_cycles();
+		int64_t remaining = rte_atomic_load_explicit(
+				&t->outstand_pkts, rte_memory_order_relaxed);
+
+		if (remaining <= 0) {
+			t->result = EVT_TEST_SUCCESS;
+			break;
+		}
+
+		if (new_cycles - cycles > rte_get_timer_hz() * 1) {
+			printf(CLGRN "\r%" PRId64 "" CLNRM, remaining);
+			fflush(stdout);
+			if (old_remaining == remaining) {
+				rte_event_dev_dump(opt->dev_id, stdout);
+				evt_err("No schedules for seconds, deadlock");
+				t->err = true;
+				break;
+			}
+			old_remaining = remaining;
+			cycles = new_cycles;
+		}
+	}
+	printf("\r");
+
+	rte_free(atomic_locks);
+
+	return 0;
+}
+
+static int
+atomic_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
+{
+	int ret;
+
+	const uint8_t nb_workers = evt_nr_active_lcores(opt->wlcores);
+	/* number of active worker cores + 1 producer */
+	const uint8_t nb_ports = nb_workers + 1;
+
+	ret = evt_configure_eventdev(opt, NB_QUEUES, nb_ports);
+	if (ret) {
+		evt_err("failed to configure eventdev %d", opt->dev_id);
+		return ret;
+	}
+
+	/* q0 configuration */
+	struct rte_event_queue_conf q0_atomic_conf = {
+			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+			.schedule_type = RTE_SCHED_TYPE_ATOMIC,
+			.nb_atomic_flows = opt->nb_flows,
+			.nb_atomic_order_sequences = opt->nb_flows,
+	};
+	ret = rte_event_queue_setup(opt->dev_id, 0, &q0_atomic_conf);
+	if (ret) {
+		evt_err("failed to setup queue0 eventdev %d err %d", opt->dev_id, ret);
+		return ret;
+	}
+
+	/* q1 configuration */
+	struct rte_event_queue_conf q1_atomic_conf = {
+			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+			.schedule_type = RTE_SCHED_TYPE_ATOMIC,
+			.nb_atomic_flows = opt->nb_flows,
+			.nb_atomic_order_sequences = opt->nb_flows,
+	};
+	ret = rte_event_queue_setup(opt->dev_id, 1, &q1_atomic_conf);
+	if (ret) {
+		evt_err("failed to setup queue0 eventdev %d err %d", opt->dev_id, ret);
+		return ret;
+	}
+
+	/* setup one port per worker, linking to all queues */
+	ret = order_event_dev_port_setup(test, opt, nb_workers, NB_QUEUES);
+	if (ret)
+		return ret;
+
+	if (!evt_has_distributed_sched(opt->dev_id)) {
+		uint32_t service_id;
+		rte_event_dev_service_id_get(opt->dev_id, &service_id);
+		ret = evt_service_setup(service_id);
+		if (ret) {
+			evt_err("No service lcore found to run event dev.");
+			return ret;
+		}
+	}
+
+	ret = rte_event_dev_start(opt->dev_id);
+	if (ret) {
+		evt_err("failed to start eventdev %d", opt->dev_id);
+		return ret;
+	}
+
+	const uint32_t num_locks = NB_QUEUES * opt->nb_flows;
+
+	atomic_locks = rte_calloc(NULL, num_locks, sizeof(rte_spinlock_t), 0);
+
+	for (uint32_t i = 0; i < num_locks; i++) {
+		rte_spinlock_init(&atomic_locks[i]);
+	}
+
+	return 0;
+}
+
+static void
+atomic_queue_opt_dump(struct evt_options *opt)
+{
+	order_opt_dump(opt);
+	evt_dump("nb_evdev_queues", "%d", NB_QUEUES);
+}
+
+static bool
+atomic_queue_capability_check(struct evt_options *opt)
+{
+	struct rte_event_dev_info dev_info;
+
+	rte_event_dev_info_get(opt->dev_id, &dev_info);
+	if (dev_info.max_event_queues < NB_QUEUES ||
+			dev_info.max_event_ports < order_nb_event_ports(opt)) {
+		evt_err("not enough eventdev queues=%d/%d or ports=%d/%d", NB_QUEUES,
+				dev_info.max_event_queues, order_nb_event_ports(opt),
+				dev_info.max_event_ports);
+		return false;
+	}
+
+	return true;
+}
+
+static const struct evt_test_ops atomic_queue = {
+		.cap_check = atomic_queue_capability_check,
+		.opt_check = order_opt_check,
+		.opt_dump = atomic_queue_opt_dump,
+		.test_setup = order_test_setup,
+		.mempool_setup = order_mempool_setup,
+		.eventdev_setup = atomic_queue_eventdev_setup,
+		.launch_lcores = atomic_queue_launch_lcores,
+		.eventdev_destroy = order_eventdev_destroy,
+		.mempool_destroy = order_mempool_destroy,
+		.test_result = order_test_result,
+		.test_destroy = order_test_destroy,
+};
+
+EVT_TEST_REGISTER(atomic_queue);
-- 
2.34.1



^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [RFC v2 1/1] eventdev: add atomic queue to test-eventdev app
  2024-12-19 14:48 [RFC v2 1/1] eventdev: add atomic queue to test-eventdev app Luka Jankovic
@ 2024-12-23 11:16 ` Mattias Rönnblom
  2025-01-09 10:22   ` Luka Jankovic
  2025-01-13  9:04 ` [RFC v3 " ejnulak
  2025-01-13 12:17 ` [RFC v4 " Luka Jankovic
  2 siblings, 1 reply; 6+ messages in thread
From: Mattias Rönnblom @ 2024-12-23 11:16 UTC (permalink / raw)
  To: Luka Jankovic; +Cc: dev, Mattias Rönnblom

On 2024-12-19 15:48, Luka Jankovic wrote:
>  From 2e55ecd0e522f50cbb3635f53b025e165db7cf3e Mon Sep 17 00:00:00 2001
> In-Reply-To: <228d44a6f2f1f6a4fb5519d9a91c99973f8d7352.camel@ericsson.com>
> References: <228d44a6f2f1f6a4fb5519d9a91c99973f8d7352.camel@ericsson.com>
> From: Luka Jankovic <luka.jankovic@ericsson.com>
> Date: Thu, 19 Dec 2024 13:31:26 +0000
> Subject: [RFC v2 1/1] eventdev: add atomic queue to test-eventdev app
> To: luka.jankovic@ericsson.com
> Cc: dev@dpdk.org,
>      mattias.ronnblom@ericsson.com
> 
> Add an atomic queue test based on the order queue test but use exclusively atomic queues.
> This makes it compatible with event devices such as the distributed software eventdev.
> 
> The test detects whether port maintenance is required.

"whether /../ or not", or replace "whether" with "if".

> 
> To verify atomicity, a spinlock is set up for each combination of queue and flow.
> It is taken whenever an event is dequeued for processing and released when processing is finished.
> The test will fail if a port attempts to take a lock which is already taken.
> 
> Signed-off-by: Luka Jankovic <luka.jankovic@ericsson.com>
> ---
> v2:
>   * Changed to only check queue, flow combination, not port, queue, flow.
>   * Lock is only held when a packet is processed.
>   * Utilize event u64 instead of mbuf.
>   * General cleanup.
> ---
>   app/test-eventdev/evt_common.h        |  10 +
>   app/test-eventdev/meson.build         |   1 +
>   app/test-eventdev/test_atomic_queue.c | 372 ++++++++++++++++++++++++++
>   3 files changed, 383 insertions(+)
>   create mode 100644 app/test-eventdev/test_atomic_queue.c
> 
> diff --git a/app/test-eventdev/evt_common.h b/app/test-eventdev/evt_common.h
> index 901b8ba55d..adb024c011 100644
> --- a/app/test-eventdev/evt_common.h
> +++ b/app/test-eventdev/evt_common.h
> @@ -138,6 +138,16 @@ evt_has_flow_id(uint8_t dev_id)
>   			true : false;
>   }
>   
> +static inline bool
> +evt_is_maintenance_free(uint8_t dev_id)
> +{
> +	struct rte_event_dev_info dev_info;
> +
> +	rte_event_dev_info_get(dev_id, &dev_info);
> +	return (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_MAINTENANCE_FREE) ?
> +			true : false;

return dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_MAINTENANCE_FREE;

will do the trick.

> +}
> +
>   static inline int
>   evt_service_setup(uint32_t service_id)
>   {
> diff --git a/app/test-eventdev/meson.build b/app/test-eventdev/meson.build
> index ab8769c755..db5add39eb 100644
> --- a/app/test-eventdev/meson.build
> +++ b/app/test-eventdev/meson.build
> @@ -15,6 +15,7 @@ sources = files(
>           'test_order_atq.c',
>           'test_order_common.c',
>           'test_order_queue.c',
> +        'test_atomic_queue.c',
>           'test_perf_atq.c',
>           'test_perf_common.c',
>           'test_perf_queue.c',
> diff --git a/app/test-eventdev/test_atomic_queue.c b/app/test-eventdev/test_atomic_queue.c
> new file mode 100644
> index 0000000000..51e988c527
> --- /dev/null
> +++ b/app/test-eventdev/test_atomic_queue.c
> @@ -0,0 +1,372 @@
> +#include <stdio.h>
> +#include <unistd.h>
> +#include <rte_lcore_var.h>
> +
> +#include "test_order_common.h"
> +
> +#define NB_QUEUES 2
> +
> +static rte_spinlock_t *atomic_locks;
> +
> +static inline uint64_t
> +event_data_create(flow_id_t flow, uint32_t seq)
> +{
> +	return ((uint64_t)flow << 32) | seq;
> +}
> +

An alternative would be to use a struct to pack the two 32-bit words 
into a single 64-bit word.

struct event_data
{
	union {
		struct {
			uint32_t flow;
			uint32_t seq;
		};
		uint64_t raw;
	};
};

static inline uint64_t
event_data_create(flow_id_t flow, uint32_t seq)
{
	struct event_data d = {
		.flow = flow,
		.seq = seq
	};
	return d.raw;
}

static inline uint32_t
event_data_get_seq(struct rte_event *const ev)
{
	struct event_data d = {
		.raw = ev->u64;
	};
	return d.seq;
}

...or something like that.

Would get rid of the masking and shifting, and some semi-magical numbers.

> +static inline uint32_t
> +event_data_get_seq(struct rte_event *const ev)
> +{
> +	return ev->u64 & 0xFFFFFFFF;

Does the masking have any effect here? Other than having the reviewer 
counting the number of Fs. :)

> +}
> +
> +static inline uint32_t
> +event_data_get_flow(struct rte_event *const ev)
> +{
> +	return ev->u64 >> 32;
> +}
> +
> +static inline uint32_t
> +get_lock_idx(int queue, flow_id_t flow, uint32_t nb_flows)
> +{
> +	return (queue * nb_flows) + flow;
> +}
> +
> +static inline bool
> +test_done(struct test_order *const t)
> +{
> +	return t->err || t->result == EVT_TEST_SUCCESS;
> +}
> +
> +static inline int
> +atomic_producer(void *arg)
> +{
> +	struct prod_data *p = arg;
> +	struct test_order *t = p->t;
> +	struct evt_options *opt = t->opt;
> +	const uint8_t dev_id = p->dev_id;
> +	const uint8_t port = p->port_id;
> +	const uint64_t nb_pkts = t->nb_pkts;
> +	uint32_t *producer_flow_seq = t->producer_flow_seq;
> +	const uint32_t nb_flows = t->nb_flows;
> +	uint64_t count = 0;
> +	struct rte_event ev;
> +
> +	if (opt->verbose_level > 1)
> +		printf("%s(): lcore %d dev_id %d port=%d queue=%d\n", __func__, rte_lcore_id(),
> +				dev_id, port, p->queue_id);
> +
> +	ev = (struct rte_event){
> +			.event = 0,
> +			.op = RTE_EVENT_OP_NEW,
> +			.queue_id = p->queue_id,
> +			.sched_type = RTE_SCHED_TYPE_ORDERED,

It seems like an atomic producer should produce events with sched_type 
to RTE_SCHED_TYPE_ATOMIC.

> +			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
> +			.event_type = RTE_EVENT_TYPE_CPU,
> +			.sub_event_type = 0 /* stage 0 */

Is the sub_event_type used? Can't find any reference to it. Also, isn't 
queue id used to signify processing stage?

> +	};
> +
> +	while (count < nb_pkts && t->err == false) {
> +		const flow_id_t flow = rte_rand_max(nb_flows);
> +
> +		/* Maintain seq number per flow */
> +		ev.u64 = event_data_create(flow, producer_flow_seq[flow]++);
> +		ev.flow_id = flow;
> +
> +		while (rte_event_enqueue_burst(dev_id, port, &ev, 1) != 1) {
> +			if (t->err)
> +				break;
> +			rte_pause();
> +		}
> +
> +		count++;
> +	}
> +
> +	if (!evt_is_maintenance_free(dev_id)) {
> +		while (!test_done(t)) {
> +			rte_event_maintain(dev_id, port, RTE_EVENT_DEV_MAINT_OP_FLUSH);
> +			rte_pause();
> +		}
> +	}
> +
> +	return 0;
> +}
> +
> +static __rte_always_inline void
> +atomic_process_stage_0(struct rte_event *const ev, uint32_t nb_flows, uint32_t port)
> +{
> +	const uint32_t flow = event_data_get_flow(ev);
> +
> +	if (!rte_spinlock_trylock(&atomic_locks[get_lock_idx(0, flow, nb_flows)])) {
> +		evt_err("Port %u unable to take spinlock for queue %u, flow %x", port, 0, flow);

Maybe you should report to the user what that means? Something making 
clear atomicity is violated.

> +	}
> +
> +	ev->queue_id = 1;
> +	ev->op = RTE_EVENT_OP_FORWARD;
> +	ev->sched_type = RTE_SCHED_TYPE_ATOMIC;
> +	ev->event_type = RTE_EVENT_TYPE_CPU;
> +
> +	rte_spinlock_unlock(&atomic_locks[get_lock_idx(0, flow, nb_flows)]);
> +}
> +
> +static __rte_always_inline void

Why is this __rte_always_inline?

> +atomic_process_stage_1(struct test_order *const t, struct rte_event *const ev, uint32_t nb_flows,
> +		uint32_t *const expected_flow_seq, RTE_ATOMIC(uint64_t) *const outstand_pkts,
> +		uint32_t port)
> +{
> +	const uint32_t flow = event_data_get_flow(ev);
> +
> +	if (!rte_spinlock_trylock(&atomic_locks[get_lock_idx(1, flow, nb_flows)])) {
> +		evt_err("Port %u unable to take spinlock for queue %u, flow %x", port, 0, flow);

Break this out to a verify function, used by both stages.

> +	}
> +
> +	/* compare the seqn against expected value */
> +	if (event_data_get_seq(ev) != expected_flow_seq[flow]) {
> +		evt_err("flow=%x seqn mismatch got=%lx expected=%x", flow, ev->u64,
> +				expected_flow_seq[flow]);
> +		t->err = true;
> +	}
> +
> +	expected_flow_seq[flow]++;
> +	rte_atomic_fetch_sub_explicit(outstand_pkts, 1, rte_memory_order_relaxed);
> +
> +	ev->op = RTE_EVENT_OP_RELEASE;
> +
> +	rte_spinlock_unlock(&atomic_locks[get_lock_idx(1, flow, nb_flows)]);

Factor out "queue-flow" locking/unlocking to two separate functions.

> +}
> +
> +static int
> +atomic_queue_worker_burst(void *arg, bool flow_id_cap, uint32_t max_burst)
> +{
> +	ORDER_WORKER_INIT;
> +	struct rte_event ev[BURST_SIZE];
> +	uint16_t i;
> +
> +	while (t->err == false) {
> +
> +		uint16_t const nb_rx = rte_event_dequeue_burst(dev_id, port, ev, max_burst, 0);
> +
> +		if (nb_rx == 0) {
> +			if (rte_atomic_load_explicit(outstand_pkts, rte_memory_order_relaxed) <=
> +					0) {
> +				break;
> +			}
> +			rte_pause();
> +			continue;
> +		}
> +
> +		for (i = 0; i < nb_rx; i++) {
> +			if (!flow_id_cap) {
> +				ev[i].flow_id = event_data_get_flow(&ev[i]);
> +			}
> +
> +			switch (ev[i].queue_id) {
> +			case 0:
> +				atomic_process_stage_0(&ev[i], nb_flows, port);
> +				break;
> +			case 1:
> +				atomic_process_stage_1(t, &ev[i], nb_flows, expected_flow_seq,
> +						outstand_pkts, port);
> +				break;
> +			default:
> +				order_process_stage_invalid(t, &ev[i]);
> +				break;
> +			}
> +		}
> +
> +		uint16_t total_enq = 0;
> +
> +		do {
> +			total_enq += rte_event_enqueue_burst(
> +					dev_id, port, ev + total_enq, nb_rx - total_enq);
> +		} while (total_enq < nb_rx);
> +	}
> +
> +	return 0;
> +}
> +
> +static int
> +worker_wrapper(void *arg)
> +{
> +	struct worker_data *w = arg;
> +	int max_burst = evt_has_burst_mode(w->dev_id) ? BURST_SIZE : 1;
> +	const bool flow_id_cap = evt_has_flow_id(w->dev_id);
> +
> +	return atomic_queue_worker_burst(arg, flow_id_cap, max_burst);
> +}
> +
> +static int
> +atomic_queue_launch_lcores(struct evt_test *test, struct evt_options *opt)
> +{
> +	int ret, lcore_id;
> +	struct test_order *t = evt_test_priv(test);
> +
> +	/* launch workers */
> +
> +	int wkr_idx = 0;
> +	RTE_LCORE_FOREACH_WORKER(lcore_id) {
> +		if (!(opt->wlcores[lcore_id]))
> +			continue;
> +
> +		ret = rte_eal_remote_launch(worker_wrapper, &t->worker[wkr_idx], lcore_id);
> +		if (ret) {
> +			evt_err("failed to launch worker %d", lcore_id);
> +			return ret;
> +		}
> +		wkr_idx++;
> +	}
> +
> +	/* launch producer */
> +	int plcore = evt_get_first_active_lcore(opt->plcores);
> +
> +	ret = rte_eal_remote_launch(atomic_producer, &t->prod, plcore);
> +	if (ret) {
> +		evt_err("failed to launch order_producer %d", plcore);
> +		return ret;
> +	}
> +
> +	uint64_t cycles = rte_get_timer_cycles();

There must be a better name for this variable. "current_time", is that 
how it's used? Or "now". When I've read below code, it seems like it's 
"last_active", or "last_dequeue".

> +	int64_t old_remaining = -1;
> +
> +	while (t->err == false) {
> +		uint64_t new_cycles = rte_get_timer_cycles();
> +		int64_t remaining = rte_atomic_load_explicit(

Rename to "outstanding_pkts". No point in having two terms used for the 
same thing.

> +				&t->outstand_pkts, rte_memory_order_relaxed);
> +
> +		if (remaining <= 0) {
> +			t->result = EVT_TEST_SUCCESS;
> +			break;
> +		}
> +
> +		if (new_cycles - cycles > rte_get_timer_hz() * 1) {

Introduce either a temp "idle_timeout = rte_get_timer_hz() * 1" or a 
"#define IDLE_TIMEOUT 1"

Maybe the timeout should be longer than a second, to allow for very busy 
CI servers.

> +			printf(CLGRN "\r%" PRId64 "" CLNRM, remaining);
> +			fflush(stdout);
> +			if (old_remaining == remaining) {
> +				rte_event_dev_dump(opt->dev_id, stdout);
> +				evt_err("No schedules for seconds, deadlock");
> +				t->err = true;
> +				break;
> +			}
> +			old_remaining = remaining;
> +			cycles = new_cycles;
> +		}
> +	}
> +	printf("\r");
> +
> +	rte_free(atomic_locks);
> +
> +	return 0;
> +}
> +
> +static int
> +atomic_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
> +{
> +	int ret;
> +
> +	const uint8_t nb_workers = evt_nr_active_lcores(opt->wlcores);
> +	/* number of active worker cores + 1 producer */
> +	const uint8_t nb_ports = nb_workers + 1;
> +
> +	ret = evt_configure_eventdev(opt, NB_QUEUES, nb_ports);
> +	if (ret) {
> +		evt_err("failed to configure eventdev %d", opt->dev_id);
> +		return ret;
> +	}
> +
> +	/* q0 configuration */
> +	struct rte_event_queue_conf q0_atomic_conf = {
> +			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
> +			.schedule_type = RTE_SCHED_TYPE_ATOMIC,
> +			.nb_atomic_flows = opt->nb_flows,
> +			.nb_atomic_order_sequences = opt->nb_flows,
> +	};
> +	ret = rte_event_queue_setup(opt->dev_id, 0, &q0_atomic_conf);
> +	if (ret) {
> +		evt_err("failed to setup queue0 eventdev %d err %d", opt->dev_id, ret);
> +		return ret;
> +	}
> +
> +	/* q1 configuration */
> +	struct rte_event_queue_conf q1_atomic_conf = {
> +			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
> +			.schedule_type = RTE_SCHED_TYPE_ATOMIC,
> +			.nb_atomic_flows = opt->nb_flows,
> +			.nb_atomic_order_sequences = opt->nb_flows,
> +	};
> +	ret = rte_event_queue_setup(opt->dev_id, 1, &q1_atomic_conf);
> +	if (ret) {
> +		evt_err("failed to setup queue0 eventdev %d err %d", opt->dev_id, ret);
> +		return ret;
> +	}
> +
> +	/* setup one port per worker, linking to all queues */
> +	ret = order_event_dev_port_setup(test, opt, nb_workers, NB_QUEUES);
> +	if (ret)
> +		return ret;
> +
> +	if (!evt_has_distributed_sched(opt->dev_id)) {
> +		uint32_t service_id;
> +		rte_event_dev_service_id_get(opt->dev_id, &service_id);
> +		ret = evt_service_setup(service_id);
> +		if (ret) {
> +			evt_err("No service lcore found to run event dev.");
> +			return ret;
> +		}
> +	}
> +
> +	ret = rte_event_dev_start(opt->dev_id);
> +	if (ret) {
> +		evt_err("failed to start eventdev %d", opt->dev_id);
> +		return ret;
> +	}
> +
> +	const uint32_t num_locks = NB_QUEUES * opt->nb_flows;
> +
> +	atomic_locks = rte_calloc(NULL, num_locks, sizeof(rte_spinlock_t), 0);
> +
> +	for (uint32_t i = 0; i < num_locks; i++) {
> +		rte_spinlock_init(&atomic_locks[i]);
> +	}
> +
> +	return 0;
> +}
> +
> +static void
> +atomic_queue_opt_dump(struct evt_options *opt)
> +{
> +	order_opt_dump(opt);
> +	evt_dump("nb_evdev_queues", "%d", NB_QUEUES);
> +}
> +
> +static bool
> +atomic_queue_capability_check(struct evt_options *opt)
> +{
> +	struct rte_event_dev_info dev_info;
> +
> +	rte_event_dev_info_get(opt->dev_id, &dev_info);
> +	if (dev_info.max_event_queues < NB_QUEUES ||
> +			dev_info.max_event_ports < order_nb_event_ports(opt)) {
> +		evt_err("not enough eventdev queues=%d/%d or ports=%d/%d", NB_QUEUES,
> +				dev_info.max_event_queues, order_nb_event_ports(opt),
> +				dev_info.max_event_ports);
> +		return false;
> +	}
> +
> +	return true;
> +}
> +
> +static const struct evt_test_ops atomic_queue = {
> +		.cap_check = atomic_queue_capability_check,
> +		.opt_check = order_opt_check,
> +		.opt_dump = atomic_queue_opt_dump,
> +		.test_setup = order_test_setup,
> +		.mempool_setup = order_mempool_setup,
> +		.eventdev_setup = atomic_queue_eventdev_setup,
> +		.launch_lcores = atomic_queue_launch_lcores,
> +		.eventdev_destroy = order_eventdev_destroy,
> +		.mempool_destroy = order_mempool_destroy,
> +		.test_result = order_test_result,
> +		.test_destroy = order_test_destroy,
> +};
> +
> +EVT_TEST_REGISTER(atomic_queue);


^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [RFC v2 1/1] eventdev: add atomic queue to test-eventdev app
  2024-12-23 11:16 ` Mattias Rönnblom
@ 2025-01-09 10:22   ` Luka Jankovic
  0 siblings, 0 replies; 6+ messages in thread
From: Luka Jankovic @ 2025-01-09 10:22 UTC (permalink / raw)
  To: hofors; +Cc: dev, Mattias Rönnblom



On Mon, 2024-12-23 at 12:16 +0100, Mattias Rönnblom wrote:
> 
> > +static __rte_always_inline void
> 
> Why is this __rte_always_inline?
> 

The stage functions are based on the ones defined in test_order_queue.c and the
test_order_common.h respectively, where they were defined with
__rte_always_inline. I tried using normal inline and it works, so I will change
it.

Thank you for the feedback. I will revise and send a new version.
> 


^ permalink raw reply	[flat|nested] 6+ messages in thread

* [RFC v3 1/1] eventdev: add atomic queue to test-eventdev app
  2024-12-19 14:48 [RFC v2 1/1] eventdev: add atomic queue to test-eventdev app Luka Jankovic
  2024-12-23 11:16 ` Mattias Rönnblom
@ 2025-01-13  9:04 ` ejnulak
  2025-01-13 12:17 ` [RFC v4 " Luka Jankovic
  2 siblings, 0 replies; 6+ messages in thread
From: ejnulak @ 2025-01-13  9:04 UTC (permalink / raw)
  To: luka.jankovic; +Cc: dev, mattias.ronnblom

From: Luka Jankovic <luka.jankovic@ericsson.com>

Add an atomic queue test based on the order queue test but use exclusively atomic queues.
This makes it compatible with event devices such as the distributed software eventdev.

The test detects if port maintenance is required.

To verify atomicity, a spinlock is set up for each combination of queue and flow.
It is taken whenever an event is dequeued for processing and released when processing is finished.
The test will fail if a port attempts to take a lock which is already taken.

Signed-off-by: Luka Jankovic <luka.jankovic@ericsson.com>
---
v3:
 * Use struct to avoid bit operations when accessing event u64.
 * Changed __rte_always_inline to inline for processing stages.
 * Introduce idle timeout constant.
 * Formatting and cleanup.

v2:
 * Changed to only check queue, flow combination, not port, queue, flow.
 * Lock is only held when a packet is processed.
 * Utilize event u64 instead of mbuf.
 * General cleanup.
---
 app/test-eventdev/evt_common.h        |   9 +
 app/test-eventdev/meson.build         |   1 +
 app/test-eventdev/test_atomic_queue.c | 407 ++++++++++++++++++++++++++
 3 files changed, 417 insertions(+)
 create mode 100644 app/test-eventdev/test_atomic_queue.c

diff --git a/app/test-eventdev/evt_common.h b/app/test-eventdev/evt_common.h
index 63b782f11a..74f9d187f3 100644
--- a/app/test-eventdev/evt_common.h
+++ b/app/test-eventdev/evt_common.h
@@ -138,6 +138,15 @@ evt_has_flow_id(uint8_t dev_id)
 			true : false;
 }

+static inline bool
+evt_is_maintenance_free(uint8_t dev_id)
+{
+	struct rte_event_dev_info dev_info;
+
+	rte_event_dev_info_get(dev_id, &dev_info);
+	return dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_MAINTENANCE_FREE;
+}
+
 static inline int
 evt_service_setup(uint32_t service_id)
 {
diff --git a/app/test-eventdev/meson.build b/app/test-eventdev/meson.build
index ab8769c755..db5add39eb 100644
--- a/app/test-eventdev/meson.build
+++ b/app/test-eventdev/meson.build
@@ -15,6 +15,7 @@ sources = files(
         'test_order_atq.c',
         'test_order_common.c',
         'test_order_queue.c',
+        'test_atomic_queue.c',
         'test_perf_atq.c',
         'test_perf_common.c',
         'test_perf_queue.c',
diff --git a/app/test-eventdev/test_atomic_queue.c b/app/test-eventdev/test_atomic_queue.c
new file mode 100644
index 0000000000..847bef6a12
--- /dev/null
+++ b/app/test-eventdev/test_atomic_queue.c
@@ -0,0 +1,407 @@
+#include <stdint.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <rte_lcore_var.h>
+
+#include "rte_common.h"
+#include "test_order_common.h"
+
+#define IDLE_TIMEOUT 1
+#define NB_QUEUES 2
+
+static rte_spinlock_t *atomic_locks;
+
+struct event_data {
+	union {
+		struct {
+			uint32_t flow;
+			uint32_t seq;
+		};
+		uint64_t raw;
+	};
+};
+
+static inline uint64_t
+event_data_create(flow_id_t flow, uint32_t seq)
+{
+	struct event_data data = {.flow = flow, .seq = seq};
+	return data.raw;
+}
+
+static inline uint32_t
+event_data_get_seq(struct rte_event *const ev)
+{
+	struct event_data data = {.raw = ev->u64};
+	return data.seq;
+}
+
+static inline uint32_t
+event_data_get_flow(struct rte_event *const ev)
+{
+	struct event_data data = {.raw = ev->u64};
+	return data.flow;
+}
+
+static inline uint32_t
+get_lock_idx(int queue, flow_id_t flow, uint32_t nb_flows)
+{
+	return (queue * nb_flows) + flow;
+}
+
+static inline bool
+atomic_spinlock_trylock(uint32_t queue, uint32_t flow, uint32_t nb_flows)
+{
+	return rte_spinlock_trylock(&atomic_locks[get_lock_idx(queue, flow, nb_flows)]);
+}
+
+static inline void
+atomic_spinlock_unlock(uint32_t queue, uint32_t flow, uint32_t nb_flows)
+{
+	rte_spinlock_unlock(&atomic_locks[get_lock_idx(queue, flow, nb_flows)]);
+}
+
+static inline bool
+test_done(struct test_order *const t)
+{
+	return t->err || t->result == EVT_TEST_SUCCESS;
+}
+
+static inline int
+atomic_producer(void *arg)
+{
+	struct prod_data *p = arg;
+	struct test_order *t = p->t;
+	struct evt_options *opt = t->opt;
+	const uint8_t dev_id = p->dev_id;
+	const uint8_t port = p->port_id;
+	const uint64_t nb_pkts = t->nb_pkts;
+	uint32_t *producer_flow_seq = t->producer_flow_seq;
+	const uint32_t nb_flows = t->nb_flows;
+	uint64_t count = 0;
+	struct rte_event ev;
+
+	if (opt->verbose_level > 1)
+		printf("%s(): lcore %d dev_id %d port=%d queue=%d\n",
+			__func__, rte_lcore_id(), dev_id, port, p->queue_id);
+
+	ev = (struct rte_event) {
+		.event = 0,
+		.op = RTE_EVENT_OP_NEW,
+		.queue_id = p->queue_id,
+		.sched_type = RTE_SCHED_TYPE_ATOMIC,
+		.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+		.event_type = RTE_EVENT_TYPE_CPU
+	};
+
+	while (count < nb_pkts && t->err == false) {
+		const flow_id_t flow = rte_rand_max(nb_flows);
+
+		/* Maintain seq number per flow */
+		ev.u64 = event_data_create(flow, producer_flow_seq[flow]++);
+		ev.flow_id = flow;
+
+		while (rte_event_enqueue_burst(dev_id, port, &ev, 1) != 1) {
+			if (t->err)
+				break;
+			rte_pause();
+		}
+
+		count++;
+	}
+
+	if (!evt_is_maintenance_free(dev_id)) {
+		while (!test_done(t)) {
+			rte_event_maintain(dev_id, port, RTE_EVENT_DEV_MAINT_OP_FLUSH);
+			rte_pause();
+		}
+	}
+
+	return 0;
+}
+
+static inline void
+atomic_lock_verify(struct test_order *const t, uint32_t flow, uint32_t nb_flows, uint32_t port,
+		uint32_t queue_id)
+{
+	if (!atomic_spinlock_trylock(queue_id, flow, nb_flows)) {
+		evt_err("q=%u, flow=%x atomicity error: port %u tried to take locked spinlock",
+				queue_id, flow, port);
+		t->err = true;
+	}
+}
+
+static inline void
+atomic_process_stage_0(struct test_order *const t, struct rte_event *const ev, uint32_t nb_flows,
+		uint32_t port)
+{
+	const uint32_t flow = event_data_get_flow(ev);
+
+	atomic_lock_verify(t, flow, nb_flows, port, 0);
+
+	ev->queue_id = 1;
+	ev->op = RTE_EVENT_OP_FORWARD;
+	ev->sched_type = RTE_SCHED_TYPE_ATOMIC;
+	ev->event_type = RTE_EVENT_TYPE_CPU;
+
+	atomic_spinlock_unlock(0, flow, nb_flows);
+}
+
+static inline void
+atomic_process_stage_1(struct test_order *const t, struct rte_event *const ev, uint32_t nb_flows,
+		uint32_t *const expected_flow_seq, RTE_ATOMIC(uint64_t) *const outstand_pkts,
+		uint32_t port)
+{
+	const uint32_t flow = event_data_get_flow(ev);
+
+	atomic_lock_verify(t, flow, nb_flows, port, 1);
+
+	/* compare the seqn against expected value */
+	if (event_data_get_seq(ev) != expected_flow_seq[flow]) {
+		evt_err("flow=%x seqn mismatch got=%lx expected=%x",
+				flow, ev->u64, expected_flow_seq[flow]);
+		t->err = true;
+	}
+
+	expected_flow_seq[flow]++;
+	rte_atomic_fetch_sub_explicit(outstand_pkts, 1, rte_memory_order_relaxed);
+
+	ev->op = RTE_EVENT_OP_RELEASE;
+
+	atomic_spinlock_unlock(1, flow, nb_flows);
+}
+
+static int
+atomic_queue_worker_burst(void *arg, bool flow_id_cap, uint32_t max_burst)
+{
+	ORDER_WORKER_INIT;
+	struct rte_event ev[BURST_SIZE];
+	uint16_t i;
+
+	while (t->err == false) {
+
+		uint16_t const nb_rx = rte_event_dequeue_burst(dev_id, port, ev, max_burst, 0);
+
+		if (nb_rx == 0) {
+			if (rte_atomic_load_explicit(outstand_pkts, rte_memory_order_relaxed) <=
+					0) {
+				break;
+			}
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			if (!flow_id_cap) {
+				ev[i].flow_id = event_data_get_flow(&ev[i]);
+			}
+
+			switch (ev[i].queue_id) {
+			case 0:
+				atomic_process_stage_0(t, &ev[i], nb_flows, port);
+				break;
+			case 1:
+				atomic_process_stage_1(t, &ev[i], nb_flows, expected_flow_seq,
+						outstand_pkts, port);
+				break;
+			default:
+				order_process_stage_invalid(t, &ev[i]);
+				break;
+			}
+		}
+
+		uint16_t total_enq = 0;
+
+		do {
+			total_enq += rte_event_enqueue_burst(
+					dev_id, port, ev + total_enq, nb_rx - total_enq);
+		} while (total_enq < nb_rx);
+	}
+
+	return 0;
+}
+
+static int
+worker_wrapper(void *arg)
+{
+	struct worker_data *w = arg;
+	int max_burst = evt_has_burst_mode(w->dev_id) ? BURST_SIZE : 1;
+	const bool flow_id_cap = evt_has_flow_id(w->dev_id);
+
+	return atomic_queue_worker_burst(arg, flow_id_cap, max_burst);
+}
+
+static int
+atomic_queue_launch_lcores(struct evt_test *test, struct evt_options *opt)
+{
+	int ret, lcore_id;
+	struct test_order *t = evt_test_priv(test);
+
+	/* launch workers */
+
+	int wkr_idx = 0;
+	RTE_LCORE_FOREACH_WORKER(lcore_id) {
+		if (!(opt->wlcores[lcore_id]))
+			continue;
+
+		ret = rte_eal_remote_launch(worker_wrapper, &t->worker[wkr_idx], lcore_id);
+		if (ret) {
+			evt_err("failed to launch worker %d", lcore_id);
+			return ret;
+		}
+		wkr_idx++;
+	}
+
+	/* launch producer */
+	int plcore = evt_get_first_active_lcore(opt->plcores);
+
+	ret = rte_eal_remote_launch(atomic_producer, &t->prod, plcore);
+	if (ret) {
+		evt_err("failed to launch order_producer %d", plcore);
+		return ret;
+	}
+
+	uint64_t prev_time = rte_get_timer_cycles();
+	int64_t prev_outstanding_pkts = -1;
+
+	while (t->err == false) {
+		uint64_t current_time = rte_get_timer_cycles();
+		int64_t outstanding_pkts = rte_atomic_load_explicit(
+				&t->outstand_pkts, rte_memory_order_relaxed);
+
+		if (outstanding_pkts <= 0) {
+			t->result = EVT_TEST_SUCCESS;
+			break;
+		}
+
+		if (current_time - prev_time > rte_get_timer_hz() * IDLE_TIMEOUT) {
+			printf(CLGRN "\r%" PRId64 "" CLNRM, outstanding_pkts);
+			fflush(stdout);
+			if (prev_outstanding_pkts == outstanding_pkts) {
+				rte_event_dev_dump(opt->dev_id, stdout);
+				evt_err("No schedules for seconds, deadlock");
+				t->err = true;
+				break;
+			}
+			prev_outstanding_pkts = outstanding_pkts;
+			prev_time = current_time;
+		}
+	}
+	printf("\r");
+
+	rte_free(atomic_locks);
+
+	return 0;
+}
+
+static int
+atomic_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
+{
+	int ret;
+
+	const uint8_t nb_workers = evt_nr_active_lcores(opt->wlcores);
+	/* number of active worker cores + 1 producer */
+	const uint8_t nb_ports = nb_workers + 1;
+
+	ret = evt_configure_eventdev(opt, NB_QUEUES, nb_ports);
+	if (ret) {
+		evt_err("failed to configure eventdev %d", opt->dev_id);
+		return ret;
+	}
+
+	/* q0 configuration */
+	struct rte_event_queue_conf q0_atomic_conf = {
+			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+			.schedule_type = RTE_SCHED_TYPE_ATOMIC,
+			.nb_atomic_flows = opt->nb_flows,
+			.nb_atomic_order_sequences = opt->nb_flows,
+	};
+	ret = rte_event_queue_setup(opt->dev_id, 0, &q0_atomic_conf);
+	if (ret) {
+		evt_err("failed to setup queue0 eventdev %d err %d", opt->dev_id, ret);
+		return ret;
+	}
+
+	/* q1 configuration */
+	struct rte_event_queue_conf q1_atomic_conf = {
+			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+			.schedule_type = RTE_SCHED_TYPE_ATOMIC,
+			.nb_atomic_flows = opt->nb_flows,
+			.nb_atomic_order_sequences = opt->nb_flows,
+	};
+	ret = rte_event_queue_setup(opt->dev_id, 1, &q1_atomic_conf);
+	if (ret) {
+		evt_err("failed to setup queue0 eventdev %d err %d", opt->dev_id, ret);
+		return ret;
+	}
+
+	/* setup one port per worker, linking to all queues */
+	ret = order_event_dev_port_setup(test, opt, nb_workers, NB_QUEUES);
+	if (ret)
+		return ret;
+
+	if (!evt_has_distributed_sched(opt->dev_id)) {
+		uint32_t service_id;
+		rte_event_dev_service_id_get(opt->dev_id, &service_id);
+		ret = evt_service_setup(service_id);
+		if (ret) {
+			evt_err("No service lcore found to run event dev.");
+			return ret;
+		}
+	}
+
+	ret = rte_event_dev_start(opt->dev_id);
+	if (ret) {
+		evt_err("failed to start eventdev %d", opt->dev_id);
+		return ret;
+	}
+
+	const uint32_t num_locks = NB_QUEUES * opt->nb_flows;
+
+	atomic_locks = rte_calloc(NULL, num_locks, sizeof(rte_spinlock_t), 0);
+
+	for (uint32_t i = 0; i < num_locks; i++) {
+		rte_spinlock_init(&atomic_locks[i]);
+	}
+
+	return 0;
+}
+
+static void
+atomic_queue_opt_dump(struct evt_options *opt)
+{
+	order_opt_dump(opt);
+	evt_dump("nb_evdev_queues", "%d", NB_QUEUES);
+}
+
+static bool
+atomic_queue_capability_check(struct evt_options *opt)
+{
+	struct rte_event_dev_info dev_info;
+
+	rte_event_dev_info_get(opt->dev_id, &dev_info);
+	if (dev_info.max_event_queues < NB_QUEUES ||
+			dev_info.max_event_ports < order_nb_event_ports(opt)) {
+		evt_err("not enough eventdev queues=%d/%d or ports=%d/%d", NB_QUEUES,
+				dev_info.max_event_queues, order_nb_event_ports(opt),
+				dev_info.max_event_ports);
+		return false;
+	}
+
+	return true;
+}
+
+static const struct evt_test_ops atomic_queue = {
+		.cap_check = atomic_queue_capability_check,
+		.opt_check = order_opt_check,
+		.opt_dump = atomic_queue_opt_dump,
+		.test_setup = order_test_setup,
+		.mempool_setup = order_mempool_setup,
+		.eventdev_setup = atomic_queue_eventdev_setup,
+		.launch_lcores = atomic_queue_launch_lcores,
+		.eventdev_destroy = order_eventdev_destroy,
+		.mempool_destroy = order_mempool_destroy,
+		.test_result = order_test_result,
+		.test_destroy = order_test_destroy,
+};
+
+EVT_TEST_REGISTER(atomic_queue);
--
2.34.1


^ permalink raw reply	[flat|nested] 6+ messages in thread

* [RFC v4 1/1] eventdev: add atomic queue to test-eventdev app
  2024-12-19 14:48 [RFC v2 1/1] eventdev: add atomic queue to test-eventdev app Luka Jankovic
  2024-12-23 11:16 ` Mattias Rönnblom
  2025-01-13  9:04 ` [RFC v3 " ejnulak
@ 2025-01-13 12:17 ` Luka Jankovic
  2025-01-13 12:27   ` [EXTERNAL] " Jerin Jacob
  2 siblings, 1 reply; 6+ messages in thread
From: Luka Jankovic @ 2025-01-13 12:17 UTC (permalink / raw)
  To: luka.jankovic; +Cc: dev, mattias.ronnblom

Add an atomic queue test based on the order queue test that exclusively uses atomic queues.
This makes it compatible with event devices such as the distributed software eventdev.

The test detects if port maintenance is required.

To verify atomicity, a spinlock is set up for each combination of queue and flow.
It is taken whenever an event is dequeued for processing and released when processing is finished.
The test will fail if a port attempts to take a lock which is already taken.

Signed-off-by: Luka Jankovic <luka.jankovic@ericsson.com>
---
v4:
 * Fix code style issues.
 * Remove unused imports.
v3:
 * Use struct to avoid bit operations when accessing event u64.
 * Changed __rte_always_inline to inline for processing stages.
 * Introduce idle timeout constant.
 * Formatting and cleanup.

v2:
 * Changed to only check queue, flow combination, not port, queue, flow.
 * Lock is only held when a packet is processed.
 * Utilize event u64 instead of mbuf.
 * General cleanup.
---
 app/test-eventdev/evt_common.h        |   9 +
 app/test-eventdev/meson.build         |   1 +
 app/test-eventdev/test_atomic_queue.c | 412 ++++++++++++++++++++++++++
 3 files changed, 422 insertions(+)
 create mode 100644 app/test-eventdev/test_atomic_queue.c

diff --git a/app/test-eventdev/evt_common.h b/app/test-eventdev/evt_common.h
index 63b782f11a..74f9d187f3 100644
--- a/app/test-eventdev/evt_common.h
+++ b/app/test-eventdev/evt_common.h
@@ -138,6 +138,15 @@ evt_has_flow_id(uint8_t dev_id)
 			true : false;
 }

+static inline bool
+evt_is_maintenance_free(uint8_t dev_id)
+{
+	struct rte_event_dev_info dev_info;
+
+	rte_event_dev_info_get(dev_id, &dev_info);
+	return dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_MAINTENANCE_FREE;
+}
+
 static inline int
 evt_service_setup(uint32_t service_id)
 {
diff --git a/app/test-eventdev/meson.build b/app/test-eventdev/meson.build
index ab8769c755..db5add39eb 100644
--- a/app/test-eventdev/meson.build
+++ b/app/test-eventdev/meson.build
@@ -15,6 +15,7 @@ sources = files(
         'test_order_atq.c',
         'test_order_common.c',
         'test_order_queue.c',
+        'test_atomic_queue.c',
         'test_perf_atq.c',
         'test_perf_common.c',
         'test_perf_queue.c',
diff --git a/app/test-eventdev/test_atomic_queue.c b/app/test-eventdev/test_atomic_queue.c
new file mode 100644
index 0000000000..4059a28a43
--- /dev/null
+++ b/app/test-eventdev/test_atomic_queue.c
@@ -0,0 +1,412 @@
+#include <stdio.h>
+#include <unistd.h>
+
+#include "test_order_common.h"
+
+#define IDLE_TIMEOUT 1
+#define NB_QUEUES 2
+
+static rte_spinlock_t *atomic_locks;
+
+struct event_data {
+	union {
+		struct {
+			uint32_t flow;
+			uint32_t seq;
+		};
+		uint64_t raw;
+	};
+};
+
+static inline uint64_t
+event_data_create(flow_id_t flow, uint32_t seq)
+{
+	struct event_data data = {.flow = flow, .seq = seq};
+	return data.raw;
+}
+
+static inline uint32_t
+event_data_get_seq(struct rte_event *const ev)
+{
+	struct event_data data = {.raw = ev->u64};
+	return data.seq;
+}
+
+static inline uint32_t
+event_data_get_flow(struct rte_event *const ev)
+{
+	struct event_data data = {.raw = ev->u64};
+	return data.flow;
+}
+
+static inline uint32_t
+get_lock_idx(int queue, flow_id_t flow, uint32_t nb_flows)
+{
+	return (queue * nb_flows) + flow;
+}
+
+static inline bool
+atomic_spinlock_trylock(uint32_t queue, uint32_t flow, uint32_t nb_flows)
+{
+	return rte_spinlock_trylock(&atomic_locks[get_lock_idx(queue, flow, nb_flows)]);
+}
+
+static inline void
+atomic_spinlock_unlock(uint32_t queue, uint32_t flow, uint32_t nb_flows)
+{
+	rte_spinlock_unlock(&atomic_locks[get_lock_idx(queue, flow, nb_flows)]);
+}
+
+static inline bool
+test_done(struct test_order *const t)
+{
+	return t->err || t->result == EVT_TEST_SUCCESS;
+}
+
+static inline int
+atomic_producer(void *arg)
+{
+	struct prod_data *p = arg;
+	struct test_order *t = p->t;
+	struct evt_options *opt = t->opt;
+	const uint8_t dev_id = p->dev_id;
+	const uint8_t port = p->port_id;
+	const uint64_t nb_pkts = t->nb_pkts;
+	uint32_t *producer_flow_seq = t->producer_flow_seq;
+	const uint32_t nb_flows = t->nb_flows;
+	uint64_t count = 0;
+	struct rte_event ev;
+
+	if (opt->verbose_level > 1)
+		printf("%s(): lcore %d dev_id %d port=%d queue=%d\n",
+			__func__, rte_lcore_id(), dev_id, port, p->queue_id);
+
+	ev = (struct rte_event) {
+		.op = RTE_EVENT_OP_NEW,
+		.queue_id = p->queue_id,
+		.sched_type = RTE_SCHED_TYPE_ATOMIC,
+		.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+		.event_type = RTE_EVENT_TYPE_CPU
+	};
+
+	while (count < nb_pkts && t->err == false) {
+		const flow_id_t flow = rte_rand_max(nb_flows);
+
+		/* Maintain seq number per flow */
+		ev.u64 = event_data_create(flow, producer_flow_seq[flow]++);
+		ev.flow_id = flow;
+
+		while (rte_event_enqueue_burst(dev_id, port, &ev, 1) != 1) {
+			if (t->err)
+				break;
+			rte_pause();
+		}
+
+		count++;
+	}
+
+	if (!evt_is_maintenance_free(dev_id)) {
+		while (!test_done(t)) {
+			rte_event_maintain(dev_id, port, RTE_EVENT_DEV_MAINT_OP_FLUSH);
+			rte_pause();
+		}
+	}
+
+	return 0;
+}
+
+static inline void
+atomic_lock_verify(struct test_order *const t,
+		uint32_t flow,
+		uint32_t nb_flows,
+		uint32_t port,
+		uint32_t queue_id)
+{
+	if (!atomic_spinlock_trylock(queue_id, flow, nb_flows)) {
+		evt_err("q=%u, flow=%x atomicity error: port %u tried to take locked spinlock",
+			queue_id, flow, port);
+		t->err = true;
+	}
+}
+
+static inline void
+atomic_process_stage_0(struct test_order *const t,
+		struct rte_event *const ev,
+		uint32_t nb_flows,
+		uint32_t port)
+{
+	const uint32_t flow = event_data_get_flow(ev);
+
+	atomic_lock_verify(t, flow, nb_flows, port, 0);
+
+	ev->queue_id = 1;
+	ev->op = RTE_EVENT_OP_FORWARD;
+	ev->sched_type = RTE_SCHED_TYPE_ATOMIC;
+	ev->event_type = RTE_EVENT_TYPE_CPU;
+
+	atomic_spinlock_unlock(0, flow, nb_flows);
+}
+
+static inline void
+atomic_process_stage_1(struct test_order *const t,
+		struct rte_event *const ev,
+		uint32_t nb_flows,
+		uint32_t *const expected_flow_seq,
+		RTE_ATOMIC(uint64_t) *const outstand_pkts,
+		uint32_t port)
+{
+	const uint32_t flow = event_data_get_flow(ev);
+
+	atomic_lock_verify(t, flow, nb_flows, port, 1);
+
+	/* compare the seqn against expected value */
+	uint32_t seq = event_data_get_seq(ev);
+	if (seq != expected_flow_seq[flow]) {
+		evt_err("flow=%x seqn mismatch got=%x expected=%x",
+			flow, seq, expected_flow_seq[flow]);
+		t->err = true;
+	}
+
+	expected_flow_seq[flow]++;
+	rte_atomic_fetch_sub_explicit(outstand_pkts, 1, rte_memory_order_relaxed);
+
+	ev->op = RTE_EVENT_OP_RELEASE;
+
+	atomic_spinlock_unlock(1, flow, nb_flows);
+}
+
+static int
+atomic_queue_worker_burst(void *arg, bool flow_id_cap, uint32_t max_burst)
+{
+	ORDER_WORKER_INIT;
+	struct rte_event ev[BURST_SIZE];
+	uint16_t i;
+
+	while (t->err == false) {
+
+		uint16_t const nb_rx = rte_event_dequeue_burst(dev_id, port, ev, max_burst, 0);
+
+		if (nb_rx == 0) {
+			if (rte_atomic_load_explicit(outstand_pkts, rte_memory_order_relaxed) <=
+					0) {
+				break;
+			}
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			if (!flow_id_cap) {
+				ev[i].flow_id = event_data_get_flow(&ev[i]);
+			}
+
+			switch (ev[i].queue_id) {
+			case 0:
+				atomic_process_stage_0(t, &ev[i], nb_flows, port);
+				break;
+			case 1:
+				atomic_process_stage_1(t, &ev[i], nb_flows, expected_flow_seq,
+						outstand_pkts, port);
+				break;
+			default:
+				order_process_stage_invalid(t, &ev[i]);
+				break;
+			}
+		}
+
+		uint16_t total_enq = 0;
+
+		do {
+			total_enq += rte_event_enqueue_burst(
+					dev_id, port, ev + total_enq, nb_rx - total_enq);
+		} while (total_enq < nb_rx);
+	}
+
+	return 0;
+}
+
+static int
+worker_wrapper(void *arg)
+{
+	struct worker_data *w = arg;
+	int max_burst = evt_has_burst_mode(w->dev_id) ? BURST_SIZE : 1;
+	const bool flow_id_cap = evt_has_flow_id(w->dev_id);
+
+	return atomic_queue_worker_burst(arg, flow_id_cap, max_burst);
+}
+
+static int
+atomic_queue_launch_lcores(struct evt_test *test, struct evt_options *opt)
+{
+	int ret, lcore_id;
+	struct test_order *t = evt_test_priv(test);
+
+	/* launch workers */
+
+	int wkr_idx = 0;
+	RTE_LCORE_FOREACH_WORKER(lcore_id) {
+		if (!(opt->wlcores[lcore_id]))
+			continue;
+
+		ret = rte_eal_remote_launch(worker_wrapper, &t->worker[wkr_idx], lcore_id);
+		if (ret) {
+			evt_err("failed to launch worker %d", lcore_id);
+			return ret;
+		}
+		wkr_idx++;
+	}
+
+	/* launch producer */
+	int plcore = evt_get_first_active_lcore(opt->plcores);
+
+	ret = rte_eal_remote_launch(atomic_producer, &t->prod, plcore);
+	if (ret) {
+		evt_err("failed to launch order_producer %d", plcore);
+		return ret;
+	}
+
+	uint64_t prev_time = rte_get_timer_cycles();
+	int64_t prev_outstanding_pkts = -1;
+
+	while (t->err == false) {
+		uint64_t current_time = rte_get_timer_cycles();
+		int64_t outstanding_pkts = rte_atomic_load_explicit(
+				&t->outstand_pkts, rte_memory_order_relaxed);
+
+		if (outstanding_pkts <= 0) {
+			t->result = EVT_TEST_SUCCESS;
+			break;
+		}
+
+		if (current_time - prev_time > rte_get_timer_hz() * IDLE_TIMEOUT) {
+			printf(CLGRN "\r%" PRId64 "" CLNRM, outstanding_pkts);
+			fflush(stdout);
+			if (prev_outstanding_pkts == outstanding_pkts) {
+				rte_event_dev_dump(opt->dev_id, stdout);
+				evt_err("No schedules for seconds, deadlock");
+				t->err = true;
+				break;
+			}
+			prev_outstanding_pkts = outstanding_pkts;
+			prev_time = current_time;
+		}
+	}
+	printf("\r");
+
+	rte_free(atomic_locks);
+
+	return 0;
+}
+
+static int
+atomic_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
+{
+	int ret;
+
+	const uint8_t nb_workers = evt_nr_active_lcores(opt->wlcores);
+	/* number of active worker cores + 1 producer */
+	const uint8_t nb_ports = nb_workers + 1;
+
+	ret = evt_configure_eventdev(opt, NB_QUEUES, nb_ports);
+	if (ret) {
+		evt_err("failed to configure eventdev %d", opt->dev_id);
+		return ret;
+	}
+
+	/* q0 configuration */
+	struct rte_event_queue_conf q0_atomic_conf = {
+			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+			.schedule_type = RTE_SCHED_TYPE_ATOMIC,
+			.nb_atomic_flows = opt->nb_flows,
+			.nb_atomic_order_sequences = opt->nb_flows,
+	};
+	ret = rte_event_queue_setup(opt->dev_id, 0, &q0_atomic_conf);
+	if (ret) {
+		evt_err("failed to setup queue0 eventdev %d err %d", opt->dev_id, ret);
+		return ret;
+	}
+
+	/* q1 configuration */
+	struct rte_event_queue_conf q1_atomic_conf = {
+			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+			.schedule_type = RTE_SCHED_TYPE_ATOMIC,
+			.nb_atomic_flows = opt->nb_flows,
+			.nb_atomic_order_sequences = opt->nb_flows,
+	};
+	ret = rte_event_queue_setup(opt->dev_id, 1, &q1_atomic_conf);
+	if (ret) {
+		evt_err("failed to setup queue0 eventdev %d err %d", opt->dev_id, ret);
+		return ret;
+	}
+
+	/* setup one port per worker, linking to all queues */
+	ret = order_event_dev_port_setup(test, opt, nb_workers, NB_QUEUES);
+	if (ret)
+		return ret;
+
+	if (!evt_has_distributed_sched(opt->dev_id)) {
+		uint32_t service_id;
+		rte_event_dev_service_id_get(opt->dev_id, &service_id);
+		ret = evt_service_setup(service_id);
+		if (ret) {
+			evt_err("No service lcore found to run event dev.");
+			return ret;
+		}
+	}
+
+	ret = rte_event_dev_start(opt->dev_id);
+	if (ret) {
+		evt_err("failed to start eventdev %d", opt->dev_id);
+		return ret;
+	}
+
+	const uint32_t num_locks = NB_QUEUES * opt->nb_flows;
+
+	atomic_locks = rte_calloc(NULL, num_locks, sizeof(rte_spinlock_t), 0);
+
+	for (uint32_t i = 0; i < num_locks; i++) {
+		rte_spinlock_init(&atomic_locks[i]);
+	}
+
+	return 0;
+}
+
+static void
+atomic_queue_opt_dump(struct evt_options *opt)
+{
+	order_opt_dump(opt);
+	evt_dump("nb_evdev_queues", "%d", NB_QUEUES);
+}
+
+static bool
+atomic_queue_capability_check(struct evt_options *opt)
+{
+	struct rte_event_dev_info dev_info;
+
+	rte_event_dev_info_get(opt->dev_id, &dev_info);
+	if (dev_info.max_event_queues < NB_QUEUES ||
+			dev_info.max_event_ports < order_nb_event_ports(opt)) {
+		evt_err("not enough eventdev queues=%d/%d or ports=%d/%d", NB_QUEUES,
+				dev_info.max_event_queues, order_nb_event_ports(opt),
+				dev_info.max_event_ports);
+		return false;
+	}
+
+	return true;
+}
+
+static const struct evt_test_ops atomic_queue = {
+	.cap_check = atomic_queue_capability_check,
+	.opt_check = order_opt_check,
+	.opt_dump = atomic_queue_opt_dump,
+	.test_setup = order_test_setup,
+	.mempool_setup = order_mempool_setup,
+	.eventdev_setup = atomic_queue_eventdev_setup,
+	.launch_lcores = atomic_queue_launch_lcores,
+	.eventdev_destroy = order_eventdev_destroy,
+	.mempool_destroy = order_mempool_destroy,
+	.test_result = order_test_result,
+	.test_destroy = order_test_destroy,
+};
+
+EVT_TEST_REGISTER(atomic_queue);
--
2.34.1


^ permalink raw reply	[flat|nested] 6+ messages in thread

* RE: [EXTERNAL] [RFC v4 1/1] eventdev: add atomic queue to test-eventdev app
  2025-01-13 12:17 ` [RFC v4 " Luka Jankovic
@ 2025-01-13 12:27   ` Jerin Jacob
  0 siblings, 0 replies; 6+ messages in thread
From: Jerin Jacob @ 2025-01-13 12:27 UTC (permalink / raw)
  To: Luka Jankovic; +Cc: dev, mattias.ronnblom



> -----Original Message-----
> From: Luka Jankovic <luka.jankovic@ericsson.com>
> Sent: Monday, January 13, 2025 5:48 PM
> To: luka.jankovic@ericsson.com
> Cc: dev@dpdk.org; mattias.ronnblom@ericsson.com
> Subject: [EXTERNAL] [RFC v4 1/1] eventdev: add atomic queue to test-eventdev
> app
> 
> Add an atomic queue test based on the order queue test that exclusively uses
> atomic queues. This makes it compatible with event devices such as the
> distributed software eventdev. The test detects if port maintenance is required.
> To verify atomicity, 
> Add an atomic queue test based on the order queue test that exclusively uses
> atomic queues.
> This makes it compatible with event devices such as the distributed software
> eventdev.
> 
> The test detects if port maintenance is required.
> 
> To verify atomicity, a spinlock is set up for each combination of queue and flow.
> It is taken whenever an event is dequeued for processing and released when
> processing is finished.
> The test will fail if a port attempts to take a lock which is already taken.
> 
> Signed-off-by: Luka Jankovic <luka.jankovic@ericsson.com>
> ---
> v4:
>  * Fix code style issues.
>  * Remove unused imports.
> v3:
>  * Use struct to avoid bit operations when accessing event u64.
>  * Changed __rte_always_inline to inline for processing stages.
>  * Introduce idle timeout constant.
>  * Formatting and cleanup.
> 
> v2:
>  * Changed to only check queue, flow combination, not port, queue, flow.
>  * Lock is only held when a packet is processed.
>  * Utilize event u64 instead of mbuf.
>  * General cleanup.
> ---
>  app/test-eventdev/evt_common.h        |   9 +
>  app/test-eventdev/meson.build         |   1 +
>  app/test-eventdev/test_atomic_queue.c | 412

Please update doc/guides/tools/testeventdev.rst

^ permalink raw reply	[flat|nested] 6+ messages in thread

end of thread, other threads:[~2025-01-13 12:27 UTC | newest]

Thread overview: 6+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2024-12-19 14:48 [RFC v2 1/1] eventdev: add atomic queue to test-eventdev app Luka Jankovic
2024-12-23 11:16 ` Mattias Rönnblom
2025-01-09 10:22   ` Luka Jankovic
2025-01-13  9:04 ` [RFC v3 " ejnulak
2025-01-13 12:17 ` [RFC v4 " Luka Jankovic
2025-01-13 12:27   ` [EXTERNAL] " Jerin Jacob

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).