* [dpdk-dev] [PATCH v2 0/2] Add in_flight_bitmask so as to use full 32 bits of tag @ 2014-11-10 12:52 Qinglai Xiao 2014-11-10 12:52 ` [dpdk-dev] [PATCH v2 1/2] Add new union field usr in mbuf->hash Qinglai Xiao 2014-11-10 12:52 ` [dpdk-dev] [PATCH v2 2/2] Add in_flight_bitmask so as to use full 32 bits of tag Qinglai Xiao 0 siblings, 2 replies; 6+ messages in thread From: Qinglai Xiao @ 2014-11-10 12:52 UTC (permalink / raw) To: dev; +Cc: Qinglai Xiao The patch series extends the tags used by librte_distributor from 31 bits to 32 bits. Besides, it fixes the integer overflow in the algorithm of finding matched tags. The newly introduced union field rte_mbuf.hash.usr stands as the flow identifier. User application is advocated to set this field for each mbuf before calling the distributor process rte_distributor_process. Qinglai Xiao (2): Add new union field usr in mbuf->hash. Add in_flight_bitmask so as to use full 32 bits of tag. app/test/test_distributor.c | 18 ++++++------ app/test/test_distributor_perf.c | 4 +- lib/librte_distributor/rte_distributor.c | 45 ++++++++++++++++++++++-------- lib/librte_distributor/rte_distributor.h | 4 ++ lib/librte_mbuf/rte_mbuf.h | 1 + 5 files changed, 49 insertions(+), 23 deletions(-) ^ permalink raw reply [flat|nested] 6+ messages in thread
* [dpdk-dev] [PATCH v2 1/2] Add new union field usr in mbuf->hash. 2014-11-10 12:52 [dpdk-dev] [PATCH v2 0/2] Add in_flight_bitmask so as to use full 32 bits of tag Qinglai Xiao @ 2014-11-10 12:52 ` Qinglai Xiao 2014-11-10 13:06 ` Bruce Richardson 2014-11-10 12:52 ` [dpdk-dev] [PATCH v2 2/2] Add in_flight_bitmask so as to use full 32 bits of tag Qinglai Xiao 1 sibling, 1 reply; 6+ messages in thread From: Qinglai Xiao @ 2014-11-10 12:52 UTC (permalink / raw) To: dev; +Cc: Qinglai Xiao This field is added for librte_distributor. User of librte_distributor is advocated to set value of mbuf->hash.usr before calling rte_distributor_process. The value of usr is the tag which stands as identifier of flow. Signed-off-by: Qinglai Xiao <jigsaw@gmail.com> --- app/test/test_distributor.c | 18 +++++++++--------- app/test/test_distributor_perf.c | 4 ++-- lib/librte_distributor/rte_distributor.c | 2 +- lib/librte_mbuf/rte_mbuf.h | 1 + 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index ce06436..9e8c06d 100644 --- a/app/test/test_distributor.c +++ b/app/test/test_distributor.c @@ -120,7 +120,7 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p) /* now set all hash values in all buffers to zero, so all pkts go to the * one worker thread */ for (i = 0; i < BURST; i++) - bufs[i]->hash.rss = 0; + bufs[i]->hash.usr = 0; rte_distributor_process(d, bufs, BURST); rte_distributor_flush(d); @@ -142,7 +142,7 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p) if (rte_lcore_count() >= 3) { clear_packet_count(); for (i = 0; i < BURST; i++) - bufs[i]->hash.rss = (i & 1) << 8; + bufs[i]->hash.usr = (i & 1) << 8; rte_distributor_process(d, bufs, BURST); rte_distributor_flush(d); @@ -167,7 +167,7 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p) * so load gets distributed */ clear_packet_count(); for (i = 0; i < BURST; i++) - bufs[i]->hash.rss = i; + bufs[i]->hash.usr = i; rte_distributor_process(d, bufs, BURST); rte_distributor_flush(d); @@ -199,7 +199,7 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p) return -1; } for (i = 0; i < BIG_BATCH; i++) - many_bufs[i]->hash.rss = i << 2; + many_bufs[i]->hash.usr = i << 2; for (i = 0; i < BIG_BATCH/BURST; i++) { rte_distributor_process(d, &many_bufs[i*BURST], BURST); @@ -280,7 +280,7 @@ sanity_test_with_mbuf_alloc(struct rte_distributor *d, struct rte_mempool *p) while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0) rte_distributor_process(d, NULL, 0); for (j = 0; j < BURST; j++) { - bufs[j]->hash.rss = (i+j) << 1; + bufs[j]->hash.usr = (i+j) << 1; rte_mbuf_refcnt_set(bufs[j], 1); } @@ -359,7 +359,7 @@ sanity_test_with_worker_shutdown(struct rte_distributor *d, /* now set all hash values in all buffers to zero, so all pkts go to the * one worker thread */ for (i = 0; i < BURST; i++) - bufs[i]->hash.rss = 0; + bufs[i]->hash.usr = 0; rte_distributor_process(d, bufs, BURST); /* at this point, we will have processed some packets and have a full @@ -372,7 +372,7 @@ sanity_test_with_worker_shutdown(struct rte_distributor *d, return -1; } for (i = 0; i < BURST; i++) - bufs[i]->hash.rss = 0; + bufs[i]->hash.usr = 0; /* get worker zero to quit */ zero_quit = 1; @@ -416,7 +416,7 @@ test_flush_with_worker_shutdown(struct rte_distributor *d, /* now set all hash values in all buffers to zero, so all pkts go to the * one worker thread */ for (i = 0; i < BURST; i++) - bufs[i]->hash.rss = 0; + bufs[i]->hash.usr = 0; rte_distributor_process(d, bufs, BURST); /* at this point, we will have processed some packets and have a full @@ -488,7 +488,7 @@ quit_workers(struct rte_distributor *d, struct rte_mempool *p) zero_quit = 0; quit = 1; for (i = 0; i < num_workers; i++) - bufs[i]->hash.rss = i << 1; + bufs[i]->hash.usr = i << 1; rte_distributor_process(d, bufs, num_workers); rte_mempool_put_bulk(p, (void *)bufs, num_workers); diff --git a/app/test/test_distributor_perf.c b/app/test/test_distributor_perf.c index b04864c..48ee344 100644 --- a/app/test/test_distributor_perf.c +++ b/app/test/test_distributor_perf.c @@ -159,7 +159,7 @@ perf_test(struct rte_distributor *d, struct rte_mempool *p) } /* ensure we have different hash value for each pkt */ for (i = 0; i < BURST; i++) - bufs[i]->hash.rss = i; + bufs[i]->hash.usr = i; start = rte_rdtsc(); for (i = 0; i < (1<<ITER_POWER); i++) @@ -198,7 +198,7 @@ quit_workers(struct rte_distributor *d, struct rte_mempool *p) quit = 1; for (i = 0; i < num_workers; i++) - bufs[i]->hash.rss = i << 1; + bufs[i]->hash.usr = i << 1; rte_distributor_process(d, bufs, num_workers); rte_mempool_put_bulk(p, (void *)bufs, num_workers); diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c index 656ee5c..3dfec4a 100644 --- a/lib/librte_distributor/rte_distributor.c +++ b/lib/librte_distributor/rte_distributor.c @@ -289,7 +289,7 @@ rte_distributor_process(struct rte_distributor *d, * use the zero-value to indicate that no packet is * being processed by a worker. */ - new_tag = (next_mb->hash.rss | 1); + new_tag = (next_mb->hash.usr | 1); uint32_t match = 0; unsigned i; diff --git a/lib/librte_mbuf/rte_mbuf.h b/lib/librte_mbuf/rte_mbuf.h index e8f9bfc..f5f8658 100644 --- a/lib/librte_mbuf/rte_mbuf.h +++ b/lib/librte_mbuf/rte_mbuf.h @@ -185,6 +185,7 @@ struct rte_mbuf { uint16_t id; } fdir; /**< Filter identifier if FDIR enabled */ uint32_t sched; /**< Hierarchical scheduler */ + uint32_t usr; /**< User defined tags. See @rte_distributor_process */ } hash; /**< hash information */ /* second cache line - fields only used in slow path or on TX */ -- 1.7.1 ^ permalink raw reply [flat|nested] 6+ messages in thread
* Re: [dpdk-dev] [PATCH v2 1/2] Add new union field usr in mbuf->hash. 2014-11-10 12:52 ` [dpdk-dev] [PATCH v2 1/2] Add new union field usr in mbuf->hash Qinglai Xiao @ 2014-11-10 13:06 ` Bruce Richardson 2014-11-13 11:26 ` Thomas Monjalon 0 siblings, 1 reply; 6+ messages in thread From: Bruce Richardson @ 2014-11-10 13:06 UTC (permalink / raw) To: Qinglai Xiao; +Cc: dev On Mon, Nov 10, 2014 at 02:52:46PM +0200, Qinglai Xiao wrote: > This field is added for librte_distributor. User of librte_distributor > is advocated to set value of mbuf->hash.usr before calling > rte_distributor_process. The value of usr is the tag which stands as > identifier of flow. > > Signed-off-by: Qinglai Xiao <jigsaw@gmail.com> Acked-by: Bruce Richardson <bruce.richardson@intel.com> > --- > app/test/test_distributor.c | 18 +++++++++--------- > app/test/test_distributor_perf.c | 4 ++-- > lib/librte_distributor/rte_distributor.c | 2 +- > lib/librte_mbuf/rte_mbuf.h | 1 + > 4 files changed, 13 insertions(+), 12 deletions(-) > > diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c > index ce06436..9e8c06d 100644 > --- a/app/test/test_distributor.c > +++ b/app/test/test_distributor.c > @@ -120,7 +120,7 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p) > /* now set all hash values in all buffers to zero, so all pkts go to the > * one worker thread */ > for (i = 0; i < BURST; i++) > - bufs[i]->hash.rss = 0; > + bufs[i]->hash.usr = 0; > > rte_distributor_process(d, bufs, BURST); > rte_distributor_flush(d); > @@ -142,7 +142,7 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p) > if (rte_lcore_count() >= 3) { > clear_packet_count(); > for (i = 0; i < BURST; i++) > - bufs[i]->hash.rss = (i & 1) << 8; > + bufs[i]->hash.usr = (i & 1) << 8; > > rte_distributor_process(d, bufs, BURST); > rte_distributor_flush(d); > @@ -167,7 +167,7 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p) > * so load gets distributed */ > clear_packet_count(); > for (i = 0; i < BURST; i++) > - bufs[i]->hash.rss = i; > + bufs[i]->hash.usr = i; > > rte_distributor_process(d, bufs, BURST); > rte_distributor_flush(d); > @@ -199,7 +199,7 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p) > return -1; > } > for (i = 0; i < BIG_BATCH; i++) > - many_bufs[i]->hash.rss = i << 2; > + many_bufs[i]->hash.usr = i << 2; > > for (i = 0; i < BIG_BATCH/BURST; i++) { > rte_distributor_process(d, &many_bufs[i*BURST], BURST); > @@ -280,7 +280,7 @@ sanity_test_with_mbuf_alloc(struct rte_distributor *d, struct rte_mempool *p) > while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0) > rte_distributor_process(d, NULL, 0); > for (j = 0; j < BURST; j++) { > - bufs[j]->hash.rss = (i+j) << 1; > + bufs[j]->hash.usr = (i+j) << 1; > rte_mbuf_refcnt_set(bufs[j], 1); > } > > @@ -359,7 +359,7 @@ sanity_test_with_worker_shutdown(struct rte_distributor *d, > /* now set all hash values in all buffers to zero, so all pkts go to the > * one worker thread */ > for (i = 0; i < BURST; i++) > - bufs[i]->hash.rss = 0; > + bufs[i]->hash.usr = 0; > > rte_distributor_process(d, bufs, BURST); > /* at this point, we will have processed some packets and have a full > @@ -372,7 +372,7 @@ sanity_test_with_worker_shutdown(struct rte_distributor *d, > return -1; > } > for (i = 0; i < BURST; i++) > - bufs[i]->hash.rss = 0; > + bufs[i]->hash.usr = 0; > > /* get worker zero to quit */ > zero_quit = 1; > @@ -416,7 +416,7 @@ test_flush_with_worker_shutdown(struct rte_distributor *d, > /* now set all hash values in all buffers to zero, so all pkts go to the > * one worker thread */ > for (i = 0; i < BURST; i++) > - bufs[i]->hash.rss = 0; > + bufs[i]->hash.usr = 0; > > rte_distributor_process(d, bufs, BURST); > /* at this point, we will have processed some packets and have a full > @@ -488,7 +488,7 @@ quit_workers(struct rte_distributor *d, struct rte_mempool *p) > zero_quit = 0; > quit = 1; > for (i = 0; i < num_workers; i++) > - bufs[i]->hash.rss = i << 1; > + bufs[i]->hash.usr = i << 1; > rte_distributor_process(d, bufs, num_workers); > > rte_mempool_put_bulk(p, (void *)bufs, num_workers); > diff --git a/app/test/test_distributor_perf.c b/app/test/test_distributor_perf.c > index b04864c..48ee344 100644 > --- a/app/test/test_distributor_perf.c > +++ b/app/test/test_distributor_perf.c > @@ -159,7 +159,7 @@ perf_test(struct rte_distributor *d, struct rte_mempool *p) > } > /* ensure we have different hash value for each pkt */ > for (i = 0; i < BURST; i++) > - bufs[i]->hash.rss = i; > + bufs[i]->hash.usr = i; > > start = rte_rdtsc(); > for (i = 0; i < (1<<ITER_POWER); i++) > @@ -198,7 +198,7 @@ quit_workers(struct rte_distributor *d, struct rte_mempool *p) > > quit = 1; > for (i = 0; i < num_workers; i++) > - bufs[i]->hash.rss = i << 1; > + bufs[i]->hash.usr = i << 1; > rte_distributor_process(d, bufs, num_workers); > > rte_mempool_put_bulk(p, (void *)bufs, num_workers); > diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c > index 656ee5c..3dfec4a 100644 > --- a/lib/librte_distributor/rte_distributor.c > +++ b/lib/librte_distributor/rte_distributor.c > @@ -289,7 +289,7 @@ rte_distributor_process(struct rte_distributor *d, > * use the zero-value to indicate that no packet is > * being processed by a worker. > */ > - new_tag = (next_mb->hash.rss | 1); > + new_tag = (next_mb->hash.usr | 1); > > uint32_t match = 0; > unsigned i; > diff --git a/lib/librte_mbuf/rte_mbuf.h b/lib/librte_mbuf/rte_mbuf.h > index e8f9bfc..f5f8658 100644 > --- a/lib/librte_mbuf/rte_mbuf.h > +++ b/lib/librte_mbuf/rte_mbuf.h > @@ -185,6 +185,7 @@ struct rte_mbuf { > uint16_t id; > } fdir; /**< Filter identifier if FDIR enabled */ > uint32_t sched; /**< Hierarchical scheduler */ > + uint32_t usr; /**< User defined tags. See @rte_distributor_process */ > } hash; /**< hash information */ > > /* second cache line - fields only used in slow path or on TX */ > -- > 1.7.1 > ^ permalink raw reply [flat|nested] 6+ messages in thread
* Re: [dpdk-dev] [PATCH v2 1/2] Add new union field usr in mbuf->hash. 2014-11-10 13:06 ` Bruce Richardson @ 2014-11-13 11:26 ` Thomas Monjalon 0 siblings, 0 replies; 6+ messages in thread From: Thomas Monjalon @ 2014-11-13 11:26 UTC (permalink / raw) To: Qinglai Xiao; +Cc: dev > > This field is added for librte_distributor. User of librte_distributor > > is advocated to set value of mbuf->hash.usr before calling > > rte_distributor_process. The value of usr is the tag which stands as > > identifier of flow. > > > > Signed-off-by: Qinglai Xiao <jigsaw@gmail.com> > Acked-by: Bruce Richardson <bruce.richardson@intel.com> Applied Thanks -- Thomas ^ permalink raw reply [flat|nested] 6+ messages in thread
* [dpdk-dev] [PATCH v2 2/2] Add in_flight_bitmask so as to use full 32 bits of tag. 2014-11-10 12:52 [dpdk-dev] [PATCH v2 0/2] Add in_flight_bitmask so as to use full 32 bits of tag Qinglai Xiao 2014-11-10 12:52 ` [dpdk-dev] [PATCH v2 1/2] Add new union field usr in mbuf->hash Qinglai Xiao @ 2014-11-10 12:52 ` Qinglai Xiao 2014-11-10 14:10 ` Bruce Richardson 1 sibling, 1 reply; 6+ messages in thread From: Qinglai Xiao @ 2014-11-10 12:52 UTC (permalink / raw) To: dev; +Cc: Qinglai Xiao With introduction of in_flight_bitmask, the whole 32 bits of tag can be used. Further more, this patch fixed the integer overflow when finding the matched tags. Note that currently librte_distributor supports up to 64 worker threads. If more workers are needed, the size of in_flight_bitmask and the algorithm of finding matched tag must be revised. Signed-off-by: Qinglai Xiao <jigsaw@gmail.com> --- lib/librte_distributor/rte_distributor.c | 45 ++++++++++++++++++++++-------- lib/librte_distributor/rte_distributor.h | 4 ++ 2 files changed, 37 insertions(+), 12 deletions(-) diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c index 3dfec4a..3dfccae 100644 --- a/lib/librte_distributor/rte_distributor.c +++ b/lib/librte_distributor/rte_distributor.c @@ -92,7 +92,13 @@ struct rte_distributor { unsigned num_workers; /**< Number of workers polling */ uint32_t in_flight_tags[RTE_MAX_LCORE]; - /**< Tracks the tag being processed per core, 0 == no pkt */ + /**< Tracks the tag being processed per core */ + uint64_t in_flight_bitmask; + /**< on/off bits for in-flight tags. + * Note that if RTE_MAX_LCORE is larger than 64 then + * the bitmask has to expand. + */ + struct rte_distributor_backlog backlog[RTE_MAX_LCORE]; union rte_distributor_buffer bufs[RTE_MAX_LCORE]; @@ -189,6 +195,7 @@ static inline void handle_worker_shutdown(struct rte_distributor *d, unsigned wkr) { d->in_flight_tags[wkr] = 0; + d->in_flight_bitmask &= ~(1UL << wkr); d->bufs[wkr].bufptr64 = 0; if (unlikely(d->backlog[wkr].count != 0)) { /* On return of a packet, we need to move the @@ -211,7 +218,10 @@ handle_worker_shutdown(struct rte_distributor *d, unsigned wkr) pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >> RTE_DISTRIB_FLAG_BITS)); } - /* recursive call */ + /* recursive call. + * Note that the tags were set before first level call + * to rte_distributor_process. + */ rte_distributor_process(d, pkts, i); bl->count = bl->start = 0; } @@ -242,6 +252,7 @@ process_returns(struct rte_distributor *d) else { d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF; d->in_flight_tags[wkr] = 0; + d->in_flight_bitmask &= ~(1UL << wkr); } oldbuf = data >> RTE_DISTRIB_FLAG_BITS; } else if (data & RTE_DISTRIB_RETURN_BUF) { @@ -284,14 +295,18 @@ rte_distributor_process(struct rte_distributor *d, next_value = (((int64_t)(uintptr_t)next_mb) << RTE_DISTRIB_FLAG_BITS); /* - * Set the low bit on the tag, so we can guarantee that - * we never store a tag value of zero. That means we can - * use the zero-value to indicate that no packet is - * being processed by a worker. + * User is advocated to set tag vaue for each + * mbuf before calling rte_distributor_process. + * User defined tags are used to identify flows, + * or sessions. */ - new_tag = (next_mb->hash.usr | 1); + new_tag = next_mb->hash.usr; - uint32_t match = 0; + /* + * Note that if RTE_MAX_LCORE is larger than 64 then + * the size of match has to be expanded. + */ + uint64_t match = 0; unsigned i; /* * to scan for a match use "xor" and "not" to get a 0/1 @@ -303,9 +318,12 @@ rte_distributor_process(struct rte_distributor *d, match |= (!(d->in_flight_tags[i] ^ new_tag) << i); + /* Only turned-on bits are considered as match */ + match &= d->in_flight_bitmask; + if (match) { next_mb = NULL; - unsigned worker = __builtin_ctz(match); + unsigned worker = __builtin_ctzl(match); if (add_to_backlog(&d->backlog[worker], next_value) < 0) next_idx--; @@ -322,6 +340,7 @@ rte_distributor_process(struct rte_distributor *d, else { d->bufs[wkr].bufptr64 = next_value; d->in_flight_tags[wkr] = new_tag; + d->in_flight_bitmask |= (1UL << wkr); next_mb = NULL; } oldbuf = data >> RTE_DISTRIB_FLAG_BITS; @@ -379,11 +398,13 @@ rte_distributor_returned_pkts(struct rte_distributor *d, static inline unsigned total_outstanding(const struct rte_distributor *d) { - unsigned wkr, total_outstanding = 0; + unsigned wkr, total_outstanding; + + total_outstanding = __builtin_popcountl(d->in_flight_bitmask); for (wkr = 0; wkr < d->num_workers; wkr++) - total_outstanding += d->backlog[wkr].count + - !!(d->in_flight_tags[wkr]); + total_outstanding += d->backlog[wkr].count; + return total_outstanding; } diff --git a/lib/librte_distributor/rte_distributor.h b/lib/librte_distributor/rte_distributor.h index ec0d74a..cc1d559 100644 --- a/lib/librte_distributor/rte_distributor.h +++ b/lib/librte_distributor/rte_distributor.h @@ -88,6 +88,10 @@ rte_distributor_create(const char *name, unsigned socket_id, * packets. The distributor will ensure that no two packets that have the * same flow id, or tag, in the mbuf will be procesed at the same time. * + * The user is advocated to set tag for each mbuf before calling this function. + * If user doesn't set the tag, the tag value can be various values depending on + * driver implementation and configuration. + * * This is not multi-thread safe and should only be called on a single lcore. * * @param d -- 1.7.1 ^ permalink raw reply [flat|nested] 6+ messages in thread
* Re: [dpdk-dev] [PATCH v2 2/2] Add in_flight_bitmask so as to use full 32 bits of tag. 2014-11-10 12:52 ` [dpdk-dev] [PATCH v2 2/2] Add in_flight_bitmask so as to use full 32 bits of tag Qinglai Xiao @ 2014-11-10 14:10 ` Bruce Richardson 0 siblings, 0 replies; 6+ messages in thread From: Bruce Richardson @ 2014-11-10 14:10 UTC (permalink / raw) To: Qinglai Xiao; +Cc: dev On Mon, Nov 10, 2014 at 02:52:47PM +0200, Qinglai Xiao wrote: > With introduction of in_flight_bitmask, the whole 32 bits of tag can be > used. Further more, this patch fixed the integer overflow when finding > the matched tags. > Note that currently librte_distributor supports up to 64 worker threads. > If more workers are needed, the size of in_flight_bitmask and the > algorithm of finding matched tag must be revised. > > Signed-off-by: Qinglai Xiao <jigsaw@gmail.com> > --- > lib/librte_distributor/rte_distributor.c | 45 ++++++++++++++++++++++-------- > lib/librte_distributor/rte_distributor.h | 4 ++ > 2 files changed, 37 insertions(+), 12 deletions(-) > > diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c > index 3dfec4a..3dfccae 100644 > --- a/lib/librte_distributor/rte_distributor.c > +++ b/lib/librte_distributor/rte_distributor.c > @@ -92,7 +92,13 @@ struct rte_distributor { > unsigned num_workers; /**< Number of workers polling */ > > uint32_t in_flight_tags[RTE_MAX_LCORE]; > - /**< Tracks the tag being processed per core, 0 == no pkt */ > + /**< Tracks the tag being processed per core */ > + uint64_t in_flight_bitmask; > + /**< on/off bits for in-flight tags. > + * Note that if RTE_MAX_LCORE is larger than 64 then > + * the bitmask has to expand. > + */ I would suggest for this that we break the link with RTE_MAX_LCORE. Instead, we can just enforce a hard limit on the distributor that it can only work with 64 worker cores. That should avoid any complications. I would suggest we do a further check in the create function something like the below: if (num_workers >= sizeof(d->in_flight_bitmask) * CHAR_BIT) { rte_errno = ..... } > + > struct rte_distributor_backlog backlog[RTE_MAX_LCORE]; > > union rte_distributor_buffer bufs[RTE_MAX_LCORE]; > @@ -189,6 +195,7 @@ static inline void > handle_worker_shutdown(struct rte_distributor *d, unsigned wkr) > { > d->in_flight_tags[wkr] = 0; > + d->in_flight_bitmask &= ~(1UL << wkr); > d->bufs[wkr].bufptr64 = 0; > if (unlikely(d->backlog[wkr].count != 0)) { > /* On return of a packet, we need to move the > @@ -211,7 +218,10 @@ handle_worker_shutdown(struct rte_distributor *d, unsigned wkr) > pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >> > RTE_DISTRIB_FLAG_BITS)); > } > - /* recursive call */ > + /* recursive call. > + * Note that the tags were set before first level call > + * to rte_distributor_process. > + */ > rte_distributor_process(d, pkts, i); > bl->count = bl->start = 0; > } > @@ -242,6 +252,7 @@ process_returns(struct rte_distributor *d) > else { > d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF; > d->in_flight_tags[wkr] = 0; > + d->in_flight_bitmask &= ~(1UL << wkr); > } > oldbuf = data >> RTE_DISTRIB_FLAG_BITS; > } else if (data & RTE_DISTRIB_RETURN_BUF) { > @@ -284,14 +295,18 @@ rte_distributor_process(struct rte_distributor *d, > next_value = (((int64_t)(uintptr_t)next_mb) > << RTE_DISTRIB_FLAG_BITS); > /* > - * Set the low bit on the tag, so we can guarantee that > - * we never store a tag value of zero. That means we can > - * use the zero-value to indicate that no packet is > - * being processed by a worker. > + * User is advocated to set tag vaue for each > + * mbuf before calling rte_distributor_process. > + * User defined tags are used to identify flows, > + * or sessions. > */ > - new_tag = (next_mb->hash.usr | 1); > + new_tag = next_mb->hash.usr; > > - uint32_t match = 0; > + /* > + * Note that if RTE_MAX_LCORE is larger than 64 then > + * the size of match has to be expanded. > + */ > + uint64_t match = 0; > unsigned i; > /* > * to scan for a match use "xor" and "not" to get a 0/1 > @@ -303,9 +318,12 @@ rte_distributor_process(struct rte_distributor *d, > match |= (!(d->in_flight_tags[i] ^ new_tag) > << i); > > + /* Only turned-on bits are considered as match */ > + match &= d->in_flight_bitmask; > + > if (match) { > next_mb = NULL; > - unsigned worker = __builtin_ctz(match); > + unsigned worker = __builtin_ctzl(match); > if (add_to_backlog(&d->backlog[worker], > next_value) < 0) > next_idx--; > @@ -322,6 +340,7 @@ rte_distributor_process(struct rte_distributor *d, > else { > d->bufs[wkr].bufptr64 = next_value; > d->in_flight_tags[wkr] = new_tag; > + d->in_flight_bitmask |= (1UL << wkr); > next_mb = NULL; > } > oldbuf = data >> RTE_DISTRIB_FLAG_BITS; > @@ -379,11 +398,13 @@ rte_distributor_returned_pkts(struct rte_distributor *d, > static inline unsigned > total_outstanding(const struct rte_distributor *d) > { > - unsigned wkr, total_outstanding = 0; > + unsigned wkr, total_outstanding; > + > + total_outstanding = __builtin_popcountl(d->in_flight_bitmask); > > for (wkr = 0; wkr < d->num_workers; wkr++) > - total_outstanding += d->backlog[wkr].count + > - !!(d->in_flight_tags[wkr]); > + total_outstanding += d->backlog[wkr].count; > + > return total_outstanding; > } > > diff --git a/lib/librte_distributor/rte_distributor.h b/lib/librte_distributor/rte_distributor.h > index ec0d74a..cc1d559 100644 > --- a/lib/librte_distributor/rte_distributor.h > +++ b/lib/librte_distributor/rte_distributor.h > @@ -88,6 +88,10 @@ rte_distributor_create(const char *name, unsigned socket_id, > * packets. The distributor will ensure that no two packets that have the > * same flow id, or tag, in the mbuf will be procesed at the same time. > * > + * The user is advocated to set tag for each mbuf before calling this function. > + * If user doesn't set the tag, the tag value can be various values depending on > + * driver implementation and configuration. > + * > * This is not multi-thread safe and should only be called on a single lcore. > * > * @param d > -- > 1.7.1 > ^ permalink raw reply [flat|nested] 6+ messages in thread
end of thread, other threads:[~2014-11-13 11:16 UTC | newest] Thread overview: 6+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2014-11-10 12:52 [dpdk-dev] [PATCH v2 0/2] Add in_flight_bitmask so as to use full 32 bits of tag Qinglai Xiao 2014-11-10 12:52 ` [dpdk-dev] [PATCH v2 1/2] Add new union field usr in mbuf->hash Qinglai Xiao 2014-11-10 13:06 ` Bruce Richardson 2014-11-13 11:26 ` Thomas Monjalon 2014-11-10 12:52 ` [dpdk-dev] [PATCH v2 2/2] Add in_flight_bitmask so as to use full 32 bits of tag Qinglai Xiao 2014-11-10 14:10 ` Bruce Richardson
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox; as well as URLs for NNTP newsgroup(s).