From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <dev-bounces@dpdk.org>
Received: from dpdk.org (dpdk.org [92.243.14.124])
	by inbox.dpdk.org (Postfix) with ESMTP id D910FA0523;
	Thu,  2 Jul 2020 07:28:55 +0200 (CEST)
Received: from [92.243.14.124] (localhost [127.0.0.1])
	by dpdk.org (Postfix) with ESMTP id BD2011D90B;
	Thu,  2 Jul 2020 07:28:42 +0200 (CEST)
Received: from foss.arm.com (foss.arm.com [217.140.110.172])
 by dpdk.org (Postfix) with ESMTP id BEB011D8DB
 for <dev@dpdk.org>; Thu,  2 Jul 2020 07:28:41 +0200 (CEST)
Received: from usa-sjc-imap-foss1.foss.arm.com (unknown [10.121.207.14])
 by usa-sjc-mx-foss1.foss.arm.com (Postfix) with ESMTP id 03392D6E;
 Wed,  1 Jul 2020 22:28:41 -0700 (PDT)
Received: from phil-VirtualBox.shanghai.arm.com
 (phil-VirtualBox.shanghai.arm.com [10.169.106.164])
 by usa-sjc-imap-foss1.foss.arm.com (Postfix) with ESMTPA id 5FB5B3F73C;
 Wed,  1 Jul 2020 22:28:38 -0700 (PDT)
From: Phil Yang <phil.yang@arm.com>
To: erik.g.carrillo@intel.com,
	dev@dpdk.org
Cc: jerinj@marvell.com, Honnappa.Nagarahalli@arm.com, drc@linux.vnet.ibm.com,
 Ruifeng.Wang@arm.com, Dharmik.Thakkar@arm.com, nd@arm.com
Date: Thu,  2 Jul 2020 13:26:44 +0800
Message-Id: <1593667604-12029-4-git-send-email-phil.yang@arm.com>
X-Mailer: git-send-email 2.7.4
In-Reply-To: <1593667604-12029-1-git-send-email-phil.yang@arm.com>
References: <1591960798-24024-1-git-send-email-phil.yang@arm.com>
 <1593667604-12029-1-git-send-email-phil.yang@arm.com>
Subject: [dpdk-dev] [PATCH v2 4/4] eventdev: relax smp barriers with c11
	atomics
X-BeenThere: dev@dpdk.org
X-Mailman-Version: 2.1.15
Precedence: list
List-Id: DPDK patches and discussions <dev.dpdk.org>
List-Unsubscribe: <https://mails.dpdk.org/options/dev>,
 <mailto:dev-request@dpdk.org?subject=unsubscribe>
List-Archive: <http://mails.dpdk.org/archives/dev/>
List-Post: <mailto:dev@dpdk.org>
List-Help: <mailto:dev-request@dpdk.org?subject=help>
List-Subscribe: <https://mails.dpdk.org/listinfo/dev>,
 <mailto:dev-request@dpdk.org?subject=subscribe>
Errors-To: dev-bounces@dpdk.org
Sender: "dev" <dev-bounces@dpdk.org>

The implementation-specific opaque data is shared between arm and cancel
operations. The state flag acts as a guard variable to make sure the
update of opaque data is synchronized. This patch uses c11 atomics with
explicit one way memory barrier instead of full barriers rte_smp_w/rmb()
to synchronize the opaque data between timer arm and cancel threads.

Signed-off-by: Phil Yang <phil.yang@arm.com>
Reviewed-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
---
v2:
1. Removed implementation-specific opaque data cleanup code.
2. Replaced thread fence with atomic ACQURE/RELEASE ordering on state access.

 lib/librte_eventdev/rte_event_timer_adapter.c | 55 ++++++++++++++++++---------
 lib/librte_eventdev/rte_event_timer_adapter.h |  2 +-
 2 files changed, 38 insertions(+), 19 deletions(-)

diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c
index 8909a8c..ca00258 100644
--- a/lib/librte_eventdev/rte_event_timer_adapter.c
+++ b/lib/librte_eventdev/rte_event_timer_adapter.c
@@ -629,7 +629,8 @@ swtim_callback(struct rte_timer *tim)
 		sw->expired_timers[sw->n_expired_timers++] = tim;
 		sw->stats.evtim_exp_count++;
 
