DPDK patches and discussions
 help / color / mirror / Atom feed
From: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
To: bruce.richardson@intel.com, pablo.de.lara.guarch@intel.com
Cc: dev@dpdk.org, yipeng1.wang@intel.com,
	honnappa.nagarahalli@arm.com, dharmik.thakkar@arm.com,
	gavin.hu@arm.com, nd@arm.com
Subject: [dpdk-dev] [PATCH v3 4/7] hash: add memory ordering to avoid race conditions
Date: Fri, 12 Oct 2018 01:31:55 -0500	[thread overview]
Message-ID: <1539325918-125438-5-git-send-email-honnappa.nagarahalli@arm.com> (raw)
In-Reply-To: <1539325918-125438-1-git-send-email-honnappa.nagarahalli@arm.com>

Only race condition that can occur is -  using the key store element
before the key write is completed. Hence, while inserting the element
the release memory order is used. Any other race condition is caught
by the key comparison. Memory orderings are added only where needed.
For ex: reads in the writer's context do not need memory ordering
as there is a single writer.

key_idx in the bucket entry and pdata in the key store element are
used for synchronisation. key_idx is used to release an inserted
entry in the bucket to the reader. Use of pdata for synchronisation
is required due to updation of an existing entry where-in only
the pdata is updated without updating key_idx.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Reviewed-by: Gavin Hu <gavin.hu@arm.com>
Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com>
Reviewed-by: Steve Capper <steve.capper@arm.com>
Reviewed-by: Yipeng Wang <yipeng1.wang@intel.com>
---
 lib/librte_hash/rte_cuckoo_hash.c | 116 ++++++++++++++++++++++++++++----------
 1 file changed, 87 insertions(+), 29 deletions(-)

diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c
index 13646d0..e55725a 100644
--- a/lib/librte_hash/rte_cuckoo_hash.c
+++ b/lib/librte_hash/rte_cuckoo_hash.c
@@ -1,5 +1,6 @@
 /* SPDX-License-Identifier: BSD-3-Clause
  * Copyright(c) 2010-2016 Intel Corporation
+ * Copyright(c) 2018 Arm Limited
  */
 
 #include <string.h>
@@ -585,7 +586,9 @@ enqueue_slot_back(const struct rte_hash *h,
 		rte_ring_sp_enqueue(h->free_slots, slot_id);
 }
 
-/* Search a key from bucket and update its data */
+/* Search a key from bucket and update its data.
+ * Writer holds the lock before calling this.
+ */
 static inline int32_t
 search_and_update(const struct rte_hash *h, void *data, const void *key,
 	struct rte_hash_bucket *bkt, uint16_t sig)
@@ -598,8 +601,13 @@ search_and_update(const struct rte_hash *h, void *data, const void *key,
 			k = (struct rte_hash_key *) ((char *)keys +
 					bkt->key_idx[i] * h->key_entry_size);
 			if (rte_hash_cmp_eq(key, k->key, h) == 0) {
-				/* Update data */
-				k->pdata = data;
+				/* 'pdata' acts as the synchronization point
+				 * when an existing hash entry is updated.
+				 * Key is not updated in this case.
+				 */
+				__atomic_store_n(&k->pdata,
+					data,
+					__ATOMIC_RELEASE);
 				/*
 				 * Return index where key is stored,
 				 * subtracting the first dummy index
@@ -655,7 +663,15 @@ rte_hash_cuckoo_insert_mw(const struct rte_hash *h,
 		/* Check if slot is available */
 		if (likely(prim_bkt->key_idx[i] == EMPTY_SLOT)) {
 			prim_bkt->sig_current[i] = sig;
-			prim_bkt->key_idx[i] = new_idx;
+			/* Key can be of arbitrary length, so it is
+			 * not possible to store it atomically.
+			 * Hence the new key element's memory stores
+			 * (key as well as data) should be complete
+			 * before it is referenced.
+			 */
+			__atomic_store_n(&prim_bkt->key_idx[i],
+					 new_idx,
+					 __ATOMIC_RELEASE);
 			break;
 		}
 	}
@@ -739,8 +755,10 @@ rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
 		 */
 		curr_bkt->sig_current[curr_slot] =
 			prev_bkt->sig_current[prev_slot];
-		curr_bkt->key_idx[curr_slot] =
-			prev_bkt->key_idx[prev_slot];
+		/* Release the updated bucket entry */
+		__atomic_store_n(&curr_bkt->key_idx[curr_slot],
+			prev_bkt->key_idx[prev_slot],
+			__ATOMIC_RELEASE);
 
 		curr_slot = prev_slot;
 		curr_node = prev_node;
@@ -748,7 +766,10 @@ rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
 	}
 
 	curr_bkt->sig_current[curr_slot] = sig;
-	curr_bkt->key_idx[curr_slot] = new_idx;
+	/* Release the new bucket entry */
+	__atomic_store_n(&curr_bkt->key_idx[curr_slot],
+			 new_idx,
+			 __ATOMIC_RELEASE);
 
 	__hash_rw_writer_unlock(h);
 
