DPDK patches and discussions
 help / color / mirror / Atom feed
* [RFC] eventdev: add atomic queue to test-eventdev app
@ 2024-12-05 13:29 Luka Jankovic
  2024-12-10 10:37 ` Mattias Rönnblom
  0 siblings, 1 reply; 4+ messages in thread
From: Luka Jankovic @ 2024-12-05 13:29 UTC (permalink / raw)
  To: dev; +Cc: Mattias Rönnblom

From 753273ab9af49e16d7f7b577d6263e3db51257d7 Mon Sep 17 00:00:00 2001
From: Luka Jankovic <luka.jankovic@ericsson.com>
Date: Thu, 5 Dec 2024 13:05:35 +0000
Subject: [RFC] eventdev: add atomic queue to test-eventdev app

Add an atomic queue test based on the order queue test but use exclusively atomic queues.
This makes it compatible with event devices such as the distributed software eventdev.

The test detects whether port maintenance is required.

To verify atomicity, a spinlock is set up for each combination of port, queue, and flow.
It is taken whenever an event enters a new flow and released when all events from a flow are processed.
The test will fail if a port attempts to take the lock for a given flow which is already taken by another port.
In the end, it is verified that an equal amount of locks and unlocks occured, and that all events have been processed.

Signed-off-by: Luka Jankovic <luka.jankovic@ericsson.com>
---
 app/test-eventdev/evt_common.h        |  10 +
 app/test-eventdev/meson.build         |   1 +
 app/test-eventdev/test_atomic_queue.c | 569 ++++++++++++++++++++++++++
 app/test-eventdev/test_order_common.h |   1 +
 4 files changed, 581 insertions(+)
 create mode 100644 app/test-eventdev/test_atomic_queue.c

diff --git a/app/test-eventdev/evt_common.h b/app/test-eventdev/evt_common.h
index 901b8ba55d..f0036fb620 100644
--- a/app/test-eventdev/evt_common.h
+++ b/app/test-eventdev/evt_common.h
@@ -138,6 +138,16 @@ evt_has_flow_id(uint8_t dev_id)
 			true : false;
 }
 
+static inline bool
+evt_has_maintenance_free(uint8_t dev_id)
+{
+	struct rte_event_dev_info dev_info;
+
+	rte_event_dev_info_get(dev_id, &dev_info);
+	return (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_MAINTENANCE_FREE) ?
+			true : false;
+}
+
 static inline int
 evt_service_setup(uint32_t service_id)
 {
diff --git a/app/test-eventdev/meson.build b/app/test-eventdev/meson.build
index ab8769c755..db5add39eb 100644
--- a/app/test-eventdev/meson.build
+++ b/app/test-eventdev/meson.build
@@ -15,6 +15,7 @@ sources = files(
         'test_order_atq.c',
         'test_order_common.c',
         'test_order_queue.c',
+        'test_atomic_queue.c',
         'test_perf_atq.c',
         'test_perf_common.c',
         'test_perf_queue.c',
diff --git a/app/test-eventdev/test_atomic_queue.c b/app/test-eventdev/test_atomic_queue.c
new file mode 100644
index 0000000000..02aec95d59
--- /dev/null
+++ b/app/test-eventdev/test_atomic_queue.c
@@ -0,0 +1,569 @@
+#include <stdio.h>
+#include <unistd.h>
+#include <rte_lcore_var.h>
+
+#include "test_order_common.h"
+
+#define NB_QUEUES 2
+
+rte_spinlock_t *atomic_locks;
+
+struct port_stat_counters {
+	uint32_t num_locked[NB_QUEUES];
+	uint32_t num_unlocked[NB_QUEUES];
+	uint64_t *num_pkts;
+};
+
+static RTE_LCORE_VAR_HANDLE(struct port_stat_counters, port_counters);
+
+static inline int
+get_num_pkts_index(int queue, uint32_t flow, uint32_t nb_flows)
+{
+	return (queue * nb_flows) + flow;
+}
+
+static inline uint32_t
+get_lock_idx(int queue, uint32_t nb_ports, uint32_t nb_flows, uint32_t port, flow_id_t flow)
+{
+	return (queue * nb_ports * nb_flows) + (port * nb_flows) + flow;
+}
+
+static inline int
+atomic_producer(void *arg)
+{
+	struct prod_data *p = arg;
+	struct test_order *t = p->t;
+	struct evt_options *opt = t->opt;
+	const uint8_t dev_id = p->dev_id;
+	const uint8_t port = p->port_id;
+	struct rte_mempool *pool = t->pool;
+	const uint64_t nb_pkts = t->nb_pkts;
+	uint32_t *producer_flow_seq = t->producer_flow_seq;
+	const uint32_t nb_flows = t->nb_flows;
+	uint64_t count = 0;
+	struct rte_mbuf *m;
+	struct rte_event ev;
+
+	if (opt->verbose_level > 1)
+		printf("%s(): lcore %d dev_id %d port=%d queue=%d\n", __func__, rte_lcore_id(),
+				dev_id, port, p->queue_id);
+
+	ev.event = 0;
+	ev.op = RTE_EVENT_OP_NEW;
+	ev.queue_id = p->queue_id;
+	ev.sched_type = RTE_SCHED_TYPE_ORDERED;
+	ev.priority = RTE_EVENT_DEV_PRIORITY_NORMAL;
+	ev.event_type = RTE_EVENT_TYPE_CPU;
+	ev.sub_event_type = 0; /* stage 0 */
+
+	while (count < nb_pkts && t->err == false) {
+		m = rte_pktmbuf_alloc(pool);
+		if (m == NULL)
+			continue;
+
+		const flow_id_t flow = (uintptr_t)m % nb_flows;
+		/* Maintain seq number per flow */
+		*order_mbuf_seqn(t, m) = producer_flow_seq[flow]++;
+		order_flow_id_save(t, flow, m, &ev);
+
+		while (rte_event_enqueue_burst(dev_id, port, &ev, 1) != 1) {
+			if (t->err)
+				break;
+			rte_pause();
+		}
+
+		count++;
+	}
+
+	if (p->maintain) {
+		while (!t->err && t->result != EVT_TEST_SUCCESS) {
+			rte_event_maintain(dev_id, port, RTE_EVENT_DEV_MAINT_OP_FLUSH);
+			rte_pause();
+		}
+	}
+
+	return 0;
+}
+
+static __rte_always_inline void
+atomic_process_stage_0(struct test_order *const t, struct rte_event *const ev,
+		const uint32_t nb_flows, const uint32_t nb_ports, uint32_t port)
+{
+	ev->queue_id = 1;
+	ev->op = RTE_EVENT_OP_FORWARD;
+	ev->sched_type = RTE_SCHED_TYPE_ATOMIC;
+	ev->event_type = RTE_EVENT_TYPE_CPU;
+
+	struct port_stat_counters *counters = RTE_LCORE_VAR(port_counters);
+	const uint32_t flow = (uintptr_t)ev->mbuf % nb_flows;
+
+	/* Checks if any other port has taken the lock before for the given flow */
+	for (uint32_t i = 0; i < nb_ports; i++) {
+		if (i != port) {
+			if (rte_spinlock_is_locked(&atomic_locks[get_lock_idx(
+					    0, nb_ports, nb_flows, i, flow)])) {
+				evt_err("processing locked flow %u, port %u", flow, i);
+				t->err = true;
+			}
+		} else {
+			if (rte_spinlock_trylock(&atomic_locks[get_lock_idx(
+					    0, nb_ports, nb_flows, port, flow)])) {
+				counters->num_locked[0]++;
+			}
+			counters->num_pkts[get_num_pkts_index(0, flow, nb_flows)]++;
+		}
+	}
+}
+
+static __rte_always_inline void
+atomic_process_stage_1(struct test_order *const t, struct rte_event *const ev,
+		const uint32_t nb_flows, const uint32_t nb_ports, uint32_t *const expected_flow_seq,
+		RTE_ATOMIC(uint64_t) *const outstand_pkts, uint32_t port)
+{
+	const uint32_t flow = (uintptr_t)ev->mbuf % nb_flows;
+	/* compare the seqn against expected value */
+	if (*order_mbuf_seqn(t, ev->mbuf) != expected_flow_seq[flow]) {
+		evt_err("flow=%x seqn mismatch got=%x expected=%x", flow,
+				*order_mbuf_seqn(t, ev->mbuf), expected_flow_seq[flow]);
+		t->err = true;
+	}
+
+	struct port_stat_counters *counters = RTE_LCORE_VAR(port_counters);
+
+	/* Checks if any other port has taken the lock before for the given flow */
+	for (uint32_t i = 0; i < nb_ports; i++) {
+		if (i != port) {
+			if (rte_spinlock_is_locked(&atomic_locks[get_lock_idx(
+					    1, nb_ports, nb_flows, i, flow)])) {
+				evt_err("processing locked flow %u, port %u", flow, i);
+				t->err = true;
+			}
+		} else {
+			if (rte_spinlock_trylock(&atomic_locks[get_lock_idx(
+					    1, nb_ports, nb_flows, port, flow)])) {
+				counters->num_locked[1]++;
+			}
+			counters->num_pkts[get_num_pkts_index(1, flow, nb_flows)]++;
+		}
+	}
+
+	expected_flow_seq[flow]++;
+	rte_pktmbuf_free(ev->mbuf);
+	rte_atomic_fetch_sub_explicit(outstand_pkts, 1, rte_memory_order_relaxed);
+}
+
+static int
+atomic_queue_worker(void *arg, const bool flow_id_cap)
+{
+	ORDER_WORKER_INIT;
+	struct rte_event ev;
+
+	const uint8_t nb_workers = evt_nr_active_lcores(opt->wlcores);
+	const uint8_t nb_ports = nb_workers + 1;
+
+	while (t->err == false) {
+		uint16_t event = rte_event_dequeue_burst(dev_id, port, &ev, 1, 0);
+		if (!event) {
+			if (rte_atomic_load_explicit(outstand_pkts, rte_memory_order_relaxed) <= 0)
+				break;
+			rte_pause();
+			continue;
+		}
+
+		if (!flow_id_cap)
+			order_flow_id_copy_from_mbuf(t, &ev);
+
+		struct port_stat_counters *counters = RTE_LCORE_VAR(port_counters);
+		const uint32_t flow = (uintptr_t)ev.mbuf % nb_flows;
+
+		if (ev.queue_id == 0) { /* from ordered queue */
+			atomic_process_stage_0(t, &ev, nb_flows, nb_ports, port);
+			while (rte_event_enqueue_burst(dev_id, port, &ev, 1) != 1) {
+				rte_pause();
+			}
+			uint64_t pkts_left = --counters->num_pkts[get_num_pkts_index(0, flow, nb_flows)];
+
+			if (pkts_left == 0) {
+				const uint32_t lock_idx =
+						get_lock_idx(0, nb_ports, nb_flows, port, flow);
+				if (rte_spinlock_is_locked(&atomic_locks[lock_idx])) {
+					rte_spinlock_unlock(&atomic_locks[lock_idx]);
+					counters->num_unlocked[0]++;
+				}
+			}
+		} else if (ev.queue_id == 1) {
+			atomic_process_stage_1(t, &ev, nb_flows, nb_ports, expected_flow_seq,
+					outstand_pkts, port);
+			ev.op = RTE_EVENT_OP_RELEASE;
+
+			uint64_t pkts_left = --counters->num_pkts[get_num_pkts_index(1, flow, nb_flows)];
+
+			if (pkts_left == 0) {
+				const uint32_t lock_idx =
+						get_lock_idx(1, nb_ports, nb_flows, port, flow);
+				if (rte_spinlock_is_locked(&atomic_locks[lock_idx])) {
+					rte_spinlock_unlock(&atomic_locks[lock_idx]);
+					counters->num_unlocked[1]++;
+				}
+			}
+		} else {
+			order_process_stage_invalid(t, &ev);
+		}
+	}
+
+	for (uint32_t i = 0; i < nb_flows; i++) {
+		rte_spinlock_unlock(&atomic_locks[(port * nb_flows) + i]);
+	}
+
+	return 0;
+}
+
+static int
+atomic_queue_worker_burst(void *arg, const bool flow_id_cap)
+{
+	ORDER_WORKER_INIT;
+	struct rte_event ev[BURST_SIZE];
+	uint16_t i;
+
+	const uint8_t nb_workers = evt_nr_active_lcores(opt->wlcores);
+	const uint8_t nb_ports = nb_workers + 1;
+
+	while (t->err == false) {
+
+		for (uint32_t i = 0; i < nb_flows; i++) {
+			rte_spinlock_unlock(&atomic_locks[(port * nb_flows) + i]);
+		}
+
+		uint16_t const nb_rx = rte_event_dequeue_burst(dev_id, port, ev, BURST_SIZE, 0);
+
+		if (nb_rx == 0) {
+			if (rte_atomic_load_explicit(outstand_pkts, rte_memory_order_relaxed) <=
+					0) {
+				break;
+			}
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+
+			if (!flow_id_cap)
+				order_flow_id_copy_from_mbuf(t, &ev[i]);
+
+			if (ev[i].queue_id == 0) {
+				atomic_process_stage_0(t, &ev[i], nb_flows, nb_ports, port);
+			} else if (ev[i].queue_id == 1) {
+				atomic_process_stage_1(t, &ev[i], nb_flows, nb_ports,
+						expected_flow_seq, outstand_pkts, port);
+				ev[i].op = RTE_EVENT_OP_RELEASE;
+			} else {
+				order_process_stage_invalid(t, &ev[i]);
+			}
+		}
+
+		uint16_t total_enq = 0;
+
+		do {
+			uint16_t enq = rte_event_enqueue_burst(
+					dev_id, port, ev + total_enq, nb_rx - total_enq);
+
+			for (uint16_t i = total_enq; i < total_enq + enq; i++) {
+				struct port_stat_counters *counters = RTE_LCORE_VAR(port_counters);
+				const uint32_t flow = (uintptr_t)ev[i].mbuf % nb_flows;
+
+				if (ev[i].op == RTE_EVENT_OP_FORWARD) {
+					counters->num_pkts[get_num_pkts_index(0, flow, nb_flows)]--;
+
+					if (counters->num_pkts[get_num_pkts_index(
+							    0, flow, nb_flows)] == 0) {
+						const uint32_t lock_idx = get_lock_idx(
+								0, nb_ports, nb_flows, port, flow);
+						if (rte_spinlock_is_locked(
+								    &atomic_locks[lock_idx])) {
+							rte_spinlock_unlock(
+									&atomic_locks[lock_idx]);
+							counters->num_unlocked[0]++;
+						}
+					}
+				}
+
+				if (ev[i].op == RTE_EVENT_OP_RELEASE) {
+					counters->num_pkts[get_num_pkts_index(1, flow, nb_flows)]--;
+
+					if (counters->num_pkts[get_num_pkts_index(
+							    1, flow, nb_flows)] == 0) {
+						const uint32_t lock_idx = get_lock_idx(
+								1, nb_ports, nb_flows, port, flow);
+						if (rte_spinlock_is_locked(
+								    &atomic_locks[lock_idx])) {
+							rte_spinlock_unlock(
+									&atomic_locks[lock_idx]);
+							counters->num_unlocked[1]++;
+						}
+					}
+				}
+			}
+
+			total_enq += enq;
+		} while (total_enq < nb_rx);
+	}
+
+	for (uint32_t i = 0; i < nb_flows; i++) {
+		rte_spinlock_unlock(&atomic_locks[(port * nb_flows) + i]);
+	}
+
+	return 0;
+}
+
+static int
+worker_wrapper(void *arg)
+{
+	struct worker_data *w = arg;
+	const bool burst = evt_has_burst_mode(w->dev_id);
+	const bool flow_id_cap = evt_has_flow_id(w->dev_id);
+
+
+	if (burst) {
+		if (flow_id_cap)
+			return atomic_queue_worker_burst(arg, true);
+		else
+			return atomic_queue_worker_burst(arg, false);
+	} else {
+		if (flow_id_cap)
+			return atomic_queue_worker(arg, true);
+		else
+			return atomic_queue_worker(arg, false);
+	}
+}
+
+static int
+atomic_queue_launch_lcores(struct evt_test *test, struct evt_options *opt)
+{
+	int ret, lcore_id;
+	struct test_order *t = evt_test_priv(test);
+
+	RTE_LCORE_VAR_ALLOC(port_counters);
+
+	struct port_stat_counters *counters;
+	RTE_LCORE_VAR_FOREACH(lcore_id, counters, port_counters)
+	{
+		for (int i = 0; i < NB_QUEUES; i++) {
+			counters->num_locked[i] = 0;
+			counters->num_unlocked[i] = 0;
+		}
+
+		counters->num_pkts =
+				rte_calloc(NULL, opt->nb_flows * NB_QUEUES, sizeof(uint64_t), 0);
+	}
+
+	int wkr_idx = 0;
+	/* launch workers */
+	RTE_LCORE_FOREACH_WORKER(lcore_id) {
+		if (!(opt->wlcores[lcore_id]))
+			continue;
+
+		ret = rte_eal_remote_launch(worker_wrapper, &t->worker[wkr_idx], lcore_id);
+		if (ret) {
+			evt_err("failed to launch worker %d", lcore_id);
+			return ret;
+		}
+		wkr_idx++;
+	}
+
+	/* launch producer */
+	int plcore = evt_get_first_active_lcore(opt->plcores);
+
+	ret = rte_eal_remote_launch(atomic_producer, &t->prod, plcore);
+	if (ret) {
+		evt_err("failed to launch order_producer %d", plcore);
+		return ret;
+	}
+
+	uint64_t cycles = rte_get_timer_cycles();
+	int64_t old_remaining = -1;
+
+	while (t->err == false) {
+		uint64_t new_cycles = rte_get_timer_cycles();
+		int64_t remaining = rte_atomic_load_explicit(
+				&t->outstand_pkts, rte_memory_order_relaxed);
+
+		if (remaining <= 0) {
+			t->result = EVT_TEST_SUCCESS;
+			break;
+		}
+
+		if (new_cycles - cycles > rte_get_timer_hz() * 1) {
+			printf(CLGRN "\r%" PRId64 "" CLNRM, remaining);
+			fflush(stdout);
+			if (old_remaining == remaining) {
+				rte_event_dev_dump(opt->dev_id, stdout);
+				evt_err("No schedules for seconds, deadlock");
+				t->err = true;
+				break;
+			}
+			old_remaining = remaining;
+			cycles = new_cycles;
+		}
+	}
+	printf("\r");
+
+	RTE_LCORE_VAR_FOREACH(lcore_id, counters, port_counters)
+	{
+		if (!(opt->wlcores[lcore_id]))
+			continue;
+		for (int i = 0; i < NB_QUEUES; i++) {
+			if (counters->num_locked[i] != counters->num_unlocked[i]) {
+				evt_err("Number of locks (%u) does not number of unlocks (%u) for "
+					"core %u, queue %i",
+						counters->num_locked[i], counters->num_unlocked[i],
+						lcore_id, i);
+				t->err = true;
+			}
+		}
+
+		for (uint32_t i = 0; i < opt->nb_flows; i++) {
+			for (int j = 0; j < NB_QUEUES; j++) {
+				if (counters->num_pkts[get_num_pkts_index(j, i, opt->nb_flows)] !=
+						0) {
+					evt_err("Packets left %lu in flow %u, queue %i for core %u",
+							counters->num_pkts[get_num_pkts_index(
+									j, i, opt->nb_flows)],
+							i, j, lcore_id);
+					t->err = true;
+				}
+			}
+		}
+	}
+
+	const uint8_t nb_workers = evt_nr_active_lcores(opt->wlcores);
+	const uint8_t nb_ports = nb_workers + 1;
+
+	const uint32_t num_locks = NB_QUEUES * nb_ports * opt->nb_flows;
+	for (uint32_t i = 0; i < num_locks; i++) {
+		rte_spinlock_unlock(&atomic_locks[i]);
+	}
+
+	rte_free(atomic_locks);
+
+	return 0;
+}
+
+static int
+atomic_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
+{
+	int ret;
+
+	const uint8_t nb_workers = evt_nr_active_lcores(opt->wlcores);
+	/* number of active worker cores + 1 producer */
+	const uint8_t nb_ports = nb_workers + 1;
+
+	ret = evt_configure_eventdev(opt, NB_QUEUES, nb_ports);
+	if (ret) {
+		evt_err("failed to configure eventdev %d", opt->dev_id);
+		return ret;
+	}
+
+	/* q0 configuration */
+	struct rte_event_queue_conf q0_ordered_conf = {
+			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+			.schedule_type = RTE_SCHED_TYPE_ATOMIC,
+			.nb_atomic_flows = opt->nb_flows,
+			.nb_atomic_order_sequences = opt->nb_flows,
+	};
+	ret = rte_event_queue_setup(opt->dev_id, 0, &q0_ordered_conf);
+	if (ret) {
+		evt_err("failed to setup queue0 eventdev %d err %d", opt->dev_id, ret);
+		return ret;
+	}
+
+	/* q1 configuration */
+	struct rte_event_queue_conf q1_ordered_conf = {
+			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+			.schedule_type = RTE_SCHED_TYPE_ATOMIC,
+			.nb_atomic_flows = opt->nb_flows,
+			.nb_atomic_order_sequences = opt->nb_flows,
+	};
+	ret = rte_event_queue_setup(opt->dev_id, 1, &q1_ordered_conf);
+	if (ret) {
+		evt_err("failed to setup queue0 eventdev %d err %d", opt->dev_id, ret);
+		return ret;
+	}
+
+	/* setup one port per worker, linking to all queues */
+	ret = order_event_dev_port_setup(test, opt, nb_workers, NB_QUEUES);
+	if (ret)
+		return ret;
+
+	if (!evt_has_distributed_sched(opt->dev_id)) {
+		uint32_t service_id;
+		rte_event_dev_service_id_get(opt->dev_id, &service_id);
+		ret = evt_service_setup(service_id);
+		if (ret) {
+			evt_err("No service lcore found to run event dev.");
+			return ret;
+		}
+	}
+
+	// DSW does not have RTE_EVENT_DEV_CAP_MAINTENANCE_FREE, so the producer needs to
+	// call rte_event_maintain on the port whnever not sending.
+	if (!evt_has_maintenance_free(opt->dev_id)) {
+		struct test_order *t = evt_test_priv(test);
+		t->prod.maintain = true;
+	}
+
+	ret = rte_event_dev_start(opt->dev_id);
+	if (ret) {
+		evt_err("failed to start eventdev %d", opt->dev_id);
+		return ret;
+	}
+
+	const uint32_t num_locks = NB_QUEUES * nb_ports * opt->nb_flows;
+
+	atomic_locks = rte_calloc(NULL, num_locks, sizeof(rte_spinlock_t), 0);
+
+	for (uint32_t i = 0; i < num_locks; i++) {
+		rte_spinlock_init(&atomic_locks[i]);
+	}
+
+	return 0;
+}
+
+static void
+atomic_queue_opt_dump(struct evt_options *opt)
+{
+	order_opt_dump(opt);
+	evt_dump("nb_evdev_queues", "%d", NB_QUEUES);
+}
+
+static bool
+atomic_queue_capability_check(struct evt_options *opt)
+{
+	struct rte_event_dev_info dev_info;
+
+	rte_event_dev_info_get(opt->dev_id, &dev_info);
+	if (dev_info.max_event_queues < NB_QUEUES ||
+			dev_info.max_event_ports < order_nb_event_ports(opt)) {
+		evt_err("not enough eventdev queues=%d/%d or ports=%d/%d", NB_QUEUES,
+				dev_info.max_event_queues, order_nb_event_ports(opt),
+				dev_info.max_event_ports);
+		return false;
+	}
+
+	return true;
+}
+
+static const struct evt_test_ops atomic_queue = {
+		.cap_check = atomic_queue_capability_check,
+		.opt_check = order_opt_check,
+		.opt_dump = atomic_queue_opt_dump,
+		.test_setup = order_test_setup,
+		.mempool_setup = order_mempool_setup,
+		.eventdev_setup = atomic_queue_eventdev_setup,
+		.launch_lcores = atomic_queue_launch_lcores,
+		.eventdev_destroy = order_eventdev_destroy,
+		.mempool_destroy = order_mempool_destroy,
+		.test_result = order_test_result,
+		.test_destroy = order_test_destroy,
+};
+
+EVT_TEST_REGISTER(atomic_queue);
diff --git a/app/test-eventdev/test_order_common.h b/app/test-eventdev/test_order_common.h
index 7177fd8e9a..6d48e7630a 100644
--- a/app/test-eventdev/test_order_common.h
+++ b/app/test-eventdev/test_order_common.h
@@ -37,6 +37,7 @@ struct prod_data {
 	uint8_t port_id;
 	uint8_t queue_id;
 	struct test_order *t;
+	bool maintain;
 };
 
 struct __rte_cache_aligned test_order {
-- 
2.34.1



^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [RFC] eventdev: add atomic queue to test-eventdev app
  2024-12-05 13:29 [RFC] eventdev: add atomic queue to test-eventdev app Luka Jankovic
@ 2024-12-10 10:37 ` Mattias Rönnblom
  2024-12-16  9:14   ` Luka Jankovic
  2024-12-19 13:24   ` Luka Jankovic
  0 siblings, 2 replies; 4+ messages in thread
