From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <bruce.richardson@intel.com>
Received: from mga14.intel.com (mga14.intel.com [192.55.52.115])
 by dpdk.org (Postfix) with ESMTP id D86677E9D
 for <dev@dpdk.org>; Mon, 10 Nov 2014 16:04:12 +0100 (CET)
Received: from fmsmga001.fm.intel.com ([10.253.24.23])
 by fmsmga103.fm.intel.com with ESMTP; 10 Nov 2014 07:06:47 -0800
X-ExtLoop1: 1
X-IronPort-AV: E=Sophos;i="5.07,353,1413270000"; d="scan'208";a="619995376"
Received: from bricha3-mobl3.ger.corp.intel.com ([10.243.20.42])
 by fmsmga001.fm.intel.com with SMTP; 10 Nov 2014 07:13:09 -0800
Received: by  (sSMTP sendmail emulation); Mon, 10 Nov 2014 15:13:08 +0025
Date: Mon, 10 Nov 2014 15:13:08 +0000
From: Bruce Richardson <bruce.richardson@intel.com>
To: Qinglai Xiao <jigsaw@gmail.com>
Message-ID: <20141110151308.GC12532@bricha3-MOBL3>
References: <1415630642-3905-1-git-send-email-jigsaw@gmail.com>
MIME-Version: 1.0
Content-Type: text/plain; charset=us-ascii
Content-Disposition: inline
In-Reply-To: <1415630642-3905-1-git-send-email-jigsaw@gmail.com>
Organization: Intel Shannon Ltd.
User-Agent: Mutt/1.5.23 (2014-03-12)
Cc: dev@dpdk.org
Subject: Re: [dpdk-dev] [PATCH v3] Add in_flight_bitmask so as to use full
 32 bits of tag.
X-BeenThere: dev@dpdk.org
X-Mailman-Version: 2.1.15
Precedence: list
List-Id: patches and discussions about DPDK <dev.dpdk.org>
List-Unsubscribe: <http://dpdk.org/ml/options/dev>,
 <mailto:dev-request@dpdk.org?subject=unsubscribe>
List-Archive: <http://dpdk.org/ml/archives/dev/>
List-Post: <mailto:dev@dpdk.org>
List-Help: <mailto:dev-request@dpdk.org?subject=help>
List-Subscribe: <http://dpdk.org/ml/listinfo/dev>,
 <mailto:dev-request@dpdk.org?subject=subscribe>
X-List-Received-Date: Mon, 10 Nov 2014 15:04:14 -0000

On Mon, Nov 10, 2014 at 04:44:02PM +0200, Qinglai Xiao wrote:
> With introduction of in_flight_bitmask, the whole 32 bits of tag can be
> used. Further more, this patch fixed the integer overflow when finding
> the matched tags.
> The maximum number workers is now defined as 64, which is length of
> double-word. The link between number of workers and RTE_MAX_LCORE is
> now removed. Compile time check is added to ensure the
> RTE_DISTRIB_MAX_WORKERS is less than or equal to size of double-word.
> 
> Signed-off-by: Qinglai Xiao <jigsaw@gmail.com>

Looks good to me.
Just before I ack this, have you checked to see if there is any performance impact?

/Bruce

