DPDK patches and discussions
 help / color / Atom feed
From: Dharmik Thakkar <dharmik.thakkar@arm.com>
To: Yipeng Wang <yipeng1.wang@intel.com>,
	Sameh Gobriel <sameh.gobriel@intel.com>,
	Bruce Richardson <bruce.richardson@intel.com>,
	Ray Kinsella <mdr@ashroe.eu>, Neil Horman <nhorman@tuxdriver.com>
Cc: dev@dpdk.org, nd@arm.com, Dharmik Thakkar <dharmik.thakkar@arm.com>
Subject: [dpdk-dev] [RFC v2] lib/hash: integrate RCU QSBR
Date: Tue, 18 Aug 2020 23:05:37 -0500
Message-ID: <20200819040537.1792-1-dharmik.thakkar@arm.com> (raw)
In-Reply-To: <20190901065810.15137-1-dharmik.thakkar@arm.com>

Integrate RCU QSBR to make it easier for the applications to use lock
free algorithm.

Resource reclamation implementation was split from the original
series, and has already been part of RCU library. Rework the series
to base hash integration on RCU reclamation APIs.

Refer 'Resource reclamation framework for DPDK' available at [1]
to understand various aspects of integrating RCU library
into other libraries.

[1] https://doc.dpdk.org/guides/prog_guide/rcu_lib.html

Introduce a new API rte_hash_rcu_qsbr_add for application to
register a RCU variable that hash library will use.

Suggested-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Signed-off-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
---
v2:
 - Remove defer queue related functions and use resource reclamation
   APIs from the RCU QSBR library instead

 - Remove patch (net/ixgbe: avoid multpile definitions of 'bool')
   from the series as it is already accepted

---
 lib/Makefile                         |   2 +-
 lib/librte_hash/Makefile             |   2 +-
 lib/librte_hash/meson.build          |   1 +
 lib/librte_hash/rte_cuckoo_hash.c    | 291 +++++++++++++++++++++------
 lib/librte_hash/rte_cuckoo_hash.h    |   8 +
 lib/librte_hash/rte_hash.h           |  75 ++++++-
 lib/librte_hash/rte_hash_version.map |   2 +-
 7 files changed, 308 insertions(+), 73 deletions(-)

diff --git a/lib/Makefile b/lib/Makefile
index 8f5b68a2d469..dffe31c829f0 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -51,7 +51,7 @@ DIRS-$(CONFIG_RTE_LIBRTE_VHOST) += librte_vhost
 DEPDIRS-librte_vhost := librte_eal librte_mempool librte_mbuf librte_ethdev \
 			librte_net librte_hash librte_cryptodev
 DIRS-$(CONFIG_RTE_LIBRTE_HASH) += librte_hash
-DEPDIRS-librte_hash := librte_eal librte_ring
+DEPDIRS-librte_hash := librte_eal librte_ring librte_rcu
 DIRS-$(CONFIG_RTE_LIBRTE_EFD) += librte_efd
 DEPDIRS-librte_efd := librte_eal librte_ring librte_hash
 DIRS-$(CONFIG_RTE_LIBRTE_RIB) += librte_rib
diff --git a/lib/librte_hash/Makefile b/lib/librte_hash/Makefile
index ec9f86499262..10e697f48652 100644
--- a/lib/librte_hash/Makefile
+++ b/lib/librte_hash/Makefile
@@ -8,7 +8,7 @@ LIB = librte_hash.a
 
 CFLAGS += -O3
 CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
-LDLIBS += -lrte_eal -lrte_ring
+LDLIBS += -lrte_eal -lrte_ring -lrte_rcu
 
 EXPORT_MAP := rte_hash_version.map
 
