From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 16D86A0548; Thu, 8 Sep 2022 04:10:37 +0200 (CEST) Received: from [217.70.189.124] (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 6CE7440143; Thu, 8 Sep 2022 04:10:26 +0200 (CEST) Received: from mga05.intel.com (mga05.intel.com [192.55.52.43]) by mails.dpdk.org (Postfix) with ESMTP id EDB4C4281B for ; Thu, 8 Sep 2022 04:10:24 +0200 (CEST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=intel.com; i=@intel.com; q=dns/txt; s=Intel; t=1662603025; x=1694139025; h=from:to:cc:subject:date:message-id:in-reply-to: references:mime-version:content-transfer-encoding; bh=0aR0AmVkTU4tpKcILQkCbD2gn0DcbClD+21nIE8nt2M=; b=lxADUQP8zKsi+LGo4Q4Mx8/DczyJtiUYIOJypfTD6DTLrwzUWkzz4EcQ vBxTsyWXExhl9I4u8eeeRx7mV8k20UYEMXXAtj81uF1Y8p5w8U6FStRdc nE9cMbq6JRSxDPr4l8OvqAgectdced9QRnsEpb+eZAA7Z63yBVdHmF7AM saL5HvYGKbHMl1VW0LMvw1wcosA9/doWQKm/CrFpvyKITq7ZgAEiGfVCL hytAfmFjggg/r0RcHogwkg8yrFlbDiFgR95xf5x/DD6k4EtRtn0dukS+E qJjoxxBm8hdEqvqmypdKoCqigYcAa65JL74XxyW/KUBi4PVVBmQp94RvW A==; X-IronPort-AV: E=McAfee;i="6500,9779,10463"; a="383336887" X-IronPort-AV: E=Sophos;i="5.93,298,1654585200"; d="scan'208";a="383336887" Received: from orsmga003.jf.intel.com ([10.7.209.27]) by fmsmga105.fm.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 07 Sep 2022 19:10:22 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.93,298,1654585200"; d="scan'208";a="565755853" Received: from dpdk-zhirun-lmm.sh.intel.com ([10.67.118.241]) by orsmga003.jf.intel.com with ESMTP; 07 Sep 2022 19:10:20 -0700 From: Zhirun Yan To: dev@dpdk.org, jerinj@marvell.com, kirankumark@marvell.com Cc: cunming.liang@intel.com, haiyue.wang@intel.com, Zhirun Yan Subject: [RFC, v1 3/6] graph: enable stream moving cross cores Date: Thu, 8 Sep 2022 10:09:56 +0800 Message-Id: <20220908020959.1675953-4-zhirun.yan@intel.com> X-Mailer: git-send-email 2.25.1 In-Reply-To: <20220908020959.1675953-1-zhirun.yan@intel.com> References: <20220908020959.1675953-1-zhirun.yan@intel.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org This patch allows a worker thread to enable enqueue and move streams of objects to the next nodes over different cores. 1. add graph_sched_wq_node to hold graph scheduling workqueue node stream 2. add workqueue help functions to create/destroy/enqueue/dequeue Signed-off-by: Haiyue Wang Signed-off-by: Cunming Liang Signed-off-by: Zhirun Yan --- lib/graph/graph_populate.c | 1 + lib/graph/graph_private.h | 38 +++++++ lib/graph/graph_sched.c | 194 +++++++++++++++++++++++++++++++++++ lib/graph/meson.build | 2 + lib/graph/rte_graph_worker.h | 45 ++++++++ lib/graph/version.map | 3 + 6 files changed, 283 insertions(+) create mode 100644 lib/graph/graph_sched.c diff --git a/lib/graph/graph_populate.c b/lib/graph/graph_populate.c index 102fd6c29b..26f9670406 100644 --- a/lib/graph/graph_populate.c +++ b/lib/graph/graph_populate.c @@ -84,6 +84,7 @@ graph_nodes_populate(struct graph *_graph) } node->id = graph_node->node->id; node->parent_id = pid; + node->lcore_id = graph_node->node->lcore_id; nb_edges = graph_node->node->nb_edges; node->nb_edges = nb_edges; off += sizeof(struct rte_node); diff --git a/lib/graph/graph_private.h b/lib/graph/graph_private.h index d53ef289d4..e50a207764 100644 --- a/lib/graph/graph_private.h +++ b/lib/graph/graph_private.h @@ -59,6 +59,17 @@ struct node { char next_nodes[][RTE_NODE_NAMESIZE]; /**< Names of next nodes. */ }; +/** + * @internal + * + * Structure that holds the graph scheduling workqueue node stream. + */ +struct graph_sched_wq_node { + rte_graph_off_t node_off; + uint16_t nb_objs; + void *objs[RTE_GRAPH_BURST_SIZE]; +} __rte_cache_aligned; + /** * @internal * @@ -349,4 +360,31 @@ void graph_dump(FILE *f, struct graph *g); */ void node_dump(FILE *f, struct node *n); +/** + * @internal + * + * Create the graph schedule work queue. And all cloned graphs attached to the + * parent graph MUST be destroied together for fast schedule design limitation. + * + * @param _graph + * The graph object + * @param _parent_graph + * The parent graph object which holds the run-queue head. + * + * @return + * - 0: Success. + * - <0: Graph schedule work queue related error. + */ +int graph_sched_wq_create(struct graph *_graph, struct graph *_parent_graph); + +/** + * @internal + * + * Destroy the graph schedule work queue. + * + * @param _graph + * The graph object + */ +void graph_sched_wq_destroy(struct graph *_graph); + #endif /* _RTE_GRAPH_PRIVATE_H_ */ diff --git a/lib/graph/graph_sched.c b/lib/graph/graph_sched.c new file mode 100644 index 0000000000..465148796b --- /dev/null +++ b/lib/graph/graph_sched.c @@ -0,0 +1,194 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(C) 2022 Intel Corporation + */ + +#include +#include +#include +#include +#include +#include + +#include "graph_private.h" + +#define GRAPH_SCHED_WQ_SIZE_MULTIPLIER 8 +#define GRAPH_SCHED_WQ_SIZE(nb_nodes) \ + ((typeof(nb_nodes))((nb_nodes) * GRAPH_SCHED_WQ_SIZE_MULTIPLIER)) + +static __rte_always_inline bool +graph_src_node_avail(struct graph *graph) +{ + struct graph_node *graph_node; + + STAILQ_FOREACH(graph_node, &graph->node_list, next) + if ((graph_node->node->flags & RTE_NODE_SOURCE_F) && + (graph_node->node->lcore_id == RTE_MAX_LCORE || + graph->lcore_id == graph_node->node->lcore_id)) + return true; + + return false; +} + +int +graph_sched_wq_create(struct graph *_graph, struct graph *_parent_graph) +{ + struct rte_graph *parent_graph = _parent_graph->graph; + struct rte_graph *graph = _graph->graph; + unsigned int wq_size; + + /* cloned graph doesn't access input node if not affinity rx core*/ + if (!graph_src_node_avail(_graph)) + graph->head = 0; + + wq_size = GRAPH_SCHED_WQ_SIZE(graph->nb_nodes); + wq_size = rte_align32pow2(wq_size + 1); + + graph->wq = rte_ring_create(graph->name, wq_size, graph->socket, + RING_F_SC_DEQ); + if (graph->wq == NULL) + SET_ERR_JMP(EIO, fail, "Failed to allocate graph WQ"); + + graph->mp = rte_mempool_create(graph->name, wq_size, + sizeof(struct graph_sched_wq_node), + 0, 0, NULL, NULL, NULL, NULL, + graph->socket, MEMPOOL_F_SP_PUT); + if (graph->mp == NULL) + SET_ERR_JMP(EIO, fail_mp, + "Failed to allocate graph WQ schedule entry"); + + graph->lcore_id = _graph->lcore_id; + + if (parent_graph->rq == NULL) { + parent_graph->rq = &parent_graph->rq_head; + SLIST_INIT(parent_graph->rq); + } + + graph->rq = parent_graph->rq; + SLIST_INSERT_HEAD(graph->rq, graph, rq_next); + + return 0; + +fail_mp: + rte_ring_free(graph->wq); + graph->wq = NULL; +fail: + return -rte_errno; +} + +void +graph_sched_wq_destroy(struct graph *_graph) +{ + struct rte_graph *graph = _graph->graph; + + if (graph == NULL) + return; + + rte_ring_free(graph->wq); + graph->wq = NULL; + + rte_mempool_free(graph->mp); + graph->mp = NULL; +} + +static __rte_always_inline bool +__graph_sched_node_enqueue(struct rte_node *node, struct rte_graph *graph) +{ + struct graph_sched_wq_node *wq_node; + uint16_t off = 0; + uint16_t size; + +submit_again: + if (rte_mempool_get(graph->mp, (void **)&wq_node) < 0) + goto fallback; + + size = RTE_MIN(node->idx, RTE_DIM(wq_node->objs)); + wq_node->node_off = node->off; + wq_node->nb_objs = size; + rte_memcpy(wq_node->objs, &node->objs[off], size * sizeof(void *)); + + while (rte_ring_mp_enqueue_bulk_elem(graph->wq, (void *)&wq_node, + sizeof(wq_node), 1, NULL) == 0) + rte_pause(); + + off += size; + node->total_sched_objs += size; + node->idx -= size; + if (node->idx > 0) + goto submit_again; + + return true; + +fallback: + if (off != 0) + memmove(&node->objs[0], &node->objs[off], + node->idx * sizeof(void *)); + + node->total_sched_fail += node->idx; + + return false; +} + +bool __rte_noinline +__rte_graph_sched_node_enqueue(struct rte_node *node, + struct rte_graph_rq_head *rq) +{ + const unsigned int lcore_id = node->lcore_id; + struct rte_graph *graph; + + SLIST_FOREACH(graph, rq, rq_next) + if (graph->lcore_id == lcore_id) + break; + + return graph != NULL ? __graph_sched_node_enqueue(node, graph) : false; +} + +void __rte_noinline +__rte_graph_sched_wq_process(struct rte_graph *graph) +{ + struct graph_sched_wq_node *wq_node; + struct rte_mempool *mp = graph->mp; + struct rte_ring *wq = graph->wq; + uint16_t idx, free_space; + struct rte_node *node; + unsigned int i, n; + uint64_t start; + uint16_t rc; + void **objs; + struct graph_sched_wq_node *wq_nodes[32]; + + n = rte_ring_sc_dequeue_burst_elem(wq, wq_nodes, sizeof(wq_nodes[0]), + RTE_DIM(wq_nodes), NULL); + if (n == 0) + return; + + for (i = 0; i < n; i++) { + wq_node = wq_nodes[i]; + node = RTE_PTR_ADD(graph, wq_node->node_off); + RTE_ASSERT(node->fence == RTE_GRAPH_FENCE); + idx = node->idx; + free_space = node->size - idx; + + if (unlikely(free_space < wq_node->nb_objs)) + __rte_node_stream_alloc_size(graph, node, node->size + wq_node->nb_objs); + + memmove(&node->objs[idx], wq_node->objs, wq_node->nb_objs * sizeof(void *)); + memset(wq_node->objs, 0, wq_node->nb_objs * sizeof(void *)); + node->idx = idx + wq_node->nb_objs; + objs = node->objs; + rte_prefetch0(objs); + + if (rte_graph_has_stats_feature()) { + start = rte_rdtsc(); + rc = node->process(graph, node, objs, wq_node->nb_objs); + node->total_cycles += rte_rdtsc() - start; + node->total_calls++; + node->total_objs += rc; + } else { + node->process(graph, node, objs, wq_node->nb_objs); + } + wq_node->nb_objs = 0; + node->idx = 0; + } + + rte_mempool_put_bulk(mp, (void **)wq_nodes, n); +} diff --git a/lib/graph/meson.build b/lib/graph/meson.build index c7327549e8..440063b24e 100644 --- a/lib/graph/meson.build +++ b/lib/graph/meson.build @@ -13,8 +13,10 @@ sources = files( 'graph_ops.c', 'graph_debug.c', 'graph_stats.c', + 'graph_sched.c', 'graph_populate.c', ) headers = files('rte_graph.h', 'rte_graph_worker.h') deps += ['eal'] +deps += ['mempool', 'ring'] diff --git a/lib/graph/rte_graph_worker.h b/lib/graph/rte_graph_worker.h index ca0185ec36..faf3f31ddc 100644 --- a/lib/graph/rte_graph_worker.h +++ b/lib/graph/rte_graph_worker.h @@ -28,6 +28,13 @@ extern "C" { #endif +/** + * @internal + * + * Singly-linked list head for graph schedule run-queue. + */ +SLIST_HEAD(rte_graph_rq_head, rte_graph); + /** * @internal * @@ -39,6 +46,14 @@ struct rte_graph { uint32_t cir_mask; /**< Circular buffer wrap around mask. */ rte_node_t nb_nodes; /**< Number of nodes in the graph. */ rte_graph_off_t *cir_start; /**< Pointer to circular buffer. */ + /* Graph schedule area --BEGIN-- */ + struct rte_graph_rq_head *rq __rte_cache_aligned; /* The run-queue */ + struct rte_graph_rq_head rq_head; /* The head for run-queue list */ + SLIST_ENTRY(rte_graph) rq_next; /* The next for run-queue list */ + unsigned int lcore_id; /**< The graph running Lcore. */ + struct rte_ring *wq; /**< The work-queue for pending streams. */ + struct rte_mempool *mp; /**< The mempool for scheduling streams. */ + /* Graph schedule area --END-- */ rte_graph_off_t nodes_start; /**< Offset at which node memory starts. */ rte_graph_t id; /**< Graph identifier. */ int socket; /**< Socket ID where memory is allocated. */ @@ -63,6 +78,8 @@ struct rte_node { char parent[RTE_NODE_NAMESIZE]; /**< Parent node name. */ char name[RTE_NODE_NAMESIZE]; /**< Name of the node. */ + /* Fast schedule area */ + unsigned int lcore_id __rte_cache_aligned; /**< Node running Lcore. */ /* Fast path area */ #define RTE_NODE_CTX_SZ 16 uint8_t ctx[RTE_NODE_CTX_SZ] __rte_cache_aligned; /**< Node Context. */ @@ -118,6 +135,34 @@ __rte_experimental void __rte_node_stream_alloc_size(struct rte_graph *graph, struct rte_node *node, uint16_t req_size); +/** + * @internal + * + * Schedule the node to the right graph's work queue. + * + * @param node + * Pointer to the scheduled node object. + * @param rq + * Pointer to the scheduled run-queue for all graphs. + * + * @return + * True on success, false otherwise. + */ +__rte_experimental +bool __rte_graph_sched_node_enqueue(struct rte_node *node, + struct rte_graph_rq_head *rq); + +/** + * @internal + * + * Process all nodes (streams) in the graph's work queue. + * + * @param graph + * Pointer to the graph object. + */ +__rte_experimental +void __rte_graph_sched_wq_process(struct rte_graph *graph); + /** * Perform graph walk on the circular buffer and invoke the process function * of the nodes and collect the stats. diff --git a/lib/graph/version.map b/lib/graph/version.map index 6fc43e4411..99f82eb21a 100644 --- a/lib/graph/version.map +++ b/lib/graph/version.map @@ -1,6 +1,9 @@ EXPERIMENTAL { global: + __rte_graph_sched_node_enqueue; + __rte_graph_sched_wq_process; + __rte_node_register; __rte_node_stream_alloc; __rte_node_stream_alloc_size; -- 2.25.1