> ---
>  lib/librte_distributor/rte_distributor.c |   64 ++++++++++++++++++++++--------
>  lib/librte_distributor/rte_distributor.h |    4 ++
>  2 files changed, 51 insertions(+), 17 deletions(-)
> 
> diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c
> index 3dfec4a..2c5d61c 100644
> --- a/lib/librte_distributor/rte_distributor.c
> +++ b/lib/librte_distributor/rte_distributor.c
> @@ -62,6 +62,13 @@
>  #define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1)
>  
>  /**
> + * Maximum number of workers allowed.
> + * Be aware of increasing the limit, becaus it is limited by how we track
> + * in-flight tags. See @in_flight_bitmask and @rte_distributor_process
> + */
> +#define RTE_DISTRIB_MAX_WORKERS	64
> +
> +/**
>   * Buffer structure used to pass the pointer data between cores. This is cache
>   * line aligned, but to improve performance and prevent adjacent cache-line
>   * prefetches of buffers for other workers, e.g. when worker 1's buffer is on
> @@ -91,11 +98,17 @@ struct rte_distributor {
>  	char name[RTE_DISTRIBUTOR_NAMESIZE];  /**< Name of the ring. */
>  	unsigned num_workers;                 /**< Number of workers polling */
>  
> -	uint32_t in_flight_tags[RTE_MAX_LCORE];
> -		/**< Tracks the tag being processed per core, 0 == no pkt */
> -	struct rte_distributor_backlog backlog[RTE_MAX_LCORE];
> +	uint32_t in_flight_tags[RTE_DISTRIB_MAX_WORKERS];
> +		/**< Tracks the tag being processed per core */
> +	uint64_t in_flight_bitmask;
> +		/**< on/off bits for in-flight tags.
> +		 * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64 then
> +		 * the bitmask has to expand.
> +		 */
> +
> +	struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS];
>  
> -	union rte_distributor_buffer bufs[RTE_MAX_LCORE];
> +	union rte_distributor_buffer bufs[RTE_DISTRIB_MAX_WORKERS];
>  
>  	struct rte_distributor_returned_pkts returns;
>  };
> @@ -189,6 +202,7 @@ static inline void
>  handle_worker_shutdown(struct rte_distributor *d, unsigned wkr)
>  {
>  	d->in_flight_tags[wkr] = 0;
> +	d->in_flight_bitmask &= ~(1UL << wkr);
>  	d->bufs[wkr].bufptr64 = 0;
>  	if (unlikely(d->backlog[wkr].count != 0)) {
>  		/* On return of a packet, we need to move the
> @@ -211,7 +225,10 @@ handle_worker_shutdown(struct rte_distributor *d, unsigned wkr)
>  			pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
>  					RTE_DISTRIB_FLAG_BITS));
>  		}
> -		/* recursive call */
> +		/* recursive call.
> +		 * Note that the tags were set before first level call
> +		 * to rte_distributor_process.
> +		 */
>  		rte_distributor_process(d, pkts, i);
>  		bl->count = bl->start = 0;
>  	}
> @@ -242,6 +259,7 @@ process_returns(struct rte_distributor *d)
>  			else {
>  				d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF;
>  				d->in_flight_tags[wkr] = 0;
> +				d->in_flight_bitmask &= ~(1UL << wkr);
>  			}
>  			oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
>  		} else if (data & RTE_DISTRIB_RETURN_BUF) {
> @@ -284,14 +302,18 @@ rte_distributor_process(struct rte_distributor *d,
>  			next_value = (((int64_t)(uintptr_t)next_mb)
>  					<< RTE_DISTRIB_FLAG_BITS);
>  			/*
> -			 * Set the low bit on the tag, so we can guarantee that
> -			 * we never store a tag value of zero. That means we can
> -			 * use the zero-value to indicate that no packet is
> -			 * being processed by a worker.
> +			 * User is advocated to set tag vaue for each
> +			 * mbuf before calling rte_distributor_process.
> +			 * User defined tags are used to identify flows,
> +			 * or sessions.
>  			 */
> -			new_tag = (next_mb->hash.usr | 1);
> +			new_tag = next_mb->hash.usr;
>  
> -			uint32_t match = 0;
> +			/*
> +			 * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64
> +			 * then the size of match has to be expanded.
> +			 */
> +			uint64_t match = 0;
>  			unsigned i;
>  			/*
>  			 * to scan for a match use "xor" and "not" to get a 0/1
> @@ -303,9 +325,12 @@ rte_distributor_process(struct rte_distributor *d,
>  				match |= (!(d->in_flight_tags[i] ^ new_tag)
>  					<< i);
>  
> +			/* Only turned-on bits are considered as match */
> +			match &= d->in_flight_bitmask;
> +
>  			if (match) {
>  				next_mb = NULL;
> -				unsigned worker = __builtin_ctz(match);
> +				unsigned worker = __builtin_ctzl(match);
>  				if (add_to_backlog(&d->backlog[worker],
>  						next_value) < 0)
>  					next_idx--;
> @@ -322,6 +347,7 @@ rte_distributor_process(struct rte_distributor *d,
>  			else {
>  				d->bufs[wkr].bufptr64 = next_value;
>  				d->in_flight_tags[wkr] = new_tag;
> +				d->in_flight_bitmask |= (1UL << wkr);
>  				next_mb = NULL;
>  			}
>  			oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> @@ -379,11 +405,13 @@ rte_distributor_returned_pkts(struct rte_distributor *d,
>  static inline unsigned
>  total_outstanding(const struct rte_distributor *d)
>  {
> -	unsigned wkr, total_outstanding = 0;
> +	unsigned wkr, total_outstanding;
> +
> +	total_outstanding = __builtin_popcountl(d->in_flight_bitmask);
>  
>  	for (wkr = 0; wkr < d->num_workers; wkr++)
> -		total_outstanding += d->backlog[wkr].count +
> -				!!(d->in_flight_tags[wkr]);
> +		total_outstanding += d->backlog[wkr].count;
> +
>  	return total_outstanding;
>  }
>  
> @@ -423,9 +451,11 @@ rte_distributor_create(const char *name,
>  
>  	/* compilation-time checks */
>  	RTE_BUILD_BUG_ON((sizeof(*d) & CACHE_LINE_MASK) != 0);
> -	RTE_BUILD_BUG_ON((RTE_MAX_LCORE & 7) != 0);
> +	RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0);
> +	RTE_BUILD_BUG_ON(RTE_DISTRIB_MAX_WORKERS >
> +				sizeof(d->in_flight_bitmask) * CHAR_BIT);
>  
> -	if (name == NULL || num_workers >= RTE_MAX_LCORE) {
> +	if (name == NULL || num_workers >= RTE_DISTRIB_MAX_WORKERS) {
>  		rte_errno = EINVAL;
>  		return NULL;
>  	}
> diff --git a/lib/librte_distributor/rte_distributor.h b/lib/librte_distributor/rte_distributor.h
> index ec0d74a..cc1d559 100644
> --- a/lib/librte_distributor/rte_distributor.h
> +++ b/lib/librte_distributor/rte_distributor.h
> @@ -88,6 +88,10 @@ rte_distributor_create(const char *name, unsigned socket_id,
>   * packets. The distributor will ensure that no two packets that have the
>   * same flow id, or tag, in the mbuf will be procesed at the same time.
>   *
> + * The user is advocated to set tag for each mbuf before calling this function.
> + * If user doesn't set the tag, the tag value can be various values depending on
> + * driver implementation and configuration.
> + *
>   * This is not multi-thread safe and should only be called on a single lcore.
>   *
>   * @param d
> -- 
> 1.7.1
>