From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <jigsaw@gmail.com>
Received: from mail-wi0-f176.google.com (mail-wi0-f176.google.com
 [209.85.212.176]) by dpdk.org (Postfix) with ESMTP id EE8AF7E9D
 for <dev@dpdk.org>; Mon, 10 Nov 2014 16:42:40 +0100 (CET)
Received: by mail-wi0-f176.google.com with SMTP id h11so10930220wiw.3
 for <dev@dpdk.org>; Mon, 10 Nov 2014 07:52:26 -0800 (PST)
DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113;
 h=mime-version:in-reply-to:references:date:message-id:subject:from:to
 :cc:content-type;
 bh=bk0SEevYFNdBdkMgUGx5MtwhPgbBlk2UV5Xa5qsA+Hs=;
 b=z3DwYnNqGVHs8LaxuukeCfgt8PQj61Djkbubjl69qjAXXcvIgrtbpMb3eU1qsP5Z9t
 MKwjAbqHgyt7G8AO1RDeuaPth599liXNcTH8Zf/idhYjjXwSB8kkc1MvdW4G3XjLvLzm
 LHvz5zqgiDvK8p8qBZNm//HnfsqaOtLJ/MEY3sKOVLOIUtu5C/9FtQk/AlRYc0GRLGEP
 Kg+jmKr/o1u9n4kxTf4ZJv7HiZq0kkQQd99ai9Ee7B6+lWL1yH3WbkgWun6xY4+hdUv6
 WAm+Z4Mc2mQ5W+PAKy+3rjQUZ6qK58L8nO04L8wOBL5X6S0NzaFwmJdrRL7xomLppELw
 rBaA==
MIME-Version: 1.0
X-Received: by 10.194.175.67 with SMTP id by3mr43718435wjc.32.1415634746515;
 Mon, 10 Nov 2014 07:52:26 -0800 (PST)
Received: by 10.27.86.144 with HTTP; Mon, 10 Nov 2014 07:52:26 -0800 (PST)
In-Reply-To: <20141110151308.GC12532@bricha3-MOBL3>
References: <1415630642-3905-1-git-send-email-jigsaw@gmail.com>
 <20141110151308.GC12532@bricha3-MOBL3>
Date: Mon, 10 Nov 2014 17:52:26 +0200
Message-ID: <CAHVfvh6oTLE2bzGitVXDn+O7MicbvB2nsiR_SnQtQHGKVdk93g@mail.gmail.com>
From: jigsaw <jigsaw@gmail.com>
To: Bruce Richardson <bruce.richardson@intel.com>
Content-Type: text/plain; charset=UTF-8
X-Content-Filtered-By: Mailman/MimeDel 2.1.15
Cc: "dev@dpdk.org" <dev@dpdk.org>
Subject: Re: [dpdk-dev] [PATCH v3] Add in_flight_bitmask so as to use full
 32 bits of tag.
X-BeenThere: dev@dpdk.org
X-Mailman-Version: 2.1.15
Precedence: list
List-Id: patches and discussions about DPDK <dev.dpdk.org>
List-Unsubscribe: <http://dpdk.org/ml/options/dev>,
 <mailto:dev-request@dpdk.org?subject=unsubscribe>
List-Archive: <http://dpdk.org/ml/archives/dev/>
List-Post: <mailto:dev@dpdk.org>
List-Help: <mailto:dev-request@dpdk.org?subject=help>
List-Subscribe: <http://dpdk.org/ml/listinfo/dev>,
 <mailto:dev-request@dpdk.org?subject=subscribe>
X-List-Received-Date: Mon, 10 Nov 2014 15:42:41 -0000

Hi Bruce,

Sorry I didn't. I will run a performance test tomorrow.

thx &
rgds,
-qinglai

On Mon, Nov 10, 2014 at 5:13 PM, Bruce Richardson <
bruce.richardson@intel.com> wrote:

