DPDK patches and discussions
 help / color / mirror / Atom feed
* [RFC PATCH] ring: adding TPAUSE instruction to ring dequeue
@ 2023-05-03 11:38 David Coyle
  2023-05-03 13:32 ` Morten Brørup
  0 siblings, 1 reply; 8+ messages in thread
From: David Coyle @ 2023-05-03 11:38 UTC (permalink / raw)
  To: dev; +Cc: honnappa.nagarahalli, konstantin.v.ananyev, rory.sexton, David Coyle

This is NOT for upstreaming. This is being submitted to allow early
comparison testing with the preferred solution, which will add TAPUSE
power management support to the ring library through the addition of
callbacks. Initial stages of the preferred solution are available at
http://dpdk.org/patch/125454.

This patch adds functionality directly to rte_ring_dequeue functions to
monitor the empty reads of the ring. When a configurable number of
empty reads is reached, a TPAUSE instruction is triggered by using
rte_power_pause() on supported architectures. rte_pause() is used on
other architectures. The functionality can be included or excluded at
compilation time using the RTE_RING_PMGMT flag. If included, the new
API can be used to enable/disable the feature on a per-ring basis.
Other related settings can also be configured using the API.

Signed-off-by: David Coyle <david.coyle@intel.com>
---
 lib/ring/meson.build      |   4 +-
 lib/ring/rte_ring.h       |  13 +-
 lib/ring/rte_ring_pmgmt.c | 367 ++++++++++++++++++++++++++++++++++++++
 lib/ring/rte_ring_pmgmt.h | 149 ++++++++++++++++
 lib/ring/version.map      |  12 ++
 5 files changed, 539 insertions(+), 6 deletions(-)
 create mode 100644 lib/ring/rte_ring_pmgmt.c
 create mode 100644 lib/ring/rte_ring_pmgmt.h

diff --git a/lib/ring/meson.build b/lib/ring/meson.build
index c20685c689..087028542c 100644
--- a/lib/ring/meson.build
+++ b/lib/ring/meson.build
@@ -1,8 +1,8 @@
 # SPDX-License-Identifier: BSD-3-Clause
 # Copyright(c) 2017 Intel Corporation
 
-sources = files('rte_ring.c')
-headers = files('rte_ring.h')
+sources = files('rte_ring.c', 'rte_ring_pmgmt.c')
+headers = files('rte_ring.h', 'rte_ring_pmgmt.h')
 # most sub-headers are not for direct inclusion
 indirect_headers += files (
         'rte_ring_core.h',
diff --git a/lib/ring/rte_ring.h b/lib/ring/rte_ring.h
index 7e4cd60650..64dbb5bca2 100644
--- a/lib/ring/rte_ring.h
+++ b/lib/ring/rte_ring.h
@@ -41,6 +41,7 @@ extern "C" {
 
 #include <rte_ring_core.h>
 #include <rte_ring_elem.h>
+#include <rte_ring_pmgmt.h>
 
 /**
  * Calculate the memory size needed for a ring
@@ -412,8 +413,10 @@ static __rte_always_inline unsigned int
 rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
 		unsigned int *available)
 {
-	return rte_ring_dequeue_bulk_elem(r, obj_table, sizeof(void *),
-			n, available);
+	uint32_t nb_rx = rte_ring_dequeue_bulk_elem(r, obj_table, sizeof(void *),
+						    n, available);
+	RTE_RING_PMGMT_IMPL(nb_rx, r->name);
+	return nb_rx;
 }
 
 /**
@@ -812,8 +815,10 @@ static __rte_always_inline unsigned int
 rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
-	return rte_ring_dequeue_burst_elem(r, obj_table, sizeof(void *),
-			n, available);
+	uint32_t nb_rx = rte_ring_dequeue_burst_elem(r, obj_table, sizeof(void *),
+						     n, available);
+	RTE_RING_PMGMT_IMPL(nb_rx, r->name);
+	return nb_rx;
 }
 
 #ifdef __cplusplus
diff --git a/lib/ring/rte_ring_pmgmt.c b/lib/ring/rte_ring_pmgmt.c
new file mode 100644
index 0000000000..25d64736d0
--- /dev/null
+++ b/lib/ring/rte_ring_pmgmt.c
@@ -0,0 +1,367 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+#include <rte_lcore.h>
+#include <rte_cycles.h>
+#include <rte_cpuflags.h>
+#include <rte_malloc.h>
+#include <rte_power_intrinsics.h>
+#include <rte_string_fns.h>
+
+#include <rte_ring.h>
+#include "rte_ring_pmgmt.h"
+
+static unsigned int emptypoll_max;
+static unsigned int pause_duration;
+
+/* store some internal state */
+static struct ring_conf_data {
+	/** what do we support? */
+	struct rte_cpu_intrinsics intrinsics_support;
+	/** pre-calculated tsc diff for 1us */
+	uint64_t tsc_per_us;
+	/** how many rte_pause can we fit in a microsecond? */
+	uint64_t pause_per_us;
+} global_data;
+
+/**
+ * Possible power management states of a rte ring.
+ */
+enum ring_mgmt_state {
+	/** Device power management is disabled. */
+	RING_MGMT_DISABLED = 0,
+	/** Device power management is enabled. */
+	RING_MGMT_ENABLED
+};
+
+struct ring {
+	char name[RTE_RING_NAMESIZE] __rte_cache_aligned;
+};
+
+struct ring_list_entry {
+	TAILQ_ENTRY(ring_list_entry) next;
+	struct ring ring;
+	uint64_t n_empty_polls;
+	uint64_t n_sleeps;
+};
+
+struct ring_core_cfg {
+	TAILQ_HEAD(ring_list_head, ring_list_entry) head;
+	/**< List of rings associated with this lcore */
+	size_t n_rings;
+	/**< How many rings are in the list? */
+	volatile enum ring_mgmt_state pwr_mgmt_state;
+	/**< State of power management for this ring */
+	uint64_t n_rings_ready_to_sleep;
+	/**< Number of rings ready to enter power optimized state */
+	uint64_t sleep_target;
+	/**< Prevent a ring from triggering sleep multiple times */
+} __rte_cache_aligned;
+static struct ring_core_cfg lcore_cfgs[RTE_MAX_LCORE];
+
+static struct ring_list_entry *
+ring_list_find(const struct ring_core_cfg *cfg, struct ring *r)
+{
+	struct ring_list_entry *cur;
+
+	TAILQ_FOREACH(cur, &cfg->head, next) {
+		if (strcmp(cur->ring.name, r->name) == 0)
+			return cur;
+	}
+	return NULL;
+}
+
+static int
+ring_list_add(struct ring_core_cfg *cfg, struct ring *r)
+{
+	struct ring_list_entry *rle;
+
+	/* is it already in the list? */
+	if (ring_list_find(cfg, r) != NULL)
+		return -EEXIST;
+
+	rle = malloc(sizeof(*rle));
+	if (rle == NULL)
+		return -ENOMEM;
+	memset(rle, 0, sizeof(*rle));
+
+	rte_strscpy(rle->ring.name, r->name, sizeof(r->name));
+	TAILQ_INSERT_TAIL(&cfg->head, rle, next);
+	cfg->n_rings++;
+
+	return 0;
+}
+
+static struct ring_list_entry *
+ring_list_take(struct ring_core_cfg *cfg, struct ring *r)
+{
+	struct ring_list_entry *found;
+
+	found = ring_list_find(cfg, r);
+	if (found == NULL)
+		return NULL;
+
+	TAILQ_REMOVE(&cfg->head, found, next);
+	cfg->n_rings--;
+
+	/* freeing is responsibility of the caller */
+	return found;
+}
+
+static void
+calc_tsc(void)
+{
+	const uint64_t hz = rte_get_timer_hz();
+	const uint64_t tsc_per_us = hz / US_PER_S; /* 1us */
+
+	global_data.tsc_per_us = tsc_per_us;
+
+	/* only do this if we don't have tpause */
+	if (!global_data.intrinsics_support.power_pause) {
+		const uint64_t start = rte_rdtsc_precise();
+		const uint32_t n_pauses = 10000;
+		double us, us_per_pause;
+		uint64_t end;
+		unsigned int i;
+
+		/* estimate number of rte_pause() calls per us */
+		for (i = 0; i < n_pauses; i++)
+			rte_pause();
+
+		end = rte_rdtsc_precise();
+		us = (end - start) / (double)tsc_per_us;
+		us_per_pause = us / n_pauses;
+
+		global_data.pause_per_us = (uint64_t)(1.0 / us_per_pause);
+	}
+}
+
+static inline void
+ring_reset(struct ring_core_cfg *cfg, struct ring_list_entry *rcfg)
+{
+	const bool is_ready_to_sleep = rcfg->n_sleeps == cfg->sleep_target;
+
+	/* reset empty poll counter for this ring */
+	rcfg->n_empty_polls = 0;
+	/* reset the ring sleep counter as well */
+	rcfg->n_sleeps = 0;
+	/* remove the ring from list of rings ready to sleep */
+	if (is_ready_to_sleep)
+		cfg->n_rings_ready_to_sleep--;
+	/*
+	 * no need change the lcore sleep target counter because this lcore will
+	 * reach the n_sleeps anyway, and the other cores are already counted so
+	 * there's no need to do anything else.
+	 */
+}
+
+static inline bool
+ring_can_sleep(struct ring_core_cfg *cfg, struct ring_list_entry *rcfg)
+{
+	/* this function is called - that means we have an empty poll */
+	rcfg->n_empty_polls++;
+
+	/* if we haven't reached threshold for empty polls, we can't sleep */
+	if (rcfg->n_empty_polls <= emptypoll_max)
+		return false;
+
+	/*
+	 * we've reached a point where we are able to sleep, but we still need
+	 * to check if this ring has already been marked for sleeping.
+	 */
+	if (rcfg->n_sleeps == cfg->sleep_target)
+		return true;
+
+	/* mark this ring as ready for sleep */
+	rcfg->n_sleeps = cfg->sleep_target;
+	cfg->n_rings_ready_to_sleep++;
+
+	return true;
+}
+
+static inline bool
+lcore_can_sleep(struct ring_core_cfg *cfg)
+{
+	/* are all rings ready to sleep? */
+	if (cfg->n_rings_ready_to_sleep != cfg->n_rings)
+		return false;
+
+	/* we've reached an iteration where we can sleep, reset sleep counter */
+	cfg->n_rings_ready_to_sleep = 0;
+	cfg->sleep_target++;
+	/*
+	 * we do not reset any individual ring empty poll counters, because
+	 * we want to keep sleeping on every poll until we actually get traffic.
+	 */
+
+	return true;
+}
+
+static void
+pause_implement(uint32_t nb_rx, char *ring_name)
+{
+	const unsigned int lcore = rte_lcore_id();
+	struct ring_list_entry *rle;
+	struct ring_core_cfg *lcore_conf;
+	const bool empty = nb_rx == 0;
+	uint32_t pause_duration = rte_ring_pmgmt_get_pause_duration();
+	struct ring r;
+
+	lcore_conf = &lcore_cfgs[lcore];
+	rte_strscpy(r.name, ring_name, sizeof(ring_name));
+	rle = ring_list_find(lcore_conf, &r);
+	if (rle == NULL)
+		return;
+
+	if (likely(!empty)) {
+		/* early exit */
+		ring_reset(lcore_conf, rle);
+	} else {
+		/* can this ring sleep? */
+		if (!ring_can_sleep(lcore_conf, rle))
+			return;
+
+		/* can this lcore sleep? */
+		if (!lcore_can_sleep(lcore_conf))
+			return;
+
+		/* sleep for pause_duration microseconds, use tpause if we have it */
+		if (global_data.intrinsics_support.power_pause) {
+			const uint64_t cur = rte_rdtsc();
+			const uint64_t wait_tsc =
+					cur + global_data.tsc_per_us * pause_duration;
+			rte_power_pause(wait_tsc);
+		} else {
+			uint64_t i;
+			for (i = 0; i < global_data.pause_per_us * pause_duration; i++)
+				rte_pause();
+		}
+	}
+}
+
+int
+rte_ring_pmgmt_ring_enable(unsigned int lcore_id, char *ring_name)
+{
+	struct ring_core_cfg *lcore_conf;
+	struct ring r;
+	int ret;
+
+	if (lcore_id >= RTE_MAX_LCORE)
+		return -EINVAL;
+
+	lcore_conf = &lcore_cfgs[lcore_id];
+
+	/* we need this in various places */
+	rte_cpu_get_intrinsics_support(&global_data.intrinsics_support);
+
+	/* figure out various time-to-tsc conversions */
+	if (global_data.tsc_per_us == 0)
+		calc_tsc();
+
+	/* add this ring to the list */
+	rte_strscpy(r.name, ring_name, sizeof(ring_name));
+	ret = ring_list_add(lcore_conf, &r);
+	if (ret < 0) {
+		RTE_LOG(DEBUG, POWER, "Failed to add ring to list: %s\n",
+				strerror(-ret));
+		return ret;
+	}
+	/* new ring is always added last */
+	TAILQ_LAST(&lcore_conf->head, ring_list_head);
+
+	/* when enabling first ring, ensure sleep target is not 0 */
+	if (lcore_conf->n_rings == 1 && lcore_conf->sleep_target == 0)
+		lcore_conf->sleep_target = 1;
+
+	/* initialize data before enabling the callback */
+	if (lcore_conf->n_rings == 1)
+		lcore_conf->pwr_mgmt_state = RING_MGMT_ENABLED;
+
+	return 0;
+}
+
+int
+rte_ring_pmgmt_ring_disable(unsigned int lcore_id, char *ring_name)
+{
+	struct ring_list_entry *rle;
+	struct ring_core_cfg *lcore_conf;
+	struct ring r;
+
+	if (lcore_id >= RTE_MAX_LCORE)
+		return -EINVAL;
+
+	lcore_conf = &lcore_cfgs[lcore_id];
+
+	if (lcore_conf->pwr_mgmt_state != RING_MGMT_ENABLED)
+		return -EINVAL;
+
+	/*
+	 * There is no good/easy way to do this without race conditions, so we
+	 * are just going to throw our hands in the air and hope that the user
+	 * has read the documentation and has ensured that ring is not being
+	 * used at the time we enter the API functions.
+	 */
+	rte_strscpy(r.name, ring_name, sizeof(ring_name));
+	rle = ring_list_take(lcore_conf, &r);
+	if (rle == NULL)
+		return -ENOENT;
+
+	/* if we've removed all rings from the lists, set state to disabled */
+	if (lcore_conf->n_rings == 0)
+		lcore_conf->pwr_mgmt_state = RING_MGMT_DISABLED;
+
+	free(rle);
+
+	return 0;
+}
+
+void
+rte_ring_pmgmt_set_emptypoll_max(unsigned int max)
+{
+	emptypoll_max = max;
+}
+
+unsigned int
+rte_ring_pmgmt_get_emptypoll_max(void)
+{
+	return emptypoll_max;
+}
+
+int
+rte_ring_pmgmt_set_pause_duration(unsigned int duration)
+{
+	if (duration == 0) {
+		RTE_LOG(ERR, POWER, "Pause duration must be greater than 0, value unchanged");
+		return -EINVAL;
+	}
+	pause_duration = duration;
+
+	return 0;
+}
+
+unsigned int
+rte_ring_pmgmt_get_pause_duration(void)
+{
+	return pause_duration;
+}
+
+void
+rte_ring_pmgmt_impl(uint32_t nb_rx, char *ring_name)
+{
+	pause_implement(nb_rx, ring_name);
+}
+
+RTE_INIT(rte_ring_pmgmt_init) {
+	size_t i;
+
+	/* initialize all tailqs */
+	for (i = 0; i < RTE_DIM(lcore_cfgs); i++) {
+		struct ring_core_cfg *cfg = &lcore_cfgs[i];
+		TAILQ_INIT(&cfg->head);
+	}
+
+	/* initialize config defaults */
+	emptypoll_max = 512;
+	pause_duration = 1;
+}
diff --git a/lib/ring/rte_ring_pmgmt.h b/lib/ring/rte_ring_pmgmt.h
new file mode 100644
index 0000000000..faa0515e5b
--- /dev/null
+++ b/lib/ring/rte_ring_pmgmt.h
@@ -0,0 +1,149 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+#ifndef _RTE_RING_PMGMT_H
+#define _RTE_RING_PMGMT_H
+
+/**
+ * @file
+ * Ring Power Management
+ */
+
+#include <stdint.h>
+
+#include <rte_log.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/** Enable/disable ring power management */
+#define RTE_RING_PMGMT 1
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change, or be removed, without prior notice.
+ *
+ * Enable power management on a specified ring and lcore.
+ *
+ * @note This function is not thread-safe.
+ *
+ * @note Only pause mode is currently supported.
+ *
+ * @param lcore_id
+ *   The lcore the Rx queue will be polled from.
+ * @param ring_name
+ *   The name of the ring.
+ * @return
+ *   0 on success
+ *   <0 on error
+ */
+__rte_experimental
+int
+rte_ring_pmgmt_ring_enable(unsigned int lcore_id, char *ring_name);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change, or be removed, without prior notice.
+ *
+ * Disable power management on a specified ring and lcore.
+ *
+ * @note This function is not thread-safe.
+ *
+ * @param lcore_id
+ *   The lcore the Rx queue is polled from.
+ * @param ring_name
+ *   The name of the ring.
+ * @return
+ *   0 on success
+ *   <0 on error
+ */
+__rte_experimental
+int
+rte_ring_pmgmt_ring_disable(unsigned int lcore_id, char *ring_name);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change, or be removed, without prior notice.
+ *
+ * Set a emptypoll_max to specified value. Used to specify the number of empty
+ * polls to wait before invoking the power management.
+ *
+ * @param max
+ *   The value to set emptypoll_max to.
+ */
+__rte_experimental
+void
+rte_ring_pmgmt_set_emptypoll_max(unsigned int max);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change, or be removed, without prior notice.
+ *
+ * Get the current value of emptypoll_max.
+ *
+ * @return
+ *   The current emptypoll_max value
+ */
+__rte_experimental
+unsigned int
+rte_ring_pmgmt_get_emptypoll_max(void);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change, or be removed, without prior notice.
+ *
+ * Set the pause_duration. Used to adjust the pause mode callback duration.
+ *
+ * @note Duration must be greater than zero.
+ *
+ * @param duration
+ *   The value to set pause_duration to.
+ * @return
+ *   0 on success
+ *   <0 on error
+ */
+__rte_experimental
+int
+rte_ring_pmgmt_set_pause_duration(unsigned int duration);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change, or be removed, without prior notice.
+ *
+ * Get the current value of pause_duration.
+ *
+ * @return
+ *   The current pause_duration value.
+ */
+__rte_experimental
+unsigned int
+rte_ring_pmgmt_get_pause_duration(void);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change, or be removed, without prior notice.
+ *
+ * Runs the power management algorithm for the specified ring.
+ *
+ * @param nb_rx
+ *   The number of packets read from the ring.
+ * @param ring_name
+ *   The name of the ring.
+ */
+__rte_experimental
+void
+rte_ring_pmgmt_impl(uint32_t nb_rx, char *ring_name);
+
+#ifdef RTE_RING_PMGMT
+#define RTE_RING_PMGMT_IMPL(nb_rx, r) rte_ring_pmgmt_impl(nb_rx, r)
+#else
+#define RTE_RING_PMGMT_IMPL(nb_rx, r)
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/ring/version.map b/lib/ring/version.map
index 4d7c27a6d9..471ba9514e 100644
--- a/lib/ring/version.map
+++ b/lib/ring/version.map
@@ -14,3 +14,15 @@ DPDK_23 {
 
 	local: *;
 };
+
+EXPERIMENTAL {
+	global:
+
+	rte_ring_pmgmt_ring_enable;
+	rte_ring_pmgmt_ring_disable;
+	rte_ring_pmgmt_set_emptypoll_max;
+	rte_ring_pmgmt_get_emptypoll_max;
+	rte_ring_pmgmt_set_pause_duration;
+	rte_ring_pmgmt_get_pause_duration;
+	rte_ring_pmgmt_impl;
+};
-- 
2.25.1


^ permalink raw reply	[flat|nested] 8+ messages in thread

end of thread, other threads:[~2023-05-04 16:58 UTC | newest]

Thread overview: 8+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-05-03 11:38 [RFC PATCH] ring: adding TPAUSE instruction to ring dequeue David Coyle
2023-05-03 13:32 ` Morten Brørup
2023-05-03 14:51   ` Stephen Hemminger
2023-05-03 15:31   ` Coyle, David
2023-05-03 21:32     ` Morten Brørup
2023-05-04 16:11       ` Coyle, David
2023-05-04 16:23         ` Stephen Hemminger
2023-05-04 16:58           ` Morten Brørup

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).