DPDK patches and discussions
 help / color / mirror / Atom feed
* [RFC v2 1/1] eventdev: add atomic queue to test-eventdev app
@ 2024-12-19 14:48 Luka Jankovic
  2024-12-23 11:16 ` Mattias Rönnblom
  0 siblings, 1 reply; 3+ messages in thread
From: Luka Jankovic @ 2024-12-19 14:48 UTC (permalink / raw)
  To: Luka Jankovic; +Cc: dev, Mattias Rönnblom

From 2e55ecd0e522f50cbb3635f53b025e165db7cf3e Mon Sep 17 00:00:00 2001
In-Reply-To: <228d44a6f2f1f6a4fb5519d9a91c99973f8d7352.camel@ericsson.com>
References: <228d44a6f2f1f6a4fb5519d9a91c99973f8d7352.camel@ericsson.com>
From: Luka Jankovic <luka.jankovic@ericsson.com>
Date: Thu, 19 Dec 2024 13:31:26 +0000
Subject: [RFC v2 1/1] eventdev: add atomic queue to test-eventdev app
To: luka.jankovic@ericsson.com
Cc: dev@dpdk.org,
    mattias.ronnblom@ericsson.com

Add an atomic queue test based on the order queue test but use exclusively atomic queues.
This makes it compatible with event devices such as the distributed software eventdev.

The test detects whether port maintenance is required.

To verify atomicity, a spinlock is set up for each combination of queue and flow.
It is taken whenever an event is dequeued for processing and released when processing is finished.
The test will fail if a port attempts to take a lock which is already taken.

Signed-off-by: Luka Jankovic <luka.jankovic@ericsson.com>
---
v2:
 * Changed to only check queue, flow combination, not port, queue, flow.
 * Lock is only held when a packet is processed.
 * Utilize event u64 instead of mbuf.
 * General cleanup.
---
 app/test-eventdev/evt_common.h        |  10 +
 app/test-eventdev/meson.build         |   1 +
 app/test-eventdev/test_atomic_queue.c | 372 ++++++++++++++++++++++++++
 3 files changed, 383 insertions(+)
 create mode 100644 app/test-eventdev/test_atomic_queue.c

diff --git a/app/test-eventdev/evt_common.h b/app/test-eventdev/evt_common.h
index 901b8ba55d..adb024c011 100644
--- a/app/test-eventdev/evt_common.h
+++ b/app/test-eventdev/evt_common.h
@@ -138,6 +138,16 @@ evt_has_flow_id(uint8_t dev_id)
 			true : false;
 }
 
+static inline bool
+evt_is_maintenance_free(uint8_t dev_id)
+{
+	struct rte_event_dev_info dev_info;
+
+	rte_event_dev_info_get(dev_id, &dev_info);
+	return (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_MAINTENANCE_FREE) ?
+			true : false;
+}
+
 static inline int
 evt_service_setup(uint32_t service_id)
 {
diff --git a/app/test-eventdev/meson.build b/app/test-eventdev/meson.build
index ab8769c755..db5add39eb 100644
--- a/app/test-eventdev/meson.build
+++ b/app/test-eventdev/meson.build
@@ -15,6 +15,7 @@ sources = files(
         'test_order_atq.c',
         'test_order_common.c',
         'test_order_queue.c',
+        'test_atomic_queue.c',
         'test_perf_atq.c',
         'test_perf_common.c',
         'test_perf_queue.c',
diff --git a/app/test-eventdev/test_atomic_queue.c b/app/test-eventdev/test_atomic_queue.c
new file mode 100644
index 0000000000..51e988c527
--- /dev/null
+++ b/app/test-eventdev/test_atomic_queue.c
@@ -0,0 +1,372 @@
+#include <stdio.h>
+#include <unistd.h>
+#include <rte_lcore_var.h>
+
+#include "test_order_common.h"
+
+#define NB_QUEUES 2
+
+static rte_spinlock_t *atomic_locks;
+
+static inline uint64_t
+event_data_create(flow_id_t flow, uint32_t seq)
+{
+	return ((uint64_t)flow << 32) | seq;
+}
+
+static inline uint32_t
+event_data_get_seq(struct rte_event *const ev)
+{
+	return ev->u64 & 0xFFFFFFFF;
+}
+
+static inline uint32_t
+event_data_get_flow(struct rte_event *const ev)
+{
+	return ev->u64 >> 32;
+}
+
+static inline uint32_t
+get_lock_idx(int queue, flow_id_t flow, uint32_t nb_flows)
+{
+	return (queue * nb_flows) + flow;
+}
+
+static inline bool
+test_done(struct test_order *const t)
+{
+	return t->err || t->result == EVT_TEST_SUCCESS;
+}
+
+static inline int
+atomic_producer(void *arg)
+{
+	struct prod_data *p = arg;
+	struct test_order *t = p->t;
+	struct evt_options *opt = t->opt;
+	const uint8_t dev_id = p->dev_id;
+	const uint8_t port = p->port_id;
+	const uint64_t nb_pkts = t->nb_pkts;
+	uint32_t *producer_flow_seq = t->producer_flow_seq;
+	const uint32_t nb_flows = t->nb_flows;
+	uint64_t count = 0;
+	struct rte_event ev;
+
+	if (opt->verbose_level > 1)
+		printf("%s(): lcore %d dev_id %d port=%d queue=%d\n", __func__, rte_lcore_id(),
+				dev_id, port, p->queue_id);
+
+	ev = (struct rte_event){
+			.event = 0,
+			.op = RTE_EVENT_OP_NEW,
+			.queue_id = p->queue_id,
+			.sched_type = RTE_SCHED_TYPE_ORDERED,
+			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+			.event_type = RTE_EVENT_TYPE_CPU,
+			.sub_event_type = 0 /* stage 0 */
+	};
+
+	while (count < nb_pkts && t->err == false) {
+		const flow_id_t flow = rte_rand_max(nb_flows);
+
+		/* Maintain seq number per flow */
+		ev.u64 = event_data_create(flow, producer_flow_seq[flow]++);
+		ev.flow_id = flow;
+
+		while (rte_event_enqueue_burst(dev_id, port, &ev, 1) != 1) {
+			if (t->err)
+				break;
+			rte_pause();
+		}
+
+		count++;
+	}
+
+	if (!evt_is_maintenance_free(dev_id)) {
+		while (!test_done(t)) {
+			rte_event_maintain(dev_id, port, RTE_EVENT_DEV_MAINT_OP_FLUSH);
+			rte_pause();
+		}
+	}
+
+	return 0;
+}
+
+static __rte_always_inline void
+atomic_process_stage_0(struct rte_event *const ev, uint32_t nb_flows, uint32_t port)
+{
+	const uint32_t flow = event_data_get_flow(ev);
+
+	if (!rte_spinlock_trylock(&atomic_locks[get_lock_idx(0, flow, nb_flows)])) {
+		evt_err("Port %u unable to take spinlock for queue %u, flow %x", port, 0, flow);
+	}
+
+	ev->queue_id = 1;
+	ev->op = RTE_EVENT_OP_FORWARD;
+	ev->sched_type = RTE_SCHED_TYPE_ATOMIC;
+	ev->event_type = RTE_EVENT_TYPE_CPU;
+
+	rte_spinlock_unlock(&atomic_locks[get_lock_idx(0, flow, nb_flows)]);
+}
+
+static __rte_always_inline void
+atomic_process_stage_1(struct test_order *const t, struct rte_event *const ev, uint32_t nb_flows,
+		uint32_t *const expected_flow_seq, RTE_ATOMIC(uint64_t) *const outstand_pkts,
+		uint32_t port)
+{
+	const uint32_t flow = event_data_get_flow(ev);
+
+	if (!rte_spinlock_trylock(&atomic_locks[get_lock_idx(1, flow, nb_flows)])) {
+		evt_err("Port %u unable to take spinlock for queue %u, flow %x", port, 0, flow);
+	}
+
+	/* compare the seqn against expected value */
+	if (event_data_get_seq(ev) != expected_flow_seq[flow]) {
+		evt_err("flow=%x seqn mismatch got=%lx expected=%x", flow, ev->u64,
+				expected_flow_seq[flow]);
+		t->err = true;
+	}
+
+	expected_flow_seq[flow]++;
+	rte_atomic_fetch_sub_explicit(outstand_pkts, 1, rte_memory_order_relaxed);
+
+	ev->op = RTE_EVENT_OP_RELEASE;
+
+	rte_spinlock_unlock(&atomic_locks[get_lock_idx(1, flow, nb_flows)]);
+}
+
+static int
+atomic_queue_worker_burst(void *arg, bool flow_id_cap, uint32_t max_burst)
+{
+	ORDER_WORKER_INIT;
+	struct rte_event ev[BURST_SIZE];
+	uint16_t i;
+
+	while (t->err == false) {
+
+		uint16_t const nb_rx = rte_event_dequeue_burst(dev_id, port, ev, max_burst, 0);
+
+		if (nb_rx == 0) {
+			if (rte_atomic_load_explicit(outstand_pkts, rte_memory_order_relaxed) <=
+					0) {
+				break;
+			}
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			if (!flow_id_cap) {
+				ev[i].flow_id = event_data_get_flow(&ev[i]);
+			}
+
+			switch (ev[i].queue_id) {
+			case 0:
+				atomic_process_stage_0(&ev[i], nb_flows, port);
+				break;
+			case 1:
+				atomic_process_stage_1(t, &ev[i], nb_flows, expected_flow_seq,
+						outstand_pkts, port);
+				break;
+			default:
+				order_process_stage_invalid(t, &ev[i]);
+				break;
+			}
+		}
+
+		uint16_t total_enq = 0;
+
+		do {
+			total_enq += rte_event_enqueue_burst(
+					dev_id, port, ev + total_enq, nb_rx - total_enq);
+		} while (total_enq < nb_rx);
+	}
+
+	return 0;
+}
+
+static int
+worker_wrapper(void *arg)
+{
+	struct worker_data *w = arg;
+	int max_burst = evt_has_burst_mode(w->dev_id) ? BURST_SIZE : 1;
+	const bool flow_id_cap = evt_has_flow_id(w->dev_id);
+
+	return atomic_queue_worker_burst(arg, flow_id_cap, max_burst);
+}
+
+static int
+atomic_queue_launch_lcores(struct evt_test *test, struct evt_options *opt)
+{
+	int ret, lcore_id;
+	struct test_order *t = evt_test_priv(test);
+
+	/* launch workers */
+
+	int wkr_idx = 0;
+	RTE_LCORE_FOREACH_WORKER(lcore_id) {
+		if (!(opt->wlcores[lcore_id]))
+			continue;
+
+		ret = rte_eal_remote_launch(worker_wrapper, &t->worker[wkr_idx], lcore_id);
+		if (ret) {
+			evt_err("failed to launch worker %d", lcore_id);
+			return ret;
+		}
+		wkr_idx++;
+	}
+
+	/* launch producer */
+	int plcore = evt_get_first_active_lcore(opt->plcores);
+
+	ret = rte_eal_remote_launch(atomic_producer, &t->prod, plcore);
+	if (ret) {
+		evt_err("failed to launch order_producer %d", plcore);
+		return ret;
+	}
+
+	uint64_t cycles = rte_get_timer_cycles();
+	int64_t old_remaining = -1;
+
+	while (t->err == false) {
+		uint64_t new_cycles = rte_get_timer_cycles();
+		int64_t remaining = rte_atomic_load_explicit(
+				&t->outstand_pkts, rte_memory_order_relaxed);
+
+		if (remaining <= 0) {
+			t->result = EVT_TEST_SUCCESS;
+			break;
+		}
+
+		if (new_cycles - cycles > rte_get_timer_hz() * 1) {
+			printf(CLGRN "\r%" PRId64 "" CLNRM, remaining);
+			fflush(stdout);
+			if (old_remaining == remaining) {
+				rte_event_dev_dump(opt->dev_id, stdout);
+				evt_err("No schedules for seconds, deadlock");
+				t->err = true;
+				break;
+			}
+			old_remaining = remaining;
+			cycles = new_cycles;
+		}
+	}
+	printf("\r");
+
+	rte_free(atomic_locks);
+
+	return 0;
+}
+
+static int
+atomic_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
+{
+	int ret;
+
+	const uint8_t nb_workers = evt_nr_active_lcores(opt->wlcores);
+	/* number of active worker cores + 1 producer */
+	const uint8_t nb_ports = nb_workers + 1;
+
+	ret = evt_configure_eventdev(opt, NB_QUEUES, nb_ports);
+	if (ret) {
+		evt_err("failed to configure eventdev %d", opt->dev_id);
+		return ret;
+	}
+
+	/* q0 configuration */
+	struct rte_event_queue_conf q0_atomic_conf = {
+			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+			.schedule_type = RTE_SCHED_TYPE_ATOMIC,
+			.nb_atomic_flows = opt->nb_flows,
+			.nb_atomic_order_sequences = opt->nb_flows,
+	};
+	ret = rte_event_queue_setup(opt->dev_id, 0, &q0_atomic_conf);
+	if (ret) {
+		evt_err("failed to setup queue0 eventdev %d err %d", opt->dev_id, ret);
+		return ret;
+	}
+
+	/* q1 configuration */
+	struct rte_event_queue_conf q1_atomic_conf = {
+			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+			.schedule_type = RTE_SCHED_TYPE_ATOMIC,
+			.nb_atomic_flows = opt->nb_flows,
+			.nb_atomic_order_sequences = opt->nb_flows,
+	};
+	ret = rte_event_queue_setup(opt->dev_id, 1, &q1_atomic_conf);
+	if (ret) {
+		evt_err("failed to setup queue0 eventdev %d err %d", opt->dev_id, ret);
+		return ret;
+	}
+
+	/* setup one port per worker, linking to all queues */
+	ret = order_event_dev_port_setup(test, opt, nb_workers, NB_QUEUES);
+	if (ret)
+		return ret;
+
+	if (!evt_has_distributed_sched(opt->dev_id)) {
+		uint32_t service_id;
+		rte_event_dev_service_id_get(opt->dev_id, &service_id);
+		ret = evt_service_setup(service_id);
+		if (ret) {
+			evt_err("No service lcore found to run event dev.");
+			return ret;
+		}
+	}
+
+	ret = rte_event_dev_start(opt->dev_id);
+	if (ret) {
+		evt_err("failed to start eventdev %d", opt->dev_id);
+		return ret;
+	}
+
+	const uint32_t num_locks = NB_QUEUES * opt->nb_flows;
+
+	atomic_locks = rte_calloc(NULL, num_locks, sizeof(rte_spinlock_t), 0);
+
+	for (uint32_t i = 0; i < num_locks; i++) {
+		rte_spinlock_init(&atomic_locks[i]);
+	}
+
+	return 0;
+}
+
+static void
+atomic_queue_opt_dump(struct evt_options *opt)
+{
+	order_opt_dump(opt);
+	evt_dump("nb_evdev_queues", "%d", NB_QUEUES);
+}
+
+static bool
+atomic_queue_capability_check(struct evt_options *opt)
+{
+	struct rte_event_dev_info dev_info;
+
+	rte_event_dev_info_get(opt->dev_id, &dev_info);
+	if (dev_info.max_event_queues < NB_QUEUES ||
+			dev_info.max_event_ports < order_nb_event_ports(opt)) {
+		evt_err("not enough eventdev queues=%d/%d or ports=%d/%d", NB_QUEUES,
+				dev_info.max_event_queues, order_nb_event_ports(opt),
+				dev_info.max_event_ports);
+		return false;
+	}
+
+	return true;
+}
+
+static const struct evt_test_ops atomic_queue = {
+		.cap_check = atomic_queue_capability_check,
+		.opt_check = order_opt_check,
+		.opt_dump = atomic_queue_opt_dump,
+		.test_setup = order_test_setup,
+		.mempool_setup = order_mempool_setup,
+		.eventdev_setup = atomic_queue_eventdev_setup,
+		.launch_lcores = atomic_queue_launch_lcores,
+		.eventdev_destroy = order_eventdev_destroy,
+		.mempool_destroy = order_mempool_destroy,
+		.test_result = order_test_result,
+		.test_destroy = order_test_destroy,
+};
+
+EVT_TEST_REGISTER(atomic_queue);
-- 
2.34.1



^ permalink raw reply	[flat|nested] 3+ messages in thread

* Re: [RFC v2 1/1] eventdev: add atomic queue to test-eventdev app
  2024-12-19 14:48 [RFC v2 1/1] eventdev: add atomic queue to test-eventdev app Luka Jankovic
@ 2024-12-23 11:16 ` Mattias Rönnblom
  2025-01-09 10:22   ` Luka Jankovic
  0 siblings, 1 reply; 3+ messages in thread
From: Mattias Rönnblom @ 2024-12-23 11:16 UTC (permalink / raw)
  To: Luka Jankovic; +Cc: dev, Mattias Rönnblom

On 2024-12-19 15:48, Luka Jankovic wrote:
>  From 2e55ecd0e522f50cbb3635f53b025e165db7cf3e Mon Sep 17 00:00:00 2001
> In-Reply-To: <228d44a6f2f1f6a4fb5519d9a91c99973f8d7352.camel@ericsson.com>
> References: <228d44a6f2f1f6a4fb5519d9a91c99973f8d7352.camel@ericsson.com>
> From: Luka Jankovic <luka.jankovic@ericsson.com>
> Date: Thu, 19 Dec 2024 13:31:26 +0000
> Subject: [RFC v2 1/1] eventdev: add atomic queue to test-eventdev app
> To: luka.jankovic@ericsson.com
> Cc: dev@dpdk.org,
>      mattias.ronnblom@ericsson.com
> 
> Add an atomic queue test based on the order queue test but use exclusively atomic queues.
> This makes it compatible with event devices such as the distributed software eventdev.
> 
> The test detects whether port maintenance is required.

"whether /../ or not", or replace "whether" with "if".

> 
> To verify atomicity, a spinlock is set up for each combination of queue and flow.
> It is taken whenever an event is dequeued for processing and released when processing is finished.
> The test will fail if a port attempts to take a lock which is already taken.
> 
> Signed-off-by: Luka Jankovic <luka.jankovic@ericsson.com>
> ---
> v2:
>   * Changed to only check queue, flow combination, not port, queue, flow.
>   * Lock is only held when a packet is processed.
>   * Utilize event u64 instead of mbuf.
>   * General cleanup.
> ---
>   app/test-eventdev/evt_common.h        |  10 +
>   app/test-eventdev/meson.build         |   1 +
>   app/test-eventdev/test_atomic_queue.c | 372 ++++++++++++++++++++++++++
>   3 files changed, 383 insertions(+)
>   create mode 100644 app/test-eventdev/test_atomic_queue.c
> 
> diff --git a/app/test-eventdev/evt_common.h b/app/test-eventdev/evt_common.h
> index 901b8ba55d..adb024c011 100644
> --- a/app/test-eventdev/evt_common.h
> +++ b/app/test-eventdev/evt_common.h
> @@ -138,6 +138,16 @@ evt_has_flow_id(uint8_t dev_id)
>   			true : false;
>   }
>   
> +static inline bool
> +evt_is_maintenance_free(uint8_t dev_id)
> +{
> +	struct rte_event_dev_info dev_info;
> +
> +	rte_event_dev_info_get(dev_id, &dev_info);
> +	return (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_MAINTENANCE_FREE) ?
> +			true : false;

return dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_MAINTENANCE_FREE;

will do the trick.

> +}
> +
>   static inline int
>   evt_service_setup(uint32_t service_id)
>   {
> diff --git a/app/test-eventdev/meson.build b/app/test-eventdev/meson.build
> index ab8769c755..db5add39eb 100644
> --- a/app/test-eventdev/meson.build
> +++ b/app/test-eventdev/meson.build
> @@ -15,6 +15,7 @@ sources = files(
>           'test_order_atq.c',
>           'test_order_common.c',
>           'test_order_queue.c',
> +        'test_atomic_queue.c',
>           'test_perf_atq.c',
>           'test_perf_common.c',
>           'test_perf_queue.c',
> diff --git a/app/test-eventdev/test_atomic_queue.c b/app/test-eventdev/test_atomic_queue.c
> new file mode 100644
> index 0000000000..51e988c527
> --- /dev/null
> +++ b/app/test-eventdev/test_atomic_queue.c
> @@ -0,0 +1,372 @@
> +#include <stdio.h>
> +#include <unistd.h>
> +#include <rte_lcore_var.h>
> +
> +#include "test_order_common.h"
> +
> +#define NB_QUEUES 2
> +
> +static rte_spinlock_t *atomic_locks;
> +
> +static inline uint64_t
> +event_data_create(flow_id_t flow, uint32_t seq)
> +{
> +	return ((uint64_t)flow << 32) | seq;
> +}
> +

An alternative would be to use a struct to pack the two 32-bit words 
into a single 64-bit word.

struct event_data
{
	union {
		struct {
			uint32_t flow;
			uint32_t seq;
		};
		uint64_t raw;
	};
};

static inline uint64_t
event_data_create(flow_id_t flow, uint32_t seq)
{
	struct event_data d = {
		.flow = flow,
		.seq = seq
	};
	return d.raw;
}

static inline uint32_t
event_data_get_seq(struct rte_event *const ev)
{
	struct event_data d = {
		.raw = ev->u64;
	};
	return d.seq;
}

...or something like that.

Would get rid of the masking and shifting, and some semi-magical numbers.

> +static inline uint32_t
> +event_data_get_seq(struct rte_event *const ev)
> +{
> +	return ev->u64 & 0xFFFFFFFF;

Does the masking have any effect here? Other than having the reviewer 
counting the number of Fs. :)

> +}
> +
> +static inline uint32_t
> +event_data_get_flow(struct rte_event *const ev)
> +{
> +	return ev->u64 >> 32;
> +}
> +
> +static inline uint32_t
> +get_lock_idx(int queue, flow_id_t flow, uint32_t nb_flows)
> +{
> +	return (queue * nb_flows) + flow;
> +}
> +
> +static inline bool
> +test_done(struct test_order *const t)
> +{
> +	return t->err || t->result == EVT_TEST_SUCCESS;
> +}
> +
> +static inline int
> +atomic_producer(void *arg)
> +{
> +	struct prod_data *p = arg;
> +	struct test_order *t = p->t;
> +	struct evt_options *opt = t->opt;
> +	const uint8_t dev_id = p->dev_id;
> +	const uint8_t port = p->port_id;
> +	const uint64_t nb_pkts = t->nb_pkts;
> +	uint32_t *producer_flow_seq = t->producer_flow_seq;
> +	const uint32_t nb_flows = t->nb_flows;
> +	uint64_t count = 0;
> +	struct rte_event ev;
> +
> +	if (opt->verbose_level > 1)
> +		printf("%s(): lcore %d dev_id %d port=%d queue=%d\n", __func__, rte_lcore_id(),
> +				dev_id, port, p->queue_id);
> +
> +	ev = (struct rte_event){
> +			.event = 0,
> +			.op = RTE_EVENT_OP_NEW,
> +			.queue_id = p->queue_id,
> +			.sched_type = RTE_SCHED_TYPE_ORDERED,

It seems like an atomic producer should produce events with sched_type 
to RTE_SCHED_TYPE_ATOMIC.

> +			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
> +			.event_type = RTE_EVENT_TYPE_CPU,
> +			.sub_event_type = 0 /* stage 0 */

Is the sub_event_type used? Can't find any reference to it. Also, isn't 
queue id used to signify processing stage?

> +	};
> +
> +	while (count < nb_pkts && t->err == false) {
> +		const flow_id_t flow = rte_rand_max(nb_flows);
> +
> +		/* Maintain seq number per flow */
> +		ev.u64 = event_data_create(flow, producer_flow_seq[flow]++);
> +		ev.flow_id = flow;
> +
> +		while (rte_event_enqueue_burst(dev_id, port, &ev, 1) != 1) {
> +			if (t->err)
> +				break;
> +			rte_pause();
> +		}
> +
> +		count++;
> +	}
> +
> +	if (!evt_is_maintenance_free(dev_id)) {
> +		while (!test_done(t)) {
> +			rte_event_maintain(dev_id, port, RTE_EVENT_DEV_MAINT_OP_FLUSH);
> +			rte_pause();
> +		}
> +	}
> +
> +	return 0;
> +}
> +
> +static __rte_always_inline void
> +atomic_process_stage_0(struct rte_event *const ev, uint32_t nb_flows, uint32_t port)
> +{
> +	const uint32_t flow = event_data_get_flow(ev);
> +
> +	if (!rte_spinlock_trylock(&atomic_locks[get_lock_idx(0, flow, nb_flows)])) {
> +		evt_err("Port %u unable to take spinlock for queue %u, flow %x", port, 0, flow);

Maybe you should report to the user what that means? Something making 
clear atomicity is violated.

> +	}
> +
> +	ev->queue_id = 1;
> +	ev->op = RTE_EVENT_OP_FORWARD;
> +	ev->sched_type = RTE_SCHED_TYPE_ATOMIC;
> +	ev->event_type = RTE_EVENT_TYPE_CPU;
> +
> +	rte_spinlock_unlock(&atomic_locks[get_lock_idx(0, flow, nb_flows)]);
> +}
> +
> +static __rte_always_inline void

Why is this __rte_always_inline?

> +atomic_process_stage_1(struct test_order *const t, struct rte_event *const ev, uint32_t nb_flows,
> +		uint32_t *const expected_flow_seq, RTE_ATOMIC(uint64_t) *const outstand_pkts,
> +		uint32_t port)
> +{
> +	const uint32_t flow = event_data_get_flow(ev);
> +
> +	if (!rte_spinlock_trylock(&atomic_locks[get_lock_idx(1, flow, nb_flows)])) {
> +		evt_err("Port %u unable to take spinlock for queue %u, flow %x", port, 0, flow);

Break this out to a verify function, used by both stages.

> +	}
> +
> +	/* compare the seqn against expected value */
> +	if (event_data_get_seq(ev) != expected_flow_seq[flow]) {
> +		evt_err("flow=%x seqn mismatch got=%lx expected=%x", flow, ev->u64,
> +				expected_flow_seq[flow]);
> +		t->err = true;
> +	}
> +
> +	expected_flow_seq[flow]++;
> +	rte_atomic_fetch_sub_explicit(outstand_pkts, 1, rte_memory_order_relaxed);
> +
> +	ev->op = RTE_EVENT_OP_RELEASE;
> +
> +	rte_spinlock_unlock(&atomic_locks[get_lock_idx(1, flow, nb_flows)]);

Factor out "queue-flow" locking/unlocking to two separate functions.

> +}
> +
> +static int
> +atomic_queue_worker_burst(void *arg, bool flow_id_cap, uint32_t max_burst)
> +{
> +	ORDER_WORKER_INIT;
> +	struct rte_event ev[BURST_SIZE];
> +	uint16_t i;
> +
> +	while (t->err == false) {
> +
> +		uint16_t const nb_rx = rte_event_dequeue_burst(dev_id, port, ev, max_burst, 0);
> +
> +		if (nb_rx == 0) {
> +			if (rte_atomic_load_explicit(outstand_pkts, rte_memory_order_relaxed) <=
> +					0) {
> +				break;
> +			}
> +			rte_pause();
> +			continue;
> +		}
> +
> +		for (i = 0; i < nb_rx; i++) {
> +			if (!flow_id_cap) {
> +				ev[i].flow_id = event_data_get_flow(&ev[i]);
> +			}
> +
> +			switch (ev[i].queue_id) {
> +			case 0:
> +				atomic_process_stage_0(&ev[i], nb_flows, port);
> +				break;
> +			case 1:
> +				atomic_process_stage_1(t, &ev[i], nb_flows, expected_flow_seq,
> +						outstand_pkts, port);
> +				break;
> +			default:
> +				order_process_stage_invalid(t, &ev[i]);
> +				break;
> +			}
> +		}
> +
> +		uint16_t total_enq = 0;
> +
> +		do {
> +			total_enq += rte_event_enqueue_burst(
> +					dev_id, port, ev + total_enq, nb_rx - total_enq);
> +		} while (total_enq < nb_rx);
> +	}
> +
> +	return 0;
> +}
> +
> +static int
> +worker_wrapper(void *arg)
> +{
> +	struct worker_data *w = arg;
> +	int max_burst = evt_has_burst_mode(w->dev_id) ? BURST_SIZE : 1;
> +	const bool flow_id_cap = evt_has_flow_id(w->dev_id);
> +
> +	return atomic_queue_worker_burst(arg, flow_id_cap, max_burst);
> +}
> +
> +static int
> +atomic_queue_launch_lcores(struct evt_test *test, struct evt_options *opt)
> +{
> +	int ret, lcore_id;
> +	struct test_order *t = evt_test_priv(test);
> +
> +	/* launch workers */
> +
> +	int wkr_idx = 0;
> +	RTE_LCORE_FOREACH_WORKER(lcore_id) {
> +		if (!(opt->wlcores[lcore_id]))
> +			continue;
> +
> +		ret = rte_eal_remote_launch(worker_wrapper, &t->worker[wkr_idx], lcore_id);
> +		if (ret) {
> +			evt_err("failed to launch worker %d", lcore_id);
> +			return ret;
> +		}
> +		wkr_idx++;
> +	}
> +
> +	/* launch producer */
> +	int plcore = evt_get_first_active_lcore(opt->plcores);
> +
> +	ret = rte_eal_remote_launch(atomic_producer, &t->prod, plcore);
> +	if (ret) {
> +		evt_err("failed to launch order_producer %d", plcore);
> +		return ret;
> +	}
> +
> +	uint64_t cycles = rte_get_timer_cycles();

There must be a better name for this variable. "current_time", is that 
how it's used? Or "now". When I've read below code, it seems like it's 
"last_active", or "last_dequeue".

> +	int64_t old_remaining = -1;
> +
> +	while (t->err == false) {
> +		uint64_t new_cycles = rte_get_timer_cycles();
> +		int64_t remaining = rte_atomic_load_explicit(

Rename to "outstanding_pkts". No point in having two terms used for the 
same thing.

> +				&t->outstand_pkts, rte_memory_order_relaxed);
> +
> +		if (remaining <= 0) {
> +			t->result = EVT_TEST_SUCCESS;
> +			break;
> +		}
> +
> +		if (new_cycles - cycles > rte_get_timer_hz() * 1) {

Introduce either a temp "idle_timeout = rte_get_timer_hz() * 1" or a 
"#define IDLE_TIMEOUT 1"

Maybe the timeout should be longer than a second, to allow for very busy 
CI servers.

> +			printf(CLGRN "\r%" PRId64 "" CLNRM, remaining);
> +			fflush(stdout);
> +			if (old_remaining == remaining) {
> +				rte_event_dev_dump(opt->dev_id, stdout);
> +				evt_err("No schedules for seconds, deadlock");
> +				t->err = true;
> +				break;
> +			}
> +			old_remaining = remaining;
> +			cycles = new_cycles;
> +		}
> +	}
> +	printf("\r");
> +
> +	rte_free(atomic_locks);
> +
> +	return 0;
> +}
> +
> +static int
> +atomic_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
> +{
> +	int ret;
> +
> +	const uint8_t nb_workers = evt_nr_active_lcores(opt->wlcores);
> +	/* number of active worker cores + 1 producer */
> +	const uint8_t nb_ports = nb_workers + 1;
> +
> +	ret = evt_configure_eventdev(opt, NB_QUEUES, nb_ports);
> +	if (ret) {
> +		evt_err("failed to configure eventdev %d", opt->dev_id);
> +		return ret;
> +	}
> +
> +	/* q0 configuration */
> +	struct rte_event_queue_conf q0_atomic_conf = {
> +			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
> +			.schedule_type = RTE_SCHED_TYPE_ATOMIC,
> +			.nb_atomic_flows = opt->nb_flows,
> +			.nb_atomic_order_sequences = opt->nb_flows,
> +	};
> +	ret = rte_event_queue_setup(opt->dev_id, 0, &q0_atomic_conf);
> +	if (ret) {
> +		evt_err("failed to setup queue0 eventdev %d err %d", opt->dev_id, ret);
> +		return ret;
> +	}
> +
> +	/* q1 configuration */
> +	struct rte_event_queue_conf q1_atomic_conf = {
> +			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
> +			.schedule_type = RTE_SCHED_TYPE_ATOMIC,
> +			.nb_atomic_flows = opt->nb_flows,
> +			.nb_atomic_order_sequences = opt->nb_flows,
> +	};
> +	ret = rte_event_queue_setup(opt->dev_id, 1, &q1_atomic_conf);
> +	if (ret) {
> +		evt_err("failed to setup queue0 eventdev %d err %d", opt->dev_id, ret);
> +		return ret;
> +	}
> +
> +	/* setup one port per worker, linking to all queues */
> +	ret = order_event_dev_port_setup(test, opt, nb_workers, NB_QUEUES);
> +	if (ret)
> +		return ret;
> +
> +	if (!evt_has_distributed_sched(opt->dev_id)) {
> +		uint32_t service_id;
> +		rte_event_dev_service_id_get(opt->dev_id, &service_id);
> +		ret = evt_service_setup(service_id);
> +		if (ret) {
> +			evt_err("No service lcore found to run event dev.");
> +			return ret;
> +		}
> +	}
> +
> +	ret = rte_event_dev_start(opt->dev_id);
> +	if (ret) {
> +		evt_err("failed to start eventdev %d", opt->dev_id);
> +		return ret;
> +	}
> +
> +	const uint32_t num_locks = NB_QUEUES * opt->nb_flows;
> +
> +	atomic_locks = rte_calloc(NULL, num_locks, sizeof(rte_spinlock_t), 0);
> +
> +	for (uint32_t i = 0; i < num_locks; i++) {
> +		rte_spinlock_init(&atomic_locks[i]);
> +	}
> +
> +	return 0;
> +}
> +
> +static void
> +atomic_queue_opt_dump(struct evt_options *opt)
> +{
> +	order_opt_dump(opt);
> +	evt_dump("nb_evdev_queues", "%d", NB_QUEUES);
> +}
> +
> +static bool
> +atomic_queue_capability_check(struct evt_options *opt)
> +{
> +	struct rte_event_dev_info dev_info;
> +
> +	rte_event_dev_info_get(opt->dev_id, &dev_info);
> +	if (dev_info.max_event_queues < NB_QUEUES ||
> +			dev_info.max_event_ports < order_nb_event_ports(opt)) {
> +		evt_err("not enough eventdev queues=%d/%d or ports=%d/%d", NB_QUEUES,
> +				dev_info.max_event_queues, order_nb_event_ports(opt),
> +				dev_info.max_event_ports);
> +		return false;
> +	}
> +
> +	return true;
> +}
> +
> +static const struct evt_test_ops atomic_queue = {
> +		.cap_check = atomic_queue_capability_check,
> +		.opt_check = order_opt_check,
> +		.opt_dump = atomic_queue_opt_dump,
> +		.test_setup = order_test_setup,
> +		.mempool_setup = order_mempool_setup,
> +		.eventdev_setup = atomic_queue_eventdev_setup,
> +		.launch_lcores = atomic_queue_launch_lcores,
> +		.eventdev_destroy = order_eventdev_destroy,
> +		.mempool_destroy = order_mempool_destroy,
> +		.test_result = order_test_result,
> +		.test_destroy = order_test_destroy,
> +};
> +
> +EVT_TEST_REGISTER(atomic_queue);


^ permalink raw reply	[flat|nested] 3+ messages in thread

* Re: [RFC v2 1/1] eventdev: add atomic queue to test-eventdev app
  2024-12-23 11:16 ` Mattias Rönnblom
@ 2025-01-09 10:22   ` Luka Jankovic
  0 siblings, 0 replies; 3+ messages in thread
From: Luka Jankovic @ 2025-01-09 10:22 UTC (permalink / raw)
  To: hofors; +Cc: dev, Mattias Rönnblom



On Mon, 2024-12-23 at 12:16 +0100, Mattias Rönnblom wrote:
> 
> > +static __rte_always_inline void
> 
> Why is this __rte_always_inline?
> 

The stage functions are based on the ones defined in test_order_queue.c and the
test_order_common.h respectively, where they were defined with
__rte_always_inline. I tried using normal inline and it works, so I will change
it.

Thank you for the feedback. I will revise and send a new version.
> 


^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2025-01-09 10:22 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2024-12-19 14:48 [RFC v2 1/1] eventdev: add atomic queue to test-eventdev app Luka Jankovic
2024-12-23 11:16 ` Mattias Rönnblom
2025-01-09 10:22   ` Luka Jankovic

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).