diff --git a/lib/librte_hash/meson.build b/lib/librte_hash/meson.build
index 6ab46ae9d768..0977a63fd279 100644
--- a/lib/librte_hash/meson.build
+++ b/lib/librte_hash/meson.build
@@ -10,3 +10,4 @@ headers = files('rte_crc_arm64.h',
 
 sources = files('rte_cuckoo_hash.c', 'rte_fbk_hash.c')
 deps += ['ring']
+deps += ['rcu']
diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c
index 0a6d474713a2..01c2cbe0e38b 100644
--- a/lib/librte_hash/rte_cuckoo_hash.c
+++ b/lib/librte_hash/rte_cuckoo_hash.c
@@ -52,6 +52,11 @@ static struct rte_tailq_elem rte_hash_tailq = {
 };
 EAL_REGISTER_TAILQ(rte_hash_tailq)
 
+struct __rte_hash_rcu_dq_entry {
+	uint32_t key_idx;
+	uint32_t ext_bkt_idx; /**< Extended bkt index */
+};
+
 struct rte_hash *
 rte_hash_find_existing(const char *name)
 {
@@ -210,7 +215,10 @@ rte_hash_create(const struct rte_hash_parameters *params)
 
 	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF) {
 		readwrite_concur_lf_support = 1;
-		/* Enable not freeing internal memory/index on delete */
+		/* Enable not freeing internal memory/index on delete.
+		 * If internal RCU is enabled, freeing of internal memory/index
+		 * is done on delete
+		 */
 		no_free_on_del = 1;
 	}
 
@@ -505,6 +513,10 @@ rte_hash_free(struct rte_hash *h)
 
 	rte_mcfg_tailq_write_unlock();
 
+	/* RCU clean up. */
+	if (h->dq)
+		rte_rcu_qsbr_dq_delete(h->dq);
+
 	if (h->use_local_cache)
 		rte_free(h->local_free_slots);
 	if (h->writer_takes_lock)
@@ -612,6 +624,16 @@ rte_hash_reset(struct rte_hash *h)
 		return;
 
 	__hash_rw_writer_lock(h);
+
+	/* RCU clean up. */
+	if (h->hash_rcu_cfg->v) {
+		rte_rcu_qsbr_dq_delete(h->dq);
+		h->dq = NULL;
+		if (rte_hash_rcu_qsbr_add(h, h->hash_rcu_cfg) < 0) {
+			RTE_LOG(ERR, HASH, "RCU init failed\n");
+			return;
+		}
+	}
 	memset(h->buckets, 0, h->num_buckets * sizeof(struct rte_hash_bucket));
 	memset(h->key_store, 0, h->key_entry_size * (h->entries + 1));
 	*h->tbl_chng_cnt = 0;
@@ -952,6 +974,37 @@ rte_hash_cuckoo_make_space_mw(const struct rte_hash *h,
 	return -ENOSPC;
 }
 
+static inline uint32_t
+alloc_slot(const struct rte_hash *h, struct lcore_cache *cached_free_slots)
+{
+	unsigned int  n_slots;
+	uint32_t slot_id;
+	if (h->use_local_cache) {
+		/* Try to get a free slot from the local cache */
+		if (cached_free_slots->len == 0) {
+			/* Need to get another burst of free slots from global ring */
+			n_slots = rte_ring_mc_dequeue_burst_elem(h->free_slots,
+					cached_free_slots->objs,
+					sizeof(uint32_t),
+					LCORE_CACHE_SIZE, NULL);
+			if (n_slots == 0)
+				return EMPTY_SLOT;
+
+			cached_free_slots->len += n_slots;
+		}
+
+		/* Get a free slot from the local cache */
+		cached_free_slots->len--;
+		slot_id = cached_free_slots->objs[cached_free_slots->len];
+	} else {
+		if (rte_ring_sc_dequeue_elem(h->free_slots, &slot_id,
+						sizeof(uint32_t)) != 0)
+			return EMPTY_SLOT;
+	}
+
+	return slot_id;
+}
+
 static inline int32_t
 __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 						hash_sig_t sig, void *data)
@@ -963,7 +1016,6 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 	uint32_t ext_bkt_id = 0;
 	uint32_t slot_id;
 	int ret;
-	unsigned n_slots;
 	unsigned lcore_id;
 	unsigned int i;
 	struct lcore_cache *cached_free_slots = NULL;
