From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id 52333A04AA; Mon, 7 Sep 2020 23:46:43 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id B74941C438; Mon, 7 Sep 2020 23:41:28 +0200 (CEST) Received: from mga01.intel.com (mga01.intel.com [192.55.52.88]) by dpdk.org (Postfix) with ESMTP id 04B8F1C204 for ; Mon, 7 Sep 2020 23:41:10 +0200 (CEST) IronPort-SDR: iL5JQTbOXVDi+3ITRE/by7KgYuFdr7M6rYqQte0Lm7fnAgIy/62YtYJ7BrSjPD/xP6e1VYRUFu 3fobWfd94mWA== X-IronPort-AV: E=McAfee;i="6000,8403,9737"; a="176099211" X-IronPort-AV: E=Sophos;i="5.76,403,1592895600"; d="scan'208";a="176099211" X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga002.fm.intel.com ([10.253.24.26]) by fmsmga101.fm.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 07 Sep 2020 14:41:10 -0700 IronPort-SDR: +q4+UP8kT7g1EnIBThNdbrN6F0MyHoM3GnNanf4hF1Yjg4UHPPo6zY25ZaLsu60c/B0keS/4Jl jKtBd1JmcU+w== X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.76,403,1592895600"; d="scan'208";a="336190232" Received: from silpixa00400573.ir.intel.com (HELO silpixa00400573.ger.corp.intel.com) ([10.237.223.107]) by fmsmga002.fm.intel.com with ESMTP; 07 Sep 2020 14:41:09 -0700 From: Cristian Dumitrescu To: dev@dpdk.org Date: Mon, 7 Sep 2020 22:40:28 +0100 Message-Id: <20200907214032.95052-38-cristian.dumitrescu@intel.com> X-Mailer: git-send-email 2.17.1 In-Reply-To: <20200907214032.95052-1-cristian.dumitrescu@intel.com> References: <20200826151445.51500-2-cristian.dumitrescu@intel.com> <20200907214032.95052-1-cristian.dumitrescu@intel.com> Subject: [dpdk-dev] [PATCH v2 37/41] examples/pipeline: add message passing mechanism X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Add network-based connectivity mechanism for the application to allow for the exchange of configuration messages through the network as opposed to local CLI only. Signed-off-by: Cristian Dumitrescu --- examples/pipeline/Makefile | 1 + examples/pipeline/conn.c | 331 ++++++++++++++++++++++++++++++++++ examples/pipeline/conn.h | 50 +++++ examples/pipeline/main.c | 136 +++++++++++++- examples/pipeline/meson.build | 1 + 5 files changed, 517 insertions(+), 2 deletions(-) create mode 100644 examples/pipeline/conn.c create mode 100644 examples/pipeline/conn.h diff --git a/examples/pipeline/Makefile b/examples/pipeline/Makefile index 3c85e9e40..eb1085f7c 100644 --- a/examples/pipeline/Makefile +++ b/examples/pipeline/Makefile @@ -5,6 +5,7 @@ APP = pipeline # all source are stored in SRCS-y +SRCS-y += conn.c SRCS-y += main.c SRCS-y += obj.c SRCS-y += thread.c diff --git a/examples/pipeline/conn.c b/examples/pipeline/conn.c new file mode 100644 index 000000000..eed87b8ea --- /dev/null +++ b/examples/pipeline/conn.c @@ -0,0 +1,331 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2010-2020 Intel Corporation + */ + +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include + +#include "conn.h" + +#define MSG_CMD_TOO_LONG "Command too long." + +struct conn { + char *welcome; + char *prompt; + char *buf; + char *msg_in; + char *msg_out; + size_t buf_size; + size_t msg_in_len_max; + size_t msg_out_len_max; + size_t msg_in_len; + int fd_server; + int fd_client_group; + conn_msg_handle_t msg_handle; + void *msg_handle_arg; +}; + +struct conn * +conn_init(struct conn_params *p) +{ + struct sockaddr_in server_address; + struct conn *conn; + int fd_server, fd_client_group, status; + + memset(&server_address, 0, sizeof(server_address)); + + /* Check input arguments */ + if ((p == NULL) || + (p->welcome == NULL) || + (p->prompt == NULL) || + (p->addr == NULL) || + (p->buf_size == 0) || + (p->msg_in_len_max == 0) || + (p->msg_out_len_max == 0) || + (p->msg_handle == NULL)) + return NULL; + + status = inet_aton(p->addr, &server_address.sin_addr); + if (status == 0) + return NULL; + + /* Memory allocation */ + conn = calloc(1, sizeof(struct conn)); + if (conn == NULL) + return NULL; + + conn->welcome = calloc(1, CONN_WELCOME_LEN_MAX + 1); + conn->prompt = calloc(1, CONN_PROMPT_LEN_MAX + 1); + conn->buf = calloc(1, p->buf_size); + conn->msg_in = calloc(1, p->msg_in_len_max + 1); + conn->msg_out = calloc(1, p->msg_out_len_max + 1); + + if ((conn->welcome == NULL) || + (conn->prompt == NULL) || + (conn->buf == NULL) || + (conn->msg_in == NULL) || + (conn->msg_out == NULL)) { + conn_free(conn); + return NULL; + } + + /* Server socket */ + server_address.sin_family = AF_INET; + server_address.sin_port = htons(p->port); + + fd_server = socket(AF_INET, + SOCK_STREAM | SOCK_NONBLOCK, + 0); + if (fd_server == -1) { + conn_free(conn); + return NULL; + } + + status = bind(fd_server, + (struct sockaddr *) &server_address, + sizeof(server_address)); + if (status == -1) { + conn_free(conn); + close(fd_server); + return NULL; + } + + status = listen(fd_server, 16); + if (status == -1) { + conn_free(conn); + close(fd_server); + return NULL; + } + + /* Client group */ + fd_client_group = epoll_create(1); + if (fd_client_group == -1) { + conn_free(conn); + close(fd_server); + return NULL; + } + + /* Fill in */ + strncpy(conn->welcome, p->welcome, CONN_WELCOME_LEN_MAX); + strncpy(conn->prompt, p->prompt, CONN_PROMPT_LEN_MAX); + conn->buf_size = p->buf_size; + conn->msg_in_len_max = p->msg_in_len_max; + conn->msg_out_len_max = p->msg_out_len_max; + conn->msg_in_len = 0; + conn->fd_server = fd_server; + conn->fd_client_group = fd_client_group; + conn->msg_handle = p->msg_handle; + conn->msg_handle_arg = p->msg_handle_arg; + + return conn; +} + +void +conn_free(struct conn *conn) +{ + if (conn == NULL) + return; + + if (conn->fd_client_group) + close(conn->fd_client_group); + + if (conn->fd_server) + close(conn->fd_server); + + free(conn->msg_out); + free(conn->msg_in); + free(conn->prompt); + free(conn->welcome); + free(conn); +} + +int +conn_poll_for_conn(struct conn *conn) +{ + struct sockaddr_in client_address; + struct epoll_event event; + socklen_t client_address_length; + int fd_client, status; + + /* Check input arguments */ + if (conn == NULL) + return -1; + + /* Server socket */ + client_address_length = sizeof(client_address); + fd_client = accept4(conn->fd_server, + (struct sockaddr *) &client_address, + &client_address_length, + SOCK_NONBLOCK); + if (fd_client == -1) { + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) + return 0; + + return -1; + } + + /* Client group */ + event.events = EPOLLIN | EPOLLRDHUP | EPOLLHUP; + event.data.fd = fd_client; + + status = epoll_ctl(conn->fd_client_group, + EPOLL_CTL_ADD, + fd_client, + &event); + if (status == -1) { + close(fd_client); + return -1; + } + + /* Client */ + status = write(fd_client, + conn->welcome, + strlen(conn->welcome)); + if (status == -1) { + close(fd_client); + return -1; + } + + status = write(fd_client, + conn->prompt, + strlen(conn->prompt)); + if (status == -1) { + close(fd_client); + return -1; + } + + return 0; +} + +static int +data_event_handle(struct conn *conn, + int fd_client) +{ + ssize_t len, i, status; + + /* Read input message */ + + len = read(fd_client, + conn->buf, + conn->buf_size); + if (len == -1) { + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) + return 0; + + return -1; + } + if (len == 0) + return 0; + + /* Handle input messages */ + for (i = 0; i < len; i++) { + if (conn->buf[i] == '\n') { + size_t n; + + conn->msg_in[conn->msg_in_len] = 0; + conn->msg_out[0] = 0; + + conn->msg_handle(conn->msg_in, + conn->msg_out, + conn->msg_out_len_max, + conn->msg_handle_arg); + + n = strlen(conn->msg_out); + if (n) { + status = write(fd_client, + conn->msg_out, + n); + if (status == -1) + return status; + } + + conn->msg_in_len = 0; + } else if (conn->msg_in_len < conn->msg_in_len_max) { + conn->msg_in[conn->msg_in_len] = conn->buf[i]; + conn->msg_in_len++; + } else { + status = write(fd_client, + MSG_CMD_TOO_LONG, + strlen(MSG_CMD_TOO_LONG)); + if (status == -1) + return status; + + conn->msg_in_len = 0; + } + } + + /* Write prompt */ + status = write(fd_client, + conn->prompt, + strlen(conn->prompt)); + if (status == -1) + return status; + + return 0; +} + +static int +control_event_handle(struct conn *conn, + int fd_client) +{ + int status; + + status = epoll_ctl(conn->fd_client_group, + EPOLL_CTL_DEL, + fd_client, + NULL); + if (status == -1) + return -1; + + status = close(fd_client); + if (status == -1) + return -1; + + return 0; +} + +int +conn_poll_for_msg(struct conn *conn) +{ + struct epoll_event event; + int fd_client, status, status_data = 0, status_control = 0; + + /* Check input arguments */ + if (conn == NULL) + return -1; + + /* Client group */ + status = epoll_wait(conn->fd_client_group, + &event, + 1, + 0); + if (status == -1) + return -1; + if (status == 0) + return 0; + + fd_client = event.data.fd; + + /* Data available */ + if (event.events & EPOLLIN) + status_data = data_event_handle(conn, fd_client); + + /* Control events */ + if (event.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) + status_control = control_event_handle(conn, fd_client); + + if (status_data || status_control) + return -1; + + return 0; +} diff --git a/examples/pipeline/conn.h b/examples/pipeline/conn.h new file mode 100644 index 000000000..871a5efd0 --- /dev/null +++ b/examples/pipeline/conn.h @@ -0,0 +1,50 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2010-2020 Intel Corporation + */ + +#ifndef __INCLUDE_CONN_H__ +#define __INCLUDE_CONN_H__ + +#include + +struct conn; + +#ifndef CONN_WELCOME_LEN_MAX +#define CONN_WELCOME_LEN_MAX 1024 +#endif + +#ifndef CONN_PROMPT_LEN_MAX +#define CONN_PROMPT_LEN_MAX 16 +#endif + +typedef void +(*conn_msg_handle_t)(char *msg_in, + char *msg_out, + size_t msg_out_len_max, + void *arg); + +struct conn_params { + const char *welcome; + const char *prompt; + const char *addr; + uint16_t port; + size_t buf_size; + size_t msg_in_len_max; + size_t msg_out_len_max; + conn_msg_handle_t msg_handle; + void *msg_handle_arg; +}; + +struct conn * +conn_init(struct conn_params *p); + +void +conn_free(struct conn *conn); + +int +conn_poll_for_conn(struct conn *conn); + +int +conn_poll_for_msg(struct conn *conn); + +#endif diff --git a/examples/pipeline/main.c b/examples/pipeline/main.c index 353e62c10..1a63c1237 100644 --- a/examples/pipeline/main.c +++ b/examples/pipeline/main.c @@ -11,15 +11,136 @@ #include #include +#include "conn.h" #include "obj.h" #include "thread.h" +static const char usage[] = + "%s EAL_ARGS -- [-h HOST] [-p PORT] [-s SCRIPT]\n"; + +static struct app_params { + struct conn_params conn; + char *script_name; +} app = { + .conn = { + .welcome = "\nWelcome!\n\n", + .prompt = "pipeline> ", + .addr = "0.0.0.0", + .port = 8086, + .buf_size = 1024 * 1024, + .msg_in_len_max = 1024, + .msg_out_len_max = 1024 * 1024, + .msg_handle = NULL, + .msg_handle_arg = NULL, /* set later. */ + }, + .script_name = NULL, +}; + +static int +parse_args(int argc, char **argv) +{ + char *app_name = argv[0]; + struct option lgopts[] = { + { NULL, 0, 0, 0 } + }; + int opt, option_index; + int h_present, p_present, s_present, n_args, i; + + /* Skip EAL input args */ + n_args = argc; + for (i = 0; i < n_args; i++) + if (strcmp(argv[i], "--") == 0) { + argc -= i; + argv += i; + break; + } + + if (i == n_args) + return 0; + + /* Parse args */ + h_present = 0; + p_present = 0; + s_present = 0; + + while ((opt = getopt_long(argc, argv, "h:p:s:", lgopts, &option_index)) + != EOF) + switch (opt) { + case 'h': + if (h_present) { + printf("Error: Multiple -h arguments\n"); + return -1; + } + h_present = 1; + + if (!strlen(optarg)) { + printf("Error: Argument for -h not provided\n"); + return -1; + } + + app.conn.addr = strdup(optarg); + if (app.conn.addr == NULL) { + printf("Error: Not enough memory\n"); + return -1; + } + break; + + case 'p': + if (p_present) { + printf("Error: Multiple -p arguments\n"); + return -1; + } + p_present = 1; + + if (!strlen(optarg)) { + printf("Error: Argument for -p not provided\n"); + return -1; + } + + app.conn.port = (uint16_t) atoi(optarg); + break; + + case 's': + if (s_present) { + printf("Error: Multiple -s arguments\n"); + return -1; + } + s_present = 1; + + if (!strlen(optarg)) { + printf("Error: Argument for -s not provided\n"); + return -1; + } + + app.script_name = strdup(optarg); + if (app.script_name == NULL) { + printf("Error: Not enough memory\n"); + return -1; + } + break; + + default: + printf(usage, app_name); + return -1; + } + + optind = 1; /* reset getopt lib */ + + return 0; +} + int main(int argc, char **argv) { + struct conn *conn; struct obj *obj; int status; + /* Parse application arguments */ + status = parse_args(argc, argv); + if (status < 0) + return status; + /* EAL */ status = rte_eal_init(argc, argv); if (status < 0) { @@ -46,7 +167,18 @@ main(int argc, char **argv) NULL, SKIP_MASTER); + /* Connectivity */ + app.conn.msg_handle_arg = obj; + conn = conn_init(&app.conn); + if (!conn) { + printf("Error: Connectivity initialization failed (%d)\n", + status); + return status; + }; /* Dispatch loop */ - for ( ; ; ); -} + for ( ; ; ) { + conn_poll_for_conn(conn); + conn_poll_for_msg(conn); + } +} diff --git a/examples/pipeline/meson.build b/examples/pipeline/meson.build index ade485f97..a92e84677 100644 --- a/examples/pipeline/meson.build +++ b/examples/pipeline/meson.build @@ -10,6 +10,7 @@ build = cc.has_header('sys/epoll.h') deps += ['pipeline', 'bus_pci'] allow_experimental_apis = true sources = files( + 'conn.c', 'main.c', 'obj.c', 'thread.c', -- 2.17.1