From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga14.intel.com (mga14.intel.com [192.55.52.115]) by dpdk.org (Postfix) with ESMTP id D41597D56 for ; Fri, 25 Aug 2017 11:39:26 +0200 (CEST) Received: from fmsmga006.fm.intel.com ([10.253.24.20]) by fmsmga103.fm.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 25 Aug 2017 02:39:26 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.41,424,1498546800"; d="scan'208";a="144456030" Received: from dpdk06.sh.intel.com ([10.67.111.82]) by fmsmga006.fm.intel.com with ESMTP; 25 Aug 2017 02:39:24 -0700 From: Jianfeng Tan To: dev@dpdk.org Cc: bruce.richardson@intel.com, konstantin.ananyev@intel.com, pablo.de.lara.guarch@intel.com, thomas@monjalon.net, yliu@fridaylinux.org, maxime.coquelin@redhat.com, mtetsuyah@gmail.com, ferruh.yigit@intel.com, Jianfeng Tan Date: Fri, 25 Aug 2017 09:40:46 +0000 Message-Id: <1503654052-84730-7-git-send-email-jianfeng.tan@intel.com> X-Mailer: git-send-email 2.7.4 In-Reply-To: <1503654052-84730-1-git-send-email-jianfeng.tan@intel.com> References: <1503654052-84730-1-git-send-email-jianfeng.tan@intel.com> Subject: [dpdk-dev] [PATCH 06/12] eal: add channel for primary/secondary communication X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Fri, 25 Aug 2017 09:39:27 -0000 Previouly, there is only one way for primary/secondary to exchange messages, that is, primary process writes info into some predefind file, and secondary process reads info out. That cannot address the requirements: a. Secondary wants to send info to primary. b. Sending info at any time, instead of just initialization time. c. Share FD with the other side. This patch proposes to create a communication channel (as an unix socket connection) for above requirements. Three new APIs are added: 1. rte_eal_primary_secondary_add_action is used to register an action, if the calling component wants to response the messages from the corresponding component in its primary process or secondary processes. 2. rte_eal_primary_secondary_del_action is used to unregister the action if the calling component does not want to response the messages. 3. rte_eal_primary_secondary_sendmsg is used to send a message. Signed-off-by: Jianfeng Tan --- lib/librte_eal/bsdapp/eal/rte_eal_version.map | 8 + lib/librte_eal/common/eal_common_proc.c | 454 ++++++++++++++++++++++++ lib/librte_eal/common/eal_filesystem.h | 18 + lib/librte_eal/common/eal_private.h | 10 + lib/librte_eal/common/include/rte_eal.h | 74 ++++ lib/librte_eal/linuxapp/eal/eal.c | 6 + lib/librte_eal/linuxapp/eal/rte_eal_version.map | 8 + 7 files changed, 578 insertions(+) diff --git a/lib/librte_eal/bsdapp/eal/rte_eal_version.map b/lib/librte_eal/bsdapp/eal/rte_eal_version.map index aac6fd7..f4ff29f 100644 --- a/lib/librte_eal/bsdapp/eal/rte_eal_version.map +++ b/lib/librte_eal/bsdapp/eal/rte_eal_version.map @@ -237,3 +237,11 @@ EXPERIMENTAL { rte_service_unregister; } DPDK_17.08; + +EXPERIMENTAL { + global: + + rte_eal_primary_secondary_add_action; + rte_eal_primary_secondary_del_action; + rte_eal_primary_secondary_sendmsg; +} DPDK_17.11; diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c index 60526ca..fa316bf 100644 --- a/lib/librte_eal/common/eal_common_proc.c +++ b/lib/librte_eal/common/eal_common_proc.c @@ -33,8 +33,20 @@ #include #include #include +#include +#include +#include +#include +#include +#include +#include +#include + +#include #include +#include +#include "eal_private.h" #include "eal_filesystem.h" #include "eal_internal_cfg.h" @@ -59,3 +71,445 @@ rte_eal_primary_proc_alive(const char *config_file_path) return !!ret; } + +struct action_entry { + TAILQ_ENTRY(action_entry) next; /**< Next attached action entry*/ + +#define MAX_ACTION_NAME_LEN 64 + char action_name[MAX_ACTION_NAME_LEN]; + rte_eal_primary_secondary_t *action; +}; + +/** Double linked list of actions. */ +TAILQ_HEAD(action_entry_list, action_entry); + +static struct action_entry_list action_entry_list = + TAILQ_HEAD_INITIALIZER(action_entry_list); + +static struct action_entry * +find_action_entry_by_name(const char *name) +{ + int len = strlen(name); + struct action_entry *entry; + + TAILQ_FOREACH(entry, &action_entry_list, next) { + if (strncmp(entry->action_name, name, len) == 0) + break; + } + + return entry; +} + +int +rte_eal_primary_secondary_add_action(const char *action_name, + rte_eal_primary_secondary_t action) +{ + struct action_entry *entry = malloc(sizeof(struct action_entry)); + + if (entry == NULL) + return -ENOMEM; + + strncpy(entry->action_name, action_name, MAX_ACTION_NAME_LEN); + entry->action = action; + TAILQ_INSERT_TAIL(&action_entry_list, entry, next); + return 0; +} + +void +rte_eal_primary_secondary_del_action(const char *name) +{ + struct action_entry *entry = find_action_entry_by_name(name); + + TAILQ_REMOVE(&action_entry_list, entry, next); + free(entry); +} + +#define MAX_SECONDARY_PROCS 8 + +static int efd_pri_sec; /* epoll fd for primary/secondary channel thread */ +static int fd_listen; /* unix listen socket by primary */ +static int fd_to_pri; /* only used by secondary process */ +static int fds_to_sec[MAX_SECONDARY_PROCS]; + +struct msg_hdr { + char action_name[MAX_ACTION_NAME_LEN]; + int fds_num; + char params[0]; +} __rte_packed; + +static int +add_sec_proc(int fd) +{ + int i; + + for (i = 0; i < MAX_SECONDARY_PROCS; ++i) + if (fds_to_sec[i] == -1) + break; + + if (i >= MAX_SECONDARY_PROCS) + return -1; + + fds_to_sec[i] = fd; + + return i; +} + +static void +del_sec_proc(int fd) +{ + int i; + + for (i = 0; i < MAX_SECONDARY_PROCS; ++i) { + if (fds_to_sec[i] == fd) { + fds_to_sec[i] = -1; + break; + } + } +} + +static int +read_msg(int sockfd, char *buf, int buflen, int *fds, int fds_num) +{ + struct iovec iov; + struct msghdr msgh; + size_t fdsize = fds_num * sizeof(int); + char control[CMSG_SPACE(fdsize)]; + struct cmsghdr *cmsg; + int ret; + + memset(&msgh, 0, sizeof(msgh)); + iov.iov_base = buf; + iov.iov_len = buflen; + + msgh.msg_iov = &iov; + msgh.msg_iovlen = 1; + msgh.msg_control = control; + msgh.msg_controllen = sizeof(control); + + ret = recvmsg(sockfd, &msgh, 0); + if (ret <= 0) { + RTE_LOG(ERR, EAL, "recvmsg failed\n"); + return ret; + } + + if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) { + RTE_LOG(ERR, EAL, "truncted msg\n"); + return -1; + } + + for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL; + cmsg = CMSG_NXTHDR(&msgh, cmsg)) { + if ((cmsg->cmsg_level == SOL_SOCKET) && + (cmsg->cmsg_type == SCM_RIGHTS)) { + memcpy(fds, CMSG_DATA(cmsg), fdsize); + break; + } + } + + return ret; +} + +static int +process_msg(int fd) +{ + int len; + int params_len; + char buf[1024]; + int fds[8]; /* accept at most 8 FDs per message */ + struct msg_hdr *hdr; + struct action_entry *entry; + + len = read_msg(fd, buf, 1024, fds, 8); + if (len < 0) { + RTE_LOG(ERR, EAL, "failed to read message: %s\n", + strerror(errno)); + return -1; + } + + hdr = (struct msg_hdr *) buf; + + entry = find_action_entry_by_name(hdr->action_name); + if (entry == NULL) { + RTE_LOG(ERR, EAL, "cannot find action by: %s\n", + hdr->action_name); + return -1; + } + + params_len = len - sizeof(struct msg_hdr); + return entry->action(hdr->params, params_len, fds, hdr->fds_num); +} + +static void * +thread_primary(__attribute__((unused)) void *arg) +{ + int fd; + int i, n; + struct epoll_event event; + struct epoll_event *events; + + event.events = EPOLLIN | EPOLLRDHUP; + event.data.fd = fd_listen; + if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_listen, &event) < 0) { + RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n", + strerror(errno)); + exit(EXIT_FAILURE); + } + + events = calloc(20, sizeof event); + + while (1) { + n = epoll_wait(efd_pri_sec, events, 20, -1); + for (i = 0; i < n; i++) { + if (events[i].data.fd == fd_listen) { + if (events[i].events != EPOLLIN) { + RTE_LOG(ERR, EAL, "what happens?\n"); + exit(EXIT_FAILURE); + } + + fd = accept(fd_listen, NULL, NULL); + if (fd < 0) { + RTE_LOG(ERR, EAL, "primary failed to accept: %s\n", + strerror(errno)); + continue; + } + + event.data.fd = fd; + if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd, &event) < 0) { + RTE_LOG(ERR, EAL, "failed to add secondary: %s\n", + strerror(errno)); + continue; + } + if (add_sec_proc(fd) < 0) + RTE_LOG(ERR, EAL, "too many secondary processes\n"); + + continue; + } + + fd = events[i].data.fd; + + if ((events[i].events & (EPOLLERR | EPOLLHUP))) { + RTE_LOG(ERR, EAL, + "secondary process exit: %d\n", fd); + epoll_ctl(efd_pri_sec, EPOLL_CTL_DEL, fd, NULL); + del_sec_proc(fd); + continue; + } + + if ((events[i].events & EPOLLIN)) { + RTE_LOG(INFO, EAL, + "recv msg from secondary process\n"); + + process_msg(fd); + } + } + } + + return NULL; +} + +static void * +thread_secondary(__attribute__((unused)) void *arg) +{ + int fd; + int i, n; + struct epoll_event event; + struct epoll_event *events; + + event.events = EPOLLIN | EPOLLRDHUP; + event.data.fd = fd_to_pri; + if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_to_pri, &event) < 0) { + RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n", strerror(errno)); + exit(EXIT_FAILURE); + } + + events = calloc(20, sizeof event); + + while (1) { + n = epoll_wait(efd_pri_sec, events, 20, -1); + for (i = 0; i < n; i++) { + + fd = events[i].data.fd; + + if ((events[i].events & (EPOLLERR | EPOLLHUP))) { + RTE_LOG(ERR, EAL, "primary exits, so do I\n"); + /* Do we need exit secondary when primary exits? */ + exit(EXIT_FAILURE); + } + + if ((events[i].events & EPOLLIN)) { + RTE_LOG(INFO, EAL, + "recv msg from primary process\n"); + process_msg(fd); + } + } + } + + return NULL; +} + +int +rte_eal_primary_secondary_channel_init(void) +{ + int i, fd, ret; + const char *path; + struct sockaddr_un un; + pthread_t tid; + void*(*fn)(void *); + char thread_name[RTE_MAX_THREAD_NAME_LEN]; + + efd_pri_sec = epoll_create1(0); + if (efd_pri_sec < 0) { + RTE_LOG(ERR, EAL, "epoll_create1 failed\n"); + return -1; + } + + fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (fd < 0) { + RTE_LOG(ERR, EAL, "Failed to create unix socket"); + return -1; + } + + memset(&un, 0, sizeof(un)); + un.sun_family = AF_UNIX; + path = eal_primary_secondary_unix_path(); + strncpy(un.sun_path, path, sizeof(un.sun_path)); + un.sun_path[sizeof(un.sun_path) - 1] = '\0'; + + if (rte_eal_process_type() == RTE_PROC_PRIMARY) { + + for (i = 0; i < MAX_SECONDARY_PROCS; ++i) + fds_to_sec[i] = -1; + + if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) { + RTE_LOG(ERR, EAL, "cannot set nonblocking mode\n"); + close(fd); + return -1; + } + + /* The file still exists since last run */ + unlink(path); + + ret = bind(fd, (struct sockaddr *)&un, sizeof(un)); + if (ret < 0) { + RTE_LOG(ERR, EAL, "failed to bind to %s: %s\n", + path, strerror(errno)); + close(fd); + return -1; + } + RTE_LOG(INFO, EAL, "primary bind to %s\n", path); + + ret = listen(fd, 1024); + if (ret < 0) { + RTE_LOG(ERR, EAL, "failed to listen: %s", strerror(errno)); + close(fd); + return -1; + } + + fn = thread_primary; + fd_listen = fd; + } else { + ret = connect(fd, (struct sockaddr *)&un, sizeof(un)); + if (ret < 0) { + RTE_LOG(ERR, EAL, "failed to connect primary\n"); + return -1; + } + fn = thread_secondary; + fd_to_pri = fd; + } + + ret = pthread_create(&tid, NULL, fn, NULL); + if (ret < 0) { + RTE_LOG(ERR, EAL, "failed to create thead: %s\n", + strerror(errno)); + close(fd); + close(efd_pri_sec); + return -1; + } + + snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, + "ps_channel"); + ret = rte_thread_setname(tid, thread_name); + if (ret < 0) { + RTE_LOG(ERR, EAL, "failed to set thead name\n"); + close(fd); + close(efd_pri_sec); + return -1; + } + + return 0; +} + +static int +send_msg(int fd, struct msghdr *p_msgh) +{ + int ret; + + do { + ret = sendmsg(fd, p_msgh, 0); + } while (ret < 0 && errno == EINTR); + + return ret; +} + +int +rte_eal_primary_secondary_sendmsg(const char *action_name, + const void *params, + int len_params, + int fds[], + int fds_num) +{ + int i; + int ret = 0; + struct msghdr msgh; + struct iovec iov; + size_t fd_size = fds_num * sizeof(int); + char control[CMSG_SPACE(fd_size)]; + struct cmsghdr *cmsg; + struct msg_hdr *msg; + int len_msg; + + len_msg = sizeof(struct msg_hdr) + len_params; + msg = malloc(len_msg); + if (!msg) { + RTE_LOG(ERR, EAL, "Cannot alloc memory for msg"); + return -ENOMEM; + } + memset(msg, 0, len_msg); + strcpy(msg->action_name, action_name); + msg->fds_num = fds_num; + memcpy(msg->params, params, len_params); + + memset(&msgh, 0, sizeof(msgh)); + memset(control, 0, sizeof(control)); + + iov.iov_base = (uint8_t *)msg; + iov.iov_len = len_msg; + + msgh.msg_iov = &iov; + msgh.msg_iovlen = 1; + msgh.msg_control = control; + msgh.msg_controllen = sizeof(control); + + cmsg = CMSG_FIRSTHDR(&msgh); + cmsg->cmsg_len = CMSG_LEN(fd_size); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + memcpy(CMSG_DATA(cmsg), fds, fd_size); + + if (rte_eal_process_type() == RTE_PROC_PRIMARY) { + for (i = 0; i < MAX_SECONDARY_PROCS; ++i) { + if (fds_to_sec[i] == -1) + continue; + + ret = send_msg(fds_to_sec[i], &msgh); + if (ret < 0) + break; + } + } else { + ret = send_msg(fd_to_pri, &msgh); + } + + free(msg); + + return ret; +} diff --git a/lib/librte_eal/common/eal_filesystem.h b/lib/librte_eal/common/eal_filesystem.h index 8acbd99..78bb4fb 100644 --- a/lib/librte_eal/common/eal_filesystem.h +++ b/lib/librte_eal/common/eal_filesystem.h @@ -67,6 +67,24 @@ eal_runtime_config_path(void) return buffer; } +/** Path of primary/secondary communication unix socket file. */ +#define PRIMARY_SECONDARY_UNIX_PATH_FMT "%s/.%s_unix" +static inline const char * +eal_primary_secondary_unix_path(void) +{ + static char buffer[PATH_MAX]; /* static so auto-zeroed */ + const char *directory = default_config_dir; + const char *home_dir = getenv("HOME"); + + if (getuid() != 0 && home_dir != NULL) + directory = home_dir; + snprintf(buffer, sizeof(buffer) - 1, PRIMARY_SECONDARY_UNIX_PATH_FMT, + directory, internal_config.hugefile_prefix); + + return buffer; + +} + /** Path of hugepage info file. */ #define HUGEPAGE_INFO_FMT "%s/.%s_hugepage_info" diff --git a/lib/librte_eal/common/eal_private.h b/lib/librte_eal/common/eal_private.h index 597d82e..719b160 100644 --- a/lib/librte_eal/common/eal_private.h +++ b/lib/librte_eal/common/eal_private.h @@ -355,4 +355,14 @@ bool rte_eal_using_phys_addrs(void); */ struct rte_bus *rte_bus_find_by_device_name(const char *str); +/** + * Create the unix channel for primary/secondary communication. + * + * @return + * 0 on success; + * (<0) on failure. + */ + +int rte_eal_primary_secondary_channel_init(void); + #endif /* _EAL_PRIVATE_H_ */ diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h index 0e7363d..6cfc9d6 100644 --- a/lib/librte_eal/common/include/rte_eal.h +++ b/lib/librte_eal/common/include/rte_eal.h @@ -210,6 +210,80 @@ int rte_eal_init(int argc, char **argv); int rte_eal_primary_proc_alive(const char *config_file_path); /** + * Action function typedef used by other components. + * + * As we create unix socket channel for primary/secondary communication, use + * this function typedef to register action for coming messages. + */ +typedef int (rte_eal_primary_secondary_t)(const char *params, + int len, + int fds[], + int fds_num); +/** + * Register an action function for primary/secondary communication. + * + * Call this function to register an action, if the calling component wants + * to response the messages from the corresponding component in its primary + * process or secondary processes. + * + * @param action_name + * The action_name argument plays as the nonredundant key to find the action. + * + * @param action + * The action argument is the function pointer to the action function. + * + * @return + * - 0 on success. + * - (<0) on failure. + */ +int rte_eal_primary_secondary_add_action(const char *action_name, + rte_eal_primary_secondary_t action); +/** + * Unregister an action function for primary/secondary communication. + * + * Call this function to unregister an action if the calling component does + * not want to response the messages from the corresponding component in its + * primary process or secondary processes. + * + * @param action_name + * The action_name argument plays as the nonredundant key to find the action. + * + */ +void rte_eal_primary_secondary_del_action(const char *name); + +/** + * Send a message to the primary process or the secondary processes. + * + * This function will send a message which will be responsed by the action + * identified by action_name of the process on the other side. + * + * @param action_name + * The action_name argument is used to identify which action will be used. + * + * @param params + * The params argument contains the customized message. + * + * @param len_params + * The len_params argument is the length of the customized message. + * + * @param fds + * The fds argument is an array of fds sent with sendmsg. + * + * @param fds_num + * The fds_num argument is number of fds to be sent with sendmsg. + * + * @return + * - (>=0) on success. + * - (<0) on failure. + */ +int +rte_eal_primary_secondary_sendmsg(const char *action_name, + const void *params, + int len_params, + int fds[], + int fds_num); + +/** * Usage function typedef used by the application usage function. * * Use this function typedef to define and call rte_set_applcation_usage_hook() diff --git a/lib/librte_eal/linuxapp/eal/eal.c b/lib/librte_eal/linuxapp/eal/eal.c index 48f12f4..237c0b1 100644 --- a/lib/librte_eal/linuxapp/eal/eal.c +++ b/lib/librte_eal/linuxapp/eal/eal.c @@ -873,6 +873,12 @@ rte_eal_init(int argc, char **argv) eal_check_mem_on_local_socket(); + if (rte_eal_primary_secondary_channel_init() < 0) { + rte_eal_init_alert("Cannot create unix channel."); + rte_errno = EFAULT; + return -1; + } + if (eal_plugins_init() < 0) rte_eal_init_alert("Cannot init plugins\n"); diff --git a/lib/librte_eal/linuxapp/eal/rte_eal_version.map b/lib/librte_eal/linuxapp/eal/rte_eal_version.map index 3a8f154..c618aec 100644 --- a/lib/librte_eal/linuxapp/eal/rte_eal_version.map +++ b/lib/librte_eal/linuxapp/eal/rte_eal_version.map @@ -242,3 +242,11 @@ EXPERIMENTAL { rte_service_unregister; } DPDK_17.08; + +EXPERIMENTAL { + global: + + rte_eal_primary_secondary_add_action; + rte_eal_primary_secondary_del_action; + rte_eal_primary_secondary_sendmsg; +} DPDK_17.11; -- 2.7.4