From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <SRS0+YpXY=OX=ericsson.com=mattias.ronnblom@lysator.liu.se>
Received: from mail.lysator.liu.se (mail.lysator.liu.se [130.236.254.3])
 by dpdk.org (Postfix) with ESMTP id 8E7431B8CC
 for <dev@dpdk.org>; Fri, 14 Dec 2018 22:15:07 +0100 (CET)
Received: from mail.lysator.liu.se (localhost [127.0.0.1])
 by mail.lysator.liu.se (Postfix) with ESMTP id 0793E40028
 for <dev@dpdk.org>; Fri, 14 Dec 2018 22:15:07 +0100 (CET)
Received: by mail.lysator.liu.se (Postfix, from userid 1004)
 id E6D3140027; Fri, 14 Dec 2018 22:15:06 +0100 (CET)
X-Spam-Checker-Version: SpamAssassin 3.4.1 (2015-04-28) on
 bernadotte.lysator.liu.se
X-Spam-Level: 
X-Spam-Status: No, score=-0.9 required=5.0 tests=ALL_TRUSTED,AWL
 autolearn=disabled version=3.4.1
X-Spam-Score: -0.9
Received: from [192.168.1.59] (host-90-232-140-56.mobileonline.telia.com
 [90.232.140.56])
 (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits))
 (No client certificate requested)
 by mail.lysator.liu.se (Postfix) with ESMTPSA id B05AF40021;
 Fri, 14 Dec 2018 22:15:05 +0100 (CET)
To: Erik Gabriel Carrillo <erik.g.carrillo@intel.com>, dev@dpdk.org
References: <1544214885-6811-1-git-send-email-erik.g.carrillo@intel.com>
 <1544802346-1249-1-git-send-email-erik.g.carrillo@intel.com>
 <1544802346-1249-2-git-send-email-erik.g.carrillo@intel.com>
From: =?UTF-8?Q?Mattias_R=c3=b6nnblom?= <mattias.ronnblom@ericsson.com>
Message-ID: <e4112ea0-bcac-64fe-4bc2-37de7dac0e85@ericsson.com>
Date: Fri, 14 Dec 2018 22:15:05 +0100
User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:60.0) Gecko/20100101
 Thunderbird/60.2.1
MIME-Version: 1.0
In-Reply-To: <1544802346-1249-2-git-send-email-erik.g.carrillo@intel.com>
Content-Type: text/plain; charset=utf-8; format=flowed
Content-Language: en-US
Content-Transfer-Encoding: 7bit
X-Virus-Scanned: ClamAV using ClamSMTP
Subject: Re: [dpdk-dev] [PATCH v3 1/1] eventdev: add new software event
 timer adapter
X-BeenThere: dev@dpdk.org
X-Mailman-Version: 2.1.15
Precedence: list
List-Id: DPDK patches and discussions <dev.dpdk.org>
List-Unsubscribe: <https://mails.dpdk.org/options/dev>,
 <mailto:dev-request@dpdk.org?subject=unsubscribe>
List-Archive: <http://mails.dpdk.org/archives/dev/>
List-Post: <mailto:dev@dpdk.org>
List-Help: <mailto:dev-request@dpdk.org?subject=help>
List-Subscribe: <https://mails.dpdk.org/listinfo/dev>,
 <mailto:dev-request@dpdk.org?subject=subscribe>
X-List-Received-Date: Fri, 14 Dec 2018 21:15:07 -0000

On 2018-12-14 16:45, Erik Gabriel Carrillo wrote:
> This patch introduces a new version of the event timer adapter software
> PMD. In the original design, timer event producer lcores in the primary
> and secondary processes enqueued event timers into a ring, and a
> service core in the primary process dequeued them and processed them
> further.  To improve performance, this version does away with the ring
> and lets lcores in both primary and secondary processes insert timers
> directly into timer skiplist data structures; the service core directly
> accesses the lists as well, when looking for timers that have expired.
> 
> Signed-off-by: Erik Gabriel Carrillo <erik.g.carrillo@intel.com>
> ---
>   lib/librte_eventdev/rte_event_timer_adapter.c | 688 +++++++++++---------------
>   1 file changed, 276 insertions(+), 412 deletions(-)
> 
> diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c
> index 79070d4..029a45a 100644
> --- a/lib/librte_eventdev/rte_event_timer_adapter.c
> +++ b/lib/librte_eventdev/rte_event_timer_adapter.c
> @@ -19,6 +19,7 @@
>   #include <rte_timer.h>
>   #include <rte_service_component.h>
>   #include <rte_cycles.h>
> +#include <rte_random.h>

You aren't using anything from rte_random.h.

/../