@@ -1001,28 +1053,19 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 	if (h->use_local_cache) {
 		lcore_id = rte_lcore_id();
 		cached_free_slots = &h->local_free_slots[lcore_id];
-		/* Try to get a free slot from the local cache */
-		if (cached_free_slots->len == 0) {
-			/* Need to get another burst of free slots from global ring */
-			n_slots = rte_ring_mc_dequeue_burst_elem(h->free_slots,
-					cached_free_slots->objs,
-					sizeof(uint32_t),
-					LCORE_CACHE_SIZE, NULL);
-			if (n_slots == 0) {
-				return -ENOSPC;
+	}
+	slot_id = alloc_slot(h, cached_free_slots);
+	if (slot_id == EMPTY_SLOT) {
+		if (h->hash_rcu_cfg->v) {
+			if (rte_rcu_qsbr_dq_reclaim(h->dq,
+						    h->hash_rcu_cfg->reclaim_max,
+						    NULL, NULL, NULL)
+					== 0) {
+				slot_id = alloc_slot(h, cached_free_slots);
 			}
-
-			cached_free_slots->len += n_slots;
 		}
-
-		/* Get a free slot from the local cache */
-		cached_free_slots->len--;
-		slot_id = cached_free_slots->objs[cached_free_slots->len];
-	} else {
-		if (rte_ring_sc_dequeue_elem(h->free_slots, &slot_id,
-						sizeof(uint32_t)) != 0) {
+		if (slot_id == EMPTY_SLOT)
 			return -ENOSPC;
-		}
 	}
 
 	new_k = RTE_PTR_ADD(keys, slot_id * h->key_entry_size);
@@ -1118,8 +1161,20 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 	if (rte_ring_sc_dequeue_elem(h->free_ext_bkts, &ext_bkt_id,
 						sizeof(uint32_t)) != 0 ||
 					ext_bkt_id == 0) {
-		ret = -ENOSPC;
-		goto failure;
+		if (h->hash_rcu_cfg->v) {
+			if (rte_rcu_qsbr_dq_reclaim(h->dq,
+						    h->hash_rcu_cfg->reclaim_max,
+						    NULL, NULL, NULL)
+					== 0) {
+				rte_ring_sc_dequeue_elem(h->free_ext_bkts,
+							 &ext_bkt_id,
+							 sizeof(uint32_t));
+			}
+		}
+		if (ext_bkt_id == 0) {
+			ret = -ENOSPC;
+			goto failure;
+		}
 	}
 
 	/* Use the first location of the new bucket */
@@ -1395,12 +1450,12 @@ rte_hash_lookup_data(const struct rte_hash *h, const void *key, void **data)
 	return __rte_hash_lookup_with_hash(h, key, rte_hash_hash(h, key), data);
 }
 
-static inline void
-remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
+static int
+free_slot(const struct rte_hash *h, uint32_t slot_id)
 {
 	unsigned lcore_id, n_slots;
 	struct lcore_cache *cached_free_slots;
-
+	/* Return key indexes to free slot ring */
 	if (h->use_local_cache) {
 		lcore_id = rte_lcore_id();
 		cached_free_slots = &h->local_free_slots[lcore_id];
@@ -1411,18 +1466,122 @@ remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
 						cached_free_slots->objs,
 						sizeof(uint32_t),
 						LCORE_CACHE_SIZE, NULL);
-			ERR_IF_TRUE((n_slots == 0),
-				"%s: could not enqueue free slots in global ring\n",
-				__func__);
+			RETURN_IF_TRUE((n_slots == 0), -EFAULT);
 			cached_free_slots->len -= n_slots;
 		}
-		/* Put index of new free slot in cache. */
-		cached_free_slots->objs[cached_free_slots->len] =
-							bkt->key_idx[i];
-		cached_free_slots->len++;
+	}
+
+	enqueue_slot_back(h, cached_free_slots, slot_id);
+	return 0;
+}
+
+static void
+__hash_rcu_qsbr_free_resource(void *p, void *e, unsigned int n)
+{
+	void *key_data = NULL;
+	int ret;
+	struct rte_hash_key *keys, *k;
+	struct rte_hash *h = (struct rte_hash *)p;
+	struct __rte_hash_rcu_dq_entry rcu_dq_entry =
+			*((struct __rte_hash_rcu_dq_entry *)e);
+
+	RTE_SET_USED(n);
+	keys = h->key_store;
+
+	k = (struct rte_hash_key *) ((char *)keys +
+				rcu_dq_entry.key_idx * h->key_entry_size);
+	key_data = k->pdata;
+	if (h->hash_rcu_cfg->free_key_data_func)
+		h->hash_rcu_cfg->free_key_data_func(h->hash_rcu_cfg->key_data_ptr,
+						    key_data);
+
+	if (h->ext_table_support && rcu_dq_entry.ext_bkt_idx != EMPTY_SLOT)
+		/* Recycle empty ext bkt to free list. */
+		rte_ring_sp_enqueue_elem(h->free_ext_bkts,
+			&rcu_dq_entry.ext_bkt_idx, sizeof(uint32_t));
+
+	/* Return key indexes to free slot ring */
+	ret = free_slot(h, rcu_dq_entry.key_idx);
+	if (ret < 0) {
+		RTE_LOG(ERR, HASH,
+			"%s: could not enqueue free slots in global ring\n",
+				__func__);
+	}
+}
+
+int
+rte_hash_rcu_qsbr_add(struct rte_hash *h,
+				struct rte_hash_rcu_config *cfg)
+{
+	struct rte_rcu_qsbr_dq_parameters params = {0};
+	char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
+	struct rte_hash_rcu_config *hash_rcu_cfg = NULL;
+
+	const uint32_t total_entries = h->use_local_cache ?
+		h->entries + (RTE_MAX_LCORE - 1) * (LCORE_CACHE_SIZE - 1) + 1
+							: h->entries + 1;
+
+	if ((h == NULL) || cfg == NULL) {
+		rte_errno = EINVAL;
+		return 1;
+	}
+
+	hash_rcu_cfg = rte_zmalloc(NULL, sizeof(struct rte_hash_rcu_config), 0);
+	if (hash_rcu_cfg == NULL) {
+		RTE_LOG(ERR, HASH, "memory allocation failed\n");
+		return 1;
+	}
+
+	if (cfg->mode == RTE_HASH_QSBR_MODE_SYNC) {
+		/* No other things to do. */
+	} else if (cfg->mode == RTE_HASH_QSBR_MODE_DQ) {
+		/* Init QSBR defer queue. */
+		snprintf(rcu_dq_name, sizeof(rcu_dq_name),
+					"HASH_RCU_%s", h->name);
+		params.name = rcu_dq_name;
+		params.size = cfg->dq_size;
+		if (params.size == 0)
+			params.size = total_entries;
+		params.trigger_reclaim_limit = cfg->reclaim_thd;
+		if (params.max_reclaim_size == 0)
+			params.max_reclaim_size = RTE_HASH_RCU_DQ_RECLAIM_MAX;
+		params.esize = sizeof(struct __rte_hash_rcu_dq_entry);
+		params.free_fn = __hash_rcu_qsbr_free_resource;
+		params.p = h;
+		params.v = cfg->v;
+		h->dq = rte_rcu_qsbr_dq_create(&params);
+		if (h->dq == NULL) {
+			rte_free(hash_rcu_cfg);
+			RTE_LOG(ERR, HASH, "HASH defer queue creation failed\n");
+			return 1;
+		}
 	} else {
-		rte_ring_sp_enqueue_elem(h->free_slots,
-				&bkt->key_idx[i], sizeof(uint32_t));
+		rte_free(hash_rcu_cfg);
+		rte_errno = EINVAL;
+		return 1;
+	}
+
+	hash_rcu_cfg->v = cfg->v;
+	hash_rcu_cfg->mode = cfg->mode;
+	hash_rcu_cfg->dq_size = cfg->dq_size;
+	hash_rcu_cfg->reclaim_thd = cfg->reclaim_thd;
+	hash_rcu_cfg->reclaim_max = cfg->reclaim_max;
+	hash_rcu_cfg->free_key_data_func = cfg->free_key_data_func;
+	hash_rcu_cfg->key_data_ptr = cfg->key_data_ptr;
+
+	h->hash_rcu_cfg = hash_rcu_cfg;
+
+	return 0;
+}
+
+static inline void
+remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
+{
+	int ret = free_slot(h, bkt->key_idx[i]);
+	if (ret < 0) {
+		RTE_LOG(ERR, HASH,
+			"%s: could not enqueue free slots in global ring\n",
+				__func__);
 	}
 }
 