From: Mattias Rönnblom @ 2024-12-10 10:37 UTC (permalink / raw)
  To: Luka Jankovic, dev; +Cc: Mattias Rönnblom

On 2024-12-05 14:29, Luka Jankovic wrote:
>  From 753273ab9af49e16d7f7b577d6263e3db51257d7 Mon Sep 17 00:00:00 2001
> From: Luka Jankovic <luka.jankovic@ericsson.com>
> Date: Thu, 5 Dec 2024 13:05:35 +0000
> Subject: [RFC] eventdev: add atomic queue to test-eventdev app
> 
> Add an atomic queue test based on the order queue test but use exclusively atomic queues.
> This makes it compatible with event devices such as the distributed software eventdev.
> 

The other tests are incompatible due to the use of "ALL_TYPES" type 
queues, or some other reason?

> The test detects whether port maintenance is required.
> 
> To verify atomicity, a spinlock is set up for each combination of port, queue, and flow.

Hmm. Atomicity doesn't depend on ports?

> It is taken whenever an event enters a new flow and released when all events from a flow are processed.
> The test will fail if a port attempts to take the lock for a given flow which is already taken by another port.
> In the end, it is verified that an equal amount of locks and unlocks occured, and that all events have been processed.
> 
> Signed-off-by: Luka Jankovic <luka.jankovic@ericsson.com>
> ---
>   app/test-eventdev/evt_common.h        |  10 +
>   app/test-eventdev/meson.build         |   1 +
>   app/test-eventdev/test_atomic_queue.c | 569 ++++++++++++++++++++++++++
>   app/test-eventdev/test_order_common.h |   1 +
>   4 files changed, 581 insertions(+)
>   create mode 100644 app/test-eventdev/test_atomic_queue.c
> 
> diff --git a/app/test-eventdev/evt_common.h b/app/test-eventdev/evt_common.h
> index 901b8ba55d..f0036fb620 100644
> --- a/app/test-eventdev/evt_common.h
> +++ b/app/test-eventdev/evt_common.h
> @@ -138,6 +138,16 @@ evt_has_flow_id(uint8_t dev_id)
>   			true : false;
>   }
>   
> +static inline bool
> +evt_has_maintenance_free(uint8_t dev_id)

