From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mail.lysator.liu.se (mail.lysator.liu.se [130.236.254.3]) by dpdk.org (Postfix) with ESMTP id 8E7431B8CC for ; Fri, 14 Dec 2018 22:15:07 +0100 (CET) Received: from mail.lysator.liu.se (localhost [127.0.0.1]) by mail.lysator.liu.se (Postfix) with ESMTP id 0793E40028 for ; Fri, 14 Dec 2018 22:15:07 +0100 (CET) Received: by mail.lysator.liu.se (Postfix, from userid 1004) id E6D3140027; Fri, 14 Dec 2018 22:15:06 +0100 (CET) X-Spam-Checker-Version: SpamAssassin 3.4.1 (2015-04-28) on bernadotte.lysator.liu.se X-Spam-Level: X-Spam-Status: No, score=-0.9 required=5.0 tests=ALL_TRUSTED,AWL autolearn=disabled version=3.4.1 X-Spam-Score: -0.9 Received: from [192.168.1.59] (host-90-232-140-56.mobileonline.telia.com [90.232.140.56]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by mail.lysator.liu.se (Postfix) with ESMTPSA id B05AF40021; Fri, 14 Dec 2018 22:15:05 +0100 (CET) To: Erik Gabriel Carrillo , dev@dpdk.org References: <1544214885-6811-1-git-send-email-erik.g.carrillo@intel.com> <1544802346-1249-1-git-send-email-erik.g.carrillo@intel.com> <1544802346-1249-2-git-send-email-erik.g.carrillo@intel.com> From: =?UTF-8?Q?Mattias_R=c3=b6nnblom?= Message-ID: Date: Fri, 14 Dec 2018 22:15:05 +0100 User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:60.0) Gecko/20100101 Thunderbird/60.2.1 MIME-Version: 1.0 In-Reply-To: <1544802346-1249-2-git-send-email-erik.g.carrillo@intel.com> Content-Type: text/plain; charset=utf-8; format=flowed Content-Language: en-US Content-Transfer-Encoding: 7bit X-Virus-Scanned: ClamAV using ClamSMTP Subject: Re: [dpdk-dev] [PATCH v3 1/1] eventdev: add new software event timer adapter X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Fri, 14 Dec 2018 21:15:07 -0000 On 2018-12-14 16:45, Erik Gabriel Carrillo wrote: > This patch introduces a new version of the event timer adapter software > PMD. In the original design, timer event producer lcores in the primary > and secondary processes enqueued event timers into a ring, and a > service core in the primary process dequeued them and processed them > further. To improve performance, this version does away with the ring > and lets lcores in both primary and secondary processes insert timers > directly into timer skiplist data structures; the service core directly > accesses the lists as well, when looking for timers that have expired. > > Signed-off-by: Erik Gabriel Carrillo > --- > lib/librte_eventdev/rte_event_timer_adapter.c | 688 +++++++++++--------------- > 1 file changed, 276 insertions(+), 412 deletions(-) > > diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c > index 79070d4..029a45a 100644 > --- a/lib/librte_eventdev/rte_event_timer_adapter.c > +++ b/lib/librte_eventdev/rte_event_timer_adapter.c > @@ -19,6 +19,7 @@ > #include > #include > #include > +#include You aren't using anything from rte_random.h. /../ > -static __rte_always_inline uint16_t > -__sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter, > - struct rte_event_timer **evtims, > - uint16_t nb_evtims) > +static uint16_t > +__swtim_arm_burst(const struct rte_event_timer_adapter *adapter, > + struct rte_event_timer **evtims, > + uint16_t nb_evtims) > { > - uint16_t i; > - int ret; > - struct rte_event_timer_adapter_sw_data *sw_data; > - struct msg *msgs[nb_evtims]; > + int i, ret; > + struct swtim *sw = swtim_pmd_priv(adapter); > + uint32_t lcore_id = rte_lcore_id(); > + struct rte_timer *tim, *tims[nb_evtims]; > + uint64_t cycles; > > #ifdef RTE_LIBRTE_EVENTDEV_DEBUG > /* Check that the service is running. */ > @@ -1101,101 +979,104 @@ __sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter, > } > #endif > > - sw_data = adapter->data->adapter_priv; > + /* Adjust lcore_id if non-EAL thread. Arbitrarily pick the timer list of > + * the highest lcore to insert such timers into > + */ > + if (lcore_id == LCORE_ID_ANY) > + lcore_id = RTE_MAX_LCORE - 1; > + > + /* If this is the first time we're arming an event timer on this lcore, > + * mark this lcore as "in use"; this will cause the service > + * function to process the timer list that corresponds to this lcore. > + */ > + if (unlikely(rte_atomic16_test_and_set(&sw->in_use[lcore_id].v))) { > + rte_spinlock_lock(&sw->poll_lcores_sl); > + EVTIM_LOG_DBG("Adding lcore id = %u to list of lcores to poll", > + lcore_id); > + sw->poll_lcores[sw->n_poll_lcores++] = lcore_id; > + rte_spinlock_unlock(&sw->poll_lcores_sl); > + } > > - ret = rte_mempool_get_bulk(sw_data->msg_pool, (void **)msgs, nb_evtims); > + ret = rte_mempool_get_bulk(sw->tim_pool, (void **)tims, > + nb_evtims); > if (ret < 0) { > rte_errno = ENOSPC; > return 0; > } > > - /* Let the service know we're producing messages for it to process */ > - rte_atomic16_inc(&sw_data->message_producer_count); > - > - /* If the service is managing timers, wait for it to finish */ > - while (sw_data->service_phase == 2) > - rte_pause(); > - > - rte_smp_rmb(); > - > for (i = 0; i < nb_evtims; i++) { > /* Don't modify the event timer state in these cases */ > if (evtims[i]->state == RTE_EVENT_TIMER_ARMED) { > rte_errno = EALREADY; > break; > } else if (!(evtims[i]->state == RTE_EVENT_TIMER_NOT_ARMED || > - evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) { > + evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) { > rte_errno = EINVAL; > break; > } > > ret = check_timeout(evtims[i], adapter); > - if (ret == -1) { > + if (unlikely(ret == -1)) { > evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOLATE; > rte_errno = EINVAL; > break; > - } > - if (ret == -2) { > + } else if (unlikely(ret == -2)) { > evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOEARLY; > rte_errno = EINVAL; > break; > } > > - if (check_destination_event_queue(evtims[i], adapter) < 0) { > + if (unlikely(check_destination_event_queue(evtims[i], > + adapter) < 0)) { > evtims[i]->state = RTE_EVENT_TIMER_ERROR; > rte_errno = EINVAL; > break; > } > > - /* Checks passed, set up a message to enqueue */ > - msgs[i]->type = MSG_TYPE_ARM; > - msgs[i]->evtim = evtims[i]; > + tim = tims[i]; > + rte_timer_init(tim); > > - /* Set the payload pointer if not set. */ > - if (evtims[i]->ev.event_ptr == NULL) > - evtims[i]->ev.event_ptr = evtims[i]; > + evtims[i]->impl_opaque[0] = (uintptr_t)tim; > + evtims[i]->impl_opaque[1] = (uintptr_t)adapter; > > - /* msg objects that get enqueued successfully will be freed > - * either by a future cancel operation or by the timer > - * expiration callback. > - */ > - if (rte_ring_enqueue(sw_data->msg_ring, msgs[i]) < 0) { > - rte_errno = ENOSPC; > + cycles = get_timeout_cycles(evtims[i], adapter); > + ret = rte_timer_alt_reset(sw->timer_data_id, tim, cycles, > + SINGLE, lcore_id, NULL, evtims[i]); > + if (ret < 0) { > + /* tim was in RUNNING or CONFIG state */ > + evtims[i]->state = RTE_EVENT_TIMER_ERROR; > break; > } > > - EVTIM_LOG_DBG("enqueued ARM message to ring"); > - > + rte_smp_wmb(); > + EVTIM_LOG_DBG("armed an event timer"); > evtims[i]->state = RTE_EVENT_TIMER_ARMED; This looks like you want a reader to see the impl_opaque[] stores, before the state store, which sounds like a good idea. However, I fail to find the corresponding read barriers on the reader side. Shouldn't swtim_cancel_burst() have such? It's loading state, and loading impl_opaque[], but there's no guarantee those memory loads happens in program order.