-		evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
+		__atomic_store_n(&evtim->state, RTE_EVENT_TIMER_NOT_ARMED,
+				__ATOMIC_RELEASE);
 	}
 
 	if (event_buffer_batch_ready(&sw->buffer)) {
@@ -1020,6 +1021,7 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
 	int n_lcores;
 	/* Timer list for this lcore is not in use. */
 	uint16_t exp_state = 0;
+	enum rte_event_timer_state n_state;
 
 #ifdef RTE_LIBRTE_EVENTDEV_DEBUG
 	/* Check that the service is running. */
@@ -1060,30 +1062,36 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
 	}
 
 	for (i = 0; i < nb_evtims; i++) {
-		/* Don't modify the event timer state in these cases */
-		if (evtims[i]->state == RTE_EVENT_TIMER_ARMED) {
+		n_state = __atomic_load_n(&evtims[i]->state, __ATOMIC_ACQUIRE);
+		if (n_state == RTE_EVENT_TIMER_ARMED) {
 			rte_errno = EALREADY;
 			break;
-		} else if (!(evtims[i]->state == RTE_EVENT_TIMER_NOT_ARMED ||
-			     evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
+		} else if (!(n_state == RTE_EVENT_TIMER_NOT_ARMED ||
+			     n_state == RTE_EVENT_TIMER_CANCELED)) {
 			rte_errno = EINVAL;
 			break;
 		}
 
 		ret = check_timeout(evtims[i], adapter);
 		if (unlikely(ret == -1)) {
-			evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOLATE;
+			__atomic_store_n(&evtims[i]->state,
+					RTE_EVENT_TIMER_ERROR_TOOLATE,
+					__ATOMIC_RELAXED);
 			rte_errno = EINVAL;
 			break;
 		} else if (unlikely(ret == -2)) {
-			evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOEARLY;
+			__atomic_store_n(&evtims[i]->state,
+					RTE_EVENT_TIMER_ERROR_TOOEARLY,
+					__ATOMIC_RELAXED);
 			rte_errno = EINVAL;
 			break;
 		}
 
 		if (unlikely(check_destination_event_queue(evtims[i],
 							   adapter) < 0)) {
-			evtims[i]->state = RTE_EVENT_TIMER_ERROR;
+			__atomic_store_n(&evtims[i]->state,
+					RTE_EVENT_TIMER_ERROR,
+					__ATOMIC_RELAXED);
 			rte_errno = EINVAL;
 			break;
 		}
@@ -1099,13 +1107,18 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
 					  SINGLE, lcore_id, NULL, evtims[i]);
 		if (ret < 0) {
 			/* tim was in RUNNING or CONFIG state */
-			evtims[i]->state = RTE_EVENT_TIMER_ERROR;
+			__atomic_store_n(&evtims[i]->state,
+					RTE_EVENT_TIMER_ERROR,
+					__ATOMIC_RELEASE);
 			break;
 		}
 
-		rte_smp_wmb();
 		EVTIM_LOG_DBG("armed an event timer");
-		evtims[i]->state = RTE_EVENT_TIMER_ARMED;
+		/* RELEASE ordering guarantees the adapter specific value
+		 * changes observed before the update of state.
+		 */
+		__atomic_store_n(&evtims[i]->state, RTE_EVENT_TIMER_ARMED,
+				__ATOMIC_RELEASE);
 	}
 
 	if (i < nb_evtims)
@@ -1132,6 +1145,7 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
 	struct rte_timer *timp;
 	uint64_t opaque;
 	struct swtim *sw = swtim_pmd_priv(adapter);
+	enum rte_event_timer_state n_state;
 
 #ifdef RTE_LIBRTE_EVENTDEV_DEBUG
 	/* Check that the service is running. */
@@ -1143,16 +1157,18 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
 
 	for (i = 0; i < nb_evtims; i++) {
 		/* Don't modify the event timer state in these cases */
-		if (evtims[i]->state == RTE_EVENT_TIMER_CANCELED) {
+		/* ACQUIRE ordering guarantees the access of implementation
+		 * specific opague data under the correct state.
+		 */
+		n_state = __atomic_load_n(&evtims[i]->state, __ATOMIC_ACQUIRE);
+		if (n_state == RTE_EVENT_TIMER_CANCELED) {
 			rte_errno = EALREADY;
 			break;
-		} else if (evtims[i]->state != RTE_EVENT_TIMER_ARMED) {
+		} else if (n_state != RTE_EVENT_TIMER_ARMED) {
 			rte_errno = EINVAL;
 			break;
 		}
 
-		rte_smp_rmb();
-
 		opaque = evtims[i]->impl_opaque[0];
 		timp = (struct rte_timer *)(uintptr_t)opaque;
 		RTE_ASSERT(timp != NULL);
@@ -1166,9 +1182,12 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
 
 		rte_mempool_put(sw->tim_pool, (void **)timp);
 
-		evtims[i]->state = RTE_EVENT_TIMER_CANCELED;
-
-		rte_smp_wmb();
+		/* The RELEASE ordering here pairs with atomic ordering
+		 * to make sure the state update data observed between
+		 * threads.
+		 */
+		__atomic_store_n(&evtims[i]->state, RTE_EVENT_TIMER_CANCELED,
+				__ATOMIC_RELEASE);
 	}
 
 	return i;
diff --git a/lib/librte_eventdev/rte_event_timer_adapter.h b/lib/librte_eventdev/rte_event_timer_adapter.h
index d2ebcb0..6f64b90 100644
--- a/lib/librte_eventdev/rte_event_timer_adapter.h
+++ b/lib/librte_eventdev/rte_event_timer_adapter.h
@@ -467,7 +467,7 @@ struct rte_event_timer {
 	 *  - op: RTE_EVENT_OP_NEW
 	 *  - event_type: RTE_EVENT_TYPE_TIMER
 	 */
-	volatile enum rte_event_timer_state state;
+	enum rte_event_timer_state state;
 	/**< State of the event timer. */
 	uint64_t timeout_ticks;
 	/**< Expiry timer ticks expressed in number of *timer_ticks_ns* from
-- 
2.7.4