DPDK patches and discussions
 help / color / mirror / Atom feed
* [RFC v2 1/1] eventdev: add atomic queue to test-eventdev app
@ 2024-12-19 14:48 Luka Jankovic
  2024-12-23 11:16 ` Mattias Rönnblom
  0 siblings, 1 reply; 3+ messages in thread
From: Luka Jankovic @ 2024-12-19 14:48 UTC (permalink / raw)
  To: Luka Jankovic; +Cc: dev, Mattias Rönnblom

From 2e55ecd0e522f50cbb3635f53b025e165db7cf3e Mon Sep 17 00:00:00 2001
In-Reply-To: <228d44a6f2f1f6a4fb5519d9a91c99973f8d7352.camel@ericsson.com>
References: <228d44a6f2f1f6a4fb5519d9a91c99973f8d7352.camel@ericsson.com>
From: Luka Jankovic <luka.jankovic@ericsson.com>
Date: Thu, 19 Dec 2024 13:31:26 +0000
Subject: [RFC v2 1/1] eventdev: add atomic queue to test-eventdev app
To: luka.jankovic@ericsson.com
Cc: dev@dpdk.org,
    mattias.ronnblom@ericsson.com

Add an atomic queue test based on the order queue test but use exclusively atomic queues.
This makes it compatible with event devices such as the distributed software eventdev.

The test detects whether port maintenance is required.

To verify atomicity, a spinlock is set up for each combination of queue and flow.
It is taken whenever an event is dequeued for processing and released when processing is finished.
The test will fail if a port attempts to take a lock which is already taken.

Signed-off-by: Luka Jankovic <luka.jankovic@ericsson.com>
---
v2:
 * Changed to only check queue, flow combination, not port, queue, flow.
 * Lock is only held when a packet is processed.
 * Utilize event u64 instead of mbuf.
 * General cleanup.
---
 app/test-eventdev/evt_common.h        |  10 +
 app/test-eventdev/meson.build         |   1 +
 app/test-eventdev/test_atomic_queue.c | 372 ++++++++++++++++++++++++++
 3 files changed, 383 insertions(+)
 create mode 100644 app/test-eventdev/test_atomic_queue.c

diff --git a/app/test-eventdev/evt_common.h b/app/test-eventdev/evt_common.h
index 901b8ba55d..adb024c011 100644
--- a/app/test-eventdev/evt_common.h
+++ b/app/test-eventdev/evt_common.h
@@ -138,6 +138,16 @@ evt_has_flow_id(uint8_t dev_id)
 			true : false;
 }
 
+static inline bool
+evt_is_maintenance_free(uint8_t dev_id)
+{
+	struct rte_event_dev_info dev_info;
+
+	rte_event_dev_info_get(dev_id, &dev_info);
+	return (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_MAINTENANCE_FREE) ?
+			true : false;
+}
+
 static inline int
 evt_service_setup(uint32_t service_id)
 {
diff --git a/app/test-eventdev/meson.build b/app/test-eventdev/meson.build
index ab8769c755..db5add39eb 100644
--- a/app/test-eventdev/meson.build
+++ b/app/test-eventdev/meson.build
@@ -15,6 +15,7 @@ sources = files(
         'test_order_atq.c',
         'test_order_common.c',
         'test_order_queue.c',
+        'test_atomic_queue.c',
         'test_perf_atq.c',
         'test_perf_common.c',
         'test_perf_queue.c',
diff --git a/app/test-eventdev/test_atomic_queue.c b/app/test-eventdev/test_atomic_queue.c
new file mode 100644
index 0000000000..51e988c527
--- /dev/null
+++ b/app/test-eventdev/test_atomic_queue.c
@@ -0,0 +1,372 @@
+#include <stdio.h>
+#include <unistd.h>
+#include <rte_lcore_var.h>
+
+#include "test_order_common.h"
+
+#define NB_QUEUES 2
+
+static rte_spinlock_t *atomic_locks;
+
+static inline uint64_t
+event_data_create(flow_id_t flow, uint32_t seq)
+{
+	return ((uint64_t)flow << 32) | seq;
+}
+
+static inline uint32_t
+event_data_get_seq(struct rte_event *const ev)
+{
+	return ev->u64 & 0xFFFFFFFF;
+}
+
+static inline uint32_t
+event_data_get_flow(struct rte_event *const ev)
+{
+	return ev->u64 >> 32;
+}
+
+static inline uint32_t
+get_lock_idx(int queue, flow_id_t flow, uint32_t nb_flows)
+{
+	return (queue * nb_flows) + flow;
+}
+
+static inline bool
+test_done(struct test_order *const t)
+{
+	return t->err || t->result == EVT_TEST_SUCCESS;
+}
+
+static inline int
+atomic_producer(void *arg)
+{
+	struct prod_data *p = arg;
+	struct test_order *t = p->t;
+	struct evt_options *opt = t->opt;
+	const uint8_t dev_id = p->dev_id;
+	const uint8_t port = p->port_id;
+	const uint64_t nb_pkts = t->nb_pkts;
+	uint32_t *producer_flow_seq = t->producer_flow_seq;
+	const uint32_t nb_flows = t->nb_flows;
+	uint64_t count = 0;
+	struct rte_event ev;
+
+	if (opt->verbose_level > 1)
+		printf("%s(): lcore %d dev_id %d port=%d queue=%d\n", __func__, rte_lcore_id(),
+				dev_id, port, p->queue_id);
+
+	ev = (struct rte_event){
+			.event = 0,
+			.op = RTE_EVENT_OP_NEW,
+			.queue_id = p->queue_id,
+			.sched_type = RTE_SCHED_TYPE_ORDERED,
+			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+			.event_type = RTE_EVENT_TYPE_CPU,
+			.sub_event_type = 0 /* stage 0 */
+	};
+
+	while (count < nb_pkts && t->err == false) {
+		const flow_id_t flow = rte_rand_max(nb_flows);
+
+		/* Maintain seq number per flow */
+		ev.u64 = event_data_create(flow, producer_flow_seq[flow]++);
+		ev.flow_id = flow;
+
+		while (rte_event_enqueue_burst(dev_id, port, &ev, 1) != 1) {
+			if (t->err)
+				break;
+			rte_pause();
+		}
+
+		count++;
+	}
+
+	if (!evt_is_maintenance_free(dev_id)) {
+		while (!test_done(t)) {
+			rte_event_maintain(dev_id, port, RTE_EVENT_DEV_MAINT_OP_FLUSH);
+			rte_pause();
+		}
+	}
+
+	return 0;
+}
+
+static __rte_always_inline void
+atomic_process_stage_0(struct rte_event *const ev, uint32_t nb_flows, uint32_t port)
+{
+	const uint32_t flow = event_data_get_flow(ev);
+
+	if (!rte_spinlock_trylock(&atomic_locks[get_lock_idx(0, flow, nb_flows)])) {
+		evt_err("Port %u unable to take spinlock for queue %u, flow %x", port, 0, flow);
+	}
+
+	ev->queue_id = 1;
+	ev->op = RTE_EVENT_OP_FORWARD;
+	ev->sched_type = RTE_SCHED_TYPE_ATOMIC;
+	ev->event_type = RTE_EVENT_TYPE_CPU;
+
+	rte_spinlock_unlock(&atomic_locks[get_lock_idx(0, flow, nb_flows)]);
+}
+
+static __rte_always_inline void
+atomic_process_stage_1(struct test_order *const t, struct rte_event *const ev, uint32_t nb_flows,
+		uint32_t *const expected_flow_seq, RTE_ATOMIC(uint64_t) *const outstand_pkts,
+		uint32_t port)
+{
+	const uint32_t flow = event_data_get_flow(ev);
+
+	if (!rte_spinlock_trylock(&atomic_locks[get_lock_idx(1, flow, nb_flows)])) {
+		evt_err("Port %u unable to take spinlock for queue %u, flow %x", port, 0, flow);
+	}
+
+	/* compare the seqn against expected value */
+	if (event_data_get_seq(ev) != expected_flow_seq[flow]) {
+		evt_err("flow=%x seqn mismatch got=%lx expected=%x", flow, ev->u64,
+				expected_flow_seq[flow]);
+		t->err = true;
+	}
+
+	expected_flow_seq[flow]++;
+	rte_atomic_fetch_sub_explicit(outstand_pkts, 1, rte_memory_order_relaxed);
+
+	ev->op = RTE_EVENT_OP_RELEASE;
+
+	rte_spinlock_unlock(&atomic_locks[get_lock_idx(1, flow, nb_flows)]);
+}
+
+static int
+atomic_queue_worker_burst(void *arg, bool flow_id_cap, uint32_t max_burst)
+{
+	ORDER_WORKER_INIT;
+	struct rte_event ev[BURST_SIZE];
+	uint16_t i;
+
+	while (t->err == false) {
+
+		uint16_t const nb_rx = rte_event_dequeue_burst(dev_id, port, ev, max_burst, 0);
+
+		if (nb_rx == 0) {
+			if (rte_atomic_load_explicit(outstand_pkts, rte_memory_order_relaxed) <=
+					0) {
+				break;
+			}
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			if (!flow_id_cap) {
+				ev[i].flow_id = event_data_get_flow(&ev[i]);
+			}
+
+			switch (ev[i].queue_id) {
+			case 0:
+				atomic_process_stage_0(&ev[i], nb_flows, port);
+				break;
+			case 1:
+				atomic_process_stage_1(t, &ev[i], nb_flows, expected_flow_seq,
+						outstand_pkts, port);
+				break;
+			default:
+				order_process_stage_invalid(t, &ev[i]);
+				break;
+			}
+		}
+
+		uint16_t total_enq = 0;
+
+		do {
+			total_enq += rte_event_enqueue_burst(
+					dev_id, port, ev + total_enq, nb_rx - total_enq);
+		} while (total_enq < nb_rx);
+	}
+
+	return 0;
+}
+
+static int
+worker_wrapper(void *arg)
+{
+	struct worker_data *w = arg;
+	int max_burst = evt_has_burst_mode(w->dev_id) ? BURST_SIZE : 1;
+	const bool flow_id_cap = evt_has_flow_id(w->dev_id);
+
+	return atomic_queue_worker_burst(arg, flow_id_cap, max_burst);
+}
+
+static int
+atomic_queue_launch_lcores(struct evt_test *test, struct evt_options *opt)
+{
+	int ret, lcore_id;
+	struct test_order *t = evt_test_priv(test);
+
+	/* launch workers */
+
+	int wkr_idx = 0;
+	RTE_LCORE_FOREACH_WORKER(lcore_id) {
+		if (!(opt->wlcores[lcore_id]))
+			continue;
+
+		ret = rte_eal_remote_launch(worker_wrapper, &t->worker[wkr_idx], lcore_id);
+		if (ret) {
+			evt_err("failed to launch worker %d", lcore_id);
+			return ret;
+		}
+		wkr_idx++;
+	}
+
+	/* launch producer */
+	int plcore = evt_get_first_active_lcore(opt->plcores);
+
+	ret = rte_eal_remote_launch(atomic_producer, &t->prod, plcore);
+	if (ret) {
+		evt_err("failed to launch order_producer %d", plcore);
+		return ret;
+	}
+
+	uint64_t cycles = rte_get_timer_cycles();
+	int64_t old_remaining = -1;
+
+	while (t->err == false) {
+		uint64_t new_cycles = rte_get_timer_cycles();
+		int64_t remaining = rte_atomic_load_explicit(
+				&t->outstand_pkts, rte_memory_order_relaxed);
+
+		if (remaining <= 0) {
+			t->result = EVT_TEST_SUCCESS;
+			break;
+		}
+
+		if (new_cycles - cycles > rte_get_timer_hz() * 1) {
+			printf(CLGRN "\r%" PRId64 "" CLNRM, remaining);
+			fflush(stdout);
+			if (old_remaining == remaining) {
+				rte_event_dev_dump(opt->dev_id, stdout);
+				evt_err("No schedules for seconds, deadlock");
+				t->err = true;
+				break;
+			}
+			old_remaining = remaining;
+			cycles = new_cycles;
+		}
+	}
+	printf("\r");
+
+	rte_free(atomic_locks);
+
+	return 0;
+}
+
+static int
+atomic_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
+{
+	int ret;
+
+	const uint8_t nb_workers = evt_nr_active_lcores(opt->wlcores);
+	/* number of active worker cores + 1 producer */
+	const uint8_t nb_ports = nb_workers + 1;
+
+	ret = evt_configure_eventdev(opt, NB_QUEUES, nb_ports);
+	if (ret) {
+		evt_err("failed to configure eventdev %d", opt->dev_id);
+		return ret;
+	}
+
+	/* q0 configuration */
+	struct rte_event_queue_conf q0_atomic_conf = {
+			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+			.schedule_type = RTE_SCHED_TYPE_ATOMIC,
+			.nb_atomic_flows = opt->nb_flows,
+			.nb_atomic_order_sequences = opt->nb_flows,
+	};
+	ret = rte_event_queue_setup(opt->dev_id, 0, &q0_atomic_conf);
+	if (ret) {
+		evt_err("failed to setup queue0 eventdev %d err %d", opt->dev_id, ret);
+		return ret;
+	}
+
+	/* q1 configuration */
+	struct rte_event_queue_conf q1_atomic_conf = {
+			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+			.schedule_type = RTE_SCHED_TYPE_ATOMIC,
+			.nb_atomic_flows = opt->nb_flows,
+			.nb_atomic_order_sequences = opt->nb_flows,
+	};
+	ret = rte_event_queue_setup(opt->dev_id, 1, &q1_atomic_conf);
+	if (ret) {
+		evt_err("failed to setup queue0 eventdev %d err %d", opt->dev_id, ret);
+		return ret;
+	}
+
+	/* setup one port per worker, linking to all queues */
+	ret = order_event_dev_port_setup(test, opt, nb_workers, NB_QUEUES);
+	if (ret)
+		return ret;
+
+	if (!evt_has_distributed_sched(opt->dev_id)) {
+		uint32_t service_id;
+		rte_event_dev_service_id_get(opt->dev_id, &service_id);
+		ret = evt_service_setup(service_id);
+		if (ret) {
+			evt_err("No service lcore found to run event dev.");
+			return ret;
+		}
+	}
+
+	ret = rte_event_dev_start(opt->dev_id);
+	if (ret) {
+		evt_err("failed to start eventdev %d", opt->dev_id);
+		return ret;
+	}
+
+	const uint32_t num_locks = NB_QUEUES * opt->nb_flows;
+
+	atomic_locks = rte_calloc(NULL, num_locks, sizeof(rte_spinlock_t), 0);
+
+	for (uint32_t i = 0; i < num_locks; i++) {
+		rte_spinlock_init(&atomic_locks[i]);
+	}
+
+	return 0;
+}
+
+static void
+atomic_queue_opt_dump(struct evt_options *opt)
+{
+	order_opt_dump(opt);
+	evt_dump("nb_evdev_queues", "%d", NB_QUEUES);
+}
+
+static bool
+atomic_queue_capability_check(struct evt_options *opt)
+{
+	struct rte_event_dev_info dev_info;
+
+	rte_event_dev_info_get(opt->dev_id, &dev_info);
+	if (dev_info.max_event_queues < NB_QUEUES ||
+			dev_info.max_event_ports < order_nb_event_ports(opt)) {
+		evt_err("not enough eventdev queues=%d/%d or ports=%d/%d", NB_QUEUES,
+				dev_info.max_event_queues, order_nb_event_ports(opt),
+				dev_info.max_event_ports);
+		return false;
+	}
+
+	return true;
+}
+
+static const struct evt_test_ops atomic_queue = {
+		.cap_check = atomic_queue_capability_check,
+		.opt_check = order_opt_check,
+		.opt_dump = atomic_queue_opt_dump,
+		.test_setup = order_test_setup,
+		.mempool_setup = order_mempool_setup,
+		.eventdev_setup = atomic_queue_eventdev_setup,
+		.launch_lcores = atomic_queue_launch_lcores,
+		.eventdev_destroy = order_eventdev_destroy,
+		.mempool_destroy = order_mempool_destroy,
+		.test_result = order_test_result,
+		.test_destroy = order_test_destroy,
+};
+
+EVT_TEST_REGISTER(atomic_queue);
-- 
2.34.1



^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2025-01-09 10:22 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2024-12-19 14:48 [RFC v2 1/1] eventdev: add atomic queue to test-eventdev app Luka Jankovic
2024-12-23 11:16 ` Mattias Rönnblom
2025-01-09 10:22   ` Luka Jankovic

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).