From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga01.intel.com (mga01.intel.com [192.55.52.88]) by dpdk.org (Postfix) with ESMTP id B4FED11F5 for ; Tue, 29 Aug 2017 12:57:29 +0200 (CEST) Received: from orsmga002.jf.intel.com ([10.7.209.21]) by fmsmga101.fm.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 29 Aug 2017 03:57:27 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.41,444,1498546800"; d="scan'208";a="129410794" Received: from irsmsx105.ger.corp.intel.com ([163.33.3.28]) by orsmga002.jf.intel.com with ESMTP; 29 Aug 2017 03:57:26 -0700 Received: from irsmsx103.ger.corp.intel.com ([169.254.3.49]) by irsmsx105.ger.corp.intel.com ([169.254.7.75]) with mapi id 14.03.0319.002; Tue, 29 Aug 2017 11:57:25 +0100 From: "Ananyev, Konstantin" To: "Carrillo, Erik G" , "rsanford@akamai.com" CC: "dev@dpdk.org" Thread-Topic: [dpdk-dev] [PATCH v2 1/3] timer: add per-installer pending lists for each lcore Thread-Index: AQHTHeCLlAuqyjUOkEeh6qmyjhX0OqKbJ4MA Date: Tue, 29 Aug 2017 10:57:25 +0000 Message-ID: <2601191342CEEE43887BDE71AB9772584F23D8BE@IRSMSX103.ger.corp.intel.com> References: <1503499644-29432-2-git-send-email-erik.g.carrillo@intel.com> <1503692823-16214-1-git-send-email-erik.g.carrillo@intel.com> In-Reply-To: <1503692823-16214-1-git-send-email-erik.g.carrillo@intel.com> Accept-Language: en-IE, en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: dlp-product: dlpe-windows dlp-version: 11.0.0.116 dlp-reaction: no-action x-originating-ip: [163.33.239.180] Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: quoted-printable MIME-Version: 1.0 Subject: Re: [dpdk-dev] [PATCH v2 1/3] timer: add per-installer pending lists for each lcore X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 29 Aug 2017 10:57:30 -0000 Hi Gabriel, >=20 > Instead of each priv_timer struct containing a single skiplist, this > commit adds a skiplist for each enabled lcore to priv_timer. In the case > that multiple lcores repeatedly install timers on the same target lcore, > this change reduces lock contention for the target lcore's skiplists and > increases performance. I am not an rte_timer expert, but there is one thing that worries me: It seems that complexity of timer_manage() has increased with that patch qu= ite a bit: now it has to check/process up to RTE_MAX_LCORE skiplists instead of one, also it has to somehow to properly sort up to RTE_MAX_LCORE lists of retrieved (ready to run) timers. Wouldn't all that affect it's running time? I understand your intention to reduce lock contention, but I suppose at least it could be done in a configurable way. Let say allow user to specify dimension of pending_lists[] at init phase o= r so. Then timer from lcore_id=3DN will endup in pending_lists[N%RTE_DIM(pendilng= _list)]. Another thought - might be better to divide pending timers list not by clie= nt (lcore) id, but by expiration time - some analog of timer wheel or so. That, I think might greatly decrease the probability that timer_manage() an= d timer_add()=20 will try to access the same list. >>From other side timer_manage() still would have to consume skip-lists one b= y one. Though I suppose that's quite radical change from what we have right now. Konstantin >=20 > Signed-off-by: Gabriel Carrillo > --- > v2: > * Address checkpatch warnings >=20 > lib/librte_timer/rte_timer.c | 309 +++++++++++++++++++++++++++----------= ------ > lib/librte_timer/rte_timer.h | 9 +- > 2 files changed, 202 insertions(+), 116 deletions(-) >=20 > diff --git a/lib/librte_timer/rte_timer.c b/lib/librte_timer/rte_timer.c > index 5ee0840..da2fc1a 100644 > --- a/lib/librte_timer/rte_timer.c > +++ b/lib/librte_timer/rte_timer.c > @@ -37,6 +37,7 @@ > #include > #include > #include > +#include >=20 > #include > #include > @@ -56,17 +57,19 @@ >=20 > LIST_HEAD(rte_timer_list, rte_timer); >=20 > +struct skiplist { > + struct rte_timer head; /**< dummy timer instance to head up list */ > + rte_spinlock_t lock; /**< lock to protect list access */ > + unsigned int depth; /**< track the current depth of the skiplist */ > +} __rte_cache_aligned; > + > struct priv_timer { > - struct rte_timer pending_head; /**< dummy timer instance to head up li= st */ > - rte_spinlock_t list_lock; /**< lock to protect list access */ > + /** one pending list per enabled lcore */ > + struct skiplist pending_lists[RTE_MAX_LCORE]; >=20 > /** per-core variable that true if a timer was updated on this > * core since last reset of the variable */ > int updated; > - > - /** track the current depth of the skiplist */ > - unsigned curr_skiplist_depth; > - > unsigned prev_lcore; /**< used for lcore round robin */ >=20 > /** running timer on this lcore now */ > @@ -81,6 +84,10 @@ struct priv_timer { > /** per-lcore private info for timers */ > static struct priv_timer priv_timer[RTE_MAX_LCORE]; >=20 > +/** cache of IDs of enabled lcores */ > +static unsigned int enabled_lcores[RTE_MAX_LCORE]; > +static int n_enabled_lcores; > + > /* when debug is enabled, store some statistics */ > #ifdef RTE_LIBRTE_TIMER_DEBUG > #define __TIMER_STAT_ADD(name, n) do { \ > @@ -96,14 +103,25 @@ static struct priv_timer priv_timer[RTE_MAX_LCORE]; > void > rte_timer_subsystem_init(void) > { > - unsigned lcore_id; > + unsigned int lcore_id1, lcore_id2; > + struct skiplist *list; > + int i, j; > + > + RTE_LCORE_FOREACH(lcore_id1) > + enabled_lcores[n_enabled_lcores++] =3D lcore_id1; >=20 > /* since priv_timer is static, it's zeroed by default, so only init som= e > * fields. > */ > - for (lcore_id =3D 0; lcore_id < RTE_MAX_LCORE; lcore_id ++) { > - rte_spinlock_init(&priv_timer[lcore_id].list_lock); > - priv_timer[lcore_id].prev_lcore =3D lcore_id; > + for (i =3D 0, lcore_id1 =3D enabled_lcores[i]; i < n_enabled_lcores; > + lcore_id1 =3D enabled_lcores[++i]) { > + priv_timer[lcore_id1].prev_lcore =3D lcore_id1; > + > + for (j =3D 0, lcore_id2 =3D enabled_lcores[j]; j < n_enabled_lcores; > + lcore_id2 =3D enabled_lcores[++j]) { > + list =3D &priv_timer[lcore_id1].pending_lists[lcore_id2]; > + rte_spinlock_init(&list->lock); > + } > } > } >=20 > @@ -114,7 +132,8 @@ rte_timer_init(struct rte_timer *tim) > union rte_timer_status status; >=20 > status.state =3D RTE_TIMER_STOP; > - status.owner =3D RTE_TIMER_NO_OWNER; > + status.installer =3D RTE_TIMER_NO_LCORE; > + status.owner =3D RTE_TIMER_NO_LCORE; > tim->status.u32 =3D status.u32; > } >=20 > @@ -142,7 +161,7 @@ timer_set_config_state(struct rte_timer *tim, > * or ready to run on local core, exit > */ > if (prev_status.state =3D=3D RTE_TIMER_RUNNING && > - (prev_status.owner !=3D (uint16_t)lcore_id || > + (prev_status.owner !=3D (int)lcore_id || > tim !=3D priv_timer[lcore_id].running_tim)) > return -1; >=20 > @@ -153,7 +172,8 @@ timer_set_config_state(struct rte_timer *tim, > /* here, we know that timer is stopped or pending, > * mark it atomically as being configured */ > status.state =3D RTE_TIMER_CONFIG; > - status.owner =3D (int16_t)lcore_id; > + status.installer =3D RTE_TIMER_NO_LCORE; > + status.owner =3D lcore_id; > success =3D rte_atomic32_cmpset(&tim->status.u32, > prev_status.u32, > status.u32); > @@ -185,7 +205,8 @@ timer_set_running_state(struct rte_timer *tim) > /* here, we know that timer is stopped or pending, > * mark it atomically as being configured */ > status.state =3D RTE_TIMER_RUNNING; > - status.owner =3D (int16_t)lcore_id; > + status.installer =3D prev_status.installer; > + status.owner =3D lcore_id; > success =3D rte_atomic32_cmpset(&tim->status.u32, > prev_status.u32, > status.u32); > @@ -236,11 +257,11 @@ timer_get_skiplist_level(unsigned curr_depth) > * are <=3D that time value. > */ > static void > -timer_get_prev_entries(uint64_t time_val, unsigned tim_lcore, > +timer_get_prev_entries(uint64_t time_val, struct skiplist *list, > struct rte_timer **prev) > { > - unsigned lvl =3D priv_timer[tim_lcore].curr_skiplist_depth; > - prev[lvl] =3D &priv_timer[tim_lcore].pending_head; > + unsigned int lvl =3D list->depth; > + prev[lvl] =3D &list->head; > while(lvl !=3D 0) { > lvl--; > prev[lvl] =3D prev[lvl+1]; > @@ -255,15 +276,15 @@ timer_get_prev_entries(uint64_t time_val, unsigned = tim_lcore, > * all skiplist levels. > */ > static void > -timer_get_prev_entries_for_node(struct rte_timer *tim, unsigned tim_lcor= e, > +timer_get_prev_entries_for_node(struct rte_timer *tim, struct skiplist *= list, > struct rte_timer **prev) > { > int i; > /* to get a specific entry in the list, look for just lower than the ti= me > * values, and then increment on each level individually if necessary > */ > - timer_get_prev_entries(tim->expire - 1, tim_lcore, prev); > - for (i =3D priv_timer[tim_lcore].curr_skiplist_depth - 1; i >=3D 0; i--= ) { > + timer_get_prev_entries(tim->expire - 1, list, prev); > + for (i =3D list->depth - 1; i >=3D 0; i--) { > while (prev[i]->sl_next[i] !=3D NULL && > prev[i]->sl_next[i] !=3D tim && > prev[i]->sl_next[i]->expire <=3D tim->expire) > @@ -282,25 +303,25 @@ timer_add(struct rte_timer *tim, unsigned tim_lcore= , int local_is_locked) > unsigned lcore_id =3D rte_lcore_id(); > unsigned lvl; > struct rte_timer *prev[MAX_SKIPLIST_DEPTH+1]; > + struct skiplist *list =3D &priv_timer[tim_lcore].pending_lists[lcore_id= ]; >=20 > /* if timer needs to be scheduled on another core, we need to > * lock the list; if it is on local core, we need to lock if > * we are not called from rte_timer_manage() */ > if (tim_lcore !=3D lcore_id || !local_is_locked) > - rte_spinlock_lock(&priv_timer[tim_lcore].list_lock); > + rte_spinlock_lock(&list->lock); >=20 > /* find where exactly this element goes in the list of elements > * for each depth. */ > - timer_get_prev_entries(tim->expire, tim_lcore, prev); > + timer_get_prev_entries(tim->expire, list, prev); >=20 > /* now assign it a new level and add at that level */ > - const unsigned tim_level =3D timer_get_skiplist_level( > - priv_timer[tim_lcore].curr_skiplist_depth); > - if (tim_level =3D=3D priv_timer[tim_lcore].curr_skiplist_depth) > - priv_timer[tim_lcore].curr_skiplist_depth++; > + const unsigned int tim_level =3D timer_get_skiplist_level(list->depth); > + if (tim_level =3D=3D list->depth) > + list->depth++; >=20 > lvl =3D tim_level; > - while (lvl > 0) { > + while (lvl > 0 && lvl < MAX_SKIPLIST_DEPTH + 1) { > tim->sl_next[lvl] =3D prev[lvl]->sl_next[lvl]; > prev[lvl]->sl_next[lvl] =3D tim; > lvl--; > @@ -310,11 +331,10 @@ timer_add(struct rte_timer *tim, unsigned tim_lcore= , int local_is_locked) >=20 > /* save the lowest list entry into the expire field of the dummy hdr > * NOTE: this is not atomic on 32-bit*/ > - priv_timer[tim_lcore].pending_head.expire =3D priv_timer[tim_lcore].\ > - pending_head.sl_next[0]->expire; > + list->head.expire =3D list->head.sl_next[0]->expire; >=20 > if (tim_lcore !=3D lcore_id || !local_is_locked) > - rte_spinlock_unlock(&priv_timer[tim_lcore].list_lock); > + rte_spinlock_unlock(&list->lock); > } >=20 > /* > @@ -330,35 +350,38 @@ timer_del(struct rte_timer *tim, union rte_timer_st= atus prev_status, > unsigned prev_owner =3D prev_status.owner; > int i; > struct rte_timer *prev[MAX_SKIPLIST_DEPTH+1]; > + struct skiplist *list; > + > + list =3D &priv_timer[prev_owner].pending_lists[prev_status.installer]; >=20 > /* if timer needs is pending another core, we need to lock the > * list; if it is on local core, we need to lock if we are not > * called from rte_timer_manage() */ > if (prev_owner !=3D lcore_id || !local_is_locked) > - rte_spinlock_lock(&priv_timer[prev_owner].list_lock); > + rte_spinlock_lock(&list->lock); >=20 > /* save the lowest list entry into the expire field of the dummy hdr. > * NOTE: this is not atomic on 32-bit */ > - if (tim =3D=3D priv_timer[prev_owner].pending_head.sl_next[0]) > - priv_timer[prev_owner].pending_head.expire =3D > - ((tim->sl_next[0] =3D=3D NULL) ? 0 : tim->sl_next[0]->expire); > + if (tim =3D=3D list->head.sl_next[0]) > + list->head.expire =3D ((tim->sl_next[0] =3D=3D NULL) ? > + 0 : tim->sl_next[0]->expire); >=20 > /* adjust pointers from previous entries to point past this */ > - timer_get_prev_entries_for_node(tim, prev_owner, prev); > - for (i =3D priv_timer[prev_owner].curr_skiplist_depth - 1; i >=3D 0; i-= -) { > + timer_get_prev_entries_for_node(tim, list, prev); > + for (i =3D list->depth - 1; i >=3D 0; i--) { > if (prev[i]->sl_next[i] =3D=3D tim) > prev[i]->sl_next[i] =3D tim->sl_next[i]; > } >=20 > /* in case we deleted last entry at a level, adjust down max level */ > - for (i =3D priv_timer[prev_owner].curr_skiplist_depth - 1; i >=3D 0; i-= -) > - if (priv_timer[prev_owner].pending_head.sl_next[i] =3D=3D NULL) > - priv_timer[prev_owner].curr_skiplist_depth --; > + for (i =3D list->depth - 1; i >=3D 0; i--) > + if (list->head.sl_next[i] =3D=3D NULL) > + list->depth--; > else > break; >=20 > if (prev_owner !=3D lcore_id || !local_is_locked) > - rte_spinlock_unlock(&priv_timer[prev_owner].list_lock); > + rte_spinlock_unlock(&list->lock); > } >=20 > /* Reset and start the timer associated with the timer handle (private f= unc) */ > @@ -416,7 +439,8 @@ __rte_timer_reset(struct rte_timer *tim, uint64_t exp= ire, > * the state so we don't need to use cmpset() here */ > rte_wmb(); > status.state =3D RTE_TIMER_PENDING; > - status.owner =3D (int16_t)tim_lcore; > + status.installer =3D lcore_id; > + status.owner =3D tim_lcore; > tim->status.u32 =3D status.u32; >=20 > return 0; > @@ -484,7 +508,8 @@ rte_timer_stop(struct rte_timer *tim) > /* mark timer as stopped */ > rte_wmb(); > status.state =3D RTE_TIMER_STOP; > - status.owner =3D RTE_TIMER_NO_OWNER; > + status.installer =3D RTE_TIMER_NO_LCORE; > + status.owner =3D RTE_TIMER_NO_LCORE; > tim->status.u32 =3D status.u32; >=20 > return 0; > @@ -506,119 +531,179 @@ rte_timer_pending(struct rte_timer *tim) > } >=20 > /* must be called periodically, run all timer that expired */ > -void rte_timer_manage(void) > +void > +rte_timer_manage(void) > { > union rte_timer_status status; > - struct rte_timer *tim, *next_tim; > - struct rte_timer *run_first_tim, **pprev; > - unsigned lcore_id =3D rte_lcore_id(); > + struct rte_timer *tim, *next_tim, **pprev; > + struct rte_timer *run_first_tims[RTE_MAX_LCORE + 1]; > struct rte_timer *prev[MAX_SKIPLIST_DEPTH + 1]; > - uint64_t cur_time; > - int i, ret; > + struct priv_timer *priv_tim; > + unsigned int installer_lcore, lcore_id =3D rte_lcore_id(); > + uint64_t cur_time, min_expire; > + int i, j, min_idx, ret; > + int nrunlists =3D 0; > + int local_is_locked =3D 0; > + struct skiplist *dest_list, *list =3D NULL; > + bool done; >=20 > /* timer manager only runs on EAL thread with valid lcore_id */ > assert(lcore_id < RTE_MAX_LCORE); >=20 > + priv_tim =3D &priv_timer[lcore_id]; > + > __TIMER_STAT_ADD(manage, 1); > - /* optimize for the case where per-cpu list is empty */ > - if (priv_timer[lcore_id].pending_head.sl_next[0] =3D=3D NULL) > - return; > - cur_time =3D rte_get_timer_cycles(); > + for (i =3D 0, installer_lcore =3D enabled_lcores[i]; i < n_enabled_lcor= es; > + installer_lcore =3D enabled_lcores[++i]) { > + list =3D &priv_tim->pending_lists[installer_lcore]; > + > + /* optimize for the case where list is empty */ > + if (list->head.sl_next[0] =3D=3D NULL) > + continue; > + cur_time =3D rte_get_timer_cycles(); >=20 > #ifdef RTE_ARCH_X86_64 > - /* on 64-bit the value cached in the pending_head.expired will be > - * updated atomically, so we can consult that for a quick check here > - * outside the lock */ > - if (likely(priv_timer[lcore_id].pending_head.expire > cur_time)) > - return; > + /* on 64-bit the value cached in the list->head.expired will be > + * updated atomically, so we can consult that for a quick check > + * here outside the lock > + */ > + if (likely(list->head.expire > cur_time)) > + continue; > #endif >=20 > - /* browse ordered list, add expired timers in 'expired' list */ > - rte_spinlock_lock(&priv_timer[lcore_id].list_lock); > + /* browse ordered list, add expired timers in 'expired' list */ > + rte_spinlock_lock(&list->lock); >=20 > - /* if nothing to do just unlock and return */ > - if (priv_timer[lcore_id].pending_head.sl_next[0] =3D=3D NULL || > - priv_timer[lcore_id].pending_head.sl_next[0]->expire > cur_time) { > - rte_spinlock_unlock(&priv_timer[lcore_id].list_lock); > - return; > - } > + /* if nothing to do just unlock and continue */ > + if (list->head.sl_next[0] =3D=3D NULL || > + list->head.sl_next[0]->expire > cur_time) { > + rte_spinlock_unlock(&list->lock); > + continue; > + } >=20 > - /* save start of list of expired timers */ > - tim =3D priv_timer[lcore_id].pending_head.sl_next[0]; > + /* save start of list of expired timers */ > + tim =3D list->head.sl_next[0]; > + > + /* break the existing list at current time point */ > + timer_get_prev_entries(cur_time, list, prev); > + for (j =3D list->depth - 1; j >=3D 0; j--) { > + if (prev[j] =3D=3D &list->head) > + continue; > + list->head.sl_next[j] =3D > + prev[j]->sl_next[j]; > + if (prev[j]->sl_next[j] =3D=3D NULL) > + list->depth--; > + prev[j]->sl_next[j] =3D NULL; > + } >=20 > - /* break the existing list at current time point */ > - timer_get_prev_entries(cur_time, lcore_id, prev); > - for (i =3D priv_timer[lcore_id].curr_skiplist_depth -1; i >=3D 0; i--) = { > - if (prev[i] =3D=3D &priv_timer[lcore_id].pending_head) > - continue; > - priv_timer[lcore_id].pending_head.sl_next[i] =3D > - prev[i]->sl_next[i]; > - if (prev[i]->sl_next[i] =3D=3D NULL) > - priv_timer[lcore_id].curr_skiplist_depth--; > - prev[i] ->sl_next[i] =3D NULL; > - } > + /* transition run-list from PENDING to RUNNING */ > + run_first_tims[nrunlists] =3D tim; > + pprev =3D &run_first_tims[nrunlists]; > + nrunlists++; > + > + for (; tim !=3D NULL; tim =3D next_tim) { > + next_tim =3D tim->sl_next[0]; > + > + ret =3D timer_set_running_state(tim); > + if (likely(ret =3D=3D 0)) { > + pprev =3D &tim->sl_next[0]; > + } else { > + /* another core is trying to re-config this one, > + * remove it from local expired list > + */ > + *pprev =3D next_tim; > + } > + } >=20 > - /* transition run-list from PENDING to RUNNING */ > - run_first_tim =3D tim; > - pprev =3D &run_first_tim; > + /* update the next to expire timer value */ > + list->head.expire =3D (list->head.sl_next[0] =3D=3D NULL) ? > + 0 : list->head.sl_next[0]->expire; >=20 > - for ( ; tim !=3D NULL; tim =3D next_tim) { > - next_tim =3D tim->sl_next[0]; > + rte_spinlock_unlock(&list->lock); > + } >=20 > - ret =3D timer_set_running_state(tim); > - if (likely(ret =3D=3D 0)) { > - pprev =3D &tim->sl_next[0]; > - } else { > - /* another core is trying to re-config this one, > - * remove it from local expired list > - */ > - *pprev =3D next_tim; > + /* Now process the run lists */ > + while (1) { > + done =3D true; > + min_expire =3D UINT64_MAX; > + min_idx =3D 0; > + > + /* Find the next oldest timer to process */ > + for (i =3D 0; i < nrunlists; i++) { > + tim =3D run_first_tims[i]; > + > + if (tim !=3D NULL && tim->expire < min_expire) { > + min_expire =3D tim->expire; > + min_idx =3D i; > + done =3D false; > + } > } > - } >=20 > - /* update the next to expire timer value */ > - priv_timer[lcore_id].pending_head.expire =3D > - (priv_timer[lcore_id].pending_head.sl_next[0] =3D=3D NULL) ? 0 : > - priv_timer[lcore_id].pending_head.sl_next[0]->expire; > + if (done) > + break; > + > + tim =3D run_first_tims[min_idx]; >=20 > - rte_spinlock_unlock(&priv_timer[lcore_id].list_lock); > + /* Move down the runlist from which we picked a timer to > + * execute > + */ > + run_first_tims[min_idx] =3D run_first_tims[min_idx]->sl_next[0]; >=20 > - /* now scan expired list and call callbacks */ > - for (tim =3D run_first_tim; tim !=3D NULL; tim =3D next_tim) { > - next_tim =3D tim->sl_next[0]; > - priv_timer[lcore_id].updated =3D 0; > - priv_timer[lcore_id].running_tim =3D tim; > + priv_tim->updated =3D 0; > + priv_tim->running_tim =3D tim; >=20 > /* execute callback function with list unlocked */ > tim->f(tim, tim->arg); >=20 > __TIMER_STAT_ADD(pending, -1); > /* the timer was stopped or reloaded by the callback > - * function, we have nothing to do here */ > - if (priv_timer[lcore_id].updated =3D=3D 1) > + * function, we have nothing to do here > + */ > + if (priv_tim->updated =3D=3D 1) > continue; >=20 > if (tim->period =3D=3D 0) { > /* remove from done list and mark timer as stopped */ > status.state =3D RTE_TIMER_STOP; > - status.owner =3D RTE_TIMER_NO_OWNER; > + status.installer =3D RTE_TIMER_NO_LCORE; > + status.owner =3D RTE_TIMER_NO_LCORE; > rte_wmb(); > tim->status.u32 =3D status.u32; > } > else { > - /* keep it in list and mark timer as pending */ > - rte_spinlock_lock(&priv_timer[lcore_id].list_lock); > + dest_list =3D &priv_tim->pending_lists[lcore_id]; > + > + /* If the destination list is the current list > + * we can acquire the lock here, and hold it > + * across removal and insertion of the timer. > + */ > + if (list =3D=3D dest_list) { > + rte_spinlock_lock(&list->lock); > + local_is_locked =3D 1; > + } > + > + /* set timer state back to PENDING and > + * reinsert it in pending list > + */ > status.state =3D RTE_TIMER_PENDING; > __TIMER_STAT_ADD(pending, 1); > - status.owner =3D (int16_t)lcore_id; > + status.installer =3D tim->status.installer; > + status.owner =3D lcore_id; > rte_wmb(); > tim->status.u32 =3D status.u32; > - __rte_timer_reset(tim, tim->expire + tim->period, > - tim->period, lcore_id, tim->f, tim->arg, 1); > - rte_spinlock_unlock(&priv_timer[lcore_id].list_lock); > + > + __rte_timer_reset(tim, > + tim->expire + tim->period, > + tim->period, lcore_id, > + tim->f, tim->arg, local_is_locked); > + > + if (local_is_locked) { > + rte_spinlock_unlock(&list->lock); > + local_is_locked =3D 0; > + } > } > } > - priv_timer[lcore_id].running_tim =3D NULL; > + priv_tim->running_tim =3D NULL; > } >=20 > /* dump statistics about timers */ > diff --git a/lib/librte_timer/rte_timer.h b/lib/librte_timer/rte_timer.h > index a276a73..4cc6baf 100644 > --- a/lib/librte_timer/rte_timer.h > +++ b/lib/librte_timer/rte_timer.h > @@ -77,7 +77,7 @@ extern "C" { > #define RTE_TIMER_RUNNING 2 /**< State: timer function is running. */ > #define RTE_TIMER_CONFIG 3 /**< State: timer is being configured. */ >=20 > -#define RTE_TIMER_NO_OWNER -2 /**< Timer has no owner. */ > +#define RTE_TIMER_NO_LCORE -2 >=20 > /** > * Timer type: Periodic or single (one-shot). > @@ -94,10 +94,11 @@ enum rte_timer_type { > union rte_timer_status { > RTE_STD_C11 > struct { > - uint16_t state; /**< Stop, pending, running, config. */ > - int16_t owner; /**< The lcore that owns the timer. */ > + unsigned state : 2; > + int installer : 15; > + int owner : 15; > }; > - uint32_t u32; /**< To atomic-set status + owner. */ > + uint32_t u32; /**< To atomic-set status, installer, owner */ > }; >=20 > #ifdef RTE_LIBRTE_TIMER_DEBUG > -- > 2.6.4