> -static __rte_always_inline uint16_t
> -__sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,
> -			  struct rte_event_timer **evtims,
> -			  uint16_t nb_evtims)
> +static uint16_t
> +__swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
> +		struct rte_event_timer **evtims,
> +		uint16_t nb_evtims)
>   {
> -	uint16_t i;
> -	int ret;
> -	struct rte_event_timer_adapter_sw_data *sw_data;
> -	struct msg *msgs[nb_evtims];
> +	int i, ret;
> +	struct swtim *sw = swtim_pmd_priv(adapter);
> +	uint32_t lcore_id = rte_lcore_id();
> +	struct rte_timer *tim, *tims[nb_evtims];
> +	uint64_t cycles;
>   
>   #ifdef RTE_LIBRTE_EVENTDEV_DEBUG
>   	/* Check that the service is running. */
> @@ -1101,101 +979,104 @@ __sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,
>   	}
>   #endif
>   
> -	sw_data = adapter->data->adapter_priv;
> +	/* Adjust lcore_id if non-EAL thread. Arbitrarily pick the timer list of
> +	 * the highest lcore to insert such timers into
> +	 */
> +	if (lcore_id == LCORE_ID_ANY)
> +		lcore_id = RTE_MAX_LCORE - 1;
> +
> +	/* If this is the first time we're arming an event timer on this lcore,
> +	 * mark this lcore as "in use"; this will cause the service
> +	 * function to process the timer list that corresponds to this lcore.
> +	 */
> +	if (unlikely(rte_atomic16_test_and_set(&sw->in_use[lcore_id].v))) {
> +		rte_spinlock_lock(&sw->poll_lcores_sl);
> +		EVTIM_LOG_DBG("Adding lcore id = %u to list of lcores to poll",
> +			      lcore_id);
> +		sw->poll_lcores[sw->n_poll_lcores++] = lcore_id;
> +		rte_spinlock_unlock(&sw->poll_lcores_sl);
> +	}
>   
> -	ret = rte_mempool_get_bulk(sw_data->msg_pool, (void **)msgs, nb_evtims);
> +	ret = rte_mempool_get_bulk(sw->tim_pool, (void **)tims,
> +				   nb_evtims);
>   	if (ret < 0) {
>   		rte_errno = ENOSPC;
>   		return 0;
>   	}
>   
> -	/* Let the service know we're producing messages for it to process */
> -	rte_atomic16_inc(&sw_data->message_producer_count);
> -
> -	/* If the service is managing timers, wait for it to finish */
> -	while (sw_data->service_phase == 2)
> -		rte_pause();
> -
> -	rte_smp_rmb();
> -
>   	for (i = 0; i < nb_evtims; i++) {
>   		/* Don't modify the event timer state in these cases */
>   		if (evtims[i]->state == RTE_EVENT_TIMER_ARMED) {
>   			rte_errno = EALREADY;
>   			break;
>   		} else if (!(evtims[i]->state == RTE_EVENT_TIMER_NOT_ARMED ||
> -		    evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
> +			     evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
>   			rte_errno = EINVAL;
>   			break;
>   		}
>   
>   		ret = check_timeout(evtims[i], adapter);
> -		if (ret == -1) {
> +		if (unlikely(ret == -1)) {
>   			evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOLATE;
>   			rte_errno = EINVAL;
>   			break;
> -		}
> -		if (ret == -2) {
> +		} else if (unlikely(ret == -2)) {
>   			evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOEARLY;
>   			rte_errno = EINVAL;
>   			break;
>   		}
>   
> -		if (check_destination_event_queue(evtims[i], adapter) < 0) {
> +		if (unlikely(check_destination_event_queue(evtims[i],
> +							   adapter) < 0)) {
>   			evtims[i]->state = RTE_EVENT_TIMER_ERROR;
>   			rte_errno = EINVAL;
>   			break;
>   		}
>   
> -		/* Checks passed, set up a message to enqueue */
> -		msgs[i]->type = MSG_TYPE_ARM;
> -		msgs[i]->evtim = evtims[i];
> +		tim = tims[i];
> +		rte_timer_init(tim);
>   
> -		/* Set the payload pointer if not set. */
> -		if (evtims[i]->ev.event_ptr == NULL)
> -			evtims[i]->ev.event_ptr = evtims[i];
> +		evtims[i]->impl_opaque[0] = (uintptr_t)tim;
> +		evtims[i]->impl_opaque[1] = (uintptr_t)adapter;
>   
> -		/* msg objects that get enqueued successfully will be freed
> -		 * either by a future cancel operation or by the timer
> -		 * expiration callback.
> -		 */
> -		if (rte_ring_enqueue(sw_data->msg_ring, msgs[i]) < 0) {
> -			rte_errno = ENOSPC;
> +		cycles = get_timeout_cycles(evtims[i], adapter);
> +		ret = rte_timer_alt_reset(sw->timer_data_id, tim, cycles,
> +					  SINGLE, lcore_id, NULL, evtims[i]);
> +		if (ret < 0) {
> +			/* tim was in RUNNING or CONFIG state */
> +			evtims[i]->state = RTE_EVENT_TIMER_ERROR;
>   			break;
>   		}
>   
> -		EVTIM_LOG_DBG("enqueued ARM message to ring");
> -
> +		rte_smp_wmb();
> +		EVTIM_LOG_DBG("armed an event timer");
>   		evtims[i]->state = RTE_EVENT_TIMER_ARMED;

This looks like you want a reader to see the impl_opaque[] stores, 
before the state store, which sounds like a good idea.

However, I fail to find the corresponding read barriers on the reader 
side. Shouldn't swtim_cancel_burst() have such? It's loading state, and 
loading impl_opaque[], but there's no guarantee those memory loads 
happens in program order.