> On Mon, Nov 10, 2014 at 04:44:02PM +0200, Qinglai Xiao wrote:
> > With introduction of in_flight_bitmask, the whole 32 bits of tag can be
> > used. Further more, this patch fixed the integer overflow when finding
> > the matched tags.
> > The maximum number workers is now defined as 64, which is length of
> > double-word. The link between number of workers and RTE_MAX_LCORE is
> > now removed. Compile time check is added to ensure the
> > RTE_DISTRIB_MAX_WORKERS is less than or equal to size of double-word.
> >
> > Signed-off-by: Qinglai Xiao <jigsaw@gmail.com>
>
> Looks good to me.
> Just before I ack this, have you checked to see if there is any
> performance impact?
>
> /Bruce
>
> > ---
> >  lib/librte_distributor/rte_distributor.c |   64
> ++++++++++++++++++++++--------
> >  lib/librte_distributor/rte_distributor.h |    4 ++
> >  2 files changed, 51 insertions(+), 17 deletions(-)
> >
> > diff --git a/lib/librte_distributor/rte_distributor.c
> b/lib/librte_distributor/rte_distributor.c
> > index 3dfec4a..2c5d61c 100644
> > --- a/lib/librte_distributor/rte_distributor.c
> > +++ b/lib/librte_distributor/rte_distributor.c
> > @@ -62,6 +62,13 @@
> >  #define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1)
> >
> >  /**
> > + * Maximum number of workers allowed.
> > + * Be aware of increasing the limit, becaus it is limited by how we
> track
> > + * in-flight tags. See @in_flight_bitmask and @rte_distributor_process
> > + */
> > +#define RTE_DISTRIB_MAX_WORKERS      64
> > +
> > +/**
> >   * Buffer structure used to pass the pointer data between cores. This
> is cache
> >   * line aligned, but to improve performance and prevent adjacent
> cache-line
> >   * prefetches of buffers for other workers, e.g. when worker 1's buffer
> is on
> > @@ -91,11 +98,17 @@ struct rte_distributor {
> >       char name[RTE_DISTRIBUTOR_NAMESIZE];  /**< Name of the ring. */
> >       unsigned num_workers;                 /**< Number of workers
> polling */
> >
> > -     uint32_t in_flight_tags[RTE_MAX_LCORE];
> > -             /**< Tracks the tag being processed per core, 0 == no pkt
> */
> > -     struct rte_distributor_backlog backlog[RTE_MAX_LCORE];
> > +     uint32_t in_flight_tags[RTE_DISTRIB_MAX_WORKERS];
> > +             /**< Tracks the tag being processed per core */
> > +     uint64_t in_flight_bitmask;
> > +             /**< on/off bits for in-flight tags.
> > +              * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64
> then
> > +              * the bitmask has to expand.
> > +              */
> > +
> > +     struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS];
> >
> > -     union rte_distributor_buffer bufs[RTE_MAX_LCORE];
> > +     union rte_distributor_buffer bufs[RTE_DISTRIB_MAX_WORKERS];
> >
> >       struct rte_distributor_returned_pkts returns;
> >  };
> > @@ -189,6 +202,7 @@ static inline void
> >  handle_worker_shutdown(struct rte_distributor *d, unsigned wkr)
> >  {
> >       d->in_flight_tags[wkr] = 0;
> > +     d->in_flight_bitmask &= ~(1UL << wkr);
> >       d->bufs[wkr].bufptr64 = 0;
> >       if (unlikely(d->backlog[wkr].count != 0)) {
> >               /* On return of a packet, we need to move the
> > @@ -211,7 +225,10 @@ handle_worker_shutdown(struct rte_distributor *d,
> unsigned wkr)
> >                       pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
> >                                       RTE_DISTRIB_FLAG_BITS));
> >               }
> > -             /* recursive call */
> > +             /* recursive call.
> > +              * Note that the tags were set before first level call
> > +              * to rte_distributor_process.
> > +              */
> >               rte_distributor_process(d, pkts, i);
> >               bl->count = bl->start = 0;
> >       }
> > @@ -242,6 +259,7 @@ process_returns(struct rte_distributor *d)
> >                       else {
> >                               d->bufs[wkr].bufptr64 =
> RTE_DISTRIB_GET_BUF;
> >                               d->in_flight_tags[wkr] = 0;
> > +                             d->in_flight_bitmask &= ~(1UL << wkr);
> >                       }
> >                       oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> >               } else if (data & RTE_DISTRIB_RETURN_BUF) {
> > @@ -284,14 +302,18 @@ rte_distributor_process(struct rte_distributor *d,
> >                       next_value = (((int64_t)(uintptr_t)next_mb)
> >                                       << RTE_DISTRIB_FLAG_BITS);
> >                       /*
> > -                      * Set the low bit on the tag, so we can guarantee
> that
> > -                      * we never store a tag value of zero. That means
> we can
> > -                      * use the zero-value to indicate that no packet is
> > -                      * being processed by a worker.
> > +                      * User is advocated to set tag vaue for each
> > +                      * mbuf before calling rte_distributor_process.
> > +                      * User defined tags are used to identify flows,
> > +                      * or sessions.
> >                        */
> > -                     new_tag = (next_mb->hash.usr | 1);
> > +                     new_tag = next_mb->hash.usr;
> >
> > -                     uint32_t match = 0;
> > +                     /*
> > +                      * Note that if RTE_DISTRIB_MAX_WORKERS is larger
> than 64
> > +                      * then the size of match has to be expanded.
> > +                      */
> > +                     uint64_t match = 0;
> >                       unsigned i;
> >                       /*
> >                        * to scan for a match use "xor" and "not" to get
> a 0/1
> > @@ -303,9 +325,12 @@ rte_distributor_process(struct rte_distributor *d,
> >                               match |= (!(d->in_flight_tags[i] ^ new_tag)
> >                                       << i);
> >
> > +                     /* Only turned-on bits are considered as match */
> > +                     match &= d->in_flight_bitmask;
> > +
> >                       if (match) {
> >                               next_mb = NULL;
> > -                             unsigned worker = __builtin_ctz(match);
> > +                             unsigned worker = __builtin_ctzl(match);
> >                               if (add_to_backlog(&d->backlog[worker],
> >                                               next_value) < 0)
> >                                       next_idx--;
> > @@ -322,6 +347,7 @@ rte_distributor_process(struct rte_distributor *d,
> >                       else {
> >                               d->bufs[wkr].bufptr64 = next_value;
> >                               d->in_flight_tags[wkr] = new_tag;
> > +                             d->in_flight_bitmask |= (1UL << wkr);
> >                               next_mb = NULL;
> >                       }
> >                       oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> > @@ -379,11 +405,13 @@ rte_distributor_returned_pkts(struct
> rte_distributor *d,
> >  static inline unsigned
> >  total_outstanding(const struct rte_distributor *d)
> >  {
> > -     unsigned wkr, total_outstanding = 0;
> > +     unsigned wkr, total_outstanding;
> > +
> > +     total_outstanding = __builtin_popcountl(d->in_flight_bitmask);
> >
> >       for (wkr = 0; wkr < d->num_workers; wkr++)
> > -             total_outstanding += d->backlog[wkr].count +
> > -                             !!(d->in_flight_tags[wkr]);
> > +             total_outstanding += d->backlog[wkr].count;
> > +
> >       return total_outstanding;
> >  }
> >
> > @@ -423,9 +451,11 @@ rte_distributor_create(const char *name,
> >
> >       /* compilation-time checks */
> >       RTE_BUILD_BUG_ON((sizeof(*d) & CACHE_LINE_MASK) != 0);
> > -     RTE_BUILD_BUG_ON((RTE_MAX_LCORE & 7) != 0);
> > +     RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0);
> > +     RTE_BUILD_BUG_ON(RTE_DISTRIB_MAX_WORKERS >
> > +                             sizeof(d->in_flight_bitmask) * CHAR_BIT);
> >
> > -     if (name == NULL || num_workers >= RTE_MAX_LCORE) {
> > +     if (name == NULL || num_workers >= RTE_DISTRIB_MAX_WORKERS) {
> >               rte_errno = EINVAL;
> >               return NULL;
> >       }
> > diff --git a/lib/librte_distributor/rte_distributor.h
> b/lib/librte_distributor/rte_distributor.h
> > index ec0d74a..cc1d559 100644
> > --- a/lib/librte_distributor/rte_distributor.h
> > +++ b/lib/librte_distributor/rte_distributor.h
> > @@ -88,6 +88,10 @@ rte_distributor_create(const char *name, unsigned
> socket_id,
> >   * packets. The distributor will ensure that no two packets that have
> the
> >   * same flow id, or tag, in the mbuf will be procesed at the same time.
> >   *
> > + * The user is advocated to set tag for each mbuf before calling this
> function.
> > + * If user doesn't set the tag, the tag value can be various values
> depending on
> > + * driver implementation and configuration.
> > + *
> >   * This is not multi-thread safe and should only be called on a single
> lcore.
> >   *
> >   * @param d
> > --
> > 1.7.1
> >
>