From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id C2D66A00BE; Tue, 7 Jul 2020 13:15:03 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 3B8D61DE1A; Tue, 7 Jul 2020 13:14:51 +0200 (CEST) Received: from foss.arm.com (foss.arm.com [217.140.110.172]) by dpdk.org (Postfix) with ESMTP id B97561DE13 for ; Tue, 7 Jul 2020 13:14:49 +0200 (CEST) Received: from usa-sjc-imap-foss1.foss.arm.com (unknown [10.121.207.14]) by usa-sjc-mx-foss1.foss.arm.com (Postfix) with ESMTP id 3C1E6113E; Tue, 7 Jul 2020 04:14:49 -0700 (PDT) Received: from phil-VirtualBox.shanghai.arm.com (phil-VirtualBox.shanghai.arm.com [10.169.109.153]) by usa-sjc-imap-foss1.foss.arm.com (Postfix) with ESMTPA id 4A10D3F71E; Tue, 7 Jul 2020 04:14:45 -0700 (PDT) From: Phil Yang To: thomas@monjalon.net, erik.g.carrillo@intel.com, dev@dpdk.org Cc: jerinj@marvell.com, Honnappa.Nagarahalli@arm.com, drc@linux.vnet.ibm.com, Ruifeng.Wang@arm.com, Dharmik.Thakkar@arm.com, nd@arm.com, david.marchand@redhat.com, mdr@ashroe.eu, nhorman@tuxdriver.com, dodji@redhat.com Date: Tue, 7 Jul 2020 19:13:23 +0800 Message-Id: <1594120403-17643-4-git-send-email-phil.yang@arm.com> X-Mailer: git-send-email 2.7.4 In-Reply-To: <1594120403-17643-1-git-send-email-phil.yang@arm.com> References: <1593667604-12029-1-git-send-email-phil.yang@arm.com> <1594120403-17643-1-git-send-email-phil.yang@arm.com> Subject: [dpdk-dev] [PATCH v3 4/4] eventdev: relax smp barriers with C11 atomics X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" The impl_opaque field is shared between the timer arm and cancel operations. Meanwhile, the state flag acts as a guard variable to make sure the update of impl_opaque is synchronized. The original code uses rte_smp barriers to achieve that. This patch uses C11 atomics with an explicit one-way memory barrier instead of full barriers rte_smp_w/rmb() to avoid the unnecessary barrier on aarch64. Since compilers can generate the same instructions for volatile and non-volatile variable in C11 __atomics built-ins, so remain the volatile keyword in front of state enum to avoid the ABI break issue. Signed-off-by: Phil Yang Reviewed-by: Dharmik Thakkar Reviewed-by: Ruifeng Wang Acked-by: Erik Gabriel Carrillo --- v3: Fix ABI issue: revert to 'volatile enum rte_event_timer_state type state'. v2: 1. Removed implementation-specific opaque data cleanup code. 2. Replaced thread fence with atomic ACQURE/RELEASE ordering on state access. lib/librte_eventdev/rte_event_timer_adapter.c | 55 ++++++++++++++++++--------- 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c index d75415c..eb2c93a 100644 --- a/lib/librte_eventdev/rte_event_timer_adapter.c +++ b/lib/librte_eventdev/rte_event_timer_adapter.c @@ -629,7 +629,8 @@ swtim_callback(struct rte_timer *tim) sw->expired_timers[sw->n_expired_timers++] = tim; sw->stats.evtim_exp_count++; - evtim->state = RTE_EVENT_TIMER_NOT_ARMED; + __atomic_store_n(&evtim->state, RTE_EVENT_TIMER_NOT_ARMED, + __ATOMIC_RELEASE); } if (event_buffer_batch_ready(&sw->buffer)) { @@ -1020,6 +1021,7 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter, int n_lcores; /* Timer list for this lcore is not in use. */ uint16_t exp_state = 0; + enum rte_event_timer_state n_state; #ifdef RTE_LIBRTE_EVENTDEV_DEBUG /* Check that the service is running. */ @@ -1060,30 +1062,36 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter, } for (i = 0; i < nb_evtims; i++) { - /* Don't modify the event timer state in these cases */ - if (evtims[i]->state == RTE_EVENT_TIMER_ARMED) { + n_state = __atomic_load_n(&evtims[i]->state, __ATOMIC_ACQUIRE); + if (n_state == RTE_EVENT_TIMER_ARMED) { rte_errno = EALREADY; break; - } else if (!(evtims[i]->state == RTE_EVENT_TIMER_NOT_ARMED || - evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) { + } else if (!(n_state == RTE_EVENT_TIMER_NOT_ARMED || + n_state == RTE_EVENT_TIMER_CANCELED)) { rte_errno = EINVAL; break; } ret = check_timeout(evtims[i], adapter); if (unlikely(ret == -1)) { - evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOLATE; + __atomic_store_n(&evtims[i]->state, + RTE_EVENT_TIMER_ERROR_TOOLATE, + __ATOMIC_RELAXED); rte_errno = EINVAL; break; } else if (unlikely(ret == -2)) { - evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOEARLY; + __atomic_store_n(&evtims[i]->state, + RTE_EVENT_TIMER_ERROR_TOOEARLY, + __ATOMIC_RELAXED); rte_errno = EINVAL; break; } if (unlikely(check_destination_event_queue(evtims[i], adapter) < 0)) { - evtims[i]->state = RTE_EVENT_TIMER_ERROR; + __atomic_store_n(&evtims[i]->state, + RTE_EVENT_TIMER_ERROR, + __ATOMIC_RELAXED); rte_errno = EINVAL; break; } @@ -1099,13 +1107,18 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter, SINGLE, lcore_id, NULL, evtims[i]); if (ret < 0) { /* tim was in RUNNING or CONFIG state */ - evtims[i]->state = RTE_EVENT_TIMER_ERROR; + __atomic_store_n(&evtims[i]->state, + RTE_EVENT_TIMER_ERROR, + __ATOMIC_RELEASE); break; } - rte_smp_wmb(); EVTIM_LOG_DBG("armed an event timer"); - evtims[i]->state = RTE_EVENT_TIMER_ARMED; + /* RELEASE ordering guarantees the adapter specific value + * changes observed before the update of state. + */ + __atomic_store_n(&evtims[i]->state, RTE_EVENT_TIMER_ARMED, + __ATOMIC_RELEASE); } if (i < nb_evtims) @@ -1132,6 +1145,7 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter, struct rte_timer *timp; uint64_t opaque; struct swtim *sw = swtim_pmd_priv(adapter); + enum rte_event_timer_state n_state; #ifdef RTE_LIBRTE_EVENTDEV_DEBUG /* Check that the service is running. */ @@ -1143,16 +1157,18 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter, for (i = 0; i < nb_evtims; i++) { /* Don't modify the event timer state in these cases */ - if (evtims[i]->state == RTE_EVENT_TIMER_CANCELED) { + /* ACQUIRE ordering guarantees the access of implementation + * specific opague data under the correct state. + */ + n_state = __atomic_load_n(&evtims[i]->state, __ATOMIC_ACQUIRE); + if (n_state == RTE_EVENT_TIMER_CANCELED) { rte_errno = EALREADY; break; - } else if (evtims[i]->state != RTE_EVENT_TIMER_ARMED) { + } else if (n_state != RTE_EVENT_TIMER_ARMED) { rte_errno = EINVAL; break; } - rte_smp_rmb(); - opaque = evtims[i]->impl_opaque[0]; timp = (struct rte_timer *)(uintptr_t)opaque; RTE_ASSERT(timp != NULL); @@ -1166,9 +1182,12 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter, rte_mempool_put(sw->tim_pool, (void **)timp); - evtims[i]->state = RTE_EVENT_TIMER_CANCELED; - - rte_smp_wmb(); + /* The RELEASE ordering here pairs with atomic ordering + * to make sure the state update data observed between + * threads. + */ + __atomic_store_n(&evtims[i]->state, RTE_EVENT_TIMER_CANCELED, + __ATOMIC_RELEASE); } return i; -- 2.7.4