@@ -1521,6 +1680,8 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
 	int pos;
 	int32_t ret, i;
 	uint16_t short_sig;
+	uint32_t index = EMPTY_SLOT;
+	struct __rte_hash_rcu_dq_entry rcu_dq_entry;
 
 	short_sig = get_short_sig(sig);
 	prim_bucket_idx = get_prim_bucket_index(h, sig);
@@ -1555,10 +1716,9 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
 
 /* Search last bucket to see if empty to be recycled */
 return_bkt:
-	if (!last_bkt) {
-		__hash_rw_writer_unlock(h);
-		return ret;
-	}
+	if (!last_bkt)
+		goto return_key;
+
 	while (last_bkt->next) {
 		prev_bkt = last_bkt;
 		last_bkt = last_bkt->next;
@@ -1571,11 +1731,11 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
 	/* found empty bucket and recycle */
 	if (i == RTE_HASH_BUCKET_ENTRIES) {
 		prev_bkt->next = NULL;
-		uint32_t index = last_bkt - h->buckets_ext + 1;
+		index = last_bkt - h->buckets_ext + 1;
 		/* Recycle the empty bkt if
 		 * no_free_on_del is disabled.
 		 */
-		if (h->no_free_on_del)
+		if (h->no_free_on_del) {
 			/* Store index of an empty ext bkt to be recycled
 			 * on calling rte_hash_del_xxx APIs.
 			 * When lock free read-write concurrency is enabled,
@@ -1583,12 +1743,32 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
 			 * immediately (as readers might be using it still).
 			 * Hence freeing of the ext bkt is piggy-backed to
 			 * freeing of the key index.
+			 * If using external RCU, store this index in an array.
 			 */
-			h->ext_bkt_to_free[ret] = index;
-		else
+			if (h->hash_rcu_cfg->v == NULL)
+				h->ext_bkt_to_free[ret] = index;
+		} else
 			rte_ring_sp_enqueue_elem(h->free_ext_bkts, &index,
 							sizeof(uint32_t));
 	}
+
+return_key:
+	/* Using internal RCU QSBR */
+	if (h->hash_rcu_cfg->v) {
+		/* Key index where key is stored, adding the first dummy index */
+		rcu_dq_entry.key_idx = ret + 1;
+		rcu_dq_entry.ext_bkt_idx = index;
+		if (h->hash_rcu_cfg->mode == RTE_HASH_QSBR_MODE_SYNC) {
+			/* Wait for quiescent state change. */
+			rte_rcu_qsbr_synchronize(h->hash_rcu_cfg->v,
+						 RTE_QSBR_THRID_INVALID);
+			__hash_rcu_qsbr_free_resource((void *)((uintptr_t)h),
+						      &rcu_dq_entry, 1);
+		} else if (h->hash_rcu_cfg->mode == RTE_HASH_QSBR_MODE_DQ)
+			/* Push into QSBR FIFO. */
+			if (rte_rcu_qsbr_dq_enqueue(h->dq, &rcu_dq_entry) != 0)
+				RTE_LOG(ERR, HASH, "Failed to push QSBR FIFO\n");
+	}
 	__hash_rw_writer_unlock(h);
 	return ret;
 }
