From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga11.intel.com (mga11.intel.com [192.55.52.93]) by dpdk.org (Postfix) with ESMTP id BD3A46828 for ; Mon, 10 Nov 2014 11:59:40 +0100 (CET) Received: from fmsmga003.fm.intel.com ([10.253.24.29]) by fmsmga102.fm.intel.com with ESMTP; 10 Nov 2014 03:09:24 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="4.97,862,1389772800"; d="scan'208";a="414198005" Received: from bricha3-mobl3.ger.corp.intel.com ([10.243.20.42]) by FMSMGA003.fm.intel.com with SMTP; 10 Nov 2014 03:00:33 -0800 Received: by (sSMTP sendmail emulation); Mon, 10 Nov 2014 11:09:22 +0025 Date: Mon, 10 Nov 2014 11:09:22 +0000 From: Bruce Richardson To: Qinglai Xiao Message-ID: <20141110110922.GB7036@bricha3-MOBL3> References: <1415615912-46212-1-git-send-email-jigsaw@gmail.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <1415615912-46212-1-git-send-email-jigsaw@gmail.com> Organization: Intel Shannon Ltd. User-Agent: Mutt/1.5.23 (2014-03-12) Cc: dev@dpdk.org Subject: Re: [dpdk-dev] [PATCH] Add in_flight_bitmask so as to use full 32 bits of tag. X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: patches and discussions about DPDK List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 10 Nov 2014 10:59:41 -0000 On Mon, Nov 10, 2014 at 12:38:32PM +0200, Qinglai Xiao wrote: > User application is advocated to set the newly introduced union field > mbuf->hash.usr as flow id, which is uint32_t. > With introduction of in_flight_bitmask, the whole 32 bits of tag can > be used. > > Further more, this patch fixed the integer overflow when finding the > matched tags. > > Note that currently librte_distributor supports up to 64 worker > threads. If more workers are needed, the size of in_flight_bitmask and > the algorithm of finding matched tag must be revised. > > Signed-off-by: Qinglai Xiao Hi, this would probably be better as two patches rather than one. One patch to add the hash.usr field, and then use it in the distributor. The change to the distributor to add the bit mask to allow all bits of the tag to be used should then go as a separate patch. Regards, /Bruce > --- > app/test/test_distributor.c | 18 ++++++------ > app/test/test_distributor_perf.c | 4 +- > lib/librte_distributor/rte_distributor.c | 45 +++++++++++++++++++++-------- > lib/librte_distributor/rte_distributor.h | 3 ++ > lib/librte_mbuf/rte_mbuf.h | 1 + > 5 files changed, 47 insertions(+), 24 deletions(-) > > diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c > index ce06436..9e8c06d 100644 > --- a/app/test/test_distributor.c > +++ b/app/test/test_distributor.c > @@ -120,7 +120,7 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p) > /* now set all hash values in all buffers to zero, so all pkts go to the > * one worker thread */ > for (i = 0; i < BURST; i++) > - bufs[i]->hash.rss = 0; > + bufs[i]->hash.usr = 0; > > rte_distributor_process(d, bufs, BURST); > rte_distributor_flush(d); > @@ -142,7 +142,7 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p) > if (rte_lcore_count() >= 3) { > clear_packet_count(); > for (i = 0; i < BURST; i++) > - bufs[i]->hash.rss = (i & 1) << 8; > + bufs[i]->hash.usr = (i & 1) << 8; > > rte_distributor_process(d, bufs, BURST); > rte_distributor_flush(d); > @@ -167,7 +167,7 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p) > * so load gets distributed */ > clear_packet_count(); > for (i = 0; i < BURST; i++) > - bufs[i]->hash.rss = i; > + bufs[i]->hash.usr = i; > > rte_distributor_process(d, bufs, BURST); > rte_distributor_flush(d); > @@ -199,7 +199,7 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p) > return -1; > } > for (i = 0; i < BIG_BATCH; i++) > - many_bufs[i]->hash.rss = i << 2; > + many_bufs[i]->hash.usr = i << 2; > > for (i = 0; i < BIG_BATCH/BURST; i++) { > rte_distributor_process(d, &many_bufs[i*BURST], BURST); > @@ -280,7 +280,7 @@ sanity_test_with_mbuf_alloc(struct rte_distributor *d, struct rte_mempool *p) > while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0) > rte_distributor_process(d, NULL, 0); > for (j = 0; j < BURST; j++) { > - bufs[j]->hash.rss = (i+j) << 1; > + bufs[j]->hash.usr = (i+j) << 1; > rte_mbuf_refcnt_set(bufs[j], 1); > } > > @@ -359,7 +359,7 @@ sanity_test_with_worker_shutdown(struct rte_distributor *d, > /* now set all hash values in all buffers to zero, so all pkts go to the > * one worker thread */ > for (i = 0; i < BURST; i++) > - bufs[i]->hash.rss = 0; > + bufs[i]->hash.usr = 0; > > rte_distributor_process(d, bufs, BURST); > /* at this point, we will have processed some packets and have a full > @@ -372,7 +372,7 @@ sanity_test_with_worker_shutdown(struct rte_distributor *d, > return -1; > } > for (i = 0; i < BURST; i++) > - bufs[i]->hash.rss = 0; > + bufs[i]->hash.usr = 0; > > /* get worker zero to quit */ > zero_quit = 1; > @@ -416,7 +416,7 @@ test_flush_with_worker_shutdown(struct rte_distributor *d, > /* now set all hash values in all buffers to zero, so all pkts go to the > * one worker thread */ > for (i = 0; i < BURST; i++) > - bufs[i]->hash.rss = 0; > + bufs[i]->hash.usr = 0; > > rte_distributor_process(d, bufs, BURST); > /* at this point, we will have processed some packets and have a full > @@ -488,7 +488,7 @@ quit_workers(struct rte_distributor *d, struct rte_mempool *p) > zero_quit = 0; > quit = 1; > for (i = 0; i < num_workers; i++) > - bufs[i]->hash.rss = i << 1; > + bufs[i]->hash.usr = i << 1; > rte_distributor_process(d, bufs, num_workers); > > rte_mempool_put_bulk(p, (void *)bufs, num_workers); > diff --git a/app/test/test_distributor_perf.c b/app/test/test_distributor_perf.c > index b04864c..48ee344 100644 > --- a/app/test/test_distributor_perf.c > +++ b/app/test/test_distributor_perf.c > @@ -159,7 +159,7 @@ perf_test(struct rte_distributor *d, struct rte_mempool *p) > } > /* ensure we have different hash value for each pkt */ > for (i = 0; i < BURST; i++) > - bufs[i]->hash.rss = i; > + bufs[i]->hash.usr = i; > > start = rte_rdtsc(); > for (i = 0; i < (1< @@ -198,7 +198,7 @@ quit_workers(struct rte_distributor *d, struct rte_mempool *p) > > quit = 1; > for (i = 0; i < num_workers; i++) > - bufs[i]->hash.rss = i << 1; > + bufs[i]->hash.usr = i << 1; > rte_distributor_process(d, bufs, num_workers); > > rte_mempool_put_bulk(p, (void *)bufs, num_workers); > diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c > index 656ee5c..a84ea97 100644 > --- a/lib/librte_distributor/rte_distributor.c > +++ b/lib/librte_distributor/rte_distributor.c > @@ -92,7 +92,13 @@ struct rte_distributor { > unsigned num_workers; /**< Number of workers polling */ > > uint32_t in_flight_tags[RTE_MAX_LCORE]; > - /**< Tracks the tag being processed per core, 0 == no pkt */ > + /**< Tracks the tag being processed per core */ > + uint64_t in_flight_bitmask; > + /**< on/off bits for in-flight tags. > + * Note that if RTE_MAX_LCORE is larger than 64 then > + * the bitmask has to expand. > + */ > + > struct rte_distributor_backlog backlog[RTE_MAX_LCORE]; > > union rte_distributor_buffer bufs[RTE_MAX_LCORE]; > @@ -189,6 +195,7 @@ static inline void > handle_worker_shutdown(struct rte_distributor *d, unsigned wkr) > { > d->in_flight_tags[wkr] = 0; > + d->in_flight_bitmask &= ~(1UL << wkr); > d->bufs[wkr].bufptr64 = 0; > if (unlikely(d->backlog[wkr].count != 0)) { > /* On return of a packet, we need to move the > @@ -211,7 +218,10 @@ handle_worker_shutdown(struct rte_distributor *d, unsigned wkr) > pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >> > RTE_DISTRIB_FLAG_BITS)); > } > - /* recursive call */ > + /* recursive call. > + * Note that the tags were set before first level call > + * to rte_distributor_process. > + */ > rte_distributor_process(d, pkts, i); > bl->count = bl->start = 0; > } > @@ -242,6 +252,7 @@ process_returns(struct rte_distributor *d) > else { > d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF; > d->in_flight_tags[wkr] = 0; > + d->in_flight_bitmask &= ~(1UL << wkr); > } > oldbuf = data >> RTE_DISTRIB_FLAG_BITS; > } else if (data & RTE_DISTRIB_RETURN_BUF) { > @@ -284,17 +295,21 @@ rte_distributor_process(struct rte_distributor *d, > next_value = (((int64_t)(uintptr_t)next_mb) > << RTE_DISTRIB_FLAG_BITS); > /* > - * Set the low bit on the tag, so we can guarantee that > - * we never store a tag value of zero. That means we can > - * use the zero-value to indicate that no packet is > - * being processed by a worker. > + * User is advocated to set tag vaue for each > + * mbuf before calling rte_distributor_process. > + * User defined tags are used to identify flows, > + * or sessions. > */ > - new_tag = (next_mb->hash.rss | 1); > + new_tag = next_mb->hash.usr; > > - uint32_t match = 0; > + /* > + * Note that if RTE_MAX_LCORE is larger than 64 then > + * the size of match has to be expanded. > + */ > + uint64_t match = 0; > unsigned i; > /* > - * to scan for a match use "xor" and "not" to get a 0/1 > + * To scan for a match use "xor" and "not" to get a 0/1 > * value, then use shifting to merge to single "match" > * variable, where a one-bit indicates a match for the > * worker given by the bit-position > @@ -303,9 +318,11 @@ rte_distributor_process(struct rte_distributor *d, > match |= (!(d->in_flight_tags[i] ^ new_tag) > << i); > > + /* Only turned-on bits are considered as match */ > + match &= d->in_flight_bitmask; > if (match) { > next_mb = NULL; > - unsigned worker = __builtin_ctz(match); > + unsigned worker = __builtin_ctzl(match); > if (add_to_backlog(&d->backlog[worker], > next_value) < 0) > next_idx--; > @@ -322,6 +339,7 @@ rte_distributor_process(struct rte_distributor *d, > else { > d->bufs[wkr].bufptr64 = next_value; > d->in_flight_tags[wkr] = new_tag; > + d->in_flight_bitmask |= (1UL << wkr); > next_mb = NULL; > } > oldbuf = data >> RTE_DISTRIB_FLAG_BITS; > @@ -379,11 +397,12 @@ rte_distributor_returned_pkts(struct rte_distributor *d, > static inline unsigned > total_outstanding(const struct rte_distributor *d) > { > - unsigned wkr, total_outstanding = 0; > + unsigned wkr, total_outstanding; > > + total_outstanding = __builtin_popcountl(d->in_flight_bitmask); > for (wkr = 0; wkr < d->num_workers; wkr++) > - total_outstanding += d->backlog[wkr].count + > - !!(d->in_flight_tags[wkr]); > + total_outstanding += d->backlog[wkr].count; > + > return total_outstanding; > } > > diff --git a/lib/librte_distributor/rte_distributor.h b/lib/librte_distributor/rte_distributor.h > index ec0d74a..a29c506 100644 > --- a/lib/librte_distributor/rte_distributor.h > +++ b/lib/librte_distributor/rte_distributor.h > @@ -87,6 +87,9 @@ rte_distributor_create(const char *name, unsigned socket_id, > * Process a set of packets by distributing them among workers that request > * packets. The distributor will ensure that no two packets that have the > * same flow id, or tag, in the mbuf will be procesed at the same time. > + * The user is advocated to set tag for each mbuf. If user doesn't set the > + * tag, the tag value can be various values depending on driver implementation > + * and configuration. > * > * This is not multi-thread safe and should only be called on a single lcore. > * > diff --git a/lib/librte_mbuf/rte_mbuf.h b/lib/librte_mbuf/rte_mbuf.h > index e8f9bfc..f5f8658 100644 > --- a/lib/librte_mbuf/rte_mbuf.h > +++ b/lib/librte_mbuf/rte_mbuf.h > @@ -185,6 +185,7 @@ struct rte_mbuf { > uint16_t id; > } fdir; /**< Filter identifier if FDIR enabled */ > uint32_t sched; /**< Hierarchical scheduler */ > + uint32_t usr; /**< User defined tags. See @rte_distributor_process */ > } hash; /**< hash information */ > > /* second cache line - fields only used in slow path or on TX */ > -- > 1.7.1 >