From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <dev-bounces@dpdk.org>
Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124])
	by inbox.dpdk.org (Postfix) with ESMTP id 22702A0032;
	Tue, 19 Jul 2022 22:28:00 +0200 (CEST)
Received: from [217.70.189.124] (localhost [127.0.0.1])
	by mails.dpdk.org (Postfix) with ESMTP id BBD7C410F1;
	Tue, 19 Jul 2022 22:27:59 +0200 (CEST)
Received: from mail-pl1-f181.google.com (mail-pl1-f181.google.com
 [209.85.214.181])
 by mails.dpdk.org (Postfix) with ESMTP id D46BE40FAE
 for <dev@dpdk.org>; Tue, 19 Jul 2022 22:27:58 +0200 (CEST)
Received: by mail-pl1-f181.google.com with SMTP id w7so567593plp.5
 for <dev@dpdk.org>; Tue, 19 Jul 2022 13:27:58 -0700 (PDT)
DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed;
 d=networkplumber-org.20210112.gappssmtp.com; s=20210112;
 h=from:to:cc:subject:date:message-id:in-reply-to:references
 :mime-version:content-transfer-encoding;
 bh=ULO2I1me2ePsz0N5ToUrkNO9gJqhoQQNRaB8lQ0Z9Q8=;
 b=Wf6LXSz+0jawYKXm4sUurtkP6R6Hql3W39+5mUkXKbFRA23kPpnWBas9LaeyP5b9IJ
 7k4CEaH3U4+7NzSMp6QBDxhxl9Pe2otsLxXwe+IwtkKrC9zipzDVM5jl4BcGWWtgeHeU
 Xu0jqZapf0Oumk8rgO1ljU8cbjRPks+32kiEwntzUertphqvraq/iCgkooMigBFJPaaL
 oTfrp0RWIgc1pse+lJwILNeRusIBddUkHkSdC8cxvmNYM15hOQHhDyytRaqnDpgahBCe
 TbG8F7txmcQXdQs4N46XIhmQZafG2Zw749/j2l6nTqenGC/rk/r2SK5keLgXa6ksoayU
 a34g==
X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed;
 d=1e100.net; s=20210112;
 h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to
 :references:mime-version:content-transfer-encoding;
 bh=ULO2I1me2ePsz0N5ToUrkNO9gJqhoQQNRaB8lQ0Z9Q8=;
 b=Q50ZeD5Xp67/Kzbssg7WDpPn80vhM2Eqzs0icufugp2iJ8Dh0nqV+X+ccsTTwI0in7
 JHlIVjLp0dxoDc5IRRjTSeWLi1u1gHjTwDE1u4uRbHy0H9cHtiaDSwi6Z6Nb/t7nzNWL
 v5ANok2ACtpi6N6pSa/aXqVI1XIx7EZh3PPXAXUd40tnJjM91H31NiExDxqa+mvMYbf7
 T1CuvqBpJ/ceHlA2JcPkxBBSsAteFOO13vAKaCz1kx7cWzh9hjiUGX7ddVLtrHfqx+pt
 0qwodY9t3zobgIjHaNAmO4Wl8VwELfFyHwnBR56aj4Xt98XAlay6olXTqZl+3ZcGYnv1
 rhOQ==
X-Gm-Message-State: AJIora+kr63/86ioOswTi3e/bke+iz+zJCevv//CRCayox9+moE9UPIy
 sVBRpbHPHoRMtdaF/Q0JmFAbo62bY8Dydw==
X-Google-Smtp-Source: AGRyM1v+PQJR4/bsV4lYbjS65UElPqfN+FNbzNGK6rF1uTqjt1bDeugYvyCH+bfk67GFf0AqYN89yQ==
X-Received: by 2002:a17:902:e551:b0:16c:5a22:4823 with SMTP id
 n17-20020a170902e55100b0016c5a224823mr33959621plf.38.1658262476789; 
 Tue, 19 Jul 2022 13:27:56 -0700 (PDT)
Received: from hermes.local (204-195-120-218.wavecable.com. [204.195.120.218])
 by smtp.gmail.com with ESMTPSA id
 g204-20020a6252d5000000b0052aca106b20sm11805880pfb.202.2022.07.19.13.27.54
 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256);
 Tue, 19 Jul 2022 13:27:55 -0700 (PDT)
