From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga02.intel.com (mga02.intel.com [134.134.136.20]) by dpdk.org (Postfix) with ESMTP id 7D21E7E8E for ; Mon, 10 Nov 2014 15:00:18 +0100 (CET) Received: from orsmga001.jf.intel.com ([10.7.209.18]) by orsmga101.jf.intel.com with ESMTP; 10 Nov 2014 06:10:02 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.07,353,1413270000"; d="scan'208";a="605310105" Received: from bricha3-mobl3.ger.corp.intel.com ([10.243.20.42]) by orsmga001.jf.intel.com with SMTP; 10 Nov 2014 06:10:00 -0800 Received: by (sSMTP sendmail emulation); Mon, 10 Nov 2014 14:10:00 +0025 Date: Mon, 10 Nov 2014 14:10:00 +0000 From: Bruce Richardson To: Qinglai Xiao Message-ID: <20141110140959.GB12532@bricha3-MOBL3> References: <1415623967-52488-1-git-send-email-jigsaw@gmail.com> <1415623967-52488-3-git-send-email-jigsaw@gmail.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <1415623967-52488-3-git-send-email-jigsaw@gmail.com> Organization: Intel Shannon Ltd. User-Agent: Mutt/1.5.23 (2014-03-12) Cc: dev@dpdk.org Subject: Re: [dpdk-dev] [PATCH v2 2/2] Add in_flight_bitmask so as to use full 32 bits of tag. X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: patches and discussions about DPDK List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 10 Nov 2014 14:00:20 -0000 On Mon, Nov 10, 2014 at 02:52:47PM +0200, Qinglai Xiao wrote: > With introduction of in_flight_bitmask, the whole 32 bits of tag can be > used. Further more, this patch fixed the integer overflow when finding > the matched tags. > Note that currently librte_distributor supports up to 64 worker threads. > If more workers are needed, the size of in_flight_bitmask and the > algorithm of finding matched tag must be revised. > > Signed-off-by: Qinglai Xiao > --- > lib/librte_distributor/rte_distributor.c | 45 ++++++++++++++++++++++-------- > lib/librte_distributor/rte_distributor.h | 4 ++ > 2 files changed, 37 insertions(+), 12 deletions(-) > > diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c > index 3dfec4a..3dfccae 100644 > --- a/lib/librte_distributor/rte_distributor.c > +++ b/lib/librte_distributor/rte_distributor.c > @@ -92,7 +92,13 @@ struct rte_distributor { > unsigned num_workers; /**< Number of workers polling */ > > uint32_t in_flight_tags[RTE_MAX_LCORE]; > - /**< Tracks the tag being processed per core, 0 == no pkt */ > + /**< Tracks the tag being processed per core */ > + uint64_t in_flight_bitmask; > + /**< on/off bits for in-flight tags. > + * Note that if RTE_MAX_LCORE is larger than 64 then > + * the bitmask has to expand. > + */ I would suggest for this that we break the link with RTE_MAX_LCORE. Instead, we can just enforce a hard limit on the distributor that it can only work with 64 worker cores. That should avoid any complications. I would suggest we do a further check in the create function something like the below: if (num_workers >= sizeof(d->in_flight_bitmask) * CHAR_BIT) { rte_errno = ..... } > + > struct rte_distributor_backlog backlog[RTE_MAX_LCORE]; > > union rte_distributor_buffer bufs[RTE_MAX_LCORE]; > @@ -189,6 +195,7 @@ static inline void > handle_worker_shutdown(struct rte_distributor *d, unsigned wkr) > { > d->in_flight_tags[wkr] = 0; > + d->in_flight_bitmask &= ~(1UL << wkr); > d->bufs[wkr].bufptr64 = 0; > if (unlikely(d->backlog[wkr].count != 0)) { > /* On return of a packet, we need to move the > @@ -211,7 +218,10 @@ handle_worker_shutdown(struct rte_distributor *d, unsigned wkr) > pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >> > RTE_DISTRIB_FLAG_BITS)); > } > - /* recursive call */ > + /* recursive call. > + * Note that the tags were set before first level call > + * to rte_distributor_process. > + */ > rte_distributor_process(d, pkts, i); > bl->count = bl->start = 0; > } > @@ -242,6 +252,7 @@ process_returns(struct rte_distributor *d) > else { > d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF; > d->in_flight_tags[wkr] = 0; > + d->in_flight_bitmask &= ~(1UL << wkr); > } > oldbuf = data >> RTE_DISTRIB_FLAG_BITS; > } else if (data & RTE_DISTRIB_RETURN_BUF) { > @@ -284,14 +295,18 @@ rte_distributor_process(struct rte_distributor *d, > next_value = (((int64_t)(uintptr_t)next_mb) > << RTE_DISTRIB_FLAG_BITS); > /* > - * Set the low bit on the tag, so we can guarantee that > - * we never store a tag value of zero. That means we can > - * use the zero-value to indicate that no packet is > - * being processed by a worker. > + * User is advocated to set tag vaue for each > + * mbuf before calling rte_distributor_process. > + * User defined tags are used to identify flows, > + * or sessions. > */ > - new_tag = (next_mb->hash.usr | 1); > + new_tag = next_mb->hash.usr; > > - uint32_t match = 0; > + /* > + * Note that if RTE_MAX_LCORE is larger than 64 then > + * the size of match has to be expanded. > + */ > + uint64_t match = 0; > unsigned i; > /* > * to scan for a match use "xor" and "not" to get a 0/1 > @@ -303,9 +318,12 @@ rte_distributor_process(struct rte_distributor *d, > match |= (!(d->in_flight_tags[i] ^ new_tag) > << i); > > + /* Only turned-on bits are considered as match */ > + match &= d->in_flight_bitmask; > + > if (match) { > next_mb = NULL; > - unsigned worker = __builtin_ctz(match); > + unsigned worker = __builtin_ctzl(match); > if (add_to_backlog(&d->backlog[worker], > next_value) < 0) > next_idx--; > @@ -322,6 +340,7 @@ rte_distributor_process(struct rte_distributor *d, > else { > d->bufs[wkr].bufptr64 = next_value; > d->in_flight_tags[wkr] = new_tag; > + d->in_flight_bitmask |= (1UL << wkr); > next_mb = NULL; > } > oldbuf = data >> RTE_DISTRIB_FLAG_BITS; > @@ -379,11 +398,13 @@ rte_distributor_returned_pkts(struct rte_distributor *d, > static inline unsigned > total_outstanding(const struct rte_distributor *d) > { > - unsigned wkr, total_outstanding = 0; > + unsigned wkr, total_outstanding; > + > + total_outstanding = __builtin_popcountl(d->in_flight_bitmask); > > for (wkr = 0; wkr < d->num_workers; wkr++) > - total_outstanding += d->backlog[wkr].count + > - !!(d->in_flight_tags[wkr]); > + total_outstanding += d->backlog[wkr].count; > + > return total_outstanding; > } > > diff --git a/lib/librte_distributor/rte_distributor.h b/lib/librte_distributor/rte_distributor.h > index ec0d74a..cc1d559 100644 > --- a/lib/librte_distributor/rte_distributor.h > +++ b/lib/librte_distributor/rte_distributor.h > @@ -88,6 +88,10 @@ rte_distributor_create(const char *name, unsigned socket_id, > * packets. The distributor will ensure that no two packets that have the > * same flow id, or tag, in the mbuf will be procesed at the same time. > * > + * The user is advocated to set tag for each mbuf before calling this function. > + * If user doesn't set the tag, the tag value can be various values depending on > + * driver implementation and configuration. > + * > * This is not multi-thread safe and should only be called on a single lcore. > * > * @param d > -- > 1.7.1 >