* [dpdk-dev] [PATCH v2 1/1] ip_pipeline: added dynamic pipeline reconfiguration @ 2015-09-30 16:25 Maciej Gajdzica 2015-09-30 16:34 ` Dumitrescu, Cristian 2015-10-19 14:00 ` [dpdk-dev] [PATCH v3 " Piotr Azarewicz 0 siblings, 2 replies; 6+ messages in thread From: Maciej Gajdzica @ 2015-09-30 16:25 UTC (permalink / raw) To: dev Up till now pipeline was bound to thread selected in the initial config. This patch allows binding pipeline to other threads at runtime using CLI commands. Signed-off-by: Maciej Gajdzica <maciejx.t.gajdzica@intel.com> --- examples/ip_pipeline/Makefile | 1 + examples/ip_pipeline/app.h | 5 + examples/ip_pipeline/config_parse.c | 2 +- examples/ip_pipeline/init.c | 61 ++++ examples/ip_pipeline/pipeline.h | 6 + examples/ip_pipeline/pipeline/pipeline_common_fe.h | 3 + examples/ip_pipeline/thread.c | 134 +++++++- examples/ip_pipeline/thread.h | 101 ++++++ examples/ip_pipeline/thread_fe.c | 323 ++++++++++++++++++++ 9 files changed, 634 insertions(+), 2 deletions(-) create mode 100644 examples/ip_pipeline/thread.h create mode 100644 examples/ip_pipeline/thread_fe.c diff --git a/examples/ip_pipeline/Makefile b/examples/ip_pipeline/Makefile index f3ff1ec..c8e80b5 100644 --- a/examples/ip_pipeline/Makefile +++ b/examples/ip_pipeline/Makefile @@ -54,6 +54,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += config_parse_tm.c SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += config_check.c SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += init.c SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += thread.c +SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += thread_fe.c SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += cpu_core_map.c SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += pipeline_common_be.c diff --git a/examples/ip_pipeline/app.h b/examples/ip_pipeline/app.h index 521e3a0..19ddd31 100644 --- a/examples/ip_pipeline/app.h +++ b/examples/ip_pipeline/app.h @@ -220,9 +220,11 @@ struct app_pipeline_data { void *be; void *fe; uint64_t timer_period; + uint32_t enabled; }; struct app_thread_pipeline_data { + uint32_t pipeline_id; void *be; pipeline_be_op_run f_run; pipeline_be_op_timer f_timer; @@ -242,6 +244,9 @@ struct app_thread_data { uint32_t n_custom; uint64_t deadline; + + struct rte_ring *msgq_in; + struct rte_ring *msgq_out; }; struct app_eal_params { diff --git a/examples/ip_pipeline/config_parse.c b/examples/ip_pipeline/config_parse.c index c9b78f9..d2aaadf 100644 --- a/examples/ip_pipeline/config_parse.c +++ b/examples/ip_pipeline/config_parse.c @@ -362,7 +362,7 @@ parser_read_uint32(uint32_t *value, const char *p) return 0; } -static int +int parse_pipeline_core(uint32_t *socket, uint32_t *core, uint32_t *ht, diff --git a/examples/ip_pipeline/init.c b/examples/ip_pipeline/init.c index 3f9c68d..1126288 100644 --- a/examples/ip_pipeline/init.c +++ b/examples/ip_pipeline/init.c @@ -50,6 +50,7 @@ #include "pipeline_firewall.h" #include "pipeline_flow_classification.h" #include "pipeline_routing.h" +#include "thread.h" #define APP_NAME_SIZE 32 @@ -1225,6 +1226,48 @@ app_init_pipelines(struct app_params *app) } } +static inline struct rte_ring * +app_thread_msgq_in_get(struct app_params *app, + uint32_t socket_id, uint32_t core_id, uint32_t ht_id) +{ + char msgq_name[32]; + ssize_t param_idx; + + snprintf(msgq_name, sizeof(msgq_name), + "MSGQ-REQ-CORE-s%" PRIu32 "c%" PRIu32 "%s", + socket_id, + core_id, + (ht_id) ? "h" : ""); + param_idx = APP_PARAM_FIND(app->msgq_params, msgq_name); + + if (param_idx < 0) + return NULL; + + return app->msgq[param_idx]; +} + +static inline struct rte_ring * +app_thread_msgq_out_get(struct app_params *app, + uint32_t socket_id, uint32_t core_id, uint32_t ht_id) +{ + char msgq_name[32]; + ssize_t param_idx; + + snprintf(msgq_name, sizeof(msgq_name), + "MSGQ-RSP-CORE-s%" PRIu32 "c%" PRIu32 "%s", + socket_id, + core_id, + (ht_id) ? "h" : ""); + param_idx = APP_PARAM_FIND(app->msgq_params, msgq_name); + + + if (param_idx < 0) + return NULL; + + return app->msgq[param_idx]; + +} + static void app_init_threads(struct app_params *app) { @@ -1253,6 +1296,20 @@ app_init_threads(struct app_params *app) t = &app->thread_data[lcore_id]; + t->msgq_in = app_thread_msgq_in_get(app, + params->socket_id, + params->core_id, + params->hyper_th_id); + if (t->msgq_in == NULL) + rte_panic("Init error: Cannot find MSGQ_IN for thread %" PRId32, lcore_id); + + t->msgq_out = app_thread_msgq_out_get(app, + params->socket_id, + params->core_id, + params->hyper_th_id); + if (t->msgq_out == NULL) + rte_panic("Init error: Cannot find MSGQ_OUT for thread %" PRId32, lcore_id); + ptype = app_pipeline_type_find(app, params->type); if (ptype == NULL) rte_panic("Init error: Unknown pipeline " @@ -1262,12 +1319,15 @@ app_init_threads(struct app_params *app) &t->regular[t->n_regular] : &t->custom[t->n_custom]; + p->pipeline_id = p_id; p->be = data->be; p->f_run = ptype->be_ops->f_run; p->f_timer = ptype->be_ops->f_timer; p->timer_period = data->timer_period; p->deadline = time + data->timer_period; + data->enabled = 1; + if (ptype->be_ops->f_run == NULL) t->n_regular++; else @@ -1288,6 +1348,7 @@ int app_init(struct app_params *app) app_init_msgq(app); app_pipeline_common_cmd_push(app); + app_pipeline_thread_cmd_push(app); app_pipeline_type_register(app, &pipeline_master); app_pipeline_type_register(app, &pipeline_passthrough); app_pipeline_type_register(app, &pipeline_flow_classification); diff --git a/examples/ip_pipeline/pipeline.h b/examples/ip_pipeline/pipeline.h index b9a56ea..dab9c36 100644 --- a/examples/ip_pipeline/pipeline.h +++ b/examples/ip_pipeline/pipeline.h @@ -84,4 +84,10 @@ pipeline_type_cmds_count(struct pipeline_type *ptype) return n_cmds; } +int +parse_pipeline_core(uint32_t *socket, + uint32_t *core, + uint32_t *ht, + const char *entry); + #endif diff --git a/examples/ip_pipeline/pipeline/pipeline_common_fe.h b/examples/ip_pipeline/pipeline/pipeline_common_fe.h index 693848d..e84aa3a 100644 --- a/examples/ip_pipeline/pipeline/pipeline_common_fe.h +++ b/examples/ip_pipeline/pipeline/pipeline_common_fe.h @@ -68,6 +68,9 @@ app_pipeline_data_fe(struct app_params *app, uint32_t id) if (pipeline_data == NULL) return NULL; + if (pipeline_data->enabled == 0) + return NULL; + return pipeline_data->fe; } diff --git a/examples/ip_pipeline/thread.c b/examples/ip_pipeline/thread.c index b2a8656..7366981 100644 --- a/examples/ip_pipeline/thread.c +++ b/examples/ip_pipeline/thread.c @@ -37,8 +37,138 @@ #include "pipeline_common_be.h" #include "app.h" +#include "thread.h" -int app_thread(void *arg) +static inline void * +thread_msg_recv(struct rte_ring *r) +{ + void *msg; + int status = rte_ring_sc_dequeue(r, &msg); + + if (status != 0) + return NULL; + + return msg; +} + +static inline void +thread_msg_send(struct rte_ring *r, + void *msg) +{ + int status; + + do { + status = rte_ring_sp_enqueue(r, msg); + } while (status == -ENOBUFS); +} + +static int +thread_pipeline_enable(struct app_thread_data *t, + struct thread_pipeline_enable_msg_req *req) +{ + struct app_thread_pipeline_data *p; + + p = (req->f_run == NULL) ? + &t->regular[t->n_regular] : + &t->custom[t->n_custom]; + + if (t->n_regular >= APP_MAX_THREAD_PIPELINES) + return -1; + + p->pipeline_id = req->pipeline_id; + p->be = req->be; + p->f_run = req->f_run; + p->f_timer = req->f_timer; + p->timer_period = req->timer_period; + p->deadline = 0; + + if (req->f_run == NULL) + t->n_regular++; + else + t->n_custom++; + + return 0; +} + +static int +thread_pipeline_disable(struct app_thread_data *t, + struct thread_pipeline_disable_msg_req *req) +{ + uint32_t i; + + printf("%p\n", req); + printf("%d %d\n", req->type, req->pipeline_id); + + for (i = 0; i < t->n_regular; i++) { + if (t->regular[i].pipeline_id == req->pipeline_id) + break; + } + + if (i < t->n_regular) { + if (i < t->n_regular - 1) + memcpy(&t->regular[i], + &t->regular[i+1], + (t->n_regular - i) * sizeof(struct app_thread_pipeline_data)); + + t->n_regular--; + + return 0; + } + + for (i = 0; i < t->n_custom; i++) { + if (t->custom[i].pipeline_id == req->pipeline_id) + break; + } + + /* return if pipeline not found */ + if (i >= t->n_custom) + return -1; + + if (i < t->n_custom - 1) + memcpy(&t->custom[i], + &t->custom[i+1], + (t->n_custom - i) * sizeof(struct app_thread_pipeline_data)); + + t->n_custom--; + + return 0; +} + +static int +thread_msg_req_handle(struct app_thread_data *t) +{ + void *msg_ptr; + struct thread_msg_req *req; + struct thread_msg_rsp *rsp; + + msg_ptr = thread_msg_recv(t->msgq_in); + req = msg_ptr; + rsp = msg_ptr; + + if (req != NULL) + switch (req->type) { + case THREAD_MSG_REQ_PIPELINE_ENABLE: { + rsp->status = thread_pipeline_enable(t, + (struct thread_pipeline_enable_msg_req *) req); + thread_msg_send(t->msgq_out, rsp); + break; + } + + case THREAD_MSG_REQ_PIPELINE_DISABLE: { + rsp->status = thread_pipeline_disable(t, + (struct thread_pipeline_disable_msg_req *) req); + thread_msg_send(t->msgq_out, rsp); + break; + } + default: + break; + } + + return 0; +} + +int +app_thread(void *arg) { struct app_params *app = (struct app_params *) arg; uint32_t core_id = rte_lcore_id(), i, j; @@ -103,6 +233,8 @@ int app_thread(void *arg) } t->deadline = t_deadline; + + thread_msg_req_handle(t); } } diff --git a/examples/ip_pipeline/thread.h b/examples/ip_pipeline/thread.h new file mode 100644 index 0000000..878c9e1 --- /dev/null +++ b/examples/ip_pipeline/thread.h @@ -0,0 +1,101 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2010-2015 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef THREAD_H_ +#define THREAD_H_ + +#include "app.h" +#include "pipeline_be.h" + +enum thread_msg_req_type { + THREAD_MSG_REQ_PIPELINE_ENABLE = 0, + THREAD_MSG_REQ_PIPELINE_DISABLE, + THREAD_MSG_REQS +}; + +struct thread_msg_req { + enum thread_msg_req_type type; +}; + +struct thread_msg_rsp { + int status; +}; + +/* + * PIPELINE ENABLE + */ +struct thread_pipeline_enable_msg_req { + enum thread_msg_req_type type; + + uint32_t pipeline_id; + void *be; + pipeline_be_op_run f_run; + pipeline_be_op_timer f_timer; + uint64_t timer_period; +}; + +struct thread_pipeline_enable_msg_rsp { + int status; +}; + +/* + * PIPELINE DISABLE + */ +struct thread_pipeline_disable_msg_req { + enum thread_msg_req_type type; + + uint32_t pipeline_id; +}; + +struct thread_pipeline_disable_msg_rsp { + int status; +}; + +int +app_pipeline_thread_cmd_push(struct app_params *app); + +int +app_pipeline_enable(struct app_params *app, + uint32_t core_id, + uint32_t socket_id, + uint32_t hyper_th_id, + uint32_t pipeline_id); + +int +app_pipeline_disable(struct app_params *app, + uint32_t core_id, + uint32_t socket_id, + uint32_t hyper_th_id, + uint32_t pipeline_id); + +#endif /* THREAD_H_ */ diff --git a/examples/ip_pipeline/thread_fe.c b/examples/ip_pipeline/thread_fe.c new file mode 100644 index 0000000..1e97bbb --- /dev/null +++ b/examples/ip_pipeline/thread_fe.c @@ -0,0 +1,323 @@ +#include <rte_common.h> +#include <rte_ring.h> +#include <rte_malloc.h> +#include <cmdline_rdline.h> +#include <cmdline_parse.h> +#include <cmdline_parse_num.h> +#include <cmdline_parse_string.h> +#include <cmdline_parse_ipaddr.h> +#include <cmdline_parse_etheraddr.h> +#include <cmdline_socket.h> +#include <cmdline.h> + +#include "thread.h" +#include "pipeline.h" +#include "pipeline_common_fe.h" +#include "app.h" + +static inline void * +thread_msg_send_recv(struct app_params *app, + uint32_t thread_id, + void *msg, + uint32_t timeout_ms) +{ + struct rte_ring *r_req = app->thread_data[thread_id].msgq_in; + struct rte_ring *r_rsp = app->thread_data[thread_id].msgq_out; + uint64_t hz = rte_get_tsc_hz(); + void *msg_recv; + uint64_t deadline; + int status; + + /* send */ + do { + status = rte_ring_sp_enqueue(r_req, (void *) msg); + } while (status == -ENOBUFS); + + /* recv */ + deadline = (timeout_ms) ? + (rte_rdtsc() + ((hz * timeout_ms) / 1000)) : + UINT64_MAX; + + do { + if (rte_rdtsc() > deadline) + return NULL; + + status = rte_ring_sc_dequeue(r_rsp, &msg_recv); + } while (status != 0); + + return msg_recv; +} + +int +app_pipeline_enable(struct app_params *app, + uint32_t socket_id, + uint32_t core_id, + uint32_t hyper_th_id, + uint32_t pipeline_id) +{ + struct thread_pipeline_enable_msg_req *req; + struct thread_pipeline_enable_msg_rsp *rsp; + + int thread_id; + + struct app_pipeline_data *p; + struct app_pipeline_params *p_params; + struct pipeline_type *p_type; + + if (app_pipeline_data(app, pipeline_id) == NULL) + return -1; + + thread_id = cpu_core_map_get_lcore_id(app->core_map, + socket_id, + core_id, + hyper_th_id); + + if (thread_id < 0) + return -1; + + req = app_msg_alloc(app); + if (req == NULL) + return -1; + + p = &app->pipeline_data[pipeline_id]; + p_params = &app->pipeline_params[pipeline_id]; + p_type = app_pipeline_type_find(app, p_params->type); + + if (p->enabled == 1) + return -1; + + req->type = THREAD_MSG_REQ_PIPELINE_ENABLE; + req->pipeline_id = pipeline_id; + req->be = p->be; + req->f_run = p_type->be_ops->f_run; + req->f_timer = p_type->be_ops->f_timer; + req->timer_period = p->timer_period; + + rsp = thread_msg_send_recv(app, thread_id, req, MSG_TIMEOUT_DEFAULT); + if (rsp == NULL) + return -1; + + if (rsp->status < 0) + return -1; + + p->enabled = 1; + return 0; +} + + +int +app_pipeline_disable(struct app_params *app, + uint32_t socket_id, + uint32_t core_id, + uint32_t hyper_th_id, + uint32_t pipeline_id) +{ + struct thread_pipeline_disable_msg_req *req; + struct thread_pipeline_disable_msg_rsp *rsp; + + int thread_id; + + struct app_pipeline_data *p; + + if (app_pipeline_data(app, pipeline_id) == NULL) + return -1; + + thread_id = cpu_core_map_get_lcore_id(app->core_map, + socket_id, + core_id, + hyper_th_id); + + if (thread_id < 0) + return -1; + + p = &app->pipeline_data[pipeline_id]; + + if (p->enabled == 0) + return -1; + + req = app_msg_alloc(app); + if (req == NULL) + return -1; + + req->type = THREAD_MSG_REQ_PIPELINE_DISABLE; + req->pipeline_id = pipeline_id; + + rsp = thread_msg_send_recv(app, thread_id, req, MSG_TIMEOUT_DEFAULT); + + if (rsp == NULL) + return -1; + + if (rsp->status < 0) + return -1; + + p->enabled = 0; + return 0; +} + +struct cmd_pipeline_enable_result { + cmdline_fixed_string_t t_string; + cmdline_fixed_string_t t_id; + cmdline_fixed_string_t pipeline_string; + uint32_t pipeline_id; + cmdline_fixed_string_t enable_string; +}; + +static void +cmd_pipeline_enable_parsed( + void *parsed_result, + __attribute__((unused)) struct cmdline *cl, + void *data) +{ + struct cmd_pipeline_enable_result *params = parsed_result; + struct app_params *app = data; + + int status; + + uint32_t core_id, socket_id, hyper_th_id; + + if (parse_pipeline_core(&socket_id, + &core_id, + &hyper_th_id, + params->t_id) < 0) { + printf("Command failed\n"); + return; + } + + status = app_pipeline_enable(app, + socket_id, + core_id, + hyper_th_id, + params->pipeline_id); + + if (status != 0) + printf("Command failed\n"); +} + +cmdline_parse_token_string_t cmd_pipeline_enable_t_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, t_string, "t"); + +cmdline_parse_token_string_t cmd_pipeline_enable_t_id = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, t_id, NULL); + +cmdline_parse_token_string_t cmd_pipeline_enable_pipeline_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, pipeline_string, + "pipeline"); + +cmdline_parse_token_num_t cmd_pipeline_enable_pipeline_id = + TOKEN_NUM_INITIALIZER(struct cmd_pipeline_enable_result, pipeline_id, UINT32); + +cmdline_parse_token_string_t cmd_pipeline_enable_enable_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, enable_string, "enable"); + +cmdline_parse_inst_t cmd_pipeline_enable = { + .f = cmd_pipeline_enable_parsed, + .data = NULL, + .help_str = "Enable pipeline on specified core", + .tokens = { + (void *)&cmd_pipeline_enable_t_string, + (void *)&cmd_pipeline_enable_t_id, + (void *)&cmd_pipeline_enable_pipeline_string, + (void *)&cmd_pipeline_enable_pipeline_id, + (void *)&cmd_pipeline_enable_enable_string, + NULL, + }, +}; + +struct cmd_pipeline_disable_result { + cmdline_fixed_string_t t_string; + cmdline_fixed_string_t t_id; + cmdline_fixed_string_t pipeline_string; + uint32_t pipeline_id; + cmdline_fixed_string_t disable_string; +}; + +static void +cmd_pipeline_disable_parsed( + void *parsed_result, + __attribute__((unused)) struct cmdline *cl, + void *data) +{ + struct cmd_pipeline_enable_result *params = parsed_result; + struct app_params *app = data; + + int status; + + uint32_t core_id, socket_id, hyper_th_id; + + if (parse_pipeline_core(&socket_id, + &core_id, + &hyper_th_id, + params->t_id) < 0) { + printf("Command failed\n"); + return; + } + + status = app_pipeline_disable(app, + socket_id, + core_id, + hyper_th_id, + params->pipeline_id); + + if (status != 0) + printf("Command failed\n"); +} + +cmdline_parse_token_string_t cmd_pipeline_disable_t_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result, t_string, "t"); + +cmdline_parse_token_string_t cmd_pipeline_disable_t_id = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result, t_id, NULL); + +cmdline_parse_token_string_t cmd_pipeline_disable_pipeline_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result, pipeline_string, + "pipeline"); + +cmdline_parse_token_num_t cmd_pipeline_disable_pipeline_id = + TOKEN_NUM_INITIALIZER(struct cmd_pipeline_disable_result, pipeline_id, UINT32); + +cmdline_parse_token_string_t cmd_pipeline_disable_enable_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result, disable_string, "disable"); + +cmdline_parse_inst_t cmd_pipeline_disable = { + .f = cmd_pipeline_disable_parsed, + .data = NULL, + .help_str = "Disable pipeline on specified core", + .tokens = { + (void *)&cmd_pipeline_disable_t_string, + (void *)&cmd_pipeline_disable_t_id, + (void *)&cmd_pipeline_disable_pipeline_string, + (void *)&cmd_pipeline_disable_pipeline_id, + (void *)&cmd_pipeline_disable_enable_string, + NULL, + }, +}; + +static cmdline_parse_ctx_t thread_cmds[] = { + (cmdline_parse_inst_t *) &cmd_pipeline_enable, + (cmdline_parse_inst_t *) &cmd_pipeline_disable, + NULL, +}; + +int +app_pipeline_thread_cmd_push(struct app_params *app) +{ + uint32_t n_cmds, i; + + /* Check for available slots in the application commands array */ + n_cmds = RTE_DIM(thread_cmds) - 1; + if (n_cmds > APP_MAX_CMDS - app->n_cmds) + return -ENOMEM; + + /* Push pipeline commands into the application */ + memcpy(&app->cmds[app->n_cmds], + thread_cmds, + n_cmds * sizeof(cmdline_parse_ctx_t *)); + + for (i = 0; i < n_cmds; i++) + app->cmds[app->n_cmds + i]->data = app; + + app->n_cmds += n_cmds; + app->cmds[app->n_cmds] = NULL; + + return 0; +} -- 1.7.9.5 ^ permalink raw reply [flat|nested] 6+ messages in thread
* Re: [dpdk-dev] [PATCH v2 1/1] ip_pipeline: added dynamic pipeline reconfiguration 2015-09-30 16:25 [dpdk-dev] [PATCH v2 1/1] ip_pipeline: added dynamic pipeline reconfiguration Maciej Gajdzica @ 2015-09-30 16:34 ` Dumitrescu, Cristian 2015-10-19 14:00 ` [dpdk-dev] [PATCH v3 " Piotr Azarewicz 1 sibling, 0 replies; 6+ messages in thread From: Dumitrescu, Cristian @ 2015-09-30 16:34 UTC (permalink / raw) To: Gajdzica, MaciejX T, dev > -----Original Message----- > From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Maciej Gajdzica > Sent: Wednesday, September 30, 2015 5:26 PM > To: dev@dpdk.org > Subject: [dpdk-dev] [PATCH v2 1/1] ip_pipeline: added dynamic pipeline > reconfiguration > > Up till now pipeline was bound to thread selected in the initial config. > This patch allows binding pipeline to other threads at runtime using CLI > commands. > > Signed-off-by: Maciej Gajdzica <maciejx.t.gajdzica@intel.com> > --- Acked-by: Cristian Dumitrescu <cristian.dumitrescu@intel.com> ^ permalink raw reply [flat|nested] 6+ messages in thread
* [dpdk-dev] [PATCH v3 1/1] ip_pipeline: added dynamic pipeline reconfiguration 2015-09-30 16:25 [dpdk-dev] [PATCH v2 1/1] ip_pipeline: added dynamic pipeline reconfiguration Maciej Gajdzica 2015-09-30 16:34 ` Dumitrescu, Cristian @ 2015-10-19 14:00 ` Piotr Azarewicz 2015-10-29 9:51 ` [dpdk-dev] [PATCH v4 " Piotr Azarewicz 1 sibling, 1 reply; 6+ messages in thread From: Piotr Azarewicz @ 2015-10-19 14:00 UTC (permalink / raw) To: dev Up till now pipeline was bound to thread selected in the initial config. This patch allows binding pipeline to other threads at runtime using CLI commands. v2 changes: - deleted debug printfs v3 changes: - add timer for thread message request - fix bug that the new functionality can't work - fix leaking memory - cleaning up Signed-off-by: Maciej Gajdzica <maciejx.t.gajdzica@intel.com> Signed-off-by: Piotr Azarewicz <piotrx.t.azarewicz@intel.com> Acked-by: Cristian Dumitrescu <cristian.dumitrescu@intel.com> --- examples/ip_pipeline/Makefile | 1 + examples/ip_pipeline/app.h | 12 + examples/ip_pipeline/config_parse.c | 2 +- examples/ip_pipeline/init.c | 24 ++ examples/ip_pipeline/pipeline.h | 6 + examples/ip_pipeline/pipeline/pipeline_common_fe.h | 3 + examples/ip_pipeline/thread.c | 157 ++++++++- examples/ip_pipeline/thread.h | 84 +++++ examples/ip_pipeline/thread_fe.c | 344 ++++++++++++++++++++ examples/ip_pipeline/thread_fe.h | 95 ++++++ 10 files changed, 720 insertions(+), 8 deletions(-) create mode 100644 examples/ip_pipeline/thread.h create mode 100644 examples/ip_pipeline/thread_fe.c create mode 100644 examples/ip_pipeline/thread_fe.h diff --git a/examples/ip_pipeline/Makefile b/examples/ip_pipeline/Makefile index f3ff1ec..c8e80b5 100644 --- a/examples/ip_pipeline/Makefile +++ b/examples/ip_pipeline/Makefile @@ -54,6 +54,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += config_parse_tm.c SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += config_check.c SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += init.c SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += thread.c +SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += thread_fe.c SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += cpu_core_map.c SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += pipeline_common_be.c diff --git a/examples/ip_pipeline/app.h b/examples/ip_pipeline/app.h index 521e3a0..3a57956 100644 --- a/examples/ip_pipeline/app.h +++ b/examples/ip_pipeline/app.h @@ -220,9 +220,11 @@ struct app_pipeline_data { void *be; void *fe; uint64_t timer_period; + uint32_t enabled; }; struct app_thread_pipeline_data { + uint32_t pipeline_id; void *be; pipeline_be_op_run f_run; pipeline_be_op_timer f_timer; @@ -234,6 +236,10 @@ struct app_thread_pipeline_data { #define APP_MAX_THREAD_PIPELINES 16 #endif +#ifndef APP_THREAD_TIMER_PERIOD +#define APP_THREAD_TIMER_PERIOD 1 +#endif + struct app_thread_data { struct app_thread_pipeline_data regular[APP_MAX_THREAD_PIPELINES]; struct app_thread_pipeline_data custom[APP_MAX_THREAD_PIPELINES]; @@ -241,7 +247,13 @@ struct app_thread_data { uint32_t n_regular; uint32_t n_custom; + uint64_t timer_period; + uint64_t thread_req_deadline; + uint64_t deadline; + + struct rte_ring *msgq_in; + struct rte_ring *msgq_out; }; struct app_eal_params { diff --git a/examples/ip_pipeline/config_parse.c b/examples/ip_pipeline/config_parse.c index c9b78f9..d2aaadf 100644 --- a/examples/ip_pipeline/config_parse.c +++ b/examples/ip_pipeline/config_parse.c @@ -362,7 +362,7 @@ parser_read_uint32(uint32_t *value, const char *p) return 0; } -static int +int parse_pipeline_core(uint32_t *socket, uint32_t *core, uint32_t *ht, diff --git a/examples/ip_pipeline/init.c b/examples/ip_pipeline/init.c index 3f9c68d..8c63879 100644 --- a/examples/ip_pipeline/init.c +++ b/examples/ip_pipeline/init.c @@ -50,6 +50,7 @@ #include "pipeline_firewall.h" #include "pipeline_flow_classification.h" #include "pipeline_routing.h" +#include "thread_fe.h" #define APP_NAME_SIZE 32 @@ -1253,6 +1254,25 @@ app_init_threads(struct app_params *app) t = &app->thread_data[lcore_id]; + t->timer_period = (rte_get_tsc_hz() * APP_THREAD_TIMER_PERIOD) / 1000; + t->thread_req_deadline = time + t->timer_period; + + t->msgq_in = app_thread_msgq_in_get(app, + params->socket_id, + params->core_id, + params->hyper_th_id); + if (t->msgq_in == NULL) + rte_panic("Init error: Cannot find MSGQ_IN for thread %" PRId32, + lcore_id); + + t->msgq_out = app_thread_msgq_out_get(app, + params->socket_id, + params->core_id, + params->hyper_th_id); + if (t->msgq_out == NULL) + rte_panic("Init error: Cannot find MSGQ_OUT for thread %" PRId32, + lcore_id); + ptype = app_pipeline_type_find(app, params->type); if (ptype == NULL) rte_panic("Init error: Unknown pipeline " @@ -1262,12 +1282,15 @@ app_init_threads(struct app_params *app) &t->regular[t->n_regular] : &t->custom[t->n_custom]; + p->pipeline_id = p_id; p->be = data->be; p->f_run = ptype->be_ops->f_run; p->f_timer = ptype->be_ops->f_timer; p->timer_period = data->timer_period; p->deadline = time + data->timer_period; + data->enabled = 1; + if (ptype->be_ops->f_run == NULL) t->n_regular++; else @@ -1288,6 +1311,7 @@ int app_init(struct app_params *app) app_init_msgq(app); app_pipeline_common_cmd_push(app); + app_pipeline_thread_cmd_push(app); app_pipeline_type_register(app, &pipeline_master); app_pipeline_type_register(app, &pipeline_passthrough); app_pipeline_type_register(app, &pipeline_flow_classification); diff --git a/examples/ip_pipeline/pipeline.h b/examples/ip_pipeline/pipeline.h index b9a56ea..dab9c36 100644 --- a/examples/ip_pipeline/pipeline.h +++ b/examples/ip_pipeline/pipeline.h @@ -84,4 +84,10 @@ pipeline_type_cmds_count(struct pipeline_type *ptype) return n_cmds; } +int +parse_pipeline_core(uint32_t *socket, + uint32_t *core, + uint32_t *ht, + const char *entry); + #endif diff --git a/examples/ip_pipeline/pipeline/pipeline_common_fe.h b/examples/ip_pipeline/pipeline/pipeline_common_fe.h index 693848d..e84aa3a 100644 --- a/examples/ip_pipeline/pipeline/pipeline_common_fe.h +++ b/examples/ip_pipeline/pipeline/pipeline_common_fe.h @@ -68,6 +68,9 @@ app_pipeline_data_fe(struct app_params *app, uint32_t id) if (pipeline_data == NULL) return NULL; + if (pipeline_data->enabled == 0) + return NULL; + return pipeline_data->fe; } diff --git a/examples/ip_pipeline/thread.c b/examples/ip_pipeline/thread.c index b2a8656..53be20c 100644 --- a/examples/ip_pipeline/thread.c +++ b/examples/ip_pipeline/thread.c @@ -37,18 +37,147 @@ #include "pipeline_common_be.h" #include "app.h" +#include "thread.h" -int app_thread(void *arg) +static inline void * +thread_msg_recv(struct rte_ring *r) +{ + void *msg; + int status = rte_ring_sc_dequeue(r, &msg); + + if (status != 0) + return NULL; + + return msg; +} + +static inline void +thread_msg_send(struct rte_ring *r, + void *msg) +{ + int status; + + do { + status = rte_ring_sp_enqueue(r, msg); + } while (status == -ENOBUFS); +} + +static int +thread_pipeline_enable(struct app_thread_data *t, + struct thread_pipeline_enable_msg_req *req) +{ + struct app_thread_pipeline_data *p; + + if (req->f_run == NULL) { + if (t->n_regular >= APP_MAX_THREAD_PIPELINES) + return -1; + } else { + if (t->n_custom >= APP_MAX_THREAD_PIPELINES) + return -1; + } + + p = (req->f_run == NULL) ? + &t->regular[t->n_regular] : + &t->custom[t->n_custom]; + + p->pipeline_id = req->pipeline_id; + p->be = req->be; + p->f_run = req->f_run; + p->f_timer = req->f_timer; + p->timer_period = req->timer_period; + p->deadline = 0; + + if (req->f_run == NULL) + t->n_regular++; + else + t->n_custom++; + + return 0; +} + +static int +thread_pipeline_disable(struct app_thread_data *t, + struct thread_pipeline_disable_msg_req *req) +{ + uint32_t i; + + /* search regular pipelines of current thread */ + for (i = 0; i < t->n_regular; i++) { + if (t->regular[i].pipeline_id != req->pipeline_id) + continue; + + if (i < t->n_regular - 1) + memcpy(&t->regular[i], + &t->regular[i+1], + (t->n_regular - 1 - i) * sizeof(struct app_thread_pipeline_data)); + + t->n_regular--; + + return 0; + } + + /* search custom pipelines of current thread */ + for (i = 0; i < t->n_custom; i++) { + if (t->custom[i].pipeline_id != req->pipeline_id) + continue; + + if (i < t->n_custom - 1) + memcpy(&t->custom[i], + &t->custom[i+1], + (t->n_custom - 1 - i) * sizeof(struct app_thread_pipeline_data)); + + t->n_custom--; + + return 0; + } + + /* return if pipeline not found */ + return -1; +} + +static int +thread_msg_req_handle(struct app_thread_data *t) +{ + void *msg_ptr; + struct thread_msg_req *req; + struct thread_msg_rsp *rsp; + + msg_ptr = thread_msg_recv(t->msgq_in); + req = msg_ptr; + rsp = msg_ptr; + + if (req != NULL) + switch (req->type) { + case THREAD_MSG_REQ_PIPELINE_ENABLE: { + rsp->status = thread_pipeline_enable(t, + (struct thread_pipeline_enable_msg_req *) req); + thread_msg_send(t->msgq_out, rsp); + break; + } + + case THREAD_MSG_REQ_PIPELINE_DISABLE: { + rsp->status = thread_pipeline_disable(t, + (struct thread_pipeline_disable_msg_req *) req); + thread_msg_send(t->msgq_out, rsp); + break; + } + default: + break; + } + + return 0; +} + +int +app_thread(void *arg) { struct app_params *app = (struct app_params *) arg; uint32_t core_id = rte_lcore_id(), i, j; struct app_thread_data *t = &app->thread_data[core_id]; - uint32_t n_regular = RTE_MIN(t->n_regular, RTE_DIM(t->regular)); - uint32_t n_custom = RTE_MIN(t->n_custom, RTE_DIM(t->custom)); for (i = 0; ; i++) { /* Run regular pipelines */ - for (j = 0; j < n_regular; j++) { + for (j = 0; j < t->n_regular; j++) { struct app_thread_pipeline_data *data = &t->regular[j]; struct pipeline *p = data->be; @@ -56,7 +185,7 @@ int app_thread(void *arg) } /* Run custom pipelines */ - for (j = 0; j < n_custom; j++) { + for (j = 0; j < t->n_custom; j++) { struct app_thread_pipeline_data *data = &t->custom[j]; data->f_run(data->be); @@ -71,7 +200,7 @@ int app_thread(void *arg) continue; /* Timer for regular pipelines */ - for (j = 0; j < n_regular; j++) { + for (j = 0; j < t->n_regular; j++) { struct app_thread_pipeline_data *data = &t->regular[j]; uint64_t p_deadline = data->deadline; @@ -87,7 +216,7 @@ int app_thread(void *arg) } /* Timer for custom pipelines */ - for (j = 0; j < n_custom; j++) { + for (j = 0; j < t->n_custom; j++) { struct app_thread_pipeline_data *data = &t->custom[j]; uint64_t p_deadline = data->deadline; @@ -102,6 +231,20 @@ int app_thread(void *arg) t_deadline = p_deadline; } + /* Timer for thread message request */ + { + uint64_t deadline = t->thread_req_deadline; + + if (deadline <= time) { + thread_msg_req_handle(t); + deadline = time + t->timer_period; + t->thread_req_deadline = deadline; + } + + if (deadline < t_deadline) + t_deadline = deadline; + } + t->deadline = t_deadline; } } diff --git a/examples/ip_pipeline/thread.h b/examples/ip_pipeline/thread.h new file mode 100644 index 0000000..dc877c0 --- /dev/null +++ b/examples/ip_pipeline/thread.h @@ -0,0 +1,84 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2010-2015 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef THREAD_H_ +#define THREAD_H_ + +#include "app.h" +#include "pipeline_be.h" + +enum thread_msg_req_type { + THREAD_MSG_REQ_PIPELINE_ENABLE = 0, + THREAD_MSG_REQ_PIPELINE_DISABLE, + THREAD_MSG_REQS +}; + +struct thread_msg_req { + enum thread_msg_req_type type; +}; + +struct thread_msg_rsp { + int status; +}; + +/* + * PIPELINE ENABLE + */ +struct thread_pipeline_enable_msg_req { + enum thread_msg_req_type type; + + uint32_t pipeline_id; + void *be; + pipeline_be_op_run f_run; + pipeline_be_op_timer f_timer; + uint64_t timer_period; +}; + +struct thread_pipeline_enable_msg_rsp { + int status; +}; + +/* + * PIPELINE DISABLE + */ +struct thread_pipeline_disable_msg_req { + enum thread_msg_req_type type; + + uint32_t pipeline_id; +}; + +struct thread_pipeline_disable_msg_rsp { + int status; +}; + +#endif /* THREAD_H_ */ diff --git a/examples/ip_pipeline/thread_fe.c b/examples/ip_pipeline/thread_fe.c new file mode 100644 index 0000000..b71153a --- /dev/null +++ b/examples/ip_pipeline/thread_fe.c @@ -0,0 +1,344 @@ +#include <rte_common.h> +#include <rte_ring.h> +#include <rte_malloc.h> +#include <cmdline_rdline.h> +#include <cmdline_parse.h> +#include <cmdline_parse_num.h> +#include <cmdline_parse_string.h> +#include <cmdline_parse_ipaddr.h> +#include <cmdline_parse_etheraddr.h> +#include <cmdline_socket.h> +#include <cmdline.h> + +#include "thread.h" +#include "thread_fe.h" +#include "pipeline.h" +#include "pipeline_common_fe.h" +#include "app.h" + +static inline void * +thread_msg_send_recv(struct app_params *app, + uint32_t socket_id, uint32_t core_id, uint32_t ht_id, + void *msg, + uint32_t timeout_ms) +{ + struct rte_ring *r_req = app_thread_msgq_in_get(app, + socket_id, core_id, ht_id); + struct rte_ring *r_rsp = app_thread_msgq_out_get(app, + socket_id, core_id, ht_id); + uint64_t hz = rte_get_tsc_hz(); + void *msg_recv; + uint64_t deadline; + int status; + + /* send */ + do { + status = rte_ring_sp_enqueue(r_req, (void *) msg); + } while (status == -ENOBUFS); + + /* recv */ + deadline = (timeout_ms) ? + (rte_rdtsc() + ((hz * timeout_ms) / 1000)) : + UINT64_MAX; + + do { + if (rte_rdtsc() > deadline) + return NULL; + + status = rte_ring_sc_dequeue(r_rsp, &msg_recv); + } while (status != 0); + + return msg_recv; +} + +int +app_pipeline_enable(struct app_params *app, + uint32_t socket_id, + uint32_t core_id, + uint32_t hyper_th_id, + uint32_t pipeline_id) +{ + struct thread_pipeline_enable_msg_req *req; + struct thread_pipeline_enable_msg_rsp *rsp; + int thread_id; + struct app_pipeline_data *p; + struct app_pipeline_params *p_params; + struct pipeline_type *p_type; + int status; + + if (app_pipeline_data(app, pipeline_id) == NULL) + return -1; + + thread_id = cpu_core_map_get_lcore_id(app->core_map, + socket_id, + core_id, + hyper_th_id); + + if (thread_id < 0) + return -1; + + p = &app->pipeline_data[pipeline_id]; + p_params = &app->pipeline_params[pipeline_id]; + p_type = app_pipeline_type_find(app, p_params->type); + + if (p->enabled == 1) + return -1; + + req = app_msg_alloc(app); + if (req == NULL) + return -1; + + req->type = THREAD_MSG_REQ_PIPELINE_ENABLE; + req->pipeline_id = pipeline_id; + req->be = p->be; + req->f_run = p_type->be_ops->f_run; + req->f_timer = p_type->be_ops->f_timer; + req->timer_period = p->timer_period; + + rsp = thread_msg_send_recv(app, + socket_id, core_id, hyper_th_id, req, MSG_TIMEOUT_DEFAULT); + if (rsp == NULL) + return -1; + + status = rsp->status; + app_msg_free(app, rsp); + + if (status != 0) + return -1; + + p->enabled = 1; + return 0; +} + +int +app_pipeline_disable(struct app_params *app, + uint32_t socket_id, + uint32_t core_id, + uint32_t hyper_th_id, + uint32_t pipeline_id) +{ + struct thread_pipeline_disable_msg_req *req; + struct thread_pipeline_disable_msg_rsp *rsp; + int thread_id; + struct app_pipeline_data *p; + int status; + + if (app_pipeline_data(app, pipeline_id) == NULL) + return -1; + + if (strcmp(app->pipeline_params[pipeline_id].type, "MASTER") == 0) + return -1; + + thread_id = cpu_core_map_get_lcore_id(app->core_map, + socket_id, + core_id, + hyper_th_id); + + if (thread_id < 0) + return -1; + + p = &app->pipeline_data[pipeline_id]; + + if (p->enabled == 0) + return -1; + + req = app_msg_alloc(app); + if (req == NULL) + return -1; + + req->type = THREAD_MSG_REQ_PIPELINE_DISABLE; + req->pipeline_id = pipeline_id; + + rsp = thread_msg_send_recv(app, + socket_id, core_id, hyper_th_id, req, MSG_TIMEOUT_DEFAULT); + + if (rsp == NULL) + return -1; + + status = rsp->status; + app_msg_free(app, rsp); + + if (status != 0) + return -1; + + p->enabled = 0; + return 0; +} + +/* + * pipeline enable + */ + +struct cmd_pipeline_enable_result { + cmdline_fixed_string_t t_string; + cmdline_fixed_string_t t_id_string; + cmdline_fixed_string_t pipeline_string; + uint32_t pipeline_id; + cmdline_fixed_string_t enable_string; +}; + +static void +cmd_pipeline_enable_parsed( + void *parsed_result, + __rte_unused struct cmdline *cl, + void *data) +{ + struct cmd_pipeline_enable_result *params = parsed_result; + struct app_params *app = data; + int status; + uint32_t core_id, socket_id, hyper_th_id; + + if (parse_pipeline_core(&socket_id, + &core_id, + &hyper_th_id, + params->t_id_string) != 0) { + printf("Command failed\n"); + return; + } + + status = app_pipeline_enable(app, + socket_id, + core_id, + hyper_th_id, + params->pipeline_id); + + if (status != 0) + printf("Command failed\n"); +} + +cmdline_parse_token_string_t cmd_pipeline_enable_t_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, t_string, "t"); + +cmdline_parse_token_string_t cmd_pipeline_enable_t_id_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, t_id_string, + NULL); + +cmdline_parse_token_string_t cmd_pipeline_enable_pipeline_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, pipeline_string, + "pipeline"); + +cmdline_parse_token_num_t cmd_pipeline_enable_pipeline_id = + TOKEN_NUM_INITIALIZER(struct cmd_pipeline_enable_result, pipeline_id, + UINT32); + +cmdline_parse_token_string_t cmd_pipeline_enable_enable_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, enable_string, + "enable"); + +cmdline_parse_inst_t cmd_pipeline_enable = { + .f = cmd_pipeline_enable_parsed, + .data = NULL, + .help_str = "Enable pipeline on specified core", + .tokens = { + (void *)&cmd_pipeline_enable_t_string, + (void *)&cmd_pipeline_enable_t_id_string, + (void *)&cmd_pipeline_enable_pipeline_string, + (void *)&cmd_pipeline_enable_pipeline_id, + (void *)&cmd_pipeline_enable_enable_string, + NULL, + }, +}; + +/* + * pipeline disable + */ + +struct cmd_pipeline_disable_result { + cmdline_fixed_string_t t_string; + cmdline_fixed_string_t t_id_string; + cmdline_fixed_string_t pipeline_string; + uint32_t pipeline_id; + cmdline_fixed_string_t disable_string; +}; + +static void +cmd_pipeline_disable_parsed( + void *parsed_result, + __rte_unused struct cmdline *cl, + void *data) +{ + struct cmd_pipeline_disable_result *params = parsed_result; + struct app_params *app = data; + int status; + uint32_t core_id, socket_id, hyper_th_id; + + if (parse_pipeline_core(&socket_id, + &core_id, + &hyper_th_id, + params->t_id_string) != 0) { + printf("Command failed\n"); + return; + } + + status = app_pipeline_disable(app, + socket_id, + core_id, + hyper_th_id, + params->pipeline_id); + + if (status != 0) + printf("Command failed\n"); +} + +cmdline_parse_token_string_t cmd_pipeline_disable_t_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result, t_string, "t"); + +cmdline_parse_token_string_t cmd_pipeline_disable_t_id_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result, t_id_string, + NULL); + +cmdline_parse_token_string_t cmd_pipeline_disable_pipeline_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result, + pipeline_string, "pipeline"); + +cmdline_parse_token_num_t cmd_pipeline_disable_pipeline_id = + TOKEN_NUM_INITIALIZER(struct cmd_pipeline_disable_result, pipeline_id, + UINT32); + +cmdline_parse_token_string_t cmd_pipeline_disable_disable_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result, disable_string, + "disable"); + +cmdline_parse_inst_t cmd_pipeline_disable = { + .f = cmd_pipeline_disable_parsed, + .data = NULL, + .help_str = "Disable pipeline on specified core", + .tokens = { + (void *)&cmd_pipeline_disable_t_string, + (void *)&cmd_pipeline_disable_t_id_string, + (void *)&cmd_pipeline_disable_pipeline_string, + (void *)&cmd_pipeline_disable_pipeline_id, + (void *)&cmd_pipeline_disable_disable_string, + NULL, + }, +}; + +static cmdline_parse_ctx_t thread_cmds[] = { + (cmdline_parse_inst_t *) &cmd_pipeline_enable, + (cmdline_parse_inst_t *) &cmd_pipeline_disable, + NULL, +}; + +int +app_pipeline_thread_cmd_push(struct app_params *app) +{ + uint32_t n_cmds, i; + + /* Check for available slots in the application commands array */ + n_cmds = RTE_DIM(thread_cmds) - 1; + if (n_cmds > APP_MAX_CMDS - app->n_cmds) + return -ENOMEM; + + /* Push thread commands into the application */ + memcpy(&app->cmds[app->n_cmds], + thread_cmds, + n_cmds * sizeof(cmdline_parse_ctx_t *)); + + for (i = 0; i < n_cmds; i++) + app->cmds[app->n_cmds + i]->data = app; + + app->n_cmds += n_cmds; + app->cmds[app->n_cmds] = NULL; + + return 0; +} diff --git a/examples/ip_pipeline/thread_fe.h b/examples/ip_pipeline/thread_fe.h new file mode 100644 index 0000000..52352c1 --- /dev/null +++ b/examples/ip_pipeline/thread_fe.h @@ -0,0 +1,95 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2010-2015 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef THREAD_FE_H_ +#define THREAD_FE_H_ + +static inline struct rte_ring * +app_thread_msgq_in_get(struct app_params *app, + uint32_t socket_id, uint32_t core_id, uint32_t ht_id) +{ + char msgq_name[32]; + ssize_t param_idx; + + snprintf(msgq_name, sizeof(msgq_name), + "MSGQ-REQ-CORE-s%" PRIu32 "c%" PRIu32 "%s", + socket_id, + core_id, + (ht_id) ? "h" : ""); + param_idx = APP_PARAM_FIND(app->msgq_params, msgq_name); + + if (param_idx < 0) + return NULL; + + return app->msgq[param_idx]; +} + +static inline struct rte_ring * +app_thread_msgq_out_get(struct app_params *app, + uint32_t socket_id, uint32_t core_id, uint32_t ht_id) +{ + char msgq_name[32]; + ssize_t param_idx; + + snprintf(msgq_name, sizeof(msgq_name), + "MSGQ-RSP-CORE-s%" PRIu32 "c%" PRIu32 "%s", + socket_id, + core_id, + (ht_id) ? "h" : ""); + param_idx = APP_PARAM_FIND(app->msgq_params, msgq_name); + + if (param_idx < 0) + return NULL; + + return app->msgq[param_idx]; + +} + +int +app_pipeline_thread_cmd_push(struct app_params *app); + +int +app_pipeline_enable(struct app_params *app, + uint32_t core_id, + uint32_t socket_id, + uint32_t hyper_th_id, + uint32_t pipeline_id); + +int +app_pipeline_disable(struct app_params *app, + uint32_t core_id, + uint32_t socket_id, + uint32_t hyper_th_id, + uint32_t pipeline_id); + +#endif /* THREAD_FE_H_ */ -- 1.7.9.5 ^ permalink raw reply [flat|nested] 6+ messages in thread
* [dpdk-dev] [PATCH v4 1/1] ip_pipeline: added dynamic pipeline reconfiguration 2015-10-19 14:00 ` [dpdk-dev] [PATCH v3 " Piotr Azarewicz @ 2015-10-29 9:51 ` Piotr Azarewicz 2015-10-29 15:36 ` [dpdk-dev] [PATCH v5 " Piotr Azarewicz 0 siblings, 1 reply; 6+ messages in thread From: Piotr Azarewicz @ 2015-10-29 9:51 UTC (permalink / raw) To: dev Up till now pipeline was bound to thread selected in the initial config. This patch allows binding pipeline to other threads at runtime using CLI commands. v2 changes: - deleted debug printfs v3 changes: - add timer for thread message request - fix bug that the new functionality can't work - fix leaking memory - cleaning up v4 changes: - fix compilation with gcc 5.1 - add proper checking if thread exist Signed-off-by: Maciej Gajdzica <maciejx.t.gajdzica@intel.com> Signed-off-by: Piotr Azarewicz <piotrx.t.azarewicz@intel.com> Acked-by: Cristian Dumitrescu <cristian.dumitrescu@intel.com> --- examples/ip_pipeline/Makefile | 1 + examples/ip_pipeline/app.h | 12 + examples/ip_pipeline/config_parse.c | 2 +- examples/ip_pipeline/init.c | 24 ++ examples/ip_pipeline/pipeline.h | 6 + examples/ip_pipeline/pipeline/pipeline_common_fe.h | 3 + examples/ip_pipeline/thread.c | 153 ++++++++- examples/ip_pipeline/thread.h | 84 +++++ examples/ip_pipeline/thread_fe.c | 350 ++++++++++++++++++++ examples/ip_pipeline/thread_fe.h | 95 ++++++ 10 files changed, 728 insertions(+), 2 deletions(-) create mode 100644 examples/ip_pipeline/thread.h create mode 100644 examples/ip_pipeline/thread_fe.c create mode 100644 examples/ip_pipeline/thread_fe.h diff --git a/examples/ip_pipeline/Makefile b/examples/ip_pipeline/Makefile index f3ff1ec..c8e80b5 100644 --- a/examples/ip_pipeline/Makefile +++ b/examples/ip_pipeline/Makefile @@ -54,6 +54,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += config_parse_tm.c SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += config_check.c SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += init.c SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += thread.c +SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += thread_fe.c SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += cpu_core_map.c SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += pipeline_common_be.c diff --git a/examples/ip_pipeline/app.h b/examples/ip_pipeline/app.h index 521e3a0..3a57956 100644 --- a/examples/ip_pipeline/app.h +++ b/examples/ip_pipeline/app.h @@ -220,9 +220,11 @@ struct app_pipeline_data { void *be; void *fe; uint64_t timer_period; + uint32_t enabled; }; struct app_thread_pipeline_data { + uint32_t pipeline_id; void *be; pipeline_be_op_run f_run; pipeline_be_op_timer f_timer; @@ -234,6 +236,10 @@ struct app_thread_pipeline_data { #define APP_MAX_THREAD_PIPELINES 16 #endif +#ifndef APP_THREAD_TIMER_PERIOD +#define APP_THREAD_TIMER_PERIOD 1 +#endif + struct app_thread_data { struct app_thread_pipeline_data regular[APP_MAX_THREAD_PIPELINES]; struct app_thread_pipeline_data custom[APP_MAX_THREAD_PIPELINES]; @@ -241,7 +247,13 @@ struct app_thread_data { uint32_t n_regular; uint32_t n_custom; + uint64_t timer_period; + uint64_t thread_req_deadline; + uint64_t deadline; + + struct rte_ring *msgq_in; + struct rte_ring *msgq_out; }; struct app_eal_params { diff --git a/examples/ip_pipeline/config_parse.c b/examples/ip_pipeline/config_parse.c index c9b78f9..d2aaadf 100644 --- a/examples/ip_pipeline/config_parse.c +++ b/examples/ip_pipeline/config_parse.c @@ -362,7 +362,7 @@ parser_read_uint32(uint32_t *value, const char *p) return 0; } -static int +int parse_pipeline_core(uint32_t *socket, uint32_t *core, uint32_t *ht, diff --git a/examples/ip_pipeline/init.c b/examples/ip_pipeline/init.c index 3f9c68d..8c63879 100644 --- a/examples/ip_pipeline/init.c +++ b/examples/ip_pipeline/init.c @@ -50,6 +50,7 @@ #include "pipeline_firewall.h" #include "pipeline_flow_classification.h" #include "pipeline_routing.h" +#include "thread_fe.h" #define APP_NAME_SIZE 32 @@ -1253,6 +1254,25 @@ app_init_threads(struct app_params *app) t = &app->thread_data[lcore_id]; + t->timer_period = (rte_get_tsc_hz() * APP_THREAD_TIMER_PERIOD) / 1000; + t->thread_req_deadline = time + t->timer_period; + + t->msgq_in = app_thread_msgq_in_get(app, + params->socket_id, + params->core_id, + params->hyper_th_id); + if (t->msgq_in == NULL) + rte_panic("Init error: Cannot find MSGQ_IN for thread %" PRId32, + lcore_id); + + t->msgq_out = app_thread_msgq_out_get(app, + params->socket_id, + params->core_id, + params->hyper_th_id); + if (t->msgq_out == NULL) + rte_panic("Init error: Cannot find MSGQ_OUT for thread %" PRId32, + lcore_id); + ptype = app_pipeline_type_find(app, params->type); if (ptype == NULL) rte_panic("Init error: Unknown pipeline " @@ -1262,12 +1282,15 @@ app_init_threads(struct app_params *app) &t->regular[t->n_regular] : &t->custom[t->n_custom]; + p->pipeline_id = p_id; p->be = data->be; p->f_run = ptype->be_ops->f_run; p->f_timer = ptype->be_ops->f_timer; p->timer_period = data->timer_period; p->deadline = time + data->timer_period; + data->enabled = 1; + if (ptype->be_ops->f_run == NULL) t->n_regular++; else @@ -1288,6 +1311,7 @@ int app_init(struct app_params *app) app_init_msgq(app); app_pipeline_common_cmd_push(app); + app_pipeline_thread_cmd_push(app); app_pipeline_type_register(app, &pipeline_master); app_pipeline_type_register(app, &pipeline_passthrough); app_pipeline_type_register(app, &pipeline_flow_classification); diff --git a/examples/ip_pipeline/pipeline.h b/examples/ip_pipeline/pipeline.h index b9a56ea..dab9c36 100644 --- a/examples/ip_pipeline/pipeline.h +++ b/examples/ip_pipeline/pipeline.h @@ -84,4 +84,10 @@ pipeline_type_cmds_count(struct pipeline_type *ptype) return n_cmds; } +int +parse_pipeline_core(uint32_t *socket, + uint32_t *core, + uint32_t *ht, + const char *entry); + #endif diff --git a/examples/ip_pipeline/pipeline/pipeline_common_fe.h b/examples/ip_pipeline/pipeline/pipeline_common_fe.h index 693848d..e84aa3a 100644 --- a/examples/ip_pipeline/pipeline/pipeline_common_fe.h +++ b/examples/ip_pipeline/pipeline/pipeline_common_fe.h @@ -68,6 +68,9 @@ app_pipeline_data_fe(struct app_params *app, uint32_t id) if (pipeline_data == NULL) return NULL; + if (pipeline_data->enabled == 0) + return NULL; + return pipeline_data->fe; } diff --git a/examples/ip_pipeline/thread.c b/examples/ip_pipeline/thread.c index b2a8656..a0f3b7c 100644 --- a/examples/ip_pipeline/thread.c +++ b/examples/ip_pipeline/thread.c @@ -37,8 +37,143 @@ #include "pipeline_common_be.h" #include "app.h" +#include "thread.h" -int app_thread(void *arg) +static inline void * +thread_msg_recv(struct rte_ring *r) +{ + void *msg; + int status = rte_ring_sc_dequeue(r, &msg); + + if (status != 0) + return NULL; + + return msg; +} + +static inline void +thread_msg_send(struct rte_ring *r, + void *msg) +{ + int status; + + do { + status = rte_ring_sp_enqueue(r, msg); + } while (status == -ENOBUFS); +} + +static int +thread_pipeline_enable(struct app_thread_data *t, + struct thread_pipeline_enable_msg_req *req) +{ + struct app_thread_pipeline_data *p; + + if (req->f_run == NULL) { + if (t->n_regular >= APP_MAX_THREAD_PIPELINES) + return -1; + } else { + if (t->n_custom >= APP_MAX_THREAD_PIPELINES) + return -1; + } + + p = (req->f_run == NULL) ? + &t->regular[t->n_regular] : + &t->custom[t->n_custom]; + + p->pipeline_id = req->pipeline_id; + p->be = req->be; + p->f_run = req->f_run; + p->f_timer = req->f_timer; + p->timer_period = req->timer_period; + p->deadline = 0; + + if (req->f_run == NULL) + t->n_regular++; + else + t->n_custom++; + + return 0; +} + +static int +thread_pipeline_disable(struct app_thread_data *t, + struct thread_pipeline_disable_msg_req *req) +{ + uint32_t i; + uint32_t n_regular = RTE_MIN(t->n_regular, RTE_DIM(t->regular)); + uint32_t n_custom = RTE_MIN(t->n_custom, RTE_DIM(t->custom)); + + /* search regular pipelines of current thread */ + for (i = 0; i < n_regular; i++) { + if (t->regular[i].pipeline_id != req->pipeline_id) + continue; + + n_regular--; + if (i < n_regular) + memcpy(&t->regular[i], + &t->regular[i+1], + (n_regular - i) * sizeof(struct app_thread_pipeline_data)); + + t->n_regular = n_regular; + + return 0; + } + + /* search custom pipelines of current thread */ + for (i = 0; i < n_custom; i++) { + if (t->custom[i].pipeline_id != req->pipeline_id) + continue; + + n_custom--; + if (i < n_custom) + memcpy(&t->custom[i], + &t->custom[i+1], + (n_custom - i) * sizeof(struct app_thread_pipeline_data)); + + t->n_custom = n_custom; + + return 0; + } + + /* return if pipeline not found */ + return -1; +} + +static int +thread_msg_req_handle(struct app_thread_data *t) +{ + void *msg_ptr; + struct thread_msg_req *req; + struct thread_msg_rsp *rsp; + + msg_ptr = thread_msg_recv(t->msgq_in); + req = msg_ptr; + rsp = msg_ptr; + + if (req != NULL) + switch (req->type) { + case THREAD_MSG_REQ_PIPELINE_ENABLE: { + rsp->status = thread_pipeline_enable(t, + (struct thread_pipeline_enable_msg_req *) req); + thread_msg_send(t->msgq_out, rsp); + break; + } + + case THREAD_MSG_REQ_PIPELINE_DISABLE: { + rsp->status = thread_pipeline_disable(t, + (struct thread_pipeline_disable_msg_req *) req); + thread_msg_send(t->msgq_out, rsp); + break; + } + default: + break; + } + + return 0; +} + +int +app_thread(void *arg) { struct app_params *app = (struct app_params *) arg; uint32_t core_id = rte_lcore_id(), i, j; @@ -102,6 +237,22 @@ int app_thread(void *arg) t_deadline = p_deadline; } + /* Timer for thread message request */ + { + uint64_t deadline = t->thread_req_deadline; + + if (deadline <= time) { + thread_msg_req_handle(t); + n_regular = RTE_MIN(t->n_regular, RTE_DIM(t->regular)); + n_custom = RTE_MIN(t->n_custom, RTE_DIM(t->custom)); + deadline = time + t->timer_period; + t->thread_req_deadline = deadline; + } + + if (deadline < t_deadline) + t_deadline = deadline; + } + t->deadline = t_deadline; } } diff --git a/examples/ip_pipeline/thread.h b/examples/ip_pipeline/thread.h new file mode 100644 index 0000000..dc877c0 --- /dev/null +++ b/examples/ip_pipeline/thread.h @@ -0,0 +1,84 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2010-2015 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef THREAD_H_ +#define THREAD_H_ + +#include "app.h" +#include "pipeline_be.h" + +enum thread_msg_req_type { + THREAD_MSG_REQ_PIPELINE_ENABLE = 0, + THREAD_MSG_REQ_PIPELINE_DISABLE, + THREAD_MSG_REQS +}; + +struct thread_msg_req { + enum thread_msg_req_type type; +}; + +struct thread_msg_rsp { + int status; +}; + +/* + * PIPELINE ENABLE + */ +struct thread_pipeline_enable_msg_req { + enum thread_msg_req_type type; + + uint32_t pipeline_id; + void *be; + pipeline_be_op_run f_run; + pipeline_be_op_timer f_timer; + uint64_t timer_period; +}; + +struct thread_pipeline_enable_msg_rsp { + int status; +}; + +/* + * PIPELINE DISABLE + */ +struct thread_pipeline_disable_msg_req { + enum thread_msg_req_type type; + + uint32_t pipeline_id; +}; + +struct thread_pipeline_disable_msg_rsp { + int status; +}; + +#endif /* THREAD_H_ */ diff --git a/examples/ip_pipeline/thread_fe.c b/examples/ip_pipeline/thread_fe.c new file mode 100644 index 0000000..06ff05a --- /dev/null +++ b/examples/ip_pipeline/thread_fe.c @@ -0,0 +1,350 @@ +#include <rte_common.h> +#include <rte_ring.h> +#include <rte_malloc.h> +#include <cmdline_rdline.h> +#include <cmdline_parse.h> +#include <cmdline_parse_num.h> +#include <cmdline_parse_string.h> +#include <cmdline_parse_ipaddr.h> +#include <cmdline_parse_etheraddr.h> +#include <cmdline_socket.h> +#include <cmdline.h> + +#include "thread.h" +#include "thread_fe.h" +#include "pipeline.h" +#include "pipeline_common_fe.h" +#include "app.h" + +static inline void * +thread_msg_send_recv(struct app_params *app, + uint32_t socket_id, uint32_t core_id, uint32_t ht_id, + void *msg, + uint32_t timeout_ms) +{ + struct rte_ring *r_req = app_thread_msgq_in_get(app, + socket_id, core_id, ht_id); + struct rte_ring *r_rsp = app_thread_msgq_out_get(app, + socket_id, core_id, ht_id); + uint64_t hz = rte_get_tsc_hz(); + void *msg_recv; + uint64_t deadline; + int status; + + /* send */ + do { + status = rte_ring_sp_enqueue(r_req, (void *) msg); + } while (status == -ENOBUFS); + + /* recv */ + deadline = (timeout_ms) ? + (rte_rdtsc() + ((hz * timeout_ms) / 1000)) : + UINT64_MAX; + + do { + if (rte_rdtsc() > deadline) + return NULL; + + status = rte_ring_sc_dequeue(r_rsp, &msg_recv); + } while (status != 0); + + return msg_recv; +} + +int +app_pipeline_enable(struct app_params *app, + uint32_t socket_id, + uint32_t core_id, + uint32_t hyper_th_id, + uint32_t pipeline_id) +{ + struct thread_pipeline_enable_msg_req *req; + struct thread_pipeline_enable_msg_rsp *rsp; + int thread_id; + struct app_pipeline_data *p; + struct app_pipeline_params *p_params; + struct pipeline_type *p_type; + int status; + + if (app_pipeline_data(app, pipeline_id) == NULL) + return -1; + + thread_id = cpu_core_map_get_lcore_id(app->core_map, + socket_id, + core_id, + hyper_th_id); + + if (thread_id < 0) + return -1; + + if (!(app->core_mask & (1LLU << thread_id))) + return -1; + + p = &app->pipeline_data[pipeline_id]; + p_params = &app->pipeline_params[pipeline_id]; + p_type = app_pipeline_type_find(app, p_params->type); + + if (p->enabled == 1) + return -1; + + req = app_msg_alloc(app); + if (req == NULL) + return -1; + + req->type = THREAD_MSG_REQ_PIPELINE_ENABLE; + req->pipeline_id = pipeline_id; + req->be = p->be; + req->f_run = p_type->be_ops->f_run; + req->f_timer = p_type->be_ops->f_timer; + req->timer_period = p->timer_period; + + rsp = thread_msg_send_recv(app, + socket_id, core_id, hyper_th_id, req, MSG_TIMEOUT_DEFAULT); + if (rsp == NULL) + return -1; + + status = rsp->status; + app_msg_free(app, rsp); + + if (status != 0) + return -1; + + p->enabled = 1; + return 0; +} + +int +app_pipeline_disable(struct app_params *app, + uint32_t socket_id, + uint32_t core_id, + uint32_t hyper_th_id, + uint32_t pipeline_id) +{ + struct thread_pipeline_disable_msg_req *req; + struct thread_pipeline_disable_msg_rsp *rsp; + int thread_id; + struct app_pipeline_data *p; + int status; + + if (app_pipeline_data(app, pipeline_id) == NULL) + return -1; + + if (strcmp(app->pipeline_params[pipeline_id].type, "MASTER") == 0) + return -1; + + thread_id = cpu_core_map_get_lcore_id(app->core_map, + socket_id, + core_id, + hyper_th_id); + + if (thread_id < 0) + return -1; + + if (!(app->core_mask & (1LLU << thread_id))) + return -1; + + p = &app->pipeline_data[pipeline_id]; + + if (p->enabled == 0) + return -1; + + req = app_msg_alloc(app); + if (req == NULL) + return -1; + + req->type = THREAD_MSG_REQ_PIPELINE_DISABLE; + req->pipeline_id = pipeline_id; + + rsp = thread_msg_send_recv(app, + socket_id, core_id, hyper_th_id, req, MSG_TIMEOUT_DEFAULT); + + if (rsp == NULL) + return -1; + + status = rsp->status; + app_msg_free(app, rsp); + + if (status != 0) + return -1; + + p->enabled = 0; + return 0; +} + +/* + * pipeline enable + */ + +struct cmd_pipeline_enable_result { + cmdline_fixed_string_t t_string; + cmdline_fixed_string_t t_id_string; + cmdline_fixed_string_t pipeline_string; + uint32_t pipeline_id; + cmdline_fixed_string_t enable_string; +}; + +static void +cmd_pipeline_enable_parsed( + void *parsed_result, + __rte_unused struct cmdline *cl, + void *data) +{ + struct cmd_pipeline_enable_result *params = parsed_result; + struct app_params *app = data; + int status; + uint32_t core_id, socket_id, hyper_th_id; + + if (parse_pipeline_core(&socket_id, + &core_id, + &hyper_th_id, + params->t_id_string) != 0) { + printf("Command failed\n"); + return; + } + + status = app_pipeline_enable(app, + socket_id, + core_id, + hyper_th_id, + params->pipeline_id); + + if (status != 0) + printf("Command failed\n"); +} + +cmdline_parse_token_string_t cmd_pipeline_enable_t_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, t_string, "t"); + +cmdline_parse_token_string_t cmd_pipeline_enable_t_id_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, t_id_string, + NULL); + +cmdline_parse_token_string_t cmd_pipeline_enable_pipeline_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, pipeline_string, + "pipeline"); + +cmdline_parse_token_num_t cmd_pipeline_enable_pipeline_id = + TOKEN_NUM_INITIALIZER(struct cmd_pipeline_enable_result, pipeline_id, + UINT32); + +cmdline_parse_token_string_t cmd_pipeline_enable_enable_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, enable_string, + "enable"); + +cmdline_parse_inst_t cmd_pipeline_enable = { + .f = cmd_pipeline_enable_parsed, + .data = NULL, + .help_str = "Enable pipeline on specified core", + .tokens = { + (void *)&cmd_pipeline_enable_t_string, + (void *)&cmd_pipeline_enable_t_id_string, + (void *)&cmd_pipeline_enable_pipeline_string, + (void *)&cmd_pipeline_enable_pipeline_id, + (void *)&cmd_pipeline_enable_enable_string, + NULL, + }, +}; + +/* + * pipeline disable + */ + +struct cmd_pipeline_disable_result { + cmdline_fixed_string_t t_string; + cmdline_fixed_string_t t_id_string; + cmdline_fixed_string_t pipeline_string; + uint32_t pipeline_id; + cmdline_fixed_string_t disable_string; +}; + +static void +cmd_pipeline_disable_parsed( + void *parsed_result, + __rte_unused struct cmdline *cl, + void *data) +{ + struct cmd_pipeline_disable_result *params = parsed_result; + struct app_params *app = data; + int status; + uint32_t core_id, socket_id, hyper_th_id; + + if (parse_pipeline_core(&socket_id, + &core_id, + &hyper_th_id, + params->t_id_string) != 0) { + printf("Command failed\n"); + return; + } + + status = app_pipeline_disable(app, + socket_id, + core_id, + hyper_th_id, + params->pipeline_id); + + if (status != 0) + printf("Command failed\n"); +} + +cmdline_parse_token_string_t cmd_pipeline_disable_t_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result, t_string, "t"); + +cmdline_parse_token_string_t cmd_pipeline_disable_t_id_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result, t_id_string, + NULL); + +cmdline_parse_token_string_t cmd_pipeline_disable_pipeline_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result, + pipeline_string, "pipeline"); + +cmdline_parse_token_num_t cmd_pipeline_disable_pipeline_id = + TOKEN_NUM_INITIALIZER(struct cmd_pipeline_disable_result, pipeline_id, + UINT32); + +cmdline_parse_token_string_t cmd_pipeline_disable_disable_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result, disable_string, + "disable"); + +cmdline_parse_inst_t cmd_pipeline_disable = { + .f = cmd_pipeline_disable_parsed, + .data = NULL, + .help_str = "Disable pipeline on specified core", + .tokens = { + (void *)&cmd_pipeline_disable_t_string, + (void *)&cmd_pipeline_disable_t_id_string, + (void *)&cmd_pipeline_disable_pipeline_string, + (void *)&cmd_pipeline_disable_pipeline_id, + (void *)&cmd_pipeline_disable_disable_string, + NULL, + }, +}; + +static cmdline_parse_ctx_t thread_cmds[] = { + (cmdline_parse_inst_t *) &cmd_pipeline_enable, + (cmdline_parse_inst_t *) &cmd_pipeline_disable, + NULL, +}; + +int +app_pipeline_thread_cmd_push(struct app_params *app) +{ + uint32_t n_cmds, i; + + /* Check for available slots in the application commands array */ + n_cmds = RTE_DIM(thread_cmds) - 1; + if (n_cmds > APP_MAX_CMDS - app->n_cmds) + return -ENOMEM; + + /* Push thread commands into the application */ + memcpy(&app->cmds[app->n_cmds], + thread_cmds, + n_cmds * sizeof(cmdline_parse_ctx_t *)); + + for (i = 0; i < n_cmds; i++) + app->cmds[app->n_cmds + i]->data = app; + + app->n_cmds += n_cmds; + app->cmds[app->n_cmds] = NULL; + + return 0; +} diff --git a/examples/ip_pipeline/thread_fe.h b/examples/ip_pipeline/thread_fe.h new file mode 100644 index 0000000..52352c1 --- /dev/null +++ b/examples/ip_pipeline/thread_fe.h @@ -0,0 +1,95 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2010-2015 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef THREAD_FE_H_ +#define THREAD_FE_H_ + +static inline struct rte_ring * +app_thread_msgq_in_get(struct app_params *app, + uint32_t socket_id, uint32_t core_id, uint32_t ht_id) +{ + char msgq_name[32]; + ssize_t param_idx; + + snprintf(msgq_name, sizeof(msgq_name), + "MSGQ-REQ-CORE-s%" PRIu32 "c%" PRIu32 "%s", + socket_id, + core_id, + (ht_id) ? "h" : ""); + param_idx = APP_PARAM_FIND(app->msgq_params, msgq_name); + + if (param_idx < 0) + return NULL; + + return app->msgq[param_idx]; +} + +static inline struct rte_ring * +app_thread_msgq_out_get(struct app_params *app, + uint32_t socket_id, uint32_t core_id, uint32_t ht_id) +{ + char msgq_name[32]; + ssize_t param_idx; + + snprintf(msgq_name, sizeof(msgq_name), + "MSGQ-RSP-CORE-s%" PRIu32 "c%" PRIu32 "%s", + socket_id, + core_id, + (ht_id) ? "h" : ""); + param_idx = APP_PARAM_FIND(app->msgq_params, msgq_name); + + if (param_idx < 0) + return NULL; + + return app->msgq[param_idx]; + +} + +int +app_pipeline_thread_cmd_push(struct app_params *app); + +int +app_pipeline_enable(struct app_params *app, + uint32_t core_id, + uint32_t socket_id, + uint32_t hyper_th_id, + uint32_t pipeline_id); + +int +app_pipeline_disable(struct app_params *app, + uint32_t core_id, + uint32_t socket_id, + uint32_t hyper_th_id, + uint32_t pipeline_id); + +#endif /* THREAD_FE_H_ */ -- 1.7.9.5 ^ permalink raw reply [flat|nested] 6+ messages in thread
* [dpdk-dev] [PATCH v5 1/1] ip_pipeline: added dynamic pipeline reconfiguration 2015-10-29 9:51 ` [dpdk-dev] [PATCH v4 " Piotr Azarewicz @ 2015-10-29 15:36 ` Piotr Azarewicz 2015-12-07 1:03 ` Thomas Monjalon 0 siblings, 1 reply; 6+ messages in thread From: Piotr Azarewicz @ 2015-10-29 15:36 UTC (permalink / raw) To: dev Up till now pipeline was bound to thread selected in the initial config. This patch allows binding pipeline to other threads at runtime using CLI commands. v2 changes: - deleted debug printfs v3 changes: - add timer for thread message request - fix bug that the new functionality can't work - fix leaking memory - cleaning up v4 changes: - fix compilation with gcc 5.1 - add proper checking if thread exist v5 changes: - better approach about v4 changes Signed-off-by: Maciej Gajdzica <maciejx.t.gajdzica@intel.com> Signed-off-by: Piotr Azarewicz <piotrx.t.azarewicz@intel.com> Acked-by: Cristian Dumitrescu <cristian.dumitrescu@intel.com> --- examples/ip_pipeline/Makefile | 1 + examples/ip_pipeline/app.h | 12 + examples/ip_pipeline/config_parse.c | 2 +- examples/ip_pipeline/init.c | 24 ++ examples/ip_pipeline/pipeline.h | 6 + examples/ip_pipeline/pipeline/pipeline_common_fe.h | 3 + examples/ip_pipeline/thread.c | 156 ++++++++- examples/ip_pipeline/thread.h | 84 +++++ examples/ip_pipeline/thread_fe.c | 349 ++++++++++++++++++++ examples/ip_pipeline/thread_fe.h | 95 ++++++ 10 files changed, 728 insertions(+), 4 deletions(-) create mode 100644 examples/ip_pipeline/thread.h create mode 100644 examples/ip_pipeline/thread_fe.c create mode 100644 examples/ip_pipeline/thread_fe.h diff --git a/examples/ip_pipeline/Makefile b/examples/ip_pipeline/Makefile index f3ff1ec..c8e80b5 100644 --- a/examples/ip_pipeline/Makefile +++ b/examples/ip_pipeline/Makefile @@ -54,6 +54,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += config_parse_tm.c SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += config_check.c SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += init.c SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += thread.c +SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += thread_fe.c SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += cpu_core_map.c SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += pipeline_common_be.c diff --git a/examples/ip_pipeline/app.h b/examples/ip_pipeline/app.h index 521e3a0..3a57956 100644 --- a/examples/ip_pipeline/app.h +++ b/examples/ip_pipeline/app.h @@ -220,9 +220,11 @@ struct app_pipeline_data { void *be; void *fe; uint64_t timer_period; + uint32_t enabled; }; struct app_thread_pipeline_data { + uint32_t pipeline_id; void *be; pipeline_be_op_run f_run; pipeline_be_op_timer f_timer; @@ -234,6 +236,10 @@ struct app_thread_pipeline_data { #define APP_MAX_THREAD_PIPELINES 16 #endif +#ifndef APP_THREAD_TIMER_PERIOD +#define APP_THREAD_TIMER_PERIOD 1 +#endif + struct app_thread_data { struct app_thread_pipeline_data regular[APP_MAX_THREAD_PIPELINES]; struct app_thread_pipeline_data custom[APP_MAX_THREAD_PIPELINES]; @@ -241,7 +247,13 @@ struct app_thread_data { uint32_t n_regular; uint32_t n_custom; + uint64_t timer_period; + uint64_t thread_req_deadline; + uint64_t deadline; + + struct rte_ring *msgq_in; + struct rte_ring *msgq_out; }; struct app_eal_params { diff --git a/examples/ip_pipeline/config_parse.c b/examples/ip_pipeline/config_parse.c index c9b78f9..d2aaadf 100644 --- a/examples/ip_pipeline/config_parse.c +++ b/examples/ip_pipeline/config_parse.c @@ -362,7 +362,7 @@ parser_read_uint32(uint32_t *value, const char *p) return 0; } -static int +int parse_pipeline_core(uint32_t *socket, uint32_t *core, uint32_t *ht, diff --git a/examples/ip_pipeline/init.c b/examples/ip_pipeline/init.c index 3f9c68d..8c63879 100644 --- a/examples/ip_pipeline/init.c +++ b/examples/ip_pipeline/init.c @@ -50,6 +50,7 @@ #include "pipeline_firewall.h" #include "pipeline_flow_classification.h" #include "pipeline_routing.h" +#include "thread_fe.h" #define APP_NAME_SIZE 32 @@ -1253,6 +1254,25 @@ app_init_threads(struct app_params *app) t = &app->thread_data[lcore_id]; + t->timer_period = (rte_get_tsc_hz() * APP_THREAD_TIMER_PERIOD) / 1000; + t->thread_req_deadline = time + t->timer_period; + + t->msgq_in = app_thread_msgq_in_get(app, + params->socket_id, + params->core_id, + params->hyper_th_id); + if (t->msgq_in == NULL) + rte_panic("Init error: Cannot find MSGQ_IN for thread %" PRId32, + lcore_id); + + t->msgq_out = app_thread_msgq_out_get(app, + params->socket_id, + params->core_id, + params->hyper_th_id); + if (t->msgq_out == NULL) + rte_panic("Init error: Cannot find MSGQ_OUT for thread %" PRId32, + lcore_id); + ptype = app_pipeline_type_find(app, params->type); if (ptype == NULL) rte_panic("Init error: Unknown pipeline " @@ -1262,12 +1282,15 @@ app_init_threads(struct app_params *app) &t->regular[t->n_regular] : &t->custom[t->n_custom]; + p->pipeline_id = p_id; p->be = data->be; p->f_run = ptype->be_ops->f_run; p->f_timer = ptype->be_ops->f_timer; p->timer_period = data->timer_period; p->deadline = time + data->timer_period; + data->enabled = 1; + if (ptype->be_ops->f_run == NULL) t->n_regular++; else @@ -1288,6 +1311,7 @@ int app_init(struct app_params *app) app_init_msgq(app); app_pipeline_common_cmd_push(app); + app_pipeline_thread_cmd_push(app); app_pipeline_type_register(app, &pipeline_master); app_pipeline_type_register(app, &pipeline_passthrough); app_pipeline_type_register(app, &pipeline_flow_classification); diff --git a/examples/ip_pipeline/pipeline.h b/examples/ip_pipeline/pipeline.h index b9a56ea..dab9c36 100644 --- a/examples/ip_pipeline/pipeline.h +++ b/examples/ip_pipeline/pipeline.h @@ -84,4 +84,10 @@ pipeline_type_cmds_count(struct pipeline_type *ptype) return n_cmds; } +int +parse_pipeline_core(uint32_t *socket, + uint32_t *core, + uint32_t *ht, + const char *entry); + #endif diff --git a/examples/ip_pipeline/pipeline/pipeline_common_fe.h b/examples/ip_pipeline/pipeline/pipeline_common_fe.h index 693848d..e84aa3a 100644 --- a/examples/ip_pipeline/pipeline/pipeline_common_fe.h +++ b/examples/ip_pipeline/pipeline/pipeline_common_fe.h @@ -68,6 +68,9 @@ app_pipeline_data_fe(struct app_params *app, uint32_t id) if (pipeline_data == NULL) return NULL; + if (pipeline_data->enabled == 0) + return NULL; + return pipeline_data->fe; } diff --git a/examples/ip_pipeline/thread.c b/examples/ip_pipeline/thread.c index b2a8656..78f1bd8 100644 --- a/examples/ip_pipeline/thread.c +++ b/examples/ip_pipeline/thread.c @@ -37,16 +37,152 @@ #include "pipeline_common_be.h" #include "app.h" +#include "thread.h" -int app_thread(void *arg) +static inline void * +thread_msg_recv(struct rte_ring *r) +{ + void *msg; + int status = rte_ring_sc_dequeue(r, &msg); + + if (status != 0) + return NULL; + + return msg; +} + +static inline void +thread_msg_send(struct rte_ring *r, + void *msg) +{ + int status; + + do { + status = rte_ring_sp_enqueue(r, msg); + } while (status == -ENOBUFS); +} + +static int +thread_pipeline_enable(struct app_thread_data *t, + struct thread_pipeline_enable_msg_req *req) +{ + struct app_thread_pipeline_data *p; + + if (req->f_run == NULL) { + if (t->n_regular >= APP_MAX_THREAD_PIPELINES) + return -1; + } else { + if (t->n_custom >= APP_MAX_THREAD_PIPELINES) + return -1; + } + + p = (req->f_run == NULL) ? + &t->regular[t->n_regular] : + &t->custom[t->n_custom]; + + p->pipeline_id = req->pipeline_id; + p->be = req->be; + p->f_run = req->f_run; + p->f_timer = req->f_timer; + p->timer_period = req->timer_period; + p->deadline = 0; + + if (req->f_run == NULL) + t->n_regular++; + else + t->n_custom++; + + return 0; +} + +static int +thread_pipeline_disable(struct app_thread_data *t, + struct thread_pipeline_disable_msg_req *req) +{ + uint32_t n_regular = RTE_MIN(t->n_regular, RTE_DIM(t->regular)); + uint32_t n_custom = RTE_MIN(t->n_custom, RTE_DIM(t->custom)); + uint32_t i; + + /* search regular pipelines of current thread */ + for (i = 0; i < n_regular; i++) { + if (t->regular[i].pipeline_id != req->pipeline_id) + continue; + + if (i < n_regular - 1) + memcpy(&t->regular[i], + &t->regular[i+1], + (n_regular - 1 - i) * sizeof(struct app_thread_pipeline_data)); + + n_regular--; + t->n_regular = n_regular; + + return 0; + } + + /* search custom pipelines of current thread */ + for (i = 0; i < n_custom; i++) { + if (t->custom[i].pipeline_id != req->pipeline_id) + continue; + + if (i < n_custom - 1) + memcpy(&t->custom[i], + &t->custom[i+1], + (n_custom - 1 - i) * sizeof(struct app_thread_pipeline_data)); + + n_custom--; + t->n_custom = n_custom; + + return 0; + } + + /* return if pipeline not found */ + return -1; +} + +static int +thread_msg_req_handle(struct app_thread_data *t) +{ + void *msg_ptr; + struct thread_msg_req *req; + struct thread_msg_rsp *rsp; + + msg_ptr = thread_msg_recv(t->msgq_in); + req = msg_ptr; + rsp = msg_ptr; + + if (req != NULL) + switch (req->type) { + case THREAD_MSG_REQ_PIPELINE_ENABLE: { + rsp->status = thread_pipeline_enable(t, + (struct thread_pipeline_enable_msg_req *) req); + thread_msg_send(t->msgq_out, rsp); + break; + } + + case THREAD_MSG_REQ_PIPELINE_DISABLE: { + rsp->status = thread_pipeline_disable(t, + (struct thread_pipeline_disable_msg_req *) req); + thread_msg_send(t->msgq_out, rsp); + break; + } + default: + break; + } + + return 0; +} + +int +app_thread(void *arg) { struct app_params *app = (struct app_params *) arg; uint32_t core_id = rte_lcore_id(), i, j; struct app_thread_data *t = &app->thread_data[core_id]; - uint32_t n_regular = RTE_MIN(t->n_regular, RTE_DIM(t->regular)); - uint32_t n_custom = RTE_MIN(t->n_custom, RTE_DIM(t->custom)); for (i = 0; ; i++) { + uint32_t n_regular = RTE_MIN(t->n_regular, RTE_DIM(t->regular)); + uint32_t n_custom = RTE_MIN(t->n_custom, RTE_DIM(t->custom)); + /* Run regular pipelines */ for (j = 0; j < n_regular; j++) { struct app_thread_pipeline_data *data = &t->regular[j]; @@ -102,6 +238,20 @@ int app_thread(void *arg) t_deadline = p_deadline; } + /* Timer for thread message request */ + { + uint64_t deadline = t->thread_req_deadline; + + if (deadline <= time) { + thread_msg_req_handle(t); + deadline = time + t->timer_period; + t->thread_req_deadline = deadline; + } + + if (deadline < t_deadline) + t_deadline = deadline; + } + t->deadline = t_deadline; } } diff --git a/examples/ip_pipeline/thread.h b/examples/ip_pipeline/thread.h new file mode 100644 index 0000000..dc877c0 --- /dev/null +++ b/examples/ip_pipeline/thread.h @@ -0,0 +1,84 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2010-2015 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef THREAD_H_ +#define THREAD_H_ + +#include "app.h" +#include "pipeline_be.h" + +enum thread_msg_req_type { + THREAD_MSG_REQ_PIPELINE_ENABLE = 0, + THREAD_MSG_REQ_PIPELINE_DISABLE, + THREAD_MSG_REQS +}; + +struct thread_msg_req { + enum thread_msg_req_type type; +}; + +struct thread_msg_rsp { + int status; +}; + +/* + * PIPELINE ENABLE + */ +struct thread_pipeline_enable_msg_req { + enum thread_msg_req_type type; + + uint32_t pipeline_id; + void *be; + pipeline_be_op_run f_run; + pipeline_be_op_timer f_timer; + uint64_t timer_period; +}; + +struct thread_pipeline_enable_msg_rsp { + int status; +}; + +/* + * PIPELINE DISABLE + */ +struct thread_pipeline_disable_msg_req { + enum thread_msg_req_type type; + + uint32_t pipeline_id; +}; + +struct thread_pipeline_disable_msg_rsp { + int status; +}; + +#endif /* THREAD_H_ */ diff --git a/examples/ip_pipeline/thread_fe.c b/examples/ip_pipeline/thread_fe.c new file mode 100644 index 0000000..7a3bbf8 --- /dev/null +++ b/examples/ip_pipeline/thread_fe.c @@ -0,0 +1,349 @@ +#include <rte_common.h> +#include <rte_ring.h> +#include <rte_malloc.h> +#include <cmdline_rdline.h> +#include <cmdline_parse.h> +#include <cmdline_parse_num.h> +#include <cmdline_parse_string.h> +#include <cmdline_parse_ipaddr.h> +#include <cmdline_parse_etheraddr.h> +#include <cmdline_socket.h> +#include <cmdline.h> + +#include "thread.h" +#include "thread_fe.h" +#include "pipeline.h" +#include "pipeline_common_fe.h" +#include "app.h" + +static inline void * +thread_msg_send_recv(struct app_params *app, + uint32_t socket_id, uint32_t core_id, uint32_t ht_id, + void *msg, + uint32_t timeout_ms) +{ + struct rte_ring *r_req = app_thread_msgq_in_get(app, + socket_id, core_id, ht_id); + struct rte_ring *r_rsp = app_thread_msgq_out_get(app, + socket_id, core_id, ht_id); + uint64_t hz = rte_get_tsc_hz(); + void *msg_recv; + uint64_t deadline; + int status; + + /* send */ + do { + status = rte_ring_sp_enqueue(r_req, (void *) msg); + } while (status == -ENOBUFS); + + /* recv */ + deadline = (timeout_ms) ? + (rte_rdtsc() + ((hz * timeout_ms) / 1000)) : + UINT64_MAX; + + do { + if (rte_rdtsc() > deadline) + return NULL; + + status = rte_ring_sc_dequeue(r_rsp, &msg_recv); + } while (status != 0); + + return msg_recv; +} + +int +app_pipeline_enable(struct app_params *app, + uint32_t socket_id, + uint32_t core_id, + uint32_t hyper_th_id, + uint32_t pipeline_id) +{ + struct thread_pipeline_enable_msg_req *req; + struct thread_pipeline_enable_msg_rsp *rsp; + int thread_id; + struct app_pipeline_data *p; + struct app_pipeline_params *p_params; + struct pipeline_type *p_type; + int status; + + if (app == NULL) + return -1; + + thread_id = cpu_core_map_get_lcore_id(app->core_map, + socket_id, + core_id, + hyper_th_id); + + if ((thread_id < 0) || + ((app->core_mask & (1LLU << thread_id)) == 0)) + return -1; + + if (app_pipeline_data(app, pipeline_id) == NULL) + return -1; + + p = &app->pipeline_data[pipeline_id]; + p_params = &app->pipeline_params[pipeline_id]; + p_type = app_pipeline_type_find(app, p_params->type); + + if (p->enabled == 1) + return -1; + + req = app_msg_alloc(app); + if (req == NULL) + return -1; + + req->type = THREAD_MSG_REQ_PIPELINE_ENABLE; + req->pipeline_id = pipeline_id; + req->be = p->be; + req->f_run = p_type->be_ops->f_run; + req->f_timer = p_type->be_ops->f_timer; + req->timer_period = p->timer_period; + + rsp = thread_msg_send_recv(app, + socket_id, core_id, hyper_th_id, req, MSG_TIMEOUT_DEFAULT); + if (rsp == NULL) + return -1; + + status = rsp->status; + app_msg_free(app, rsp); + + if (status != 0) + return -1; + + p->enabled = 1; + return 0; +} + +int +app_pipeline_disable(struct app_params *app, + uint32_t socket_id, + uint32_t core_id, + uint32_t hyper_th_id, + uint32_t pipeline_id) +{ + struct thread_pipeline_disable_msg_req *req; + struct thread_pipeline_disable_msg_rsp *rsp; + int thread_id; + struct app_pipeline_data *p; + int status; + + if (app == NULL) + return -1; + + thread_id = cpu_core_map_get_lcore_id(app->core_map, + socket_id, + core_id, + hyper_th_id); + + if ((thread_id < 0) || + ((app->core_mask & (1LLU << thread_id)) == 0)) + return -1; + + if (app_pipeline_data(app, pipeline_id) == NULL) + return -1; + + p = &app->pipeline_data[pipeline_id]; + + if (p->enabled == 0) + return -1; + + req = app_msg_alloc(app); + if (req == NULL) + return -1; + + req->type = THREAD_MSG_REQ_PIPELINE_DISABLE; + req->pipeline_id = pipeline_id; + + rsp = thread_msg_send_recv(app, + socket_id, core_id, hyper_th_id, req, MSG_TIMEOUT_DEFAULT); + + if (rsp == NULL) + return -1; + + status = rsp->status; + app_msg_free(app, rsp); + + if (status != 0) + return -1; + + p->enabled = 0; + return 0; +} + +/* + * pipeline enable + */ + +struct cmd_pipeline_enable_result { + cmdline_fixed_string_t t_string; + cmdline_fixed_string_t t_id_string; + cmdline_fixed_string_t pipeline_string; + uint32_t pipeline_id; + cmdline_fixed_string_t enable_string; +}; + +static void +cmd_pipeline_enable_parsed( + void *parsed_result, + __rte_unused struct cmdline *cl, + void *data) +{ + struct cmd_pipeline_enable_result *params = parsed_result; + struct app_params *app = data; + int status; + uint32_t core_id, socket_id, hyper_th_id; + + if (parse_pipeline_core(&socket_id, + &core_id, + &hyper_th_id, + params->t_id_string) != 0) { + printf("Command failed\n"); + return; + } + + status = app_pipeline_enable(app, + socket_id, + core_id, + hyper_th_id, + params->pipeline_id); + + if (status != 0) + printf("Command failed\n"); +} + +cmdline_parse_token_string_t cmd_pipeline_enable_t_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, t_string, "t"); + +cmdline_parse_token_string_t cmd_pipeline_enable_t_id_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, t_id_string, + NULL); + +cmdline_parse_token_string_t cmd_pipeline_enable_pipeline_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, pipeline_string, + "pipeline"); + +cmdline_parse_token_num_t cmd_pipeline_enable_pipeline_id = + TOKEN_NUM_INITIALIZER(struct cmd_pipeline_enable_result, pipeline_id, + UINT32); + +cmdline_parse_token_string_t cmd_pipeline_enable_enable_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, enable_string, + "enable"); + +cmdline_parse_inst_t cmd_pipeline_enable = { + .f = cmd_pipeline_enable_parsed, + .data = NULL, + .help_str = "Enable pipeline on specified core", + .tokens = { + (void *)&cmd_pipeline_enable_t_string, + (void *)&cmd_pipeline_enable_t_id_string, + (void *)&cmd_pipeline_enable_pipeline_string, + (void *)&cmd_pipeline_enable_pipeline_id, + (void *)&cmd_pipeline_enable_enable_string, + NULL, + }, +}; + +/* + * pipeline disable + */ + +struct cmd_pipeline_disable_result { + cmdline_fixed_string_t t_string; + cmdline_fixed_string_t t_id_string; + cmdline_fixed_string_t pipeline_string; + uint32_t pipeline_id; + cmdline_fixed_string_t disable_string; +}; + +static void +cmd_pipeline_disable_parsed( + void *parsed_result, + __rte_unused struct cmdline *cl, + void *data) +{ + struct cmd_pipeline_disable_result *params = parsed_result; + struct app_params *app = data; + int status; + uint32_t core_id, socket_id, hyper_th_id; + + if (parse_pipeline_core(&socket_id, + &core_id, + &hyper_th_id, + params->t_id_string) != 0) { + printf("Command failed\n"); + return; + } + + status = app_pipeline_disable(app, + socket_id, + core_id, + hyper_th_id, + params->pipeline_id); + + if (status != 0) + printf("Command failed\n"); +} + +cmdline_parse_token_string_t cmd_pipeline_disable_t_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result, t_string, "t"); + +cmdline_parse_token_string_t cmd_pipeline_disable_t_id_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result, t_id_string, + NULL); + +cmdline_parse_token_string_t cmd_pipeline_disable_pipeline_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result, + pipeline_string, "pipeline"); + +cmdline_parse_token_num_t cmd_pipeline_disable_pipeline_id = + TOKEN_NUM_INITIALIZER(struct cmd_pipeline_disable_result, pipeline_id, + UINT32); + +cmdline_parse_token_string_t cmd_pipeline_disable_disable_string = + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result, disable_string, + "disable"); + +cmdline_parse_inst_t cmd_pipeline_disable = { + .f = cmd_pipeline_disable_parsed, + .data = NULL, + .help_str = "Disable pipeline on specified core", + .tokens = { + (void *)&cmd_pipeline_disable_t_string, + (void *)&cmd_pipeline_disable_t_id_string, + (void *)&cmd_pipeline_disable_pipeline_string, + (void *)&cmd_pipeline_disable_pipeline_id, + (void *)&cmd_pipeline_disable_disable_string, + NULL, + }, +}; + +static cmdline_parse_ctx_t thread_cmds[] = { + (cmdline_parse_inst_t *) &cmd_pipeline_enable, + (cmdline_parse_inst_t *) &cmd_pipeline_disable, + NULL, +}; + +int +app_pipeline_thread_cmd_push(struct app_params *app) +{ + uint32_t n_cmds, i; + + /* Check for available slots in the application commands array */ + n_cmds = RTE_DIM(thread_cmds) - 1; + if (n_cmds > APP_MAX_CMDS - app->n_cmds) + return -ENOMEM; + + /* Push thread commands into the application */ + memcpy(&app->cmds[app->n_cmds], + thread_cmds, + n_cmds * sizeof(cmdline_parse_ctx_t *)); + + for (i = 0; i < n_cmds; i++) + app->cmds[app->n_cmds + i]->data = app; + + app->n_cmds += n_cmds; + app->cmds[app->n_cmds] = NULL; + + return 0; +} diff --git a/examples/ip_pipeline/thread_fe.h b/examples/ip_pipeline/thread_fe.h new file mode 100644 index 0000000..52352c1 --- /dev/null +++ b/examples/ip_pipeline/thread_fe.h @@ -0,0 +1,95 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2010-2015 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef THREAD_FE_H_ +#define THREAD_FE_H_ + +static inline struct rte_ring * +app_thread_msgq_in_get(struct app_params *app, + uint32_t socket_id, uint32_t core_id, uint32_t ht_id) +{ + char msgq_name[32]; + ssize_t param_idx; + + snprintf(msgq_name, sizeof(msgq_name), + "MSGQ-REQ-CORE-s%" PRIu32 "c%" PRIu32 "%s", + socket_id, + core_id, + (ht_id) ? "h" : ""); + param_idx = APP_PARAM_FIND(app->msgq_params, msgq_name); + + if (param_idx < 0) + return NULL; + + return app->msgq[param_idx]; +} + +static inline struct rte_ring * +app_thread_msgq_out_get(struct app_params *app, + uint32_t socket_id, uint32_t core_id, uint32_t ht_id) +{ + char msgq_name[32]; + ssize_t param_idx; + + snprintf(msgq_name, sizeof(msgq_name), + "MSGQ-RSP-CORE-s%" PRIu32 "c%" PRIu32 "%s", + socket_id, + core_id, + (ht_id) ? "h" : ""); + param_idx = APP_PARAM_FIND(app->msgq_params, msgq_name); + + if (param_idx < 0) + return NULL; + + return app->msgq[param_idx]; + +} + +int +app_pipeline_thread_cmd_push(struct app_params *app); + +int +app_pipeline_enable(struct app_params *app, + uint32_t core_id, + uint32_t socket_id, + uint32_t hyper_th_id, + uint32_t pipeline_id); + +int +app_pipeline_disable(struct app_params *app, + uint32_t core_id, + uint32_t socket_id, + uint32_t hyper_th_id, + uint32_t pipeline_id); + +#endif /* THREAD_FE_H_ */ -- 1.7.9.5 ^ permalink raw reply [flat|nested] 6+ messages in thread
* Re: [dpdk-dev] [PATCH v5 1/1] ip_pipeline: added dynamic pipeline reconfiguration 2015-10-29 15:36 ` [dpdk-dev] [PATCH v5 " Piotr Azarewicz @ 2015-12-07 1:03 ` Thomas Monjalon 0 siblings, 0 replies; 6+ messages in thread From: Thomas Monjalon @ 2015-12-07 1:03 UTC (permalink / raw) To: Piotr Azarewicz; +Cc: dev, Maciej Gajdzica 2015-10-29 16:36, Piotr Azarewicz: > Up till now pipeline was bound to thread selected in the initial config. > This patch allows binding pipeline to other threads at runtime using CLI > commands. > > Signed-off-by: Maciej Gajdzica <maciejx.t.gajdzica@intel.com> > Signed-off-by: Piotr Azarewicz <piotrx.t.azarewicz@intel.com> > > Acked-by: Cristian Dumitrescu <cristian.dumitrescu@intel.com> Applied, thanks ^ permalink raw reply [flat|nested] 6+ messages in thread
end of thread, other threads:[~2015-12-07 1:05 UTC | newest] Thread overview: 6+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2015-09-30 16:25 [dpdk-dev] [PATCH v2 1/1] ip_pipeline: added dynamic pipeline reconfiguration Maciej Gajdzica 2015-09-30 16:34 ` Dumitrescu, Cristian 2015-10-19 14:00 ` [dpdk-dev] [PATCH v3 " Piotr Azarewicz 2015-10-29 9:51 ` [dpdk-dev] [PATCH v4 " Piotr Azarewicz 2015-10-29 15:36 ` [dpdk-dev] [PATCH v5 " Piotr Azarewicz 2015-12-07 1:03 ` Thomas Monjalon
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox; as well as URLs for NNTP newsgroup(s).