From: Stephen Hemminger <stephen@networkplumber.org>
To: dev@dpdk.org
Cc: Stephen Hemminger <stephen@networkplumber.org>,
 =?UTF-8?q?Morten=20Br=C3=B8rup?= <mb@smartsharesystems.com>
Subject: [PATCH v2] rwlock: prevent readers from starving writers
Date: Tue, 19 Jul 2022 13:27:50 -0700
Message-Id: <20220719202750.563166-1-stephen@networkplumber.org>
X-Mailer: git-send-email 2.35.1
In-Reply-To: <20220707201226.618611-1-stephen@networkplumber.org>
References: <20220707201226.618611-1-stephen@networkplumber.org>
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
X-BeenThere: dev@dpdk.org
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: DPDK patches and discussions <dev.dpdk.org>
List-Unsubscribe: <https://mails.dpdk.org/options/dev>,
 <mailto:dev-request@dpdk.org?subject=unsubscribe>
List-Archive: <http://mails.dpdk.org/archives/dev/>
List-Post: <mailto:dev@dpdk.org>
List-Help: <mailto:dev-request@dpdk.org?subject=help>
List-Subscribe: <https://mails.dpdk.org/listinfo/dev>,
 <mailto:dev-request@dpdk.org?subject=subscribe>
Errors-To: dev-bounces@dpdk.org

Modify reader/writer lock to avoid starvation of writer.  The previous
implementation would cause a writer to get starved if readers kept
acquiring the lock.  The new version uses an additional bit to indicate
that a writer is waiting and which keeps readers from starving the
writer.

Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
Acked-by: Morten Brørup <mb@smartsharesystems.com>
---
v2 - incorporate feedback, change from RFC to PATCH

 lib/eal/include/generic/rte_rwlock.h | 119 +++++++++++++++++++--------
 1 file changed, 83 insertions(+), 36 deletions(-)

diff --git a/lib/eal/include/generic/rte_rwlock.h b/lib/eal/include/generic/rte_rwlock.h
index da9bc3e9c0e2..59ec54110444 100644
--- a/lib/eal/include/generic/rte_rwlock.h
+++ b/lib/eal/include/generic/rte_rwlock.h
@@ -15,23 +15,46 @@
  * one writer. All readers are blocked until the writer is finished
  * writing.
  *
+ * This version does not give preference to readers or writers
+ * and does not starve either readers or writers.
+ *
+ * See also:
+ *  https://locklessinc.com/articles/locks/
  */
 
 #ifdef __cplusplus
 extern "C" {
 #endif
 
+#include <rte_branch_prediction.h>
 #include <rte_common.h>
-#include <rte_atomic.h>
 #include <rte_pause.h>
 
 /**
  * The rte_rwlock_t type.
  *
- * cnt is -1 when write lock is held, and > 0 when read locks are held.
+ * Readers increment the counter by RTE_RWLOCK_READ (4)
+ * Writers set the RTE_RWLOCK_WRITE bit when lock is held
+ *     and set the RTE_RWLOCK_WAIT bit while waiting.
+ *
+ * 31                 2 1 0
+ * +-------------------+-+-+
+ * |  readers          | | |
+ * +-------------------+-+-+
+ *                      ^ ^
+ *                      | |
+ * WRITE: lock held ----/ |
+ * WAIT: writer pending --/
  */
+
+#define RTE_RWLOCK_WAIT	 0x1	/* Writer is waiting */
+#define RTE_RWLOCK_WRITE 0x2	/* Writer has the lock */
+#define RTE_RWLOCK_MASK  (RTE_RWLOCK_WAIT | RTE_RWLOCK_WRITE)
+				/* Writer is waiting or has lock */
+#define RTE_RWLOCK_READ	 0x4	/* Reader increment */
+
 typedef struct {
-	volatile int32_t cnt; /**< -1 when W lock held, > 0 when R locks held. */
+	int32_t cnt;
 } rte_rwlock_t;
 
 /**
@@ -61,17 +84,24 @@ static inline void
 rte_rwlock_read_lock(rte_rwlock_t *rwl)
 {
 	int32_t x;
-	int success = 0;
 
-	while (success == 0) {
-		x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
-		/* write lock is held */
-		if (x < 0) {
+	while (1) {
+		/* Wait while writer is present or pending */
+		while (__atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED)
+		       & RTE_RWLOCK_MASK)
 			rte_pause();
-			continue;
-		}
-		success = __atomic_compare_exchange_n(&rwl->cnt, &x, x + 1, 1,
-					__ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
+
+		/* Try to get read lock */
+		x = __atomic_add_fetch(&rwl->cnt, RTE_RWLOCK_READ,
+				       __ATOMIC_ACQUIRE);
+
+		/* If no writer, then acquire was successful */
+		if (likely(!(x & RTE_RWLOCK_MASK)))
+			return;
+
+		/* Lost race with writer, backout the change. */
+		__atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_READ,
+				   __ATOMIC_RELAXED);
 	}
 }
 