I would use "is" instead of "has".

> +{
> +	struct rte_event_dev_info dev_info;
> +
> +	rte_event_dev_info_get(dev_id, &dev_info);
> +	return (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_MAINTENANCE_FREE) ?
> +			true : false;

return dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_MAINTENANCE_FREE;

will work fine.

> +}
> +
>   static inline int
>   evt_service_setup(uint32_t service_id)
>   {
> diff --git a/app/test-eventdev/meson.build b/app/test-eventdev/meson.build
> index ab8769c755..db5add39eb 100644
> --- a/app/test-eventdev/meson.build
> +++ b/app/test-eventdev/meson.build
> @@ -15,6 +15,7 @@ sources = files(
>           'test_order_atq.c',
>           'test_order_common.c',
>           'test_order_queue.c',
> +        'test_atomic_queue.c',
>           'test_perf_atq.c',
>           'test_perf_common.c',
>           'test_perf_queue.c',
> diff --git a/app/test-eventdev/test_atomic_queue.c b/app/test-eventdev/test_atomic_queue.c
> new file mode 100644
> index 0000000000..02aec95d59
> --- /dev/null
> +++ b/app/test-eventdev/test_atomic_queue.c
> @@ -0,0 +1,569 @@
> +#include <stdio.h>
> +#include <unistd.h>
> +#include <rte_lcore_var.h>
> +
> +#include "test_order_common.h"
> +
> +#define NB_QUEUES 2
> +
> +rte_spinlock_t *atomic_locks;

Should be static.

> +
> +struct port_stat_counters {
> +	uint32_t num_locked[NB_QUEUES];
> +	uint32_t num_unlocked[NB_QUEUES];
> +	uint64_t *num_pkts;
> +};
> +
> +static RTE_LCORE_VAR_HANDLE(struct port_stat_counters, port_counters);
> +
> +static inline int
> +get_num_pkts_index(int queue, uint32_t flow, uint32_t nb_flows)
> +{
> +	return (queue * nb_flows) + flow;
> +}
> +
> +static inline uint32_t
> +get_lock_idx(int queue, uint32_t nb_ports, uint32_t nb_flows, uint32_t port, flow_id_t flow)
> +{
> +	return (queue * nb_ports * nb_flows) + (port * nb_flows) + flow;
> +}
> +
> +static inline int
> +atomic_producer(void *arg)
> +{
> +	struct prod_data *p = arg;
> +	struct test_order *t = p->t;
> +	struct evt_options *opt = t->opt;
> +	const uint8_t dev_id = p->dev_id;
> +	const uint8_t port = p->port_id;
> +	struct rte_mempool *pool = t->pool;
> +	const uint64_t nb_pkts = t->nb_pkts;
> +	uint32_t *producer_flow_seq = t->producer_flow_seq;
> +	const uint32_t nb_flows = t->nb_flows;
> +	uint64_t count = 0;
> +	struct rte_mbuf *m;
> +	struct rte_event ev;
> +
> +	if (opt->verbose_level > 1)
> +		printf("%s(): lcore %d dev_id %d port=%d queue=%d\n", __func__, rte_lcore_id(),
> +				dev_id, port, p->queue_id);
> +
> +	ev.event = 0;
> +	ev.op = RTE_EVENT_OP_NEW;
> +	ev.queue_id = p->queue_id;
> +	ev.sched_type = RTE_SCHED_TYPE_ORDERED;
> +	ev.priority = RTE_EVENT_DEV_PRIORITY_NORMAL;
> +	ev.event_type = RTE_EVENT_TYPE_CPU;
> +	ev.sub_event_type = 0; /* stage 0 */

A detail, but

ev = (struct rte_event) {
	.event = 0,
	/../
};

looks better imo.

> +
> +	while (count < nb_pkts && t->err == false) {
> +		m = rte_pktmbuf_alloc(pool);
> +		if (m == NULL)
> +			continue;

Isn't the pool sized to be able to satisfy all allocations (for all 
in-flight events [+mbuf caches] in the test)? So an allocation failure 
is an error.

> +
> +		const flow_id_t flow = (uintptr_t)m % nb_flows;

What is this? If mbuf is cache line aligned, a bunch of the lowest bits 
will be zero, so flow will often (always?) be zero.

If a random number if needed, use rte_rand_max(nb_flows).

> +		/* Maintain seq number per flow */
> +		*order_mbuf_seqn(t, m) = producer_flow_seq[flow]++;
> +		order_flow_id_save(t, flow, m, &ev);
> +
> +		while (rte_event_enqueue_burst(dev_id, port, &ev, 1) != 1) {
> +			if (t->err)
> +				break;
> +			rte_pause();
> +		}

You could have a "max_burst" parameter that could be > 1 for 
RTE_EVENT_DEV_CAP_BURST_MODE and 1 otherwise. To test atomicity, it may 
be a good ide to ship of the events as fast as possible.

> +
> +		count++;
> +	}
> +
> +	if (p->maintain) {
> +		while (!t->err && t->result != EVT_TEST_SUCCESS) {

Maybe break this check out to a separate function, so it's clear to the 
reader what's going on here. This is a check if the test has terminated, 
right?

> +			rte_event_maintain(dev_id, port, RTE_EVENT_DEV_MAINT_OP_FLUSH);
> +			rte_pause();
> +		}
> +	}
> +
> +	return 0;
> +}
> +
> +static __rte_always_inline void
> +atomic_process_stage_0(struct test_order *const t, struct rte_event *const ev,
> +		const uint32_t nb_flows, const uint32_t nb_ports, uint32_t port)
> +{
> +	ev->queue_id = 1;
> +	ev->op = RTE_EVENT_OP_FORWARD;
> +	ev->sched_type = RTE_SCHED_TYPE_ATOMIC;
> +	ev->event_type = RTE_EVENT_TYPE_CPU;
> +
> +	struct port_stat_counters *counters = RTE_LCORE_VAR(port_counters);
> +	const uint32_t flow = (uintptr_t)ev->mbuf % nb_flows;

Put the flow id in the mbuf udata or somethere else in the mbuf.

Another way is to drop the use of mbufs altogheter, and just use the 
64-bit u64 field in rte_event to store the flow_id. Do the use of mbufs 
add anything to this test?

> +
> +	/* Checks if any other port has taken the lock before for the given flow */
> +	for (uint32_t i = 0; i < nb_ports; i++) {
> +		if (i != port) {
> +			if (rte_spinlock_is_locked(&atomic_locks[get_lock_idx(
> +					    0, nb_ports, nb_flows, i, flow)])) {
> +				evt_err("processing locked flow %u, port %u", flow, i);
> +				t->err = true;
> +			}
> +		} else {
> +			if (rte_spinlock_trylock(&atomic_locks[get_lock_idx(
> +					    0, nb_ports, nb_flows, port, flow)])) {
> +				counters->num_locked[0]++;
> +			}
> +			counters->num_pkts[get_num_pkts_index(0, flow, nb_flows)]++;
> +		}
> +	}
> +}
> +
> +static __rte_always_inline void
> +atomic_process_stage_1(struct test_order *const t, struct rte_event *const ev,
> +		const uint32_t nb_flows, const uint32_t nb_ports, uint32_t *const expected_flow_seq,
> +		RTE_ATOMIC(uint64_t) *const outstand_pkts, uint32_t port)
> +{
> +	const uint32_t flow = (uintptr_t)ev->mbuf % nb_flows;
> +	/* compare the seqn against expected value */
> +	if (*order_mbuf_seqn(t, ev->mbuf) != expected_flow_seq[flow]) {
> +		evt_err("flow=%x seqn mismatch got=%x expected=%x", flow,
> +				*order_mbuf_seqn(t, ev->mbuf), expected_flow_seq[flow]);
> +		t->err = true;
> +	}
> +
> +	struct port_stat_counters *counters = RTE_LCORE_VAR(port_counters);
> +
> +	/* Checks if any other port has taken the lock before for the given flow */

Wouldn't it suffice to just have a lock per flow, and then see if that 
is locked? In other words, why is a lock per port and flow needed, as 
oppose of just one lock per flow?

> +	for (uint32_t i = 0; i < nb_ports; i++) {
> +		if (i != port) {
> +			if (rte_spinlock_is_locked(&atomic_locks[get_lock_idx(
> +					    1, nb_ports, nb_flows, i, flow)])) {
> +				evt_err("processing locked flow %u, port %u", flow, i);
> +				t->err = true;
> +			}
> +		} else {
> +			if (rte_spinlock_trylock(&atomic_locks[get_lock_idx(
> +					    1, nb_ports, nb_flows, port, flow)])) {
> +				counters->num_locked[1]++;
> +			}
> +			counters->num_pkts[get_num_pkts_index(1, flow, nb_flows)]++;
> +		}
> +	}
> +
> +	expected_flow_seq[flow]++;
> +	rte_pktmbuf_free(ev->mbuf);
> +	rte_atomic_fetch_sub_explicit(outstand_pkts, 1, rte_memory_order_relaxed);
> +}
> +
> +static int
> +atomic_queue_worker(void *arg, const bool flow_id_cap)

Add a "max_burst" parameter to the atomic_queue_worker_burst() function, 
and eliminate this function.

> +{
> +	ORDER_WORKER_INIT;
> +	struct rte_event ev;
> +
> +	const uint8_t nb_workers = evt_nr_active_lcores(opt->wlcores);
> +	const uint8_t nb_ports = nb_workers + 1;
> +
> +	while (t->err == false) {
> +		uint16_t event = rte_event_dequeue_burst(dev_id, port, &ev, 1, 0);
> +		if (!event) {
> +			if (rte_atomic_load_explicit(outstand_pkts, rte_memory_order_relaxed) <= 0)
> +				break;
> +			rte_pause();
> +			continue;
> +		}
> +
> +		if (!flow_id_cap)
> +			order_flow_id_copy_from_mbuf(t, &ev);
> +
> +		struct port_stat_counters *counters = RTE_LCORE_VAR(port_counters);
> +		const uint32_t flow = (uintptr_t)ev.mbuf % nb_flows;
> +
> +		if (ev.queue_id == 0) { /* from ordered queue */
> +			atomic_process_stage_0(t, &ev, nb_flows, nb_ports, port);
> +			while (rte_event_enqueue_burst(dev_id, port, &ev, 1) != 1) {
> +				rte_pause();
> +			}
> +			uint64_t pkts_left = --counters->num_pkts[get_num_pkts_index(0, flow, nb_flows)];
> +
> +			if (pkts_left == 0) {
> +				const uint32_t lock_idx =
> +						get_lock_idx(0, nb_ports, nb_flows, port, flow);
> +				if (rte_spinlock_is_locked(&atomic_locks[lock_idx])) {
> +					rte_spinlock_unlock(&atomic_locks[lock_idx]);
> +					counters->num_unlocked[0]++;
> +				}
> +			}
> +		} else if (ev.queue_id == 1) {
> +			atomic_process_stage_1(t, &ev, nb_flows, nb_ports, expected_flow_seq,
> +					outstand_pkts, port);
> +			ev.op = RTE_EVENT_OP_RELEASE;
> +
> +			uint64_t pkts_left = --counters->num_pkts[get_num_pkts_index(1, flow, nb_flows)];
> +
> +			if (pkts_left == 0) {
> +				const uint32_t lock_idx =
> +						get_lock_idx(1, nb_ports, nb_flows, port, flow);
> +				if (rte_spinlock_is_locked(&atomic_locks[lock_idx])) {
> +					rte_spinlock_unlock(&atomic_locks[lock_idx]);
> +					counters->num_unlocked[1]++;
> +				}
> +			}
> +		} else {
> +			order_process_stage_invalid(t, &ev);
> +		}
> +	}
> +
> +	for (uint32_t i = 0; i < nb_flows; i++) {
> +		rte_spinlock_unlock(&atomic_locks[(port * nb_flows) + i]);
> +	}
> +
> +	return 0;
> +}
> +
> +static int
> +atomic_queue_worker_burst(void *arg, const bool flow_id_cap)
> +{
> +	ORDER_WORKER_INIT;
> +	struct rte_event ev[BURST_SIZE];
> +	uint16_t i;
> +
> +	const uint8_t nb_workers = evt_nr_active_lcores(opt->wlcores);
> +	const uint8_t nb_ports = nb_workers + 1;
> +
> +	while (t->err == false) {
> +
> +		for (uint32_t i = 0; i < nb_flows; i++) {
> +			rte_spinlock_unlock(&atomic_locks[(port * nb_flows) + i]);
> +		}

This shouldn't be spinlock_init()?

> +
> +		uint16_t const nb_rx = rte_event_dequeue_burst(dev_id, port, ev, BURST_SIZE, 0);
> +
> +		if (nb_rx == 0) {
> +			if (rte_atomic_load_explicit(outstand_pkts, rte_memory_order_relaxed) <=
> +					0) {
> +				break;
> +			}
> +			rte_pause();
> +			continue;
> +		}
> +
> +		for (i = 0; i < nb_rx; i++) {
> +
> +			if (!flow_id_cap)
> +				order_flow_id_copy_from_mbuf(t, &ev[i]);
> +
> +			if (ev[i].queue_id == 0) {

Use a switch here.

> +				atomic_process_stage_0(t, &ev[i], nb_flows, nb_ports, port);

Should the ev[i].op be set to FORWARD here? What op type comes out of 
the event device (on dequeue) is unfortunately undefined.

> +			} else if (ev[i].queue_id == 1) {
> +				atomic_process_stage_1(t, &ev[i], nb_flows, nb_ports,
> +						expected_flow_seq, outstand_pkts, port);
> +				ev[i].op = RTE_EVENT_OP_RELEASE;
> +			} else {
> +				order_process_stage_invalid(t, &ev[i]);
> +			}
> +		}
> +
> +		uint16_t total_enq = 0;
> +
> +		do {
> +			uint16_t enq = rte_event_enqueue_burst(
> +					dev_id, port, ev + total_enq, nb_rx - total_enq);
> +
> +			for (uint16_t i = total_enq; i < total_enq + enq; i++) {
> +				struct port_stat_counters *counters = RTE_LCORE_VAR(port_counters);
> +				const uint32_t flow = (uintptr_t)ev[i].mbuf % nb_flows;

In addition to my above comments: extracting flow id from an mbuf (or an 
rte_event) should be abstracted in a function, to avoid repeating yourself.

> +
> +				if (ev[i].op == RTE_EVENT_OP_FORWARD) {

Use a switch on the op field.

> +					counters->num_pkts[get_num_pkts_index(0, flow, nb_flows)]--;

Simplify by introducing a variable.

> +
> +					if (counters->num_pkts[get_num_pkts_index(
> +							    0, flow, nb_flows)] == 0) {

...and this.

> +						const uint32_t lock_idx = get_lock_idx(
> +								0, nb_ports, nb_flows, port, flow);
> +						if (rte_spinlock_is_locked(
> +								    &atomic_locks[lock_idx])) {
> +							rte_spinlock_unlock(
> +									&atomic_locks[lock_idx]);
> +							counters->num_unlocked[0]++;

Could this counter have a better name? It's not supposed to be locked, 
right? So an atomicity violation.

Also, in case it is locked, should one really attempt to unlock it? 
Wouldn't it be unlocked twice in tht case?

> +						}
> +					}
> +				}
> +
> +				if (ev[i].op == RTE_EVENT_OP_RELEASE) {

Considering breaking out the snippets to deal with FORWARD and RELEASE 
into separate functions.

> +					counters->num_pkts[get_num_pkts_index(1, flow, nb_flows)]--;
> +
> +					if (counters->num_pkts[get_num_pkts_index(
> +							    1, flow, nb_flows)] == 0) {
> +						const uint32_t lock_idx = get_lock_idx(
> +								1, nb_ports, nb_flows, port, flow);
> +						if (rte_spinlock_is_locked(
> +								    &atomic_locks[lock_idx])) {
> +							rte_spinlock_unlock(
> +									&atomic_locks[lock_idx]);
> +							counters->num_unlocked[1]++;
> +						}
> +					}
> +				}
> +			}
> +
> +			total_enq += enq;
> +		} while (total_enq < nb_rx);
> +	}
> +
> +	for (uint32_t i = 0; i < nb_flows; i++) {
> +		rte_spinlock_unlock(&atomic_locks[(port * nb_flows) + i]);
> +	}
> +
> +	return 0;
> +}
> +
> +static int
> +worker_wrapper(void *arg)

Delete "wrapper".

> +{
> +	struct worker_data *w = arg;
> +	const bool burst = evt_has_burst_mode(w->dev_id);
> +	const bool flow_id_cap = evt_has_flow_id(w->dev_id);
> +
> +
> +	if (burst) {
> +		if (flow_id_cap)
> +			return atomic_queue_worker_burst(arg, true);
> +		else
> +			return atomic_queue_worker_burst(arg, false);

Just pass the flow_id_cap as an argument.

> +	} else {
> +		if (flow_id_cap)
> +			return atomic_queue_worker(arg, true);
> +		else
> +			return atomic_queue_worker(arg, false);
> +	}
> +}
> +
> +static int
> +atomic_queue_launch_lcores(struct evt_test *test, struct evt_options *opt)
> +{
> +	int ret, lcore_id;
> +	struct test_order *t = evt_test_priv(test);
> +
> +	RTE_LCORE_VAR_ALLOC(port_counters);
> +
> +	struct port_stat_counters *counters;
> +	RTE_LCORE_VAR_FOREACH(lcore_id, counters, port_counters)
> +	{
> +		for (int i = 0; i < NB_QUEUES; i++) {
> +			counters->num_locked[i] = 0;
> +			counters->num_unlocked[i] = 0;
> +		}
> +
> +		counters->num_pkts =
> +				rte_calloc(NULL, opt->nb_flows * NB_QUEUES, sizeof(uint64_t), 0);
> +	}
> +
> +	int wkr_idx = 0;
> +	/* launch workers */
> +	RTE_LCORE_FOREACH_WORKER(lcore_id) {
> +		if (!(opt->wlcores[lcore_id]))
> +			continue;
> +
> +		ret = rte_eal_remote_launch(worker_wrapper, &t->worker[wkr_idx], lcore_id);
> +		if (ret) {
> +			evt_err("failed to launch worker %d", lcore_id);
> +			return ret;
> +		}
> +		wkr_idx++;
> +	}
> +
> +	/* launch producer */
> +	int plcore = evt_get_first_active_lcore(opt->plcores);
> +
> +	ret = rte_eal_remote_launch(atomic_producer, &t->prod, plcore);
> +	if (ret) {
> +		evt_err("failed to launch order_producer %d", plcore);
> +		return ret;
> +	}
> +
> +	uint64_t cycles = rte_get_timer_cycles();
> +	int64_t old_remaining = -1;
> +
> +	while (t->err == false) {
> +		uint64_t new_cycles = rte_get_timer_cycles();
> +		int64_t remaining = rte_atomic_load_explicit(
> +				&t->outstand_pkts, rte_memory_order_relaxed);
> +
> +		if (remaining <= 0) {
> +			t->result = EVT_TEST_SUCCESS;
> +			break;
> +		}
> +
> +		if (new_cycles - cycles > rte_get_timer_hz() * 1) {
> +			printf(CLGRN "\r%" PRId64 "" CLNRM, remaining);
> +			fflush(stdout);
> +			if (old_remaining == remaining) {
> +				rte_event_dev_dump(opt->dev_id, stdout);
> +				evt_err("No schedules for seconds, deadlock");
> +				t->err = true;
> +				break;
> +			}
> +			old_remaining = remaining;
> +			cycles = new_cycles;
> +		}
> +	}
> +	printf("\r");
> +
> +	RTE_LCORE_VAR_FOREACH(lcore_id, counters, port_counters)
> +	{
> +		if (!(opt->wlcores[lcore_id]))
> +			continue;
> +		for (int i = 0; i < NB_QUEUES; i++) {
> +			if (counters->num_locked[i] != counters->num_unlocked[i]) {
> +				evt_err("Number of locks (%u) does not number of unlocks (%u) for "
> +					"core %u, queue %i",
> +						counters->num_locked[i], counters->num_unlocked[i],
> +						lcore_id, i);
> +				t->err = true;
> +			}
> +		}
> +
> +		for (uint32_t i = 0; i < opt->nb_flows; i++) {
> +			for (int j = 0; j < NB_QUEUES; j++) {
> +				if (counters->num_pkts[get_num_pkts_index(j, i, opt->nb_flows)] !=
> +						0) {
> +					evt_err("Packets left %lu in flow %u, queue %i for core %u",
> +							counters->num_pkts[get_num_pkts_index(
> +									j, i, opt->nb_flows)],
> +							i, j, lcore_id);
> +					t->err = true;
> +				}
> +			}
> +		}
> +	}
> +
> +	const uint8_t nb_workers = evt_nr_active_lcores(opt->wlcores);
> +	const uint8_t nb_ports = nb_workers + 1;
> +
> +	const uint32_t num_locks = NB_QUEUES * nb_ports * opt->nb_flows;
> +	for (uint32_t i = 0; i < num_locks; i++) {
> +		rte_spinlock_unlock(&atomic_locks[i]);
> +	}
> +
> +	rte_free(atomic_locks);
> +
> +	return 0;
> +}
> +
> +static int
> +atomic_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
> +{
> +	int ret;
> +
> +	const uint8_t nb_workers = evt_nr_active_lcores(opt->wlcores);
> +	/* number of active worker cores + 1 producer */
> +	const uint8_t nb_ports = nb_workers + 1;
> +
> +	ret = evt_configure_eventdev(opt, NB_QUEUES, nb_ports);
> +	if (ret) {
> +		evt_err("failed to configure eventdev %d", opt->dev_id);
> +		return ret;
> +	}
> +
> +	/* q0 configuration */

Would it help readability if the queues (stages) had names? Like 
"process" and "verify" or whatever is going on on the different stages.

> +	struct rte_event_queue_conf q0_ordered_conf = {
> +			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
> +			.schedule_type = RTE_SCHED_TYPE_ATOMIC,
> +			.nb_atomic_flows = opt->nb_flows,
> +			.nb_atomic_order_sequences = opt->nb_flows,
> +	};
> +	ret = rte_event_queue_setup(opt->dev_id, 0, &q0_ordered_conf);
> +	if (ret) {
> +		evt_err("failed to setup queue0 eventdev %d err %d", opt->dev_id, ret);
> +		return ret;
> +	}
> +
> +	/* q1 configuration */
> +	struct rte_event_queue_conf q1_ordered_conf = {
> +			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
> +			.schedule_type = RTE_SCHED_TYPE_ATOMIC,
> +			.nb_atomic_flows = opt->nb_flows,
> +			.nb_atomic_order_sequences = opt->nb_flows,
> +	};
> +	ret = rte_event_queue_setup(opt->dev_id, 1, &q1_ordered_conf);

Queue 1 is not an ORDERED queue.

> +	if (ret) {
> +		evt_err("failed to setup queue0 eventdev %d err %d", opt->dev_id, ret);
> +		return ret;
> +	}
> +
> +	/* setup one port per worker, linking to all queues */
> +	ret = order_event_dev_port_setup(test, opt, nb_workers, NB_QUEUES);

"order"?

> +	if (ret)
> +		return ret;
> +
> +	if (!evt_has_distributed_sched(opt->dev_id)) {
> +		uint32_t service_id;
> +		rte_event_dev_service_id_get(opt->dev_id, &service_id);
> +		ret = evt_service_setup(service_id);
> +		if (ret) {
> +			evt_err("No service lcore found to run event dev.");
> +			return ret;
> +		}
> +	}
> +
> +	// DSW does not have RTE_EVENT_DEV_CAP_MAINTENANCE_FREE, so the producer needs to
> +	// call rte_event_maintain on the port whnever not sending.

Use /* */ for comments.

I'm not sure this comment is needed. This is a part of the Eventdev API 
contract.

> +	if (!evt_has_maintenance_free(opt->dev_id)) {
> +		struct test_order *t = evt_test_priv(test);
> +		t->prod.maintain = true;

The producer could figure this out for himself, using the same function. 
It has the dev id already.

> +	}
> +
> +	ret = rte_event_dev_start(opt->dev_id);
> +	if (ret) {
> +		evt_err("failed to start eventdev %d", opt->dev_id);
> +		return ret;
> +	}
> +
> +	const uint32_t num_locks = NB_QUEUES * nb_ports * opt->nb_flows;
> +
> +	atomic_locks = rte_calloc(NULL, num_locks, sizeof(rte_spinlock_t), 0);
> +
> +	for (uint32_t i = 0; i < num_locks; i++) {
> +		rte_spinlock_init(&atomic_locks[i]);
> +	}
> +
> +	return 0;
> +}
> +
> +static void
> +atomic_queue_opt_dump(struct evt_options *opt)
> +{
> +	order_opt_dump(opt);

"order"?

> +	evt_dump("nb_evdev_queues", "%d", NB_QUEUES);
> +}
> +
> +static bool
> +atomic_queue_capability_check(struct evt_options *opt)
> +{
> +	struct rte_event_dev_info dev_info;
> +
> +	rte_event_dev_info_get(opt->dev_id, &dev_info);
> +	if (dev_info.max_event_queues < NB_QUEUES ||
> +			dev_info.max_event_ports < order_nb_event_ports(opt)) {
> +		evt_err("not enough eventdev queues=%d/%d or ports=%d/%d", NB_QUEUES,
> +				dev_info.max_event_queues, order_nb_event_ports(opt),
> +				dev_info.max_event_ports);
> +		return false;
> +	}
> +
> +	return true;
> +}
> +
> +static const struct evt_test_ops atomic_queue = {
> +		.cap_check = atomic_queue_capability_check,
> +		.opt_check = order_opt_check,
> +		.opt_dump = atomic_queue_opt_dump,
> +		.test_setup = order_test_setup,
> +		.mempool_setup = order_mempool_setup,
> +		.eventdev_setup = atomic_queue_eventdev_setup,
> +		.launch_lcores = atomic_queue_launch_lcores,
> +		.eventdev_destroy = order_eventdev_destroy,
> +		.mempool_destroy = order_mempool_destroy,
> +		.test_result = order_test_result,
> +		.test_destroy = order_test_destroy,
> +};
> +
> +EVT_TEST_REGISTER(atomic_queue);
> diff --git a/app/test-eventdev/test_order_common.h b/app/test-eventdev/test_order_common.h
> index 7177fd8e9a..6d48e7630a 100644
> --- a/app/test-eventdev/test_order_common.h
> +++ b/app/test-eventdev/test_order_common.h
> @@ -37,6 +37,7 @@ struct prod_data {
>   	uint8_t port_id;
>   	uint8_t queue_id;
>   	struct test_order *t;
> +	bool maintain;
>   };
>   
>   struct __rte_cache_aligned test_order {


^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [RFC] eventdev: add atomic queue to test-eventdev app
  2024-12-10 10:37 ` Mattias Rönnblom
@ 2024-12-16  9:14   ` Luka Jankovic
  2024-12-19 13:24   ` Luka Jankovic
  1 sibling, 0 replies; 4+ messages in thread
From: Luka Jankovic @ 2024-12-16  9:14 UTC (permalink / raw)
  To: hofors, dev; +Cc: Mattias Rönnblom

Thank you for the feedback. I will re-implement the test by not checking port-flow-queue combination and generally clean-up the code based on your comments.

On Tue, 2024-12-10 at 11:37 +0100, Mattias Rönnblom wrote:
> 
> > +{
> > +       struct rte_event_dev_info dev_info;
> > +
> > +       rte_event_dev_info_get(dev_id, &dev_info);
> > +       return (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_MAINTENANCE_FREE) ?
> > +                       true : false;
> 
> return dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_MAINTENANCE_FREE;
> 
> will work fine.
> 
> 
I decided against it in order to maintain consistent styling with similar functions in the file.

> 
> > +static int
> > +worker_wrapper(void *arg)
> 
> Delete "wrapper".

All other eventdev-tests name their equivalent functions "worker_wrapper", so I picked it to be consistent with the other tests.

> 
> > +
> > +       /* setup one port per worker, linking to all queues */
> > +       ret = order_event_dev_port_setup(test, opt, nb_workers, NB_QUEUES);
> 
> "order"?

This function is declared in test_order_common.h and is used in all tests. It is not specific for "ordered" ports, so I thought it was OK to use.


> > +
> > +static void
> > +atomic_queue_opt_dump(struct evt_options *opt)
> > +{
> > +       order_opt_dump(opt);
> 
> "order"?

Same thing here.


^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [RFC] eventdev: add atomic queue to test-eventdev app
  2024-12-10 10:37 ` Mattias Rönnblom
  2024-12-16  9:14   ` Luka Jankovic
@ 2024-12-19 13:24   ` Luka Jankovic
  1 sibling, 0 replies; 4+ messages in thread
From: Luka Jankovic @ 2024-12-19 13:24 UTC (permalink / raw)
  To: hofors, dev; +Cc: Mattias Rönnblom

On Tue, 2024-12-10 at 11:37 +0100, Mattias Rönnblom wrote:
>
> > +{
> > +       struct rte_event_dev_info dev_info;
> > +
> > +       rte_event_dev_info_get(dev_id, &dev_info);
> > +       return (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_MAINTENANCE_FREE) ?
> > +                       true : false;
>
> return dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_MAINTENANCE_FREE;
>
> will work fine.
>
>
I decided against it in order to maintain consistent styling with similar functions in the file.

>
> > +static int
> > +worker_wrapper(void *arg)
>
> Delete "wrapper".

All other eventdev-tests name their equivalent functions "worker_wrapper", so I picked it to be consistent with the other tests.

>
> > +
> > +       /* setup one port per worker, linking to all queues */
> > +       ret = order_event_dev_port_setup(test, opt, nb_workers, NB_QUEUES);
>
> "order"?

This function is declared in test_order_common.h and is used in all tests. It is not specific for "ordered" ports, so I thought it was OK to use.


> > +
> > +static void
> > +atomic_queue_opt_dump(struct evt_options *opt)
> > +{
> > +       order_opt_dump(opt);
>
> "order"?

Same thing here.

Thank you for the feedback. I will re-implement the test by not checking port-flow-queue combination and generally clean-up the code based on your comments.


^ permalink raw reply	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2024-12-19 13:24 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2024-12-05 13:29 [RFC] eventdev: add atomic queue to test-eventdev app Luka Jankovic
2024-12-10 10:37 ` Mattias Rönnblom
2024-12-16  9:14   ` Luka Jankovic
2024-12-19 13:24   ` Luka Jankovic

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).