@@ -1637,8 +1817,6 @@ rte_hash_free_key_with_position(const struct rte_hash *h,
 
 	RETURN_IF_TRUE(((h == NULL) || (key_idx == EMPTY_SLOT)), -EINVAL);
 
-	unsigned int lcore_id, n_slots;
-	struct lcore_cache *cached_free_slots;
 	const uint32_t total_entries = h->use_local_cache ?
 		h->entries + (RTE_MAX_LCORE - 1) * (LCORE_CACHE_SIZE - 1) + 1
 							: h->entries + 1;
@@ -1656,28 +1834,9 @@ rte_hash_free_key_with_position(const struct rte_hash *h,
 		}
 	}
 
-	if (h->use_local_cache) {
-		lcore_id = rte_lcore_id();
-		cached_free_slots = &h->local_free_slots[lcore_id];
-		/* Cache full, need to free it. */
-		if (cached_free_slots->len == LCORE_CACHE_SIZE) {
-			/* Need to enqueue the free slots in global ring. */
-			n_slots = rte_ring_mp_enqueue_burst_elem(h->free_slots,
-						cached_free_slots->objs,
-						sizeof(uint32_t),
-						LCORE_CACHE_SIZE, NULL);
-			RETURN_IF_TRUE((n_slots == 0), -EFAULT);
-			cached_free_slots->len -= n_slots;
-		}
-		/* Put index of new free slot in cache. */
-		cached_free_slots->objs[cached_free_slots->len] = key_idx;
-		cached_free_slots->len++;
-	} else {
-		rte_ring_sp_enqueue_elem(h->free_slots, &key_idx,
-						sizeof(uint32_t));
-	}
+	/* Enqueue slot to cache/ring of free slots. */
+	return free_slot(h, key_idx);
 
-	return 0;
 }
 
 static inline void
diff --git a/lib/librte_hash/rte_cuckoo_hash.h b/lib/librte_hash/rte_cuckoo_hash.h
index 345de6bf9cfd..85be49d3bbe7 100644
--- a/lib/librte_hash/rte_cuckoo_hash.h
+++ b/lib/librte_hash/rte_cuckoo_hash.h
@@ -168,6 +168,11 @@ struct rte_hash {
 	struct lcore_cache *local_free_slots;
 	/**< Local cache per lcore, storing some indexes of the free slots */
 
+	/* RCU config */
+	struct rte_hash_rcu_config *hash_rcu_cfg;
+	/**< HASH RCU QSBR configuration structure */
+	struct rte_rcu_qsbr_dq *dq;	/**< RCU QSBR defer queue. */
+
 	/* Fields used in lookup */
 
 	uint32_t key_len __rte_cache_aligned;
@@ -230,4 +235,7 @@ struct queue_node {
 	int prev_slot;               /* Parent(slot) in search path */
 };
 
+/** @internal Default RCU defer queue entries to reclaim in one go. */
+#define RTE_HASH_RCU_DQ_RECLAIM_MAX	16
+
 #endif
diff --git a/lib/librte_hash/rte_hash.h b/lib/librte_hash/rte_hash.h
index bff40251bc98..5431bcf4aeb1 100644
--- a/lib/librte_hash/rte_hash.h
+++ b/lib/librte_hash/rte_hash.h
@@ -15,6 +15,7 @@
 #include <stddef.h>
 
 #include <rte_compat.h>
+#include <rte_rcu_qsbr.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -45,7 +46,8 @@ extern "C" {
 /** Flag to disable freeing of key index on hash delete.
  * Refer to rte_hash_del_xxx APIs for more details.
  * This is enabled by default when RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF
- * is enabled.
+ * is enabled. However, if internal RCU is enabled, freeing of internal
+ * memory/index is done on delete
  */
 #define RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL 0x10
 
@@ -67,6 +69,13 @@ typedef uint32_t (*rte_hash_function)(const void *key, uint32_t key_len,
 /** Type of function used to compare the hash key. */
 typedef int (*rte_hash_cmp_eq_t)(const void *key1, const void *key2, size_t key_len);
 
+/**
+ * Type of function used to free data stored in the key.
+ * Required when using internal RCU to allow application to free key-data once
+ * the key is returned to the the ring of free key-slots.
+ */
+typedef void (*rte_hash_free_key_data)(void *p, void *key_data);
+
 /**
  * Parameters used when creating the hash table.
  */
@@ -81,6 +90,39 @@ struct rte_hash_parameters {
 	uint8_t extra_flag;		/**< Indicate if additional parameters are present. */
 };
 
+/** RCU reclamation modes */
+enum rte_hash_qsbr_mode {
+	/** Create defer queue for reclaim. */
+	RTE_HASH_QSBR_MODE_DQ = 0,
+	/** Use blocking mode reclaim. No defer queue created. */
+	RTE_HASH_QSBR_MODE_SYNC
+};
+
+/** HASH RCU QSBR configuration structure. */
+struct rte_hash_rcu_config {
+	struct rte_rcu_qsbr *v;		/**< RCU QSBR variable. */
+	enum rte_hash_qsbr_mode mode;
+	/**< Mode of RCU QSBR. RTE_HASH_QSBR_MODE_xxx
+	 * '0' for default: create defer queue for reclaim.
+	 */
+	uint32_t dq_size;
+	/**< RCU defer queue size.
+	 * default: total hash table entries.
+	 */
+	uint32_t reclaim_thd;	/**< Threshold to trigger auto reclaim. */
+	uint32_t reclaim_max;
+	/**< Max entries to reclaim in one go.
+	 * default: RTE_HASH_RCU_DQ_RECLAIM_MAX.
+	 */
+	void *key_data_ptr;
+	/**< Pointer passed to the free function. Typically, this is the
+	 * pointer to the data structure to which the resource to free
+	 * (key-data) belongs. This can be NULL.
+	 */
+	rte_hash_free_key_data free_key_data_func;
+	/**< Function to call to free the resource (key-data). */
+};
+
 /** @internal A hash table structure. */
 struct rte_hash;
 
@@ -287,7 +329,8 @@ rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, hash_sig_t
  * Thread safety can be enabled by setting flag during
  * table creation.
  * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
- * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
+ * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and
+ * internal RCU is NOT enabled,
  * the key index returned by rte_hash_add_key_xxx APIs will not be
  * freed by this API. rte_hash_free_key_with_position API must be called
  * additionally to free the index associated with the key.
@@ -316,7 +359,8 @@ rte_hash_del_key(const struct rte_hash *h, const void *key);
  * Thread safety can be enabled by setting flag during
  * table creation.
  * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
- * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
+ * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and
+ * internal RCU is NOT enabled,
  * the key index returned by rte_hash_add_key_xxx APIs will not be
  * freed by this API. rte_hash_free_key_with_position API must be called
  * additionally to free the index associated with the key.
@@ -370,7 +414,8 @@ rte_hash_get_key_with_position(const struct rte_hash *h, const int32_t position,
  * only be called from one thread by default. Thread safety
  * can be enabled by setting flag during table creation.
  * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
- * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
+ * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and
+ * internal RCU is NOT enabled,
  * the key index returned by rte_hash_del_key_xxx APIs must be freed
  * using this API. This API should be called after all the readers
  * have stopped referencing the entry corresponding to this key.
@@ -625,6 +670,28 @@ rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
  */
 int32_t
 rte_hash_iterate(const struct rte_hash *h, const void **key, void **data, uint32_t *next);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Associate RCU QSBR variable with an Hash object.
+ *
+ * @param h
+ *   the hash object to add RCU QSBR
+ * @param cfg
+ *   RCU QSBR configuration
+ * @return
+ *   On success - 0
+ *   On error - 1 with error code set in rte_errno.
+ *   Possible rte_errno codes are:
+ *   - EINVAL - invalid pointer
+ *   - EEXIST - already added QSBR
+ *   - ENOMEM - memory allocation failure
+ */
+__rte_experimental
+int rte_hash_rcu_qsbr_add(struct rte_hash *h,
+				struct rte_hash_rcu_config *cfg);
 #ifdef __cplusplus
 }
 #endif
diff --git a/lib/librte_hash/rte_hash_version.map b/lib/librte_hash/rte_hash_version.map
index c0db81014ff9..c6d73080f478 100644
--- a/lib/librte_hash/rte_hash_version.map
+++ b/lib/librte_hash/rte_hash_version.map
@@ -36,5 +36,5 @@ EXPERIMENTAL {
 	rte_hash_lookup_with_hash_bulk;
 	rte_hash_lookup_with_hash_bulk_data;
 	rte_hash_max_key_id;
-
+	rte_hash_rcu_qsbr_add;
 };
-- 
2.17.1


  parent reply index

Thread overview: 49+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-09-01  6:58 [dpdk-dev] [RFC 0/3] RCU integration with hash library Dharmik Thakkar
2019-09-01  6:58 ` [dpdk-dev] [RFC 1/3] net/ixgbe: avoid multpile definitions of 'bool' Dharmik Thakkar
2019-09-05 15:26   ` Stephen Hemminger
2019-09-05 20:10     ` Dharmik Thakkar
2019-09-05 20:23       ` Stephen Hemminger
2019-09-01  6:58 ` [dpdk-dev] [RFC 2/3] lib/hash: integrate RCU QSBR Dharmik Thakkar
2019-09-01  6:58 ` [dpdk-dev] [RFC 3/3] test/hash: add tests for integrated " Dharmik Thakkar
2019-09-05 21:31 ` [dpdk-dev] [RFC 0/3] RCU integration with hash library Wang, Yipeng1
2020-08-19  4:05 ` Dharmik Thakkar [this message]
2020-08-25 19:59   ` [dpdk-dev] [RFC v2] lib/hash: integrate RCU QSBR Dharmik Thakkar
2020-08-31 20:47   ` Wang, Yipeng1
2020-09-01 22:01     ` Dharmik Thakkar
2020-10-16 13:53       ` David Marchand
2020-10-16 15:04         ` Dharmik Thakkar
2020-10-16 17:38   ` [dpdk-dev] [PATCH 0/3] hash: " Dharmik Thakkar
2020-10-16 17:38     ` [dpdk-dev] [PATCH v3 1/3] lib/hash: " Dharmik Thakkar
2020-10-19  9:43       ` Kinsella, Ray
2020-10-16 17:38     ` [dpdk-dev] [PATCH v3 2/3] test/hash: replace rte atomic with C11 atomic APIs Dharmik Thakkar
2020-10-16 17:38     ` [dpdk-dev] [PATCH v3 3/3] test/hash: add tests for integrated RCU QSBR Dharmik Thakkar
2020-10-19 14:48     ` [dpdk-dev] [PATCH 0/3] hash: integrate " David Marchand
2020-10-19 16:35     ` [dpdk-dev] [PATCH v4 " Dharmik Thakkar
2020-10-19 16:35       ` [dpdk-dev] [PATCH v4 1/3] lib/hash: " Dharmik Thakkar
2020-10-19 16:35       ` [dpdk-dev] [PATCH v4 2/3] test/hash: replace rte atomic with C11 atomic APIs Dharmik Thakkar
2020-10-19 16:35       ` [dpdk-dev] [PATCH v4 3/3] test/hash: add tests for integrated RCU QSBR Dharmik Thakkar
2020-10-19 21:05       ` [dpdk-dev] [PATCH v4 0/3] hash: integrate " David Marchand
2020-10-20  4:08         ` Dharmik Thakkar
2020-10-20 16:12       ` [dpdk-dev] [PATCH v5 0/4] " Dharmik Thakkar
2020-10-20 16:12         ` [dpdk-dev] [PATCH v5 1/4] rcu: build on Windows Dharmik Thakkar
2020-10-20 16:12         ` [dpdk-dev] [PATCH v5 2/4] lib/hash: integrate RCU QSBR Dharmik Thakkar
2020-10-21  2:42           ` Wang, Yipeng1
2020-10-21  4:52             ` Dharmik Thakkar
2020-10-20 16:13         ` [dpdk-dev] [PATCH v5 3/4] test/hash: replace rte atomic with C11 atomic APIs Dharmik Thakkar
2020-10-21  2:52           ` Wang, Yipeng1
2020-10-20 16:13         ` [dpdk-dev] [PATCH v5 4/4] test/hash: add tests for integrated RCU QSBR Dharmik Thakkar
2020-10-21  3:54           ` Wang, Yipeng1
2020-10-21  4:55             ` Dharmik Thakkar
2020-10-21 22:34             ` Honnappa Nagarahalli
2020-10-21 22:50         ` [dpdk-dev] [PATCH v6 0/4] hash: integrate " Dharmik Thakkar
2020-10-21 22:50           ` [dpdk-dev] [PATCH v6 1/4] rcu: build on Windows Dharmik Thakkar
2020-10-22  9:05             ` David Marchand
2020-10-22 18:35             ` Kadam, Pallavi
2020-10-21 22:50           ` [dpdk-dev] [PATCH v6 2/4] lib/hash: integrate RCU QSBR Dharmik Thakkar
2020-10-23 21:46             ` Dharmik Thakkar
2020-10-23 22:08             ` Wang, Yipeng1
2020-10-21 22:50           ` [dpdk-dev] [PATCH v6 3/4] test/hash: replace rte atomic with C11 atomic APIs Dharmik Thakkar
2020-10-21 22:50           ` [dpdk-dev] [PATCH v6 4/4] test/hash: add tests for integrated RCU QSBR Dharmik Thakkar
2020-10-23 22:11             ` Wang, Yipeng1
2020-10-24  9:09           ` [dpdk-dev] [PATCH v6 0/4] hash: integrate " David Marchand
2020-10-26 13:56             ` Dharmik Thakkar

Reply instructions:

You may reply publically to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20200819040537.1792-1-dharmik.thakkar@arm.com \
    --to=dharmik.thakkar@arm.com \
    --cc=bruce.richardson@intel.com \
    --cc=dev@dpdk.org \
    --cc=mdr@ashroe.eu \
    --cc=nd@arm.com \
    --cc=nhorman@tuxdriver.com \
    --cc=sameh.gobriel@intel.com \
    --cc=yipeng1.wang@intel.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

DPDK patches and discussions

Archives are clonable:
	git clone --mirror http://inbox.dpdk.org/dev/0 dev/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 dev dev/ http://inbox.dpdk.org/dev \
		dev@dpdk.org
	public-inbox-index dev


Newsgroup available over NNTP:
	nntp://inbox.dpdk.org/inbox.dpdk.dev


AGPL code for this site: git clone https://public-inbox.org/ public-inbox