From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id 3C40AA04DD for ; Wed, 18 Nov 2020 17:37:33 +0100 (CET) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id F2E375AA7; Wed, 18 Nov 2020 17:37:25 +0100 (CET) Received: from us-smtp-delivery-124.mimecast.com (us-smtp-delivery-124.mimecast.com [216.205.24.124]) by dpdk.org (Postfix) with ESMTP id 4F566F90 for ; Wed, 18 Nov 2020 17:37:24 +0100 (CET) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=redhat.com; s=mimecast20190719; t=1605717442; h=from:from:reply-to:subject:subject:date:date:message-id:message-id: to:to:cc:cc:mime-version:mime-version:content-type:content-type: content-transfer-encoding:content-transfer-encoding: in-reply-to:in-reply-to:references:references; bh=OBkEQh2Wm/DtFFm58wHrrMW1Dfi3/zs2tFUc4kkfhdc=; b=OWAIARuzpifwIE2qIB6gbiYaS1+nJ6/p6w7jdggmuWeMSYy9lmdTzYGjRg13j24VsQZfYq 9h2B76CiTnIbiWghWPRisYctZ8nZaS6jDz+mxuE+vf/CtOvv+q8CXh+ALkVB1Hg5pJi6F7 Y/KU4dM9Bgdj2Qj+xdHMUSr8BFNNYCc= Received: from mimecast-mx01.redhat.com (mimecast-mx01.redhat.com [209.132.183.4]) (Using TLS) by relay.mimecast.com with ESMTP id us-mta-553-esqrWGp1OKqqthitdvyKAw-1; Wed, 18 Nov 2020 11:37:18 -0500 X-MC-Unique: esqrWGp1OKqqthitdvyKAw-1 Received: from smtp.corp.redhat.com (int-mx06.intmail.prod.int.phx2.redhat.com [10.5.11.16]) (using TLSv1.2 with cipher AECDH-AES256-SHA (256/256 bits)) (No client certificate requested) by mimecast-mx01.redhat.com (Postfix) with ESMTPS id 72BF78A8F0C; Wed, 18 Nov 2020 16:36:53 +0000 (UTC) Received: from rh.redhat.com (ovpn-113-249.ams2.redhat.com [10.36.113.249]) by smtp.corp.redhat.com (Postfix) with ESMTP id 223DC5C1A3; Wed, 18 Nov 2020 16:36:51 +0000 (UTC) From: Kevin Traynor To: Lukasz Wojciechowski Cc: David Hunt , dpdk stable Date: Wed, 18 Nov 2020 16:35:07 +0000 Message-Id: <20201118163558.1101823-21-ktraynor@redhat.com> In-Reply-To: <20201118163558.1101823-1-ktraynor@redhat.com> References: <20201118163558.1101823-1-ktraynor@redhat.com> MIME-Version: 1.0 X-Scanned-By: MIMEDefang 2.79 on 10.5.11.16 Authentication-Results: relay.mimecast.com; auth=pass smtp.auth=CUSA124A263 smtp.mailfrom=ktraynor@redhat.com X-Mimecast-Spam-Score: 0 X-Mimecast-Originator: redhat.com Content-Transfer-Encoding: 8bit Content-Type: text/plain; charset="US-ASCII" Subject: [dpdk-stable] patch 'distributor: handle worker shutdown in burst mode' has been queued to LTS release 18.11.11 X-BeenThere: stable@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: patches for DPDK stable branches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: stable-bounces@dpdk.org Sender: "stable" Hi, FYI, your patch has been queued to LTS release 18.11.11 Note it hasn't been pushed to http://dpdk.org/browse/dpdk-stable yet. It will be pushed if I get no objections before 11/24/20. So please shout if anyone has objections. Also note that after the patch there's a diff of the upstream commit vs the patch applied to the branch. This will indicate if there was any rebasing needed to apply to the stable branch. If there were code changes for rebasing (ie: not only metadata diffs), please double check that the rebase was correctly done. Queued patches are on a temporary branch at: https://github.com/kevintraynor/dpdk-stable-queue This queued commit can be viewed at: https://github.com/kevintraynor/dpdk-stable-queue/commit/141297a40a2b246b1db303e610a5add938a53ac3 Thanks. Kevin. --- >From 141297a40a2b246b1db303e610a5add938a53ac3 Mon Sep 17 00:00:00 2001 From: Lukasz Wojciechowski Date: Sat, 17 Oct 2020 05:06:48 +0200 Subject: [PATCH] distributor: handle worker shutdown in burst mode [ upstream commit 480d5a7c812eedad973dd7143e5fec8e092ff619 ] The burst version of distributor implementation was missing proper handling of worker shutdown. A worker processing packets received from distributor can call rte_distributor_return_pkt() function informing distributor that it want no more packets. Further calls to rte_distributor_request_pkt() or rte_distributor_get_pkt() however should inform distributor that new packets are requested again. Lack of the proper implementation has caused that even after worker informed about returning last packets, new packets were still sent from distributor causing deadlocks as no one could get them on worker side. This patch adds handling shutdown of the worker in following way: 1) It fixes usage of RTE_DISTRIB_VALID_BUF handshake flag. This flag was formerly unused in burst implementation and now it is used for marking valid packets in retptr64 replacing invalid use of RTE_DISTRIB_RETURN_BUF flag. 2) Uses RTE_DISTRIB_RETURN_BUF as a worker to distributor handshake in retptr64 to indicate that worker has shutdown. 3) Worker that shuts down blocks also bufptr for itself with RTE_DISTRIB_RETURN_BUF flag allowing distributor to retrieve any in flight packets. 4) When distributor receives information about shutdown of a worker, it: marks worker as not active; retrieves any in flight and backlog packets and process them to different workers; unlocks bufptr64 by clearing RTE_DISTRIB_RETURN_BUF flag and allowing use in the future if worker requests any new packets. 5) Do not allow to: send or add to backlog any packets for not active workers. Such workers are also ignored if matched. 6) Adjust calls to handle_returns() and tags matching procedure to react for possible activation deactivation of workers. Fixes: 775003ad2f96 ("distributor: add new burst-capable library") Signed-off-by: Lukasz Wojciechowski Acked-by: David Hunt --- lib/librte_distributor/rte_distributor.c | 175 ++++++++++++++---- .../rte_distributor_private.h | 3 + 2 files changed, 146 insertions(+), 32 deletions(-) diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c index 5f9dde8494..2673aa2649 100644 --- a/lib/librte_distributor/rte_distributor.c +++ b/lib/librte_distributor/rte_distributor.c @@ -53,5 +53,5 @@ rte_distributor_request_pkt_v1705(struct rte_distributor *d, */ while (unlikely(__atomic_load_n(retptr64, __ATOMIC_ACQUIRE) - & RTE_DISTRIB_GET_BUF)) { + & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) { rte_pause(); uint64_t t = rte_rdtsc()+100; @@ -69,9 +69,9 @@ rte_distributor_request_pkt_v1705(struct rte_distributor *d, buf->retptr64[i] = 0; - /* Set Return bit for each packet returned */ + /* Set VALID_BUF bit for each packet returned */ for (i = count; i-- > 0; ) buf->retptr64[i] = (((int64_t)(uintptr_t)(oldpkt[i])) << - RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF; + RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_VALID_BUF; /* @@ -103,9 +103,11 @@ rte_distributor_poll_pkt_v1705(struct rte_distributor *d, } - /* If bit is set, return + /* If any of below bits is set, return. + * GET_BUF is set when distributor hasn't sent any packets yet + * RETURN_BUF is set when distributor must retrieve in-flight packets * Sync with distributor to acquire bufptrs */ if (__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE) - & RTE_DISTRIB_GET_BUF) + & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF)) return -1; @@ -119,5 +121,5 @@ rte_distributor_poll_pkt_v1705(struct rte_distributor *d, /* - * so now we've got the contents of the cacheline into an array of + * so now we've got the contents of the cacheline into an array of * mbuf pointers, so toggle the bit so scheduler can start working * on the next cacheline while we're working. @@ -188,5 +190,5 @@ rte_distributor_return_pkt_v1705(struct rte_distributor *d, */ while (unlikely(__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_RELAXED) - & RTE_DISTRIB_GET_BUF)) { + & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) { rte_pause(); uint64_t t = rte_rdtsc()+100; @@ -200,15 +202,23 @@ rte_distributor_return_pkt_v1705(struct rte_distributor *d, for (i = 0; i < RTE_DIST_BURST_SIZE; i++) /* Switch off the return bit first */ - buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF; + buf->retptr64[i] = 0; for (i = num; i-- > 0; ) buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) << - RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF; + RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_VALID_BUF; - /* set the GET_BUF but even if we got no returns. - * Sync with distributor on GET_BUF flag. Release retptrs. + /* Use RETURN_BUF on bufptr64 to notify distributor that + * we won't read any mbufs from there even if GET_BUF is set. + * This allows distributor to retrieve in-flight already sent packets. + */ + __atomic_or_fetch(&(buf->bufptr64[0]), RTE_DISTRIB_RETURN_BUF, + __ATOMIC_ACQ_REL); + + /* set the RETURN_BUF on retptr64 even if we got no returns. + * Sync with distributor on RETURN_BUF flag. Release retptrs. + * Notify distributor that we don't request more packets any more. */ __atomic_store_n(&(buf->retptr64[0]), - buf->retptr64[0] | RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE); + buf->retptr64[0] | RTE_DISTRIB_RETURN_BUF, __ATOMIC_RELEASE); return 0; @@ -284,4 +294,57 @@ find_match_scalar(struct rte_distributor *d, } +/* + * When worker called rte_distributor_return_pkt() + * and passed RTE_DISTRIB_RETURN_BUF handshake through retptr64, + * distributor must retrieve both inflight and backlog packets assigned + * to the worker and reprocess them to another worker. + */ +static void +handle_worker_shutdown(struct rte_distributor *d, unsigned int wkr) +{ + struct rte_distributor_buffer *buf = &(d->bufs[wkr]); + /* double BURST size for storing both inflights and backlog */ + struct rte_mbuf *pkts[RTE_DIST_BURST_SIZE * 2]; + unsigned int pkts_count = 0; + unsigned int i; + + /* If GET_BUF is cleared there are in-flight packets sent + * to worker which does not require new packets. + * They must be retrieved and assigned to another worker. + */ + if (!(__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE) + & RTE_DISTRIB_GET_BUF)) + for (i = 0; i < RTE_DIST_BURST_SIZE; i++) + if (buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF) + pkts[pkts_count++] = (void *)((uintptr_t) + (buf->bufptr64[i] + >> RTE_DISTRIB_FLAG_BITS)); + + /* Make following operations on handshake flags on bufptr64: + * - set GET_BUF to indicate that distributor can overwrite buffer + * with new packets if worker will make a new request. + * - clear RETURN_BUF to unlock reads on worker side. + */ + __atomic_store_n(&(buf->bufptr64[0]), RTE_DISTRIB_GET_BUF, + __ATOMIC_RELEASE); + + /* Collect backlog packets from worker */ + for (i = 0; i < d->backlog[wkr].count; i++) + pkts[pkts_count++] = (void *)((uintptr_t) + (d->backlog[wkr].pkts[i] >> RTE_DISTRIB_FLAG_BITS)); + + d->backlog[wkr].count = 0; + + /* Clear both inflight and backlog tags */ + for (i = 0; i < RTE_DIST_BURST_SIZE; i++) { + d->in_flight_tags[wkr][i] = 0; + d->backlog[wkr].tags[i] = 0; + } + + /* Recursive call */ + if (pkts_count > 0) + rte_distributor_process(d, pkts, pkts_count); +} + /* @@ -302,7 +365,7 @@ handle_returns(struct rte_distributor *d, unsigned int wkr) /* Sync on GET_BUF flag. Acquire retptrs. */ if (__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_ACQUIRE) - & RTE_DISTRIB_GET_BUF) { + & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF)) { for (i = 0; i < RTE_DIST_BURST_SIZE; i++) { - if (buf->retptr64[i] & RTE_DISTRIB_RETURN_BUF) { + if (buf->retptr64[i] & RTE_DISTRIB_VALID_BUF) { oldbuf = ((uintptr_t)(buf->retptr64[i] >> RTE_DISTRIB_FLAG_BITS)); @@ -310,9 +373,23 @@ handle_returns(struct rte_distributor *d, unsigned int wkr) store_return(oldbuf, d, &ret_start, &ret_count); count++; - buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF; + buf->retptr64[i] &= ~RTE_DISTRIB_VALID_BUF; } } d->returns.start = ret_start; d->returns.count = ret_count; + + /* If worker requested packets with GET_BUF, set it to active + * otherwise (RETURN_BUF), set it to not active. + */ + d->activesum -= d->active[wkr]; + d->active[wkr] = !!(buf->retptr64[0] & RTE_DISTRIB_GET_BUF); + d->activesum += d->active[wkr]; + + /* If worker returned packets without requesting new ones, + * handle all in-flights and backlog packets assigned to it. + */ + if (unlikely(buf->retptr64[0] & RTE_DISTRIB_RETURN_BUF)) + handle_worker_shutdown(d, wkr); + /* Clear for the worker to populate with more returns. * Sync with distributor on GET_BUF flag. Release retptrs. @@ -339,4 +416,6 @@ release(struct rte_distributor *d, unsigned int wkr) handle_returns(d, wkr); + if (unlikely(!d->active[wkr])) + return 0; /* Sync with worker on GET_BUF flag */ @@ -344,4 +423,6 @@ release(struct rte_distributor *d, unsigned int wkr) & RTE_DISTRIB_GET_BUF)) { handle_returns(d, wkr); + if (unlikely(!d->active[wkr])) + return 0; rte_pause(); } @@ -383,5 +464,5 @@ rte_distributor_process_v1705(struct rte_distributor *d, uint16_t new_tag = 0; uint16_t flows[RTE_DIST_BURST_SIZE] __rte_cache_aligned; - unsigned int i, j, w, wid; + unsigned int i, j, w, wid, matching_required; if (d->alg_type == RTE_DIST_ALG_SINGLE) { @@ -390,9 +471,11 @@ rte_distributor_process_v1705(struct rte_distributor *d, } + for (wid = 0 ; wid < d->num_workers; wid++) + handle_returns(d, wid); + if (unlikely(num_mbufs == 0)) { /* Flush out all non-full cache-lines to workers. */ for (wid = 0 ; wid < d->num_workers; wid++) { /* Sync with worker on GET_BUF flag. */ - handle_returns(d, wid); if (__atomic_load_n(&(d->bufs[wid].bufptr64[0]), __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF) { @@ -404,4 +487,7 @@ rte_distributor_process_v1705(struct rte_distributor *d, } + if (unlikely(!d->activesum)) + return 0; + while (next_idx < num_mbufs) { uint16_t matches[RTE_DIST_BURST_SIZE]; @@ -428,12 +514,22 @@ rte_distributor_process_v1705(struct rte_distributor *d, flows[i] = 0; - switch (d->dist_match_fn) { - case RTE_DIST_MATCH_VECTOR: - find_match_vec(d, &flows[0], &matches[0]); - break; - default: - find_match_scalar(d, &flows[0], &matches[0]); - } + matching_required = 1; + for (j = 0; j < pkts; j++) { + if (unlikely(!d->activesum)) + return next_idx; + + if (unlikely(matching_required)) { + switch (d->dist_match_fn) { + case RTE_DIST_MATCH_VECTOR: + find_match_vec(d, &flows[0], + &matches[0]); + break; + default: + find_match_scalar(d, &flows[0], + &matches[0]); + } + matching_required = 0; + } /* * Matches array now contain the intended worker ID (+1) of @@ -442,6 +538,4 @@ rte_distributor_process_v1705(struct rte_distributor *d, */ - for (j = 0; j < pkts; j++) { - next_mb = mbufs[next_idx++]; next_value = (((int64_t)(uintptr_t)next_mb) << @@ -463,5 +557,5 @@ rte_distributor_process_v1705(struct rte_distributor *d, /* matches[j] = 0; */ - if (matches[j]) { + if (matches[j] && d->active[matches[j]-1]) { struct rte_distributor_backlog *bl = &d->backlog[matches[j]-1]; @@ -469,4 +563,10 @@ rte_distributor_process_v1705(struct rte_distributor *d, RTE_DIST_BURST_SIZE)) { release(d, matches[j]-1); + if (!d->active[matches[j]-1]) { + j--; + next_idx--; + matching_required = 1; + continue; + } } @@ -478,9 +578,19 @@ rte_distributor_process_v1705(struct rte_distributor *d, } else { - struct rte_distributor_backlog *bl = - &d->backlog[wkr]; + struct rte_distributor_backlog *bl; + + while (unlikely(!d->active[wkr])) + wkr = (wkr + 1) % d->num_workers; + bl = &d->backlog[wkr]; + if (unlikely(bl->count == RTE_DIST_BURST_SIZE)) { release(d, wkr); + if (!d->active[wkr]) { + j--; + next_idx--; + matching_required = 1; + continue; + } } @@ -501,7 +611,5 @@ rte_distributor_process_v1705(struct rte_distributor *d, } } - wkr++; - if (wkr >= d->num_workers) - wkr = 0; + wkr = (wkr + 1) % d->num_workers; } @@ -693,4 +801,7 @@ rte_distributor_create_v1705(const char *name, d->backlog[i].tags = &d->in_flight_tags[i][RTE_DIST_BURST_SIZE]; + memset(d->active, 0, sizeof(d->active)); + d->activesum = 0; + dist_burst_list = RTE_TAILQ_CAST(rte_dist_burst_tailq.head, rte_dist_burst_list); diff --git a/lib/librte_distributor/rte_distributor_private.h b/lib/librte_distributor/rte_distributor_private.h index 33cd89410c..74f967b257 100644 --- a/lib/librte_distributor/rte_distributor_private.h +++ b/lib/librte_distributor/rte_distributor_private.h @@ -156,4 +156,7 @@ struct rte_distributor { struct rte_distributor_v20 *d_v20; + + uint8_t active[RTE_DISTRIB_MAX_WORKERS]; + uint8_t activesum; }; -- 2.26.2 --- Diff of the applied patch vs upstream commit (please double-check if non-empty: --- --- - 2020-11-18 16:33:38.370366899 +0000 +++ 0021-distributor-handle-worker-shutdown-in-burst-mode.patch 2020-11-18 16:33:37.927215061 +0000 @@ -1 +1 @@ -From 480d5a7c812eedad973dd7143e5fec8e092ff619 Mon Sep 17 00:00:00 2001 +From 141297a40a2b246b1db303e610a5add938a53ac3 Mon Sep 17 00:00:00 2001 @@ -5,0 +6,2 @@ +[ upstream commit 480d5a7c812eedad973dd7143e5fec8e092ff619 ] + @@ -39 +40,0 @@ -Cc: stable@dpdk.org @@ -44,2 +45,2 @@ - lib/librte_distributor/distributor_private.h | 3 + - lib/librte_distributor/rte_distributor.c | 175 +++++++++++++++---- + lib/librte_distributor/rte_distributor.c | 175 ++++++++++++++---- + .../rte_distributor_private.h | 3 + @@ -48,12 +48,0 @@ -diff --git a/lib/librte_distributor/distributor_private.h b/lib/librte_distributor/distributor_private.h -index 489aef2acb..689fe3e183 100644 ---- a/lib/librte_distributor/distributor_private.h -+++ b/lib/librte_distributor/distributor_private.h -@@ -156,4 +156,7 @@ struct rte_distributor { - - struct rte_distributor_single *d_single; -+ -+ uint8_t active[RTE_DISTRIB_MAX_WORKERS]; -+ uint8_t activesum; - }; - @@ -61 +50 @@ -index 93c90cf543..7aa079d53c 100644 +index 5f9dde8494..2673aa2649 100644 @@ -64 +53 @@ -@@ -52,5 +52,5 @@ rte_distributor_request_pkt(struct rte_distributor *d, +@@ -53,5 +53,5 @@ rte_distributor_request_pkt_v1705(struct rte_distributor *d, @@ -71 +60 @@ -@@ -68,9 +68,9 @@ rte_distributor_request_pkt(struct rte_distributor *d, +@@ -69,9 +69,9 @@ rte_distributor_request_pkt_v1705(struct rte_distributor *d, @@ -83 +72 @@ -@@ -98,9 +98,11 @@ rte_distributor_poll_pkt(struct rte_distributor *d, +@@ -103,9 +103,11 @@ rte_distributor_poll_pkt_v1705(struct rte_distributor *d, @@ -97 +86 @@ -@@ -114,5 +116,5 @@ rte_distributor_poll_pkt(struct rte_distributor *d, +@@ -119,5 +121,5 @@ rte_distributor_poll_pkt_v1705(struct rte_distributor *d, @@ -104 +93 @@ -@@ -174,5 +176,5 @@ rte_distributor_return_pkt(struct rte_distributor *d, +@@ -188,5 +190,5 @@ rte_distributor_return_pkt_v1705(struct rte_distributor *d, @@ -111 +100 @@ -@@ -186,15 +188,23 @@ rte_distributor_return_pkt(struct rte_distributor *d, +@@ -200,15 +202,23 @@ rte_distributor_return_pkt_v1705(struct rte_distributor *d, @@ -140 +129 @@ -@@ -266,4 +276,57 @@ find_match_scalar(struct rte_distributor *d, +@@ -284,4 +294,57 @@ find_match_scalar(struct rte_distributor *d, @@ -198 +187 @@ -@@ -284,7 +347,7 @@ handle_returns(struct rte_distributor *d, unsigned int wkr) +@@ -302,7 +365,7 @@ handle_returns(struct rte_distributor *d, unsigned int wkr) @@ -208 +197 @@ -@@ -292,9 +355,23 @@ handle_returns(struct rte_distributor *d, unsigned int wkr) +@@ -310,9 +373,23 @@ handle_returns(struct rte_distributor *d, unsigned int wkr) @@ -233 +222 @@ -@@ -321,4 +398,6 @@ release(struct rte_distributor *d, unsigned int wkr) +@@ -339,4 +416,6 @@ release(struct rte_distributor *d, unsigned int wkr) @@ -240 +229 @@ -@@ -326,4 +405,6 @@ release(struct rte_distributor *d, unsigned int wkr) +@@ -344,4 +423,6 @@ release(struct rte_distributor *d, unsigned int wkr) @@ -247 +236 @@ -@@ -365,5 +446,5 @@ rte_distributor_process(struct rte_distributor *d, +@@ -383,5 +464,5 @@ rte_distributor_process_v1705(struct rte_distributor *d, @@ -254 +243 @@ -@@ -373,9 +454,11 @@ rte_distributor_process(struct rte_distributor *d, +@@ -390,9 +471,11 @@ rte_distributor_process_v1705(struct rte_distributor *d, @@ -267 +256 @@ -@@ -387,4 +470,7 @@ rte_distributor_process(struct rte_distributor *d, +@@ -404,4 +487,7 @@ rte_distributor_process_v1705(struct rte_distributor *d, @@ -275 +264 @@ -@@ -411,12 +497,22 @@ rte_distributor_process(struct rte_distributor *d, +@@ -428,12 +514,22 @@ rte_distributor_process_v1705(struct rte_distributor *d, @@ -305 +294 @@ -@@ -425,6 +521,4 @@ rte_distributor_process(struct rte_distributor *d, +@@ -442,6 +538,4 @@ rte_distributor_process_v1705(struct rte_distributor *d, @@ -312 +301 @@ -@@ -446,5 +540,5 @@ rte_distributor_process(struct rte_distributor *d, +@@ -463,5 +557,5 @@ rte_distributor_process_v1705(struct rte_distributor *d, @@ -319 +308 @@ -@@ -452,4 +546,10 @@ rte_distributor_process(struct rte_distributor *d, +@@ -469,4 +563,10 @@ rte_distributor_process_v1705(struct rte_distributor *d, @@ -330 +319 @@ -@@ -461,9 +561,19 @@ rte_distributor_process(struct rte_distributor *d, +@@ -478,9 +578,19 @@ rte_distributor_process_v1705(struct rte_distributor *d, @@ -352 +341 @@ -@@ -484,7 +594,5 @@ rte_distributor_process(struct rte_distributor *d, +@@ -501,7 +611,5 @@ rte_distributor_process_v1705(struct rte_distributor *d, @@ -361 +350 @@ -@@ -662,4 +770,7 @@ rte_distributor_create(const char *name, +@@ -693,4 +801,7 @@ rte_distributor_create_v1705(const char *name, @@ -368,0 +358,12 @@ +diff --git a/lib/librte_distributor/rte_distributor_private.h b/lib/librte_distributor/rte_distributor_private.h +index 33cd89410c..74f967b257 100644 +--- a/lib/librte_distributor/rte_distributor_private.h ++++ b/lib/librte_distributor/rte_distributor_private.h +@@ -156,4 +156,7 @@ struct rte_distributor { + + struct rte_distributor_v20 *d_v20; ++ ++ uint8_t active[RTE_DISTRIB_MAX_WORKERS]; ++ uint8_t activesum; + }; +