@@ -889,8 +910,15 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 	new_idx = (uint32_t)((uintptr_t) slot_id);
 	/* Copy key */
 	rte_memcpy(new_k->key, key, h->key_len);
-	new_k->pdata = data;
-
+	/* Key can be of arbitrary length, so it is not possible to store
+	 * it atomically. Hence the new key element's memory stores
+	 * (key as well as data) should be complete before it is referenced.
+	 * 'pdata' acts as the synchronization point when an existing hash
+	 * entry is updated.
+	 */
+	__atomic_store_n(&new_k->pdata,
+		data,
+		__ATOMIC_RELEASE);
 
 	/* Find an empty slot and insert */
 	ret = rte_hash_cuckoo_insert_mw(h, prim_bkt, sec_bkt, key, data,
@@ -1034,21 +1062,27 @@ search_one_bucket(const struct rte_hash *h, const void *key, uint16_t sig,
 			void **data, const struct rte_hash_bucket *bkt)
 {
 	int i;
+	uint32_t key_idx;
+	void *pdata;
 	struct rte_hash_key *k, *keys = h->key_store;
 
 	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
-		if (bkt->sig_current[i] == sig &&
-				bkt->key_idx[i] != EMPTY_SLOT) {
+		key_idx = __atomic_load_n(&bkt->key_idx[i],
+					  __ATOMIC_ACQUIRE);
+		if (bkt->sig_current[i] == sig && key_idx != EMPTY_SLOT) {
 			k = (struct rte_hash_key *) ((char *)keys +
-					bkt->key_idx[i] * h->key_entry_size);
+					key_idx * h->key_entry_size);
+			pdata = __atomic_load_n(&k->pdata,
+					__ATOMIC_ACQUIRE);
+
 			if (rte_hash_cmp_eq(key, k->key, h) == 0) {
 				if (data != NULL)
-					*data = k->pdata;
+					*data = pdata;
 				/*
 				 * Return index where key is stored,
 				 * subtracting the first dummy index
 				 */
-				return bkt->key_idx[i] - 1;
+				return key_idx - 1;
 			}
 		}
 	}
@@ -1173,21 +1207,25 @@ __rte_hash_compact_ll(struct rte_hash_bucket *cur_bkt, int pos) {
 	}
 }
 
-/* Search one bucket and remove the matched key */
+/* Search one bucket and remove the matched key.
+ * Writer is expected to hold the lock while calling this
+ * function.
+ */
 static inline int32_t
 search_and_remove(const struct rte_hash *h, const void *key,
 			struct rte_hash_bucket *bkt, uint16_t sig, int *pos)
 {
 	struct rte_hash_key *k, *keys = h->key_store;
 	unsigned int i;
-	int32_t ret;
+	uint32_t key_idx;
 
 	/* Check if key is in bucket */
 	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
-		if (bkt->sig_current[i] == sig &&
-				bkt->key_idx[i] != EMPTY_SLOT) {
+		key_idx = __atomic_load_n(&bkt->key_idx[i],
+					  __ATOMIC_ACQUIRE);
+		if (bkt->sig_current[i] == sig && key_idx != EMPTY_SLOT) {
 			k = (struct rte_hash_key *) ((char *)keys +
-					bkt->key_idx[i] * h->key_entry_size);
+					key_idx * h->key_entry_size);
 			if (rte_hash_cmp_eq(key, k->key, h) == 0) {
 				bkt->sig_current[i] = NULL_SIGNATURE;
 				/* Do not free the key store element if
@@ -1196,13 +1234,16 @@ search_and_remove(const struct rte_hash *h, const void *key,
 				if (h->recycle_on_del)
 					remove_entry(h, bkt, i);
 
-				/* Return index where key is stored,
+				__atomic_store_n(&bkt->key_idx[i],
+						 EMPTY_SLOT,
+						 __ATOMIC_RELEASE);
+
+				*pos = i;
+				/*
+				 * Return index where key is stored,
 				 * subtracting the first dummy index
 				 */
-				ret = bkt->key_idx[i] - 1;
-				bkt->key_idx[i] = EMPTY_SLOT;
-				*pos = i;
-				return ret;
+				return key_idx - 1;
 			}
 		}
 	}
@@ -1402,6 +1443,7 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
 	uint32_t prim_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0};
 	uint32_t sec_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0};
 	struct rte_hash_bucket *cur_bkt, *next_bkt;
+	void *pdata[RTE_HASH_LOOKUP_BULK_MAX];
 
 	/* Prefetch first keys */
 	for (i = 0; i < PREFETCH_OFFSET && i < num_keys; i++)
@@ -1480,18 +1522,25 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
 			uint32_t hit_index =
 					__builtin_ctzl(prim_hitmask[i]) >> 1;
 
-			uint32_t key_idx = primary_bkt[i]->key_idx[hit_index];
+			uint32_t key_idx =
+			__atomic_load_n(
+				&primary_bkt[i]->key_idx[hit_index],
+				__ATOMIC_ACQUIRE);
 			const struct rte_hash_key *key_slot =
 				(const struct rte_hash_key *)(
 				(const char *)h->key_store +
 				key_idx * h->key_entry_size);
