From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id A5AA0A04B1; Wed, 30 Sep 2020 08:47:32 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 4D0531DB60; Wed, 30 Sep 2020 08:36:08 +0200 (CEST) Received: from mga09.intel.com (mga09.intel.com [134.134.136.24]) by dpdk.org (Postfix) with ESMTP id DC4C41DA33 for ; Wed, 30 Sep 2020 08:35:09 +0200 (CEST) IronPort-SDR: m+jXYExJwnBgVfSG5lpg7v6MhvGQ/io0fA2Hd8q4Lu/fIagL0tM7wH2gGiqWnqBN5q2nbqPjg0 mudMzSSxvVTw== X-IronPort-AV: E=McAfee;i="6000,8403,9759"; a="163238802" X-IronPort-AV: E=Sophos;i="5.77,321,1596524400"; d="scan'208";a="163238802" X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga005.jf.intel.com ([10.7.209.41]) by orsmga102.jf.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 29 Sep 2020 23:35:07 -0700 IronPort-SDR: deP7QgL0Rj/MHXZaVzY2x3WfACUGq1cFfDXDs5TDYLeneOxkIUxZEYS4H5i0gJuWXb6Gf4zVkc /iTHuTakPorw== X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.77,321,1596524400"; d="scan'208";a="495884221" Received: from silpixa00400573.ir.intel.com (HELO silpixa00400573.ger.corp.intel.com) ([10.237.223.107]) by orsmga005.jf.intel.com with ESMTP; 29 Sep 2020 23:35:07 -0700 From: Cristian Dumitrescu To: dev@dpdk.org Cc: thomas@monjalon.net, david.marchand@redhat.com Date: Wed, 30 Sep 2020 07:34:10 +0100 Message-Id: <20200930063416.68428-37-cristian.dumitrescu@intel.com> X-Mailer: git-send-email 2.17.1 In-Reply-To: <20200930063416.68428-1-cristian.dumitrescu@intel.com> References: <20200923180645.55852-2-cristian.dumitrescu@intel.com> <20200930063416.68428-1-cristian.dumitrescu@intel.com> Subject: [dpdk-dev] [PATCH v6 36/42] examples/pipeline: add new example application X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Add new example application to showcase the API of the newly introduced SWX pipeline type. Signed-off-by: Cristian Dumitrescu --- MAINTAINERS | 1 + examples/meson.build | 1 + examples/pipeline/Makefile | 50 ++++ examples/pipeline/main.c | 50 ++++ examples/pipeline/meson.build | 16 + examples/pipeline/obj.c | 470 +++++++++++++++++++++++++++++ examples/pipeline/obj.h | 131 ++++++++ examples/pipeline/thread.c | 549 ++++++++++++++++++++++++++++++++++ examples/pipeline/thread.h | 28 ++ 9 files changed, 1296 insertions(+) create mode 100644 examples/pipeline/Makefile create mode 100644 examples/pipeline/main.c create mode 100644 examples/pipeline/meson.build create mode 100644 examples/pipeline/obj.c create mode 100644 examples/pipeline/obj.h create mode 100644 examples/pipeline/thread.c create mode 100644 examples/pipeline/thread.h diff --git a/MAINTAINERS b/MAINTAINERS index 49a6dfa7a..df3033cdb 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -1331,6 +1331,7 @@ F: app/test/test_table* F: app/test-pipeline/ F: doc/guides/sample_app_ug/test_pipeline.rst F: examples/ip_pipeline/ +F: examples/pipeline/ F: doc/guides/sample_app_ug/ip_pipeline.rst diff --git a/examples/meson.build b/examples/meson.build index eb13e8210..245d98575 100644 --- a/examples/meson.build +++ b/examples/meson.build @@ -33,6 +33,7 @@ all_examples = [ 'ntb', 'packet_ordering', 'performance-thread/l3fwd-thread', 'performance-thread/pthread_shim', + 'pipeline', 'ptpclient', 'qos_meter', 'qos_sched', 'rxtx_callbacks', diff --git a/examples/pipeline/Makefile b/examples/pipeline/Makefile new file mode 100644 index 000000000..da2f4850b --- /dev/null +++ b/examples/pipeline/Makefile @@ -0,0 +1,50 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2020 Intel Corporation + +# binary name +APP = pipeline + +# all source are stored in SRCS-y +SRCS-y += main.c +SRCS-y += obj.c +SRCS-y += thread.c + +# Build using pkg-config variables if possible +ifneq ($(shell pkg-config --exists libdpdk && echo 0),0) +$(error "no installation of DPDK found") +endif + +all: shared +.PHONY: shared static +shared: build/$(APP)-shared + ln -sf $(APP)-shared build/$(APP) +static: build/$(APP)-static + ln -sf $(APP)-static build/$(APP) + +PKGCONF ?= pkg-config + +PC_FILE := $(shell $(PKGCONF) --path libdpdk 2>/dev/null) +CFLAGS += -O3 $(shell $(PKGCONF) --cflags libdpdk) +LDFLAGS_SHARED = $(shell $(PKGCONF) --libs libdpdk) +LDFLAGS_STATIC = $(shell $(PKGCONF) --static --libs libdpdk) + +CFLAGS += -I. -DALLOW_EXPERIMENTAL_API -D_GNU_SOURCE + +OBJS := $(patsubst %.c,build/%.o,$(SRCS-y)) + +build/%.o: %.c Makefile $(PC_FILE) | build + $(CC) $(CFLAGS) -c $< -o $@ + +build/$(APP)-shared: $(OBJS) + $(CC) $(OBJS) -o $@ $(LDFLAGS) $(LDFLAGS_SHARED) + +build/$(APP)-static: $(OBJS) + $(CC) $(OBJS) -o $@ $(LDFLAGS) $(LDFLAGS_STATIC) + +build: + @mkdir -p $@ + +.PHONY: clean +clean: + rm -f build/$(APP)* build/*.o + test -d build && rmdir -p build || true diff --git a/examples/pipeline/main.c b/examples/pipeline/main.c new file mode 100644 index 000000000..dec78fba5 --- /dev/null +++ b/examples/pipeline/main.c @@ -0,0 +1,50 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2010-2020 Intel Corporation + */ + +#include +#include +#include +#include +#include + +#include +#include + +#include "obj.h" +#include "thread.h" + +int +main(int argc, char **argv) +{ + struct obj *obj; + int status; + + /* EAL */ + status = rte_eal_init(argc, argv); + if (status < 0) { + printf("Error: EAL initialization failed (%d)\n", status); + return status; + }; + + /* Obj */ + obj = obj_init(); + if (!obj) { + printf("Error: Obj initialization failed (%d)\n", status); + return status; + } + + /* Thread */ + status = thread_init(); + if (status) { + printf("Error: Thread initialization failed (%d)\n", status); + return status; + } + + rte_eal_mp_remote_launch( + thread_main, + NULL, + SKIP_MASTER); + + return 0; +} diff --git a/examples/pipeline/meson.build b/examples/pipeline/meson.build new file mode 100644 index 000000000..ade485f97 --- /dev/null +++ b/examples/pipeline/meson.build @@ -0,0 +1,16 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2017-2020 Intel Corporation + +# meson file, for building this example as part of a main DPDK build. +# +# To build this example as a standalone application with an already-installed +# DPDK instance, use 'make' + +build = cc.has_header('sys/epoll.h') +deps += ['pipeline', 'bus_pci'] +allow_experimental_apis = true +sources = files( + 'main.c', + 'obj.c', + 'thread.c', +) diff --git a/examples/pipeline/obj.c b/examples/pipeline/obj.c new file mode 100644 index 000000000..688870f97 --- /dev/null +++ b/examples/pipeline/obj.c @@ -0,0 +1,470 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2010-2020 Intel Corporation + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "obj.h" + +/* + * mempool + */ +TAILQ_HEAD(mempool_list, mempool); + +/* + * link + */ +TAILQ_HEAD(link_list, link); + +/* + * pipeline + */ +TAILQ_HEAD(pipeline_list, pipeline); + +/* + * obj + */ +struct obj { + struct mempool_list mempool_list; + struct link_list link_list; + struct pipeline_list pipeline_list; +}; + +/* + * mempool + */ +#define BUFFER_SIZE_MIN (sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM) + +struct mempool * +mempool_create(struct obj *obj, const char *name, struct mempool_params *params) +{ + struct mempool *mempool; + struct rte_mempool *m; + + /* Check input params */ + if ((name == NULL) || + mempool_find(obj, name) || + (params == NULL) || + (params->buffer_size < BUFFER_SIZE_MIN) || + (params->pool_size == 0)) + return NULL; + + /* Resource create */ + m = rte_pktmbuf_pool_create( + name, + params->pool_size, + params->cache_size, + 0, + params->buffer_size - sizeof(struct rte_mbuf), + params->cpu_id); + + if (m == NULL) + return NULL; + + /* Node allocation */ + mempool = calloc(1, sizeof(struct mempool)); + if (mempool == NULL) { + rte_mempool_free(m); + return NULL; + } + + /* Node fill in */ + strlcpy(mempool->name, name, sizeof(mempool->name)); + mempool->m = m; + mempool->buffer_size = params->buffer_size; + + /* Node add to list */ + TAILQ_INSERT_TAIL(&obj->mempool_list, mempool, node); + + return mempool; +} + +struct mempool * +mempool_find(struct obj *obj, const char *name) +{ + struct mempool *mempool; + + if (!obj || !name) + return NULL; + + TAILQ_FOREACH(mempool, &obj->mempool_list, node) + if (strcmp(mempool->name, name) == 0) + return mempool; + + return NULL; +} + +/* + * link + */ +static struct rte_eth_conf port_conf_default = { + .link_speeds = 0, + .rxmode = { + .mq_mode = ETH_MQ_RX_NONE, + .max_rx_pkt_len = 9000, /* Jumbo frame max packet len */ + .split_hdr_size = 0, /* Header split buffer size */ + }, + .rx_adv_conf = { + .rss_conf = { + .rss_key = NULL, + .rss_key_len = 40, + .rss_hf = 0, + }, + }, + .txmode = { + .mq_mode = ETH_MQ_TX_NONE, + }, + .lpbk_mode = 0, +}; + +#define RETA_CONF_SIZE (ETH_RSS_RETA_SIZE_512 / RTE_RETA_GROUP_SIZE) + +static int +rss_setup(uint16_t port_id, + uint16_t reta_size, + struct link_params_rss *rss) +{ + struct rte_eth_rss_reta_entry64 reta_conf[RETA_CONF_SIZE]; + uint32_t i; + int status; + + /* RETA setting */ + memset(reta_conf, 0, sizeof(reta_conf)); + + for (i = 0; i < reta_size; i++) + reta_conf[i / RTE_RETA_GROUP_SIZE].mask = UINT64_MAX; + + for (i = 0; i < reta_size; i++) { + uint32_t reta_id = i / RTE_RETA_GROUP_SIZE; + uint32_t reta_pos = i % RTE_RETA_GROUP_SIZE; + uint32_t rss_qs_pos = i % rss->n_queues; + + reta_conf[reta_id].reta[reta_pos] = + (uint16_t) rss->queue_id[rss_qs_pos]; + } + + /* RETA update */ + status = rte_eth_dev_rss_reta_update(port_id, + reta_conf, + reta_size); + + return status; +} + +struct link * +link_create(struct obj *obj, const char *name, struct link_params *params) +{ + struct rte_eth_dev_info port_info; + struct rte_eth_conf port_conf; + struct link *link; + struct link_params_rss *rss; + struct mempool *mempool; + uint32_t cpu_id, i; + int status; + uint16_t port_id; + + /* Check input params */ + if ((name == NULL) || + link_find(obj, name) || + (params == NULL) || + (params->rx.n_queues == 0) || + (params->rx.queue_size == 0) || + (params->tx.n_queues == 0) || + (params->tx.queue_size == 0)) + return NULL; + + port_id = params->port_id; + if (params->dev_name) { + status = rte_eth_dev_get_port_by_name(params->dev_name, + &port_id); + + if (status) + return NULL; + } else + if (!rte_eth_dev_is_valid_port(port_id)) + return NULL; + + if (rte_eth_dev_info_get(port_id, &port_info) != 0) + return NULL; + + mempool = mempool_find(obj, params->rx.mempool_name); + if (mempool == NULL) + return NULL; + + rss = params->rx.rss; + if (rss) { + if ((port_info.reta_size == 0) || + (port_info.reta_size > ETH_RSS_RETA_SIZE_512)) + return NULL; + + if ((rss->n_queues == 0) || + (rss->n_queues >= LINK_RXQ_RSS_MAX)) + return NULL; + + for (i = 0; i < rss->n_queues; i++) + if (rss->queue_id[i] >= port_info.max_rx_queues) + return NULL; + } + + /** + * Resource create + */ + /* Port */ + memcpy(&port_conf, &port_conf_default, sizeof(port_conf)); + if (rss) { + port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS; + port_conf.rx_adv_conf.rss_conf.rss_hf = + (ETH_RSS_IP | ETH_RSS_TCP | ETH_RSS_UDP) & + port_info.flow_type_rss_offloads; + } + + cpu_id = (uint32_t) rte_eth_dev_socket_id(port_id); + if (cpu_id == (uint32_t) SOCKET_ID_ANY) + cpu_id = 0; + + status = rte_eth_dev_configure( + port_id, + params->rx.n_queues, + params->tx.n_queues, + &port_conf); + + if (status < 0) + return NULL; + + if (params->promiscuous) { + status = rte_eth_promiscuous_enable(port_id); + if (status != 0) + return NULL; + } + + /* Port RX */ + for (i = 0; i < params->rx.n_queues; i++) { + status = rte_eth_rx_queue_setup( + port_id, + i, + params->rx.queue_size, + cpu_id, + NULL, + mempool->m); + + if (status < 0) + return NULL; + } + + /* Port TX */ + for (i = 0; i < params->tx.n_queues; i++) { + status = rte_eth_tx_queue_setup( + port_id, + i, + params->tx.queue_size, + cpu_id, + NULL); + + if (status < 0) + return NULL; + } + + /* Port start */ + status = rte_eth_dev_start(port_id); + if (status < 0) + return NULL; + + if (rss) { + status = rss_setup(port_id, port_info.reta_size, rss); + + if (status) { + rte_eth_dev_stop(port_id); + return NULL; + } + } + + /* Port link up */ + status = rte_eth_dev_set_link_up(port_id); + if ((status < 0) && (status != -ENOTSUP)) { + rte_eth_dev_stop(port_id); + return NULL; + } + + /* Node allocation */ + link = calloc(1, sizeof(struct link)); + if (link == NULL) { + rte_eth_dev_stop(port_id); + return NULL; + } + + /* Node fill in */ + strlcpy(link->name, name, sizeof(link->name)); + link->port_id = port_id; + rte_eth_dev_get_name_by_port(port_id, link->dev_name); + link->n_rxq = params->rx.n_queues; + link->n_txq = params->tx.n_queues; + + /* Node add to list */ + TAILQ_INSERT_TAIL(&obj->link_list, link, node); + + return link; +} + +int +link_is_up(struct obj *obj, const char *name) +{ + struct rte_eth_link link_params; + struct link *link; + + /* Check input params */ + if (!obj || !name) + return 0; + + link = link_find(obj, name); + if (link == NULL) + return 0; + + /* Resource */ + if (rte_eth_link_get(link->port_id, &link_params) < 0) + return 0; + + return (link_params.link_status == ETH_LINK_DOWN) ? 0 : 1; +} + +struct link * +link_find(struct obj *obj, const char *name) +{ + struct link *link; + + if (!obj || !name) + return NULL; + + TAILQ_FOREACH(link, &obj->link_list, node) + if (strcmp(link->name, name) == 0) + return link; + + return NULL; +} + +struct link * +link_next(struct obj *obj, struct link *link) +{ + return (link == NULL) ? + TAILQ_FIRST(&obj->link_list) : TAILQ_NEXT(link, node); +} + +/* + * pipeline + */ +#ifndef PIPELINE_MSGQ_SIZE +#define PIPELINE_MSGQ_SIZE 64 +#endif + +struct pipeline * +pipeline_create(struct obj *obj, const char *name, int numa_node) +{ + struct pipeline *pipeline; + struct rte_swx_pipeline *p = NULL; + int status; + + /* Check input params */ + if ((name == NULL) || + pipeline_find(obj, name)) + return NULL; + + /* Resource create */ + status = rte_swx_pipeline_config(&p, numa_node); + if (status) + goto error; + + status = rte_swx_pipeline_port_in_type_register(p, + "ethdev", + &rte_swx_port_ethdev_reader_ops); + if (status) + goto error; + + status = rte_swx_pipeline_port_out_type_register(p, + "ethdev", + &rte_swx_port_ethdev_writer_ops); + if (status) + goto error; + +#ifdef RTE_PORT_PCAP + status = rte_swx_pipeline_port_in_type_register(p, + "source", + &rte_swx_port_source_ops); + if (status) + goto error; +#endif + + status = rte_swx_pipeline_port_out_type_register(p, + "sink", + &rte_swx_port_sink_ops); + if (status) + goto error; + + status = rte_swx_pipeline_table_type_register(p, + "exact", + RTE_SWX_TABLE_MATCH_EXACT, + &rte_swx_table_exact_match_ops); + if (status) + goto error; + + /* Node allocation */ + pipeline = calloc(1, sizeof(struct pipeline)); + if (pipeline == NULL) + goto error; + + /* Node fill in */ + strlcpy(pipeline->name, name, sizeof(pipeline->name)); + pipeline->p = p; + pipeline->timer_period_ms = 10; + + /* Node add to list */ + TAILQ_INSERT_TAIL(&obj->pipeline_list, pipeline, node); + + return pipeline; + +error: + rte_swx_pipeline_free(p); + return NULL; +} + +struct pipeline * +pipeline_find(struct obj *obj, const char *name) +{ + struct pipeline *pipeline; + + if (!obj || !name) + return NULL; + + TAILQ_FOREACH(pipeline, &obj->pipeline_list, node) + if (strcmp(name, pipeline->name) == 0) + return pipeline; + + return NULL; +} + +/* + * obj + */ +struct obj * +obj_init(void) +{ + struct obj *obj; + + obj = calloc(1, sizeof(struct obj)); + if (!obj) + return NULL; + + TAILQ_INIT(&obj->mempool_list); + TAILQ_INIT(&obj->link_list); + TAILQ_INIT(&obj->pipeline_list); + + return obj; +} diff --git a/examples/pipeline/obj.h b/examples/pipeline/obj.h new file mode 100644 index 000000000..2f48b790f --- /dev/null +++ b/examples/pipeline/obj.h @@ -0,0 +1,131 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2010-2020 Intel Corporation + */ + +#ifndef _INCLUDE_OBJ_H_ +#define _INCLUDE_OBJ_H_ + +#include +#include + +#include +#include +#include + +#ifndef NAME_SIZE +#define NAME_SIZE 64 +#endif + +/* + * obj + */ +struct obj; + +struct obj * +obj_init(void); + +/* + * mempool + */ +struct mempool_params { + uint32_t buffer_size; + uint32_t pool_size; + uint32_t cache_size; + uint32_t cpu_id; +}; + +struct mempool { + TAILQ_ENTRY(mempool) node; + char name[NAME_SIZE]; + struct rte_mempool *m; + uint32_t buffer_size; +}; + +struct mempool * +mempool_create(struct obj *obj, + const char *name, + struct mempool_params *params); + +struct mempool * +mempool_find(struct obj *obj, + const char *name); + +/* + * link + */ +#ifndef LINK_RXQ_RSS_MAX +#define LINK_RXQ_RSS_MAX 16 +#endif + +struct link_params_rss { + uint32_t queue_id[LINK_RXQ_RSS_MAX]; + uint32_t n_queues; +}; + +struct link_params { + const char *dev_name; + uint16_t port_id; /**< Valid only when *dev_name* is NULL. */ + + struct { + uint32_t n_queues; + uint32_t queue_size; + const char *mempool_name; + struct link_params_rss *rss; + } rx; + + struct { + uint32_t n_queues; + uint32_t queue_size; + } tx; + + int promiscuous; +}; + +struct link { + TAILQ_ENTRY(link) node; + char name[NAME_SIZE]; + char dev_name[NAME_SIZE]; + uint16_t port_id; + uint32_t n_rxq; + uint32_t n_txq; +}; + +struct link * +link_create(struct obj *obj, + const char *name, + struct link_params *params); + +int +link_is_up(struct obj *obj, const char *name); + +struct link * +link_find(struct obj *obj, const char *name); + +struct link * +link_next(struct obj *obj, struct link *link); + +/* + * pipeline + */ +struct pipeline { + TAILQ_ENTRY(pipeline) node; + char name[NAME_SIZE]; + + struct rte_swx_pipeline *p; + struct rte_swx_ctl_pipeline *ctl; + + uint32_t timer_period_ms; + int enabled; + uint32_t thread_id; + uint32_t cpu_id; +}; + +struct pipeline * +pipeline_create(struct obj *obj, + const char *name, + int numa_node); + +struct pipeline * +pipeline_find(struct obj *obj, const char *name); + +#endif /* _INCLUDE_OBJ_H_ */ diff --git a/examples/pipeline/thread.c b/examples/pipeline/thread.c new file mode 100644 index 000000000..1be9828f0 --- /dev/null +++ b/examples/pipeline/thread.c @@ -0,0 +1,549 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2010-2020 Intel Corporation + */ + +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "obj.h" +#include "thread.h" + +#ifndef THREAD_PIPELINES_MAX +#define THREAD_PIPELINES_MAX 256 +#endif + +#ifndef THREAD_MSGQ_SIZE +#define THREAD_MSGQ_SIZE 64 +#endif + +#ifndef THREAD_TIMER_PERIOD_MS +#define THREAD_TIMER_PERIOD_MS 100 +#endif + +/** + * Control thread: data plane thread context + */ +struct thread { + struct rte_ring *msgq_req; + struct rte_ring *msgq_rsp; + + uint32_t enabled; +}; + +static struct thread thread[RTE_MAX_LCORE]; + +/** + * Data plane threads: context + */ +struct pipeline_data { + struct rte_swx_pipeline *p; + uint64_t timer_period; /* Measured in CPU cycles. */ + uint64_t time_next; +}; + +struct thread_data { + struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX]; + uint32_t n_pipelines; + + struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX]; + struct rte_ring *msgq_req; + struct rte_ring *msgq_rsp; + uint64_t timer_period; /* Measured in CPU cycles. */ + uint64_t time_next; + uint64_t time_next_min; +} __rte_cache_aligned; + +static struct thread_data thread_data[RTE_MAX_LCORE]; + +/** + * Control thread: data plane thread init + */ +static void +thread_free(void) +{ + uint32_t i; + + for (i = 0; i < RTE_MAX_LCORE; i++) { + struct thread *t = &thread[i]; + + if (!rte_lcore_is_enabled(i)) + continue; + + /* MSGQs */ + if (t->msgq_req) + rte_ring_free(t->msgq_req); + + if (t->msgq_rsp) + rte_ring_free(t->msgq_rsp); + } +} + +int +thread_init(void) +{ + uint32_t i; + + RTE_LCORE_FOREACH_SLAVE(i) { + char name[NAME_MAX]; + struct rte_ring *msgq_req, *msgq_rsp; + struct thread *t = &thread[i]; + struct thread_data *t_data = &thread_data[i]; + uint32_t cpu_id = rte_lcore_to_socket_id(i); + + /* MSGQs */ + snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i); + + msgq_req = rte_ring_create(name, + THREAD_MSGQ_SIZE, + cpu_id, + RING_F_SP_ENQ | RING_F_SC_DEQ); + + if (msgq_req == NULL) { + thread_free(); + return -1; + } + + snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i); + + msgq_rsp = rte_ring_create(name, + THREAD_MSGQ_SIZE, + cpu_id, + RING_F_SP_ENQ | RING_F_SC_DEQ); + + if (msgq_rsp == NULL) { + thread_free(); + return -1; + } + + /* Control thread records */ + t->msgq_req = msgq_req; + t->msgq_rsp = msgq_rsp; + t->enabled = 1; + + /* Data plane thread records */ + t_data->n_pipelines = 0; + t_data->msgq_req = msgq_req; + t_data->msgq_rsp = msgq_rsp; + t_data->timer_period = + (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000; + t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period; + t_data->time_next_min = t_data->time_next; + } + + return 0; +} + +static inline int +thread_is_running(uint32_t thread_id) +{ + enum rte_lcore_state_t thread_state; + + thread_state = rte_eal_get_lcore_state(thread_id); + return (thread_state == RUNNING) ? 1 : 0; +} + +/** + * Control thread & data plane threads: message passing + */ +enum thread_req_type { + THREAD_REQ_PIPELINE_ENABLE = 0, + THREAD_REQ_PIPELINE_DISABLE, + THREAD_REQ_MAX +}; + +struct thread_msg_req { + enum thread_req_type type; + + union { + struct { + struct rte_swx_pipeline *p; + uint32_t timer_period_ms; + } pipeline_enable; + + struct { + struct rte_swx_pipeline *p; + } pipeline_disable; + }; +}; + +struct thread_msg_rsp { + int status; +}; + +/** + * Control thread + */ +static struct thread_msg_req * +thread_msg_alloc(void) +{ + size_t size = RTE_MAX(sizeof(struct thread_msg_req), + sizeof(struct thread_msg_rsp)); + + return calloc(1, size); +} + +static void +thread_msg_free(struct thread_msg_rsp *rsp) +{ + free(rsp); +} + +static struct thread_msg_rsp * +thread_msg_send_recv(uint32_t thread_id, + struct thread_msg_req *req) +{ + struct thread *t = &thread[thread_id]; + struct rte_ring *msgq_req = t->msgq_req; + struct rte_ring *msgq_rsp = t->msgq_rsp; + struct thread_msg_rsp *rsp; + int status; + + /* send */ + do { + status = rte_ring_sp_enqueue(msgq_req, req); + } while (status == -ENOBUFS); + + /* recv */ + do { + status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp); + } while (status != 0); + + return rsp; +} + +int +thread_pipeline_enable(uint32_t thread_id, + struct obj *obj, + const char *pipeline_name) +{ + struct pipeline *p = pipeline_find(obj, pipeline_name); + struct thread *t; + struct thread_msg_req *req; + struct thread_msg_rsp *rsp; + int status; + + /* Check input params */ + if ((thread_id >= RTE_MAX_LCORE) || + (p == NULL)) + return -1; + + t = &thread[thread_id]; + if (t->enabled == 0) + return -1; + + if (!thread_is_running(thread_id)) { + struct thread_data *td = &thread_data[thread_id]; + struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines]; + + if (td->n_pipelines >= THREAD_PIPELINES_MAX) + return -1; + + /* Data plane thread */ + td->p[td->n_pipelines] = p->p; + + tdp->p = p->p; + tdp->timer_period = + (rte_get_tsc_hz() * p->timer_period_ms) / 1000; + tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period; + + td->n_pipelines++; + + /* Pipeline */ + p->thread_id = thread_id; + p->enabled = 1; + + return 0; + } + + /* Allocate request */ + req = thread_msg_alloc(); + if (req == NULL) + return -1; + + /* Write request */ + req->type = THREAD_REQ_PIPELINE_ENABLE; + req->pipeline_enable.p = p->p; + req->pipeline_enable.timer_period_ms = p->timer_period_ms; + + /* Send request and wait for response */ + rsp = thread_msg_send_recv(thread_id, req); + + /* Read response */ + status = rsp->status; + + /* Free response */ + thread_msg_free(rsp); + + /* Request completion */ + if (status) + return status; + + p->thread_id = thread_id; + p->enabled = 1; + + return 0; +} + +int +thread_pipeline_disable(uint32_t thread_id, + struct obj *obj, + const char *pipeline_name) +{ + struct pipeline *p = pipeline_find(obj, pipeline_name); + struct thread *t; + struct thread_msg_req *req; + struct thread_msg_rsp *rsp; + int status; + + /* Check input params */ + if ((thread_id >= RTE_MAX_LCORE) || + (p == NULL)) + return -1; + + t = &thread[thread_id]; + if (t->enabled == 0) + return -1; + + if (p->enabled == 0) + return 0; + + if (p->thread_id != thread_id) + return -1; + + if (!thread_is_running(thread_id)) { + struct thread_data *td = &thread_data[thread_id]; + uint32_t i; + + for (i = 0; i < td->n_pipelines; i++) { + struct pipeline_data *tdp = &td->pipeline_data[i]; + + if (tdp->p != p->p) + continue; + + /* Data plane thread */ + if (i < td->n_pipelines - 1) { + struct rte_swx_pipeline *pipeline_last = + td->p[td->n_pipelines - 1]; + struct pipeline_data *tdp_last = + &td->pipeline_data[td->n_pipelines - 1]; + + td->p[i] = pipeline_last; + memcpy(tdp, tdp_last, sizeof(*tdp)); + } + + td->n_pipelines--; + + /* Pipeline */ + p->enabled = 0; + + break; + } + + return 0; + } + + /* Allocate request */ + req = thread_msg_alloc(); + if (req == NULL) + return -1; + + /* Write request */ + req->type = THREAD_REQ_PIPELINE_DISABLE; + req->pipeline_disable.p = p->p; + + /* Send request and wait for response */ + rsp = thread_msg_send_recv(thread_id, req); + + /* Read response */ + status = rsp->status; + + /* Free response */ + thread_msg_free(rsp); + + /* Request completion */ + if (status) + return status; + + p->enabled = 0; + + return 0; +} + +/** + * Data plane threads: message handling + */ +static inline struct thread_msg_req * +thread_msg_recv(struct rte_ring *msgq_req) +{ + struct thread_msg_req *req; + + int status = rte_ring_sc_dequeue(msgq_req, (void **) &req); + + if (status != 0) + return NULL; + + return req; +} + +static inline void +thread_msg_send(struct rte_ring *msgq_rsp, + struct thread_msg_rsp *rsp) +{ + int status; + + do { + status = rte_ring_sp_enqueue(msgq_rsp, rsp); + } while (status == -ENOBUFS); +} + +static struct thread_msg_rsp * +thread_msg_handle_pipeline_enable(struct thread_data *t, + struct thread_msg_req *req) +{ + struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req; + struct pipeline_data *p = &t->pipeline_data[t->n_pipelines]; + + /* Request */ + if (t->n_pipelines >= THREAD_PIPELINES_MAX) { + rsp->status = -1; + return rsp; + } + + t->p[t->n_pipelines] = req->pipeline_enable.p; + + p->p = req->pipeline_enable.p; + p->timer_period = (rte_get_tsc_hz() * + req->pipeline_enable.timer_period_ms) / 1000; + p->time_next = rte_get_tsc_cycles() + p->timer_period; + + t->n_pipelines++; + + /* Response */ + rsp->status = 0; + return rsp; +} + +static struct thread_msg_rsp * +thread_msg_handle_pipeline_disable(struct thread_data *t, + struct thread_msg_req *req) +{ + struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req; + uint32_t n_pipelines = t->n_pipelines; + struct rte_swx_pipeline *pipeline = req->pipeline_disable.p; + uint32_t i; + + /* find pipeline */ + for (i = 0; i < n_pipelines; i++) { + struct pipeline_data *p = &t->pipeline_data[i]; + + if (p->p != pipeline) + continue; + + if (i < n_pipelines - 1) { + struct rte_swx_pipeline *pipeline_last = + t->p[n_pipelines - 1]; + struct pipeline_data *p_last = + &t->pipeline_data[n_pipelines - 1]; + + t->p[i] = pipeline_last; + memcpy(p, p_last, sizeof(*p)); + } + + t->n_pipelines--; + + rsp->status = 0; + return rsp; + } + + /* should not get here */ + rsp->status = 0; + return rsp; +} + +static void +thread_msg_handle(struct thread_data *t) +{ + for ( ; ; ) { + struct thread_msg_req *req; + struct thread_msg_rsp *rsp; + + req = thread_msg_recv(t->msgq_req); + if (req == NULL) + break; + + switch (req->type) { + case THREAD_REQ_PIPELINE_ENABLE: + rsp = thread_msg_handle_pipeline_enable(t, req); + break; + + case THREAD_REQ_PIPELINE_DISABLE: + rsp = thread_msg_handle_pipeline_disable(t, req); + break; + + default: + rsp = (struct thread_msg_rsp *) req; + rsp->status = -1; + } + + thread_msg_send(t->msgq_rsp, rsp); + } +} + +/** + * Data plane threads: main + */ +int +thread_main(void *arg __rte_unused) +{ + struct thread_data *t; + uint32_t thread_id, i; + + thread_id = rte_lcore_id(); + t = &thread_data[thread_id]; + + /* Dispatch loop */ + for (i = 0; ; i++) { + uint32_t j; + + /* Data Plane */ + for (j = 0; j < t->n_pipelines; j++) + rte_swx_pipeline_run(t->p[j], 1000000); + + /* Control Plane */ + if ((i & 0xF) == 0) { + uint64_t time = rte_get_tsc_cycles(); + uint64_t time_next_min = UINT64_MAX; + + if (time < t->time_next_min) + continue; + + /* Thread message queues */ + { + uint64_t time_next = t->time_next; + + if (time_next <= time) { + thread_msg_handle(t); + time_next = time + t->timer_period; + t->time_next = time_next; + } + + if (time_next < time_next_min) + time_next_min = time_next; + } + + t->time_next_min = time_next_min; + } + } + + return 0; +} diff --git a/examples/pipeline/thread.h b/examples/pipeline/thread.h new file mode 100644 index 000000000..829d82cbd --- /dev/null +++ b/examples/pipeline/thread.h @@ -0,0 +1,28 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2010-2020 Intel Corporation + */ + +#ifndef _INCLUDE_THREAD_H_ +#define _INCLUDE_THREAD_H_ + +#include + +#include "obj.h" + +int +thread_pipeline_enable(uint32_t thread_id, + struct obj *obj, + const char *pipeline_name); + +int +thread_pipeline_disable(uint32_t thread_id, + struct obj *obj, + const char *pipeline_name); + +int +thread_init(void); + +int +thread_main(void *arg); + +#endif /* _INCLUDE_THREAD_H_ */ -- 2.17.1