@@ -93,17 +123,24 @@ static inline int
 rte_rwlock_read_trylock(rte_rwlock_t *rwl)
 {
 	int32_t x;
-	int success = 0;
 
-	while (success == 0) {
-		x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
-		/* write lock is held */
-		if (x < 0)
-			return -EBUSY;
-		success = __atomic_compare_exchange_n(&rwl->cnt, &x, x + 1, 1,
-					__ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
-	}
+	x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
+
+	/* fail if write lock is held or writer is pending */
+	if (x & RTE_RWLOCK_MASK)
+		return -EBUSY;
 
+	/* Try to get read lock */
+	x = __atomic_add_fetch(&rwl->cnt, RTE_RWLOCK_READ,
+			       __ATOMIC_ACQUIRE);
+
+	/* Back out if writer raced in */
+	if (unlikely(x & RTE_RWLOCK_MASK)) {
+		__atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_READ,
+				   __ATOMIC_RELEASE);
+
+		return -EBUSY;
+	}
 	return 0;
 }
 
@@ -116,7 +153,7 @@ rte_rwlock_read_trylock(rte_rwlock_t *rwl)
 static inline void
 rte_rwlock_read_unlock(rte_rwlock_t *rwl)
 {
-	__atomic_fetch_sub(&rwl->cnt, 1, __ATOMIC_RELEASE);
+	__atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_READ, __ATOMIC_RELEASE);
 }
 
 /**
@@ -139,11 +176,12 @@ rte_rwlock_write_trylock(rte_rwlock_t *rwl)
 	int32_t x;
 
 	x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
-	if (x != 0 || __atomic_compare_exchange_n(&rwl->cnt, &x, -1, 1,
-			      __ATOMIC_ACQUIRE, __ATOMIC_RELAXED) == 0)
+	if (x < RTE_RWLOCK_WRITE &&
+	    __atomic_compare_exchange_n(&rwl->cnt, &x, x + RTE_RWLOCK_WRITE,
+					1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
+		return 0;
+	else
 		return -EBUSY;
-
-	return 0;
 }
 
 /**
@@ -156,18 +194,27 @@ static inline void
 rte_rwlock_write_lock(rte_rwlock_t *rwl)
 {
 	int32_t x;
-	int success = 0;
 
-	while (success == 0) {
+	while (1) {
 		x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
-		/* a lock is held */
-		if (x != 0) {
-			rte_pause();
-			continue;
+
+		/* No readers or writers? */
+		if (likely(x < RTE_RWLOCK_WRITE)) {
+			/* Turn off RTE_RWLOCK_WAIT, turn on RTE_RWLOCK_WRITE */
+			if (__atomic_compare_exchange_n(&rwl->cnt, &x, RTE_RWLOCK_WRITE, 1,
+							__ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
+				return;
 		}
-		success = __atomic_compare_exchange_n(&rwl->cnt, &x, -1, 1,
-					__ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
-	}
+
+		/* Turn on writer wait bit */
+		if (!(x & RTE_RWLOCK_WAIT))
+			__atomic_fetch_or(&rwl->cnt, RTE_RWLOCK_WAIT, __ATOMIC_RELAXED);
+
+		/* Wait until no readers befor trying again */
+		while (__atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED) > RTE_RWLOCK_WAIT)
+			rte_pause();
+
+    }
 }
 
 /**
@@ -179,7 +226,7 @@ rte_rwlock_write_lock(rte_rwlock_t *rwl)
 static inline void
 rte_rwlock_write_unlock(rte_rwlock_t *rwl)
 {
-	__atomic_store_n(&rwl->cnt, 0, __ATOMIC_RELEASE);
+	__atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_WRITE, __ATOMIC_RELEASE);
 }
 
 /**
-- 
2.35.1