+
+			if (key_idx != EMPTY_SLOT)
+				pdata[i] = __atomic_load_n(&key_slot->pdata,
+						__ATOMIC_ACQUIRE);
 			/*
 			 * If key index is 0, do not compare key,
 			 * as it is checking the dummy slot
 			 */
 			if (!!key_idx & !rte_hash_cmp_eq(key_slot->key, keys[i], h)) {
 				if (data != NULL)
-					data[i] = key_slot->pdata;
+					data[i] = pdata[i];
 
 				hits |= 1ULL << i;
 				positions[i] = key_idx - 1;
@@ -1504,11 +1553,19 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
 			uint32_t hit_index =
 					__builtin_ctzl(sec_hitmask[i]) >> 1;
 
-			uint32_t key_idx = secondary_bkt[i]->key_idx[hit_index];
+			uint32_t key_idx =
+			__atomic_load_n(
+				&secondary_bkt[i]->key_idx[hit_index],
+				__ATOMIC_ACQUIRE);
 			const struct rte_hash_key *key_slot =
 				(const struct rte_hash_key *)(
 				(const char *)h->key_store +
 				key_idx * h->key_entry_size);
+
+			if (key_idx != EMPTY_SLOT)
+				pdata[i] = __atomic_load_n(&key_slot->pdata,
+						__ATOMIC_ACQUIRE);
+
 			/*
 			 * If key index is 0, do not compare key,
 			 * as it is checking the dummy slot
@@ -1516,7 +1573,7 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
 
 			if (!!key_idx & !rte_hash_cmp_eq(key_slot->key, keys[i], h)) {
 				if (data != NULL)
-					data[i] = key_slot->pdata;
+					data[i] = pdata[i];
 
 				hits |= 1ULL << i;
 				positions[i] = key_idx - 1;
@@ -1612,7 +1669,8 @@ rte_hash_iterate(const struct rte_hash *h, const void **key, void **data, uint32
 	idx = *next % RTE_HASH_BUCKET_ENTRIES;
 
 	/* If current position is empty, go to the next one */
-	while ((position = h->buckets[bucket_idx].key_idx[idx]) == EMPTY_SLOT) {
+	while ((position = __atomic_load_n(&h->buckets[bucket_idx].key_idx[idx],
+					__ATOMIC_ACQUIRE)) == EMPTY_SLOT) {
 		(*next)++;
 		/* End of table */
 		if (*next == total_entries)
-- 
2.7.4

  parent reply	other threads:[~2018-10-12  6:32 UTC|newest]

Thread overview: 20+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-10-12  6:31 [dpdk-dev] [PATCH v3 0/7] Address reader-writer concurrency in rte_hash Honnappa Nagarahalli
2018-10-12  6:31 ` [dpdk-dev] [PATCH v3 1/7] hash: separate multi-writer from rw-concurrency Honnappa Nagarahalli
2018-10-13  1:02   ` Wang, Yipeng1
2018-10-15 20:15     ` Honnappa Nagarahalli
2018-10-12  6:31 ` [dpdk-dev] [PATCH v3 2/7] hash: support do not recycle on delete Honnappa Nagarahalli
2018-10-13  1:17   ` Wang, Yipeng1
2018-10-16  1:25     ` Honnappa Nagarahalli
2018-10-12  6:31 ` [dpdk-dev] [PATCH v3 3/7] hash: correct key store element alignment Honnappa Nagarahalli
2018-10-13  1:20   ` Wang, Yipeng1
2018-10-16 23:26     ` Honnappa Nagarahalli
2018-10-12  6:31 ` Honnappa Nagarahalli [this message]
2018-10-13  1:56   ` [dpdk-dev] [PATCH v3 4/7] hash: add memory ordering to avoid race conditions Wang, Yipeng1
2018-10-16 23:28     ` Honnappa Nagarahalli
2018-10-12  6:31 ` [dpdk-dev] [PATCH v3 5/7] hash: fix rw concurrency while moving keys Honnappa Nagarahalli
2018-10-13  2:07   ` Wang, Yipeng1
2018-10-12  6:31 ` [dpdk-dev] [PATCH v3 6/7] hash: enable lock-free reader-writer concurrency Honnappa Nagarahalli
2018-10-13  2:32   ` Wang, Yipeng1
2018-10-17 13:54     ` Honnappa Nagarahalli
2018-10-12  6:31 ` [dpdk-dev] [PATCH v3 7/7] test/hash: read-write lock-free concurrency test Honnappa Nagarahalli
2018-10-13  2:52   ` Wang, Yipeng1

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1539325918-125438-5-git-send-email-honnappa.nagarahalli@arm.com \
    --to=honnappa.nagarahalli@arm.com \
    --cc=bruce.richardson@intel.com \
    --cc=dev@dpdk.org \
    --cc=dharmik.thakkar@arm.com \
    --cc=gavin.hu@arm.com \
    --cc=nd@arm.com \
    --cc=pablo.de.lara.guarch@intel.com \
    --cc=yipeng1.wang@intel.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).