From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga06.intel.com (mga06.intel.com [134.134.136.31]) by dpdk.org (Postfix) with ESMTP id 2B2A5199A9 for ; Mon, 18 Sep 2017 15:46:40 +0200 (CEST) Received: from fmsmga006.fm.intel.com ([10.253.24.20]) by orsmga104.jf.intel.com with ESMTP; 18 Sep 2017 06:46:39 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.42,413,1500966000"; d="scan'208";a="153086717" Received: from dpdk15.sh.intel.com ([10.67.111.77]) by fmsmga006.fm.intel.com with ESMTP; 18 Sep 2017 06:46:37 -0700 Date: Mon, 18 Sep 2017 21:49:35 +0800 From: Jiayu Hu To: Jianfeng Tan Cc: dev@dpdk.org, bruce.richardson@intel.com, konstantin.ananyev@intel.com, pablo.de.lara.guarch@intel.com, thomas@monjalon.net, yliu@fridaylinux.org, maxime.coquelin@redhat.com, mtetsuyah@gmail.com, ferruh.yigit@intel.com Message-ID: <20170918134934.GA93504@dpdk15.sh.intel.com> References: <1503654052-84730-1-git-send-email-jianfeng.tan@intel.com> <1503654052-84730-7-git-send-email-jianfeng.tan@intel.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <1503654052-84730-7-git-send-email-jianfeng.tan@intel.com> User-Agent: Mutt/1.7.1 (2016-10-04) Subject: Re: [dpdk-dev] [PATCH 06/12] eal: add channel for primary/secondary communication X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 18 Sep 2017 13:46:42 -0000 Hi Jianfeng, On Fri, Aug 25, 2017 at 09:40:46AM +0000, Jianfeng Tan wrote: > Previouly, there is only one way for primary/secondary to exchange > messages, that is, primary process writes info into some predefind > file, and secondary process reads info out. That cannot address > the requirements: > a. Secondary wants to send info to primary. > b. Sending info at any time, instead of just initialization time. > c. Share FD with the other side. If you can explain more about why the above three characters are required for enabling vdev in the secondary process here, that would be better. For example, vdev may hot plugin or remove, so the primary and the secondary process need to exchange data bidirectionally and dynamically. > > This patch proposes to create a communication channel (as an unix > socket connection) for above requirements. Can you give more explainations about how the channel works? Like both the primary and the secondary register actions for specific messages, and another thread is created to listen and react incoming messages. > > Three new APIs are added: > > 1. rte_eal_primary_secondary_add_action is used to register an action, > if the calling component wants to response the messages from the > corresponding component in its primary process or secondary processes. > 2. rte_eal_primary_secondary_del_action is used to unregister the > action if the calling component does not want to response the messages. > 3. rte_eal_primary_secondary_sendmsg is used to send a message. > > Signed-off-by: Jianfeng Tan > --- > lib/librte_eal/bsdapp/eal/rte_eal_version.map | 8 + > lib/librte_eal/common/eal_common_proc.c | 454 ++++++++++++++++++++++++ > lib/librte_eal/common/eal_filesystem.h | 18 + > lib/librte_eal/common/eal_private.h | 10 + > lib/librte_eal/common/include/rte_eal.h | 74 ++++ > lib/librte_eal/linuxapp/eal/eal.c | 6 + > lib/librte_eal/linuxapp/eal/rte_eal_version.map | 8 + > 7 files changed, 578 insertions(+) > > diff --git a/lib/librte_eal/bsdapp/eal/rte_eal_version.map b/lib/librte_eal/bsdapp/eal/rte_eal_version.map > index aac6fd7..f4ff29f 100644 > --- a/lib/librte_eal/bsdapp/eal/rte_eal_version.map > +++ b/lib/librte_eal/bsdapp/eal/rte_eal_version.map > @@ -237,3 +237,11 @@ EXPERIMENTAL { > rte_service_unregister; > > } DPDK_17.08; > + > +EXPERIMENTAL { > + global: > + > + rte_eal_primary_secondary_add_action; > + rte_eal_primary_secondary_del_action; > + rte_eal_primary_secondary_sendmsg; > +} DPDK_17.11; > diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c > index 60526ca..fa316bf 100644 > --- a/lib/librte_eal/common/eal_common_proc.c > +++ b/lib/librte_eal/common/eal_common_proc.c > @@ -33,8 +33,20 @@ > #include > #include > #include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > + > +#include > #include > +#include > > +#include "eal_private.h" > #include "eal_filesystem.h" > #include "eal_internal_cfg.h" > > @@ -59,3 +71,445 @@ rte_eal_primary_proc_alive(const char *config_file_path) > > return !!ret; > } > + > +struct action_entry { > + TAILQ_ENTRY(action_entry) next; /**< Next attached action entry*/ > + > +#define MAX_ACTION_NAME_LEN 64 > + char action_name[MAX_ACTION_NAME_LEN]; > + rte_eal_primary_secondary_t *action; > +}; > + > +/** Double linked list of actions. */ > +TAILQ_HEAD(action_entry_list, action_entry); > + > +static struct action_entry_list action_entry_list = > + TAILQ_HEAD_INITIALIZER(action_entry_list); > + > +static struct action_entry * > +find_action_entry_by_name(const char *name) > +{ > + int len = strlen(name); > + struct action_entry *entry; > + > + TAILQ_FOREACH(entry, &action_entry_list, next) { > + if (strncmp(entry->action_name, name, len) == 0) > + break; > + } > + > + return entry; > +} > + > +int > +rte_eal_primary_secondary_add_action(const char *action_name, > + rte_eal_primary_secondary_t action) > +{ > + struct action_entry *entry = malloc(sizeof(struct action_entry)); > + > + if (entry == NULL) > + return -ENOMEM; > + > + strncpy(entry->action_name, action_name, MAX_ACTION_NAME_LEN); > + entry->action = action; In struct action_entry, the type of action is 'rte_eal_primary_secondary_t *', but you assign an object to action here. > + TAILQ_INSERT_TAIL(&action_entry_list, entry, next); What would happen if register two actions for a same message name? > + return 0; > +} > + > +void > +rte_eal_primary_secondary_del_action(const char *name) > +{ > + struct action_entry *entry = find_action_entry_by_name(name); > + > + TAILQ_REMOVE(&action_entry_list, entry, next); > + free(entry); > +} > + > +#define MAX_SECONDARY_PROCS 8 A simple question: why the max number is 8? > + > +static int efd_pri_sec; /* epoll fd for primary/secondary channel thread */ > +static int fd_listen; /* unix listen socket by primary */ > +static int fd_to_pri; /* only used by secondary process */ > +static int fds_to_sec[MAX_SECONDARY_PROCS]; > + > +struct msg_hdr { > + char action_name[MAX_ACTION_NAME_LEN]; > + int fds_num; > + char params[0]; > +} __rte_packed; > + > +static int > +add_sec_proc(int fd) > +{ > + int i; > + > + for (i = 0; i < MAX_SECONDARY_PROCS; ++i) > + if (fds_to_sec[i] == -1) > + break; > + > + if (i >= MAX_SECONDARY_PROCS) > + return -1; > + > + fds_to_sec[i] = fd; > + > + return i; > +} > + > +static void > +del_sec_proc(int fd) > +{ > + int i; > + > + for (i = 0; i < MAX_SECONDARY_PROCS; ++i) { > + if (fds_to_sec[i] == fd) { > + fds_to_sec[i] = -1; > + break; > + } > + } > +} > + > +static int > +read_msg(int sockfd, char *buf, int buflen, int *fds, int fds_num) > +{ > + struct iovec iov; > + struct msghdr msgh; > + size_t fdsize = fds_num * sizeof(int); > + char control[CMSG_SPACE(fdsize)]; > + struct cmsghdr *cmsg; > + int ret; > + > + memset(&msgh, 0, sizeof(msgh)); > + iov.iov_base = buf; > + iov.iov_len = buflen; > + > + msgh.msg_iov = &iov; > + msgh.msg_iovlen = 1; > + msgh.msg_control = control; > + msgh.msg_controllen = sizeof(control); > + > + ret = recvmsg(sockfd, &msgh, 0); > + if (ret <= 0) { > + RTE_LOG(ERR, EAL, "recvmsg failed\n"); > + return ret; > + } > + > + if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) { > + RTE_LOG(ERR, EAL, "truncted msg\n"); > + return -1; > + } > + > + for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL; > + cmsg = CMSG_NXTHDR(&msgh, cmsg)) { > + if ((cmsg->cmsg_level == SOL_SOCKET) && > + (cmsg->cmsg_type == SCM_RIGHTS)) { > + memcpy(fds, CMSG_DATA(cmsg), fdsize); > + break; > + } > + } > + > + return ret; > +} > + > +static int > +process_msg(int fd) > +{ > + int len; > + int params_len; > + char buf[1024]; > + int fds[8]; /* accept at most 8 FDs per message */ > + struct msg_hdr *hdr; > + struct action_entry *entry; > + > + len = read_msg(fd, buf, 1024, fds, 8); > + if (len < 0) { > + RTE_LOG(ERR, EAL, "failed to read message: %s\n", > + strerror(errno)); > + return -1; > + } > + > + hdr = (struct msg_hdr *) buf; > + > + entry = find_action_entry_by_name(hdr->action_name); > + if (entry == NULL) { > + RTE_LOG(ERR, EAL, "cannot find action by: %s\n", > + hdr->action_name); > + return -1; > + } > + > + params_len = len - sizeof(struct msg_hdr); > + return entry->action(hdr->params, params_len, fds, hdr->fds_num); > +} > + > +static void * > +thread_primary(__attribute__((unused)) void *arg) > +{ > + int fd; > + int i, n; > + struct epoll_event event; > + struct epoll_event *events; > + > + event.events = EPOLLIN | EPOLLRDHUP; > + event.data.fd = fd_listen; > + if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_listen, &event) < 0) { > + RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n", > + strerror(errno)); > + exit(EXIT_FAILURE); > + } > + > + events = calloc(20, sizeof event); > + > + while (1) { > + n = epoll_wait(efd_pri_sec, events, 20, -1); > + for (i = 0; i < n; i++) { > + if (events[i].data.fd == fd_listen) { > + if (events[i].events != EPOLLIN) { > + RTE_LOG(ERR, EAL, "what happens?\n"); > + exit(EXIT_FAILURE); > + } > + > + fd = accept(fd_listen, NULL, NULL); > + if (fd < 0) { > + RTE_LOG(ERR, EAL, "primary failed to accept: %s\n", > + strerror(errno)); > + continue; > + } > + > + event.data.fd = fd; > + if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd, &event) < 0) { > + RTE_LOG(ERR, EAL, "failed to add secondary: %s\n", > + strerror(errno)); > + continue; > + } > + if (add_sec_proc(fd) < 0) > + RTE_LOG(ERR, EAL, "too many secondary processes\n"); > + > + continue; > + } > + > + fd = events[i].data.fd; > + > + if ((events[i].events & (EPOLLERR | EPOLLHUP))) { > + RTE_LOG(ERR, EAL, > + "secondary process exit: %d\n", fd); > + epoll_ctl(efd_pri_sec, EPOLL_CTL_DEL, fd, NULL); > + del_sec_proc(fd); > + continue; > + } > + > + if ((events[i].events & EPOLLIN)) { > + RTE_LOG(INFO, EAL, > + "recv msg from secondary process\n"); > + > + process_msg(fd); > + } > + } > + } > + > + return NULL; > +} > + > +static void * > +thread_secondary(__attribute__((unused)) void *arg) > +{ > + int fd; > + int i, n; > + struct epoll_event event; > + struct epoll_event *events; > + > + event.events = EPOLLIN | EPOLLRDHUP; > + event.data.fd = fd_to_pri; > + if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_to_pri, &event) < 0) { > + RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n", strerror(errno)); > + exit(EXIT_FAILURE); > + } > + > + events = calloc(20, sizeof event); > + > + while (1) { > + n = epoll_wait(efd_pri_sec, events, 20, -1); > + for (i = 0; i < n; i++) { > + > + fd = events[i].data.fd; > + > + if ((events[i].events & (EPOLLERR | EPOLLHUP))) { > + RTE_LOG(ERR, EAL, "primary exits, so do I\n"); > + /* Do we need exit secondary when primary exits? */ > + exit(EXIT_FAILURE); > + } > + > + if ((events[i].events & EPOLLIN)) { > + RTE_LOG(INFO, EAL, > + "recv msg from primary process\n"); > + process_msg(fd); > + } > + } > + } > + > + return NULL; > +} > + > +int > +rte_eal_primary_secondary_channel_init(void) > +{ > + int i, fd, ret; > + const char *path; > + struct sockaddr_un un; > + pthread_t tid; > + void*(*fn)(void *); > + char thread_name[RTE_MAX_THREAD_NAME_LEN]; > + > + efd_pri_sec = epoll_create1(0); > + if (efd_pri_sec < 0) { > + RTE_LOG(ERR, EAL, "epoll_create1 failed\n"); > + return -1; > + } > + > + fd = socket(AF_UNIX, SOCK_STREAM, 0); > + if (fd < 0) { > + RTE_LOG(ERR, EAL, "Failed to create unix socket"); > + return -1; > + } > + > + memset(&un, 0, sizeof(un)); > + un.sun_family = AF_UNIX; > + path = eal_primary_secondary_unix_path(); > + strncpy(un.sun_path, path, sizeof(un.sun_path)); > + un.sun_path[sizeof(un.sun_path) - 1] = '\0'; > + > + if (rte_eal_process_type() == RTE_PROC_PRIMARY) { > + > + for (i = 0; i < MAX_SECONDARY_PROCS; ++i) > + fds_to_sec[i] = -1; > + > + if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) { > + RTE_LOG(ERR, EAL, "cannot set nonblocking mode\n"); > + close(fd); > + return -1; > + } > + > + /* The file still exists since last run */ > + unlink(path); > + > + ret = bind(fd, (struct sockaddr *)&un, sizeof(un)); > + if (ret < 0) { > + RTE_LOG(ERR, EAL, "failed to bind to %s: %s\n", > + path, strerror(errno)); > + close(fd); > + return -1; > + } > + RTE_LOG(INFO, EAL, "primary bind to %s\n", path); > + > + ret = listen(fd, 1024); > + if (ret < 0) { > + RTE_LOG(ERR, EAL, "failed to listen: %s", strerror(errno)); > + close(fd); > + return -1; > + } > + > + fn = thread_primary; > + fd_listen = fd; > + } else { > + ret = connect(fd, (struct sockaddr *)&un, sizeof(un)); > + if (ret < 0) { > + RTE_LOG(ERR, EAL, "failed to connect primary\n"); > + return -1; > + } > + fn = thread_secondary; > + fd_to_pri = fd; > + } > + > + ret = pthread_create(&tid, NULL, fn, NULL); > + if (ret < 0) { > + RTE_LOG(ERR, EAL, "failed to create thead: %s\n", > + strerror(errno)); > + close(fd); > + close(efd_pri_sec); > + return -1; > + } > + > + snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, > + "ps_channel"); > + ret = rte_thread_setname(tid, thread_name); > + if (ret < 0) { > + RTE_LOG(ERR, EAL, "failed to set thead name\n"); > + close(fd); > + close(efd_pri_sec); > + return -1; > + } > + > + return 0; > +} > + > +static int > +send_msg(int fd, struct msghdr *p_msgh) > +{ > + int ret; > + > + do { > + ret = sendmsg(fd, p_msgh, 0); > + } while (ret < 0 && errno == EINTR); > + > + return ret; > +} > + > +int > +rte_eal_primary_secondary_sendmsg(const char *action_name, > + const void *params, > + int len_params, > + int fds[], > + int fds_num) > +{ > + int i; > + int ret = 0; > + struct msghdr msgh; > + struct iovec iov; > + size_t fd_size = fds_num * sizeof(int); > + char control[CMSG_SPACE(fd_size)]; > + struct cmsghdr *cmsg; > + struct msg_hdr *msg; > + int len_msg; > + > + len_msg = sizeof(struct msg_hdr) + len_params; > + msg = malloc(len_msg); > + if (!msg) { > + RTE_LOG(ERR, EAL, "Cannot alloc memory for msg"); > + return -ENOMEM; > + } > + memset(msg, 0, len_msg); > + strcpy(msg->action_name, action_name); > + msg->fds_num = fds_num; > + memcpy(msg->params, params, len_params); > + > + memset(&msgh, 0, sizeof(msgh)); > + memset(control, 0, sizeof(control)); > + > + iov.iov_base = (uint8_t *)msg; > + iov.iov_len = len_msg; > + > + msgh.msg_iov = &iov; > + msgh.msg_iovlen = 1; > + msgh.msg_control = control; > + msgh.msg_controllen = sizeof(control); > + > + cmsg = CMSG_FIRSTHDR(&msgh); > + cmsg->cmsg_len = CMSG_LEN(fd_size); > + cmsg->cmsg_level = SOL_SOCKET; > + cmsg->cmsg_type = SCM_RIGHTS; > + memcpy(CMSG_DATA(cmsg), fds, fd_size); > + > + if (rte_eal_process_type() == RTE_PROC_PRIMARY) { > + for (i = 0; i < MAX_SECONDARY_PROCS; ++i) { > + if (fds_to_sec[i] == -1) > + continue; > + > + ret = send_msg(fds_to_sec[i], &msgh); > + if (ret < 0) > + break; > + } > + } else { > + ret = send_msg(fd_to_pri, &msgh); > + } > + > + free(msg); > + > + return ret; > +} > diff --git a/lib/librte_eal/common/eal_filesystem.h b/lib/librte_eal/common/eal_filesystem.h > index 8acbd99..78bb4fb 100644 > --- a/lib/librte_eal/common/eal_filesystem.h > +++ b/lib/librte_eal/common/eal_filesystem.h > @@ -67,6 +67,24 @@ eal_runtime_config_path(void) > return buffer; > } > > +/** Path of primary/secondary communication unix socket file. */ > +#define PRIMARY_SECONDARY_UNIX_PATH_FMT "%s/.%s_unix" > +static inline const char * > +eal_primary_secondary_unix_path(void) > +{ > + static char buffer[PATH_MAX]; /* static so auto-zeroed */ > + const char *directory = default_config_dir; > + const char *home_dir = getenv("HOME"); > + > + if (getuid() != 0 && home_dir != NULL) > + directory = home_dir; > + snprintf(buffer, sizeof(buffer) - 1, PRIMARY_SECONDARY_UNIX_PATH_FMT, > + directory, internal_config.hugefile_prefix); > + > + return buffer; > + > +} > + > /** Path of hugepage info file. */ > #define HUGEPAGE_INFO_FMT "%s/.%s_hugepage_info" > > diff --git a/lib/librte_eal/common/eal_private.h b/lib/librte_eal/common/eal_private.h > index 597d82e..719b160 100644 > --- a/lib/librte_eal/common/eal_private.h > +++ b/lib/librte_eal/common/eal_private.h > @@ -355,4 +355,14 @@ bool rte_eal_using_phys_addrs(void); > */ > struct rte_bus *rte_bus_find_by_device_name(const char *str); > > +/** > + * Create the unix channel for primary/secondary communication. > + * > + * @return > + * 0 on success; > + * (<0) on failure. > + */ > + > +int rte_eal_primary_secondary_channel_init(void); > + > #endif /* _EAL_PRIVATE_H_ */ > diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h > index 0e7363d..6cfc9d6 100644 > --- a/lib/librte_eal/common/include/rte_eal.h > +++ b/lib/librte_eal/common/include/rte_eal.h > @@ -210,6 +210,80 @@ int rte_eal_init(int argc, char **argv); > int rte_eal_primary_proc_alive(const char *config_file_path); > > /** > + * Action function typedef used by other components. > + * > + * As we create unix socket channel for primary/secondary communication, use > + * this function typedef to register action for coming messages. > + */ > +typedef int (rte_eal_primary_secondary_t)(const char *params, > + int len, > + int fds[], > + int fds_num); > +/** > + * Register an action function for primary/secondary communication. > + * > + * Call this function to register an action, if the calling component wants > + * to response the messages from the corresponding component in its primary > + * process or secondary processes. > + * > + * @param action_name > + * The action_name argument plays as the nonredundant key to find the action. > + * > + * @param action > + * The action argument is the function pointer to the action function. > + * > + * @return > + * - 0 on success. > + * - (<0) on failure. > + */ > +int rte_eal_primary_secondary_add_action(const char *action_name, > + rte_eal_primary_secondary_t action); > +/** > + * Unregister an action function for primary/secondary communication. > + * > + * Call this function to unregister an action if the calling component does > + * not want to response the messages from the corresponding component in its > + * primary process or secondary processes. > + * > + * @param action_name > + * The action_name argument plays as the nonredundant key to find the action. > + * > + */ > +void rte_eal_primary_secondary_del_action(const char *name); > + > +/** > + * Send a message to the primary process or the secondary processes. > + * > + * This function will send a message which will be responsed by the action > + * identified by action_name of the process on the other side. > + * > + * @param action_name > + * The action_name argument is used to identify which action will be used. > + * > + * @param params > + * The params argument contains the customized message. > + * > + * @param len_params > + * The len_params argument is the length of the customized message. > + * > + * @param fds > + * The fds argument is an array of fds sent with sendmsg. > + * > + * @param fds_num > + * The fds_num argument is number of fds to be sent with sendmsg. > + * > + * @return > + * - (>=0) on success. > + * - (<0) on failure. > + */ > +int > +rte_eal_primary_secondary_sendmsg(const char *action_name, > + const void *params, > + int len_params, > + int fds[], > + int fds_num); > + > +/** > * Usage function typedef used by the application usage function. > * > * Use this function typedef to define and call rte_set_applcation_usage_hook() > diff --git a/lib/librte_eal/linuxapp/eal/eal.c b/lib/librte_eal/linuxapp/eal/eal.c > index 48f12f4..237c0b1 100644 > --- a/lib/librte_eal/linuxapp/eal/eal.c > +++ b/lib/librte_eal/linuxapp/eal/eal.c > @@ -873,6 +873,12 @@ rte_eal_init(int argc, char **argv) > > eal_check_mem_on_local_socket(); > > + if (rte_eal_primary_secondary_channel_init() < 0) { > + rte_eal_init_alert("Cannot create unix channel."); > + rte_errno = EFAULT; > + return -1; > + } > + > if (eal_plugins_init() < 0) > rte_eal_init_alert("Cannot init plugins\n"); > > diff --git a/lib/librte_eal/linuxapp/eal/rte_eal_version.map b/lib/librte_eal/linuxapp/eal/rte_eal_version.map > index 3a8f154..c618aec 100644 > --- a/lib/librte_eal/linuxapp/eal/rte_eal_version.map > +++ b/lib/librte_eal/linuxapp/eal/rte_eal_version.map > @@ -242,3 +242,11 @@ EXPERIMENTAL { > rte_service_unregister; > > } DPDK_17.08; > + > +EXPERIMENTAL { > + global: > + > + rte_eal_primary_secondary_add_action; > + rte_eal_primary_secondary_del_action; > + rte_eal_primary_secondary_sendmsg; > +} DPDK_17.11; > -- > 2.7.4