From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga02.intel.com (mga02.intel.com [134.134.136.20]) by dpdk.org (Postfix) with ESMTP id 7E2CF1B1C3 for ; Thu, 11 Jan 2018 05:05:51 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga006.fm.intel.com ([10.253.24.20]) by orsmga101.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 10 Jan 2018 20:05:50 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.46,343,1511856000"; d="scan'208";a="194041989" Received: from dpdk06.sh.intel.com ([10.67.110.196]) by fmsmga006.fm.intel.com with ESMTP; 10 Jan 2018 20:05:49 -0800 From: Jianfeng Tan To: dev@dpdk.org Cc: anatoly.burakov@intel.com, bruce.richardson@intel.com, konstantin.ananyev@intel.com, thomas@monjalon.net, Jianfeng Tan Date: Thu, 11 Jan 2018 04:07:33 +0000 Message-Id: <1515643654-129489-4-git-send-email-jianfeng.tan@intel.com> X-Mailer: git-send-email 2.7.4 In-Reply-To: <1515643654-129489-1-git-send-email-jianfeng.tan@intel.com> References: <1512067450-59203-1-git-send-email-jianfeng.tan@intel.com> <1515643654-129489-1-git-send-email-jianfeng.tan@intel.com> Subject: [dpdk-dev] [PATCH v2 3/4] eal: add synchronous multi-process communication X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 11 Jan 2018 04:05:52 -0000 We need the synchronous way for multi-process communication, i.e., blockingly waiting for reply message when we send a request to the peer process. We add two APIs rte_eal_mp_request() and rte_eal_mp_reply() for such use case. By invoking rte_eal_mp_request(), a request message is sent out, and then it waits there for a reply message. The timeout is hard-coded 5 Sec. And the replied message will be copied in the parameters of this API so that the caller can decide how to translate those information (including params and fds). Note if a primary process owns multiple secondary processes, this API will fail. The API rte_eal_mp_reply() is always called by an mp action handler. Here we add another parameter for rte_eal_mp_t so that the action handler knows which peer address to reply. We use mutex in rte_eal_mp_request() to guarantee that only one request is on the fly for one pair of processes. Suggested-by: Anatoly Burakov Signed-off-by: Jianfeng Tan --- lib/librte_eal/common/eal_common_proc.c | 144 +++++++++++++++++++++++++++++--- lib/librte_eal/common/include/rte_eal.h | 73 +++++++++++++++- lib/librte_eal/rte_eal_version.map | 2 + 3 files changed, 206 insertions(+), 13 deletions(-) diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c index 70519cc..f194a52 100644 --- a/lib/librte_eal/common/eal_common_proc.c +++ b/lib/librte_eal/common/eal_common_proc.c @@ -32,6 +32,7 @@ static int mp_fd = -1; static char *mp_sec_sockets[MAX_SECONDARY_PROCS]; static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t mp_mutex_request = PTHREAD_MUTEX_INITIALIZER; struct action_entry { TAILQ_ENTRY(action_entry) next; /**< Next attached action entry */ @@ -49,6 +50,10 @@ static struct action_entry_list action_entry_list = struct mp_msghdr { char action_name[MAX_ACTION_NAME_LEN]; +#define MP_MSG 0 /* Share message with peers, will not block */ +#define MP_REQ 1 /* Request for information, Will block for a reply */ +#define MP_REP 2 /* Reply to previously-received request */ + int type; int fds_num; int len_params; char params[0]; @@ -138,7 +143,8 @@ rte_eal_mp_action_unregister(const char *name) } static int -read_msg(int fd, char *buf, int buflen, int *fds, int fds_num) +read_msg(int fd, char *buf, int buflen, + int *fds, int fds_num, struct sockaddr_un *s) { int ret; struct iovec iov; @@ -151,6 +157,8 @@ read_msg(int fd, char *buf, int buflen, int *fds, int fds_num) iov.iov_base = buf; iov.iov_len = buflen; + msgh.msg_name = s; + msgh.msg_namelen = sizeof(*s); msgh.msg_iov = &iov; msgh.msg_iovlen = 1; msgh.msg_control = control; @@ -181,7 +189,7 @@ read_msg(int fd, char *buf, int buflen, int *fds, int fds_num) } static int -process_msg(struct mp_msghdr *hdr, int len, int fds[]) +process_msg(struct mp_msghdr *hdr, int len, int fds[], struct sockaddr_un *s) { int ret; int params_len; @@ -199,10 +207,10 @@ process_msg(struct mp_msghdr *hdr, int len, int fds[]) } params_len = len - sizeof(struct mp_msghdr); - ret = entry->action(hdr->params, params_len, fds, hdr->fds_num); + ret = entry->action(hdr->params, params_len, + fds, hdr->fds_num, s->sun_path); pthread_mutex_unlock(&mp_mutex_action); return ret; - } static void * @@ -211,11 +219,12 @@ mp_handle(void *arg __rte_unused) int len; int fds[SCM_MAX_FD]; char buf[MAX_MSG_LENGTH]; + struct sockaddr_un sa; while (1) { - len = read_msg(mp_fd, buf, MAX_MSG_LENGTH, fds, SCM_MAX_FD); + len = read_msg(mp_fd, buf, MAX_MSG_LENGTH, fds, SCM_MAX_FD, &sa); if (len > 0) - process_msg((struct mp_msghdr *)buf, len, fds); + process_msg((struct mp_msghdr *)buf, len, fds, &sa); } return NULL; @@ -255,7 +264,8 @@ static int mp_primary_proc(const void *params, int len __rte_unused, int fds[] __rte_unused, - int fds_num __rte_unused) + int fds_num __rte_unused, + const void *peer __rte_unused) { const struct proc_request *r = (const struct proc_request *)params; @@ -362,7 +372,8 @@ rte_eal_mp_channel_init(void) } static inline struct mp_msghdr * -format_msg(const char *act_name, const void *p, int len_params, int fds_num) +format_msg(const char *act_name, const void *p, + int len_params, int fds_num, int type) { int len_msg; struct mp_msghdr *msg; @@ -384,6 +395,7 @@ format_msg(const char *act_name, const void *p, int len_params, int fds_num) strcpy(msg->action_name, act_name); msg->fds_num = fds_num; msg->len_params = len_params; + msg->type = type; memcpy(msg->params, p, len_params); return msg; } @@ -455,7 +467,9 @@ mp_send(const char *action_name, const void *params, int len_params, int fds[], - int fds_num) + int fds_num, + int type, + const void *peer) { int i; int n = 0; @@ -468,7 +482,7 @@ mp_send(const char *action_name, return 0; } - msg = format_msg(action_name, params, len_params, fds_num); + msg = format_msg(action_name, params, len_params, fds_num, type); if (msg == NULL) return 0; @@ -477,6 +491,11 @@ mp_send(const char *action_name, return 0; } + if (peer) { + n += send_msg(sockfd, peer, msg, fds); + goto ret; + } + if (rte_eal_process_type() == RTE_PROC_PRIMARY) { /* broadcast to all secondaries */ for (i = 0; i < MAX_SECONDARY_PROCS; ++i) { @@ -488,6 +507,7 @@ mp_send(const char *action_name, } else n += send_msg(sockfd, eal_mp_unix_path(), msg, fds); +ret: free(msg); close(sockfd); return n; @@ -501,5 +521,107 @@ rte_eal_mp_sendmsg(const char *action_name, int fds_num) { RTE_LOG(DEBUG, EAL, "sendmsg: %s\n", action_name); - return mp_send(action_name, params, len_params, fds, fds_num); + return mp_send(action_name, params, len_params, + fds, fds_num, MP_MSG, NULL); +} + +int +rte_eal_mp_request(const char *action_name, + void *params, + int len_p, + int fds[], + int fds_in, + int fds_out) +{ + int i, j; + int sockfd; + int nprocs; + int ret = 0; + struct mp_msghdr *req; + struct timeval tv; + char buf[MAX_MSG_LENGTH]; + struct mp_msghdr *hdr; + + RTE_LOG(DEBUG, EAL, "request: %s\n", action_name); + + if (fds_in > SCM_MAX_FD || fds_out > SCM_MAX_FD) { + RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n", SCM_MAX_FD); + rte_errno = -E2BIG; + return 0; + } + + req = format_msg(action_name, params, len_p, fds_in, MP_REQ); + if (req == NULL) + return 0; + + if ((sockfd = open_unix_fd(0)) < 0) { + free(req); + return 0; + } + + tv.tv_sec = 5; /* 5 Secs Timeout */ + tv.tv_usec = 0; + if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, + (const void *)&tv, sizeof(struct timeval)) < 0) + RTE_LOG(INFO, EAL, "Failed to set recv timeout\n"); + + /* Only allow one req at a time */ + pthread_mutex_lock(&mp_mutex_request); + + if (rte_eal_process_type() == RTE_PROC_PRIMARY) { + nprocs = 0; + for (i = 0; i < MAX_SECONDARY_PROCS; ++i) + if (!mp_sec_sockets[i]) { + j = i; + nprocs++; + } + + if (nprocs > 1) { + RTE_LOG(ERR, EAL, + "multi secondary processes not supported\n"); + goto free_and_ret; + } + + ret = send_msg(sockfd, mp_sec_sockets[j], req, fds); + } else + ret = send_msg(sockfd, eal_mp_unix_path(), req, fds); + + if (ret == 0) { + RTE_LOG(ERR, EAL, "failed to send request: %s\n", action_name); + ret = -1; + goto free_and_ret; + } + + ret = read_msg(sockfd, buf, MAX_MSG_LENGTH, fds, fds_out, NULL); + if (ret > 0) { + hdr = (struct mp_msghdr *)buf; + if (hdr->len_params == len_p) + memcpy(params, hdr->params, len_p); + else { + RTE_LOG(ERR, EAL, "invalid reply\n"); + ret = 0; + } + } + +free_and_ret: + free(req); + close(sockfd); + pthread_mutex_unlock(&mp_mutex_request); + return ret; +} + +int +rte_eal_mp_reply(const char *action_name, + const void *params, + int len_p, + int fds[], + int fds_in, + const void *peer) +{ + RTE_LOG(DEBUG, EAL, "reply: %s\n", action_name); + if (peer == NULL) { + RTE_LOG(ERR, EAL, "peer is not specified\n"); + return 0; + } + return mp_send(action_name, params, len_p, fds, fds_in, MP_REP, peer); } diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h index 9884c0b..2690a77 100644 --- a/lib/librte_eal/common/include/rte_eal.h +++ b/lib/librte_eal/common/include/rte_eal.h @@ -192,7 +192,7 @@ int rte_eal_primary_proc_alive(const char *config_file_path); * this function typedef to register action for coming messages. */ typedef int (*rte_eal_mp_t)(const void *params, int len, - int fds[], int fds_num); + int fds[], int fds_num, const void *peer); /** * Register an action function for primary/secondary communication. @@ -245,7 +245,7 @@ void rte_eal_mp_action_unregister(const char *name); * The fds argument is an array of fds sent with sendmsg. * * @param fds_num - * The fds_num argument is number of fds to be sent with sendmsg. + * The number of fds to be sent with sendmsg. * * @return * - Returns the number of messages being sent successfully. @@ -255,6 +255,75 @@ rte_eal_mp_sendmsg(const char *action_name, const void *params, int len_params, int fds[], int fds_num); /** + * Send a request to the peer process and expect a reply. + * + * This function sends a request message to the peer process, and will + * block until receiving reply message from the peer process. Note: + * this does not work for the primary process sending requests to its + * multiple (>1) secondary processes. + * + * @param action_name + * The action_name argument is used to identify which action will be used. + * + * @param params + * The params argument contains the customized message; as the reply is + * received, the replied params will be copied to this pointer. + * + * @param len_p + * The length of the customized message. + * + * @param fds + * The fds argument is an array of fds sent with sendmsg; as the reply + * is received, the replied fds will be copied into this array. + * + * @param fds_in + * The number of fds to be sent. + * + * @param fds_out + * The number of fds to be received. + * + * @return + * - (1) on success; + * - (0) on sending request successfully but no valid reply received. + * - (<0) on failing to sending request. + */ +int +rte_eal_mp_request(const char *action_name, void *params, + int len_p, int fds[], int fds_in, int fds_out); + +/** + * Send a reply to the peer process. + * + * This function will send a reply message in response to a request message + * received previously. + * + * @param action_name + * The action_name argument is used to identify which action will be used. + * + * @param params + * The params argument contains the customized message. + * + * @param len_p + * The length of the customized message. + * + * @param fds + * The fds argument is an array of fds sent with sendmsg. + * + * @param fds_in + * The number of fds to be sent with sendmsg. + * + * @param peer + * The fds_num argument is number of fds to be sent with sendmsg. + * + * @return + * - (1) on success; + * - (0) on failure. + */ +int +rte_eal_mp_reply(const char *action_name, const void *params, + int len_p, int fds[], int fds_in, const void *peer); + +/** * Usage function typedef used by the application usage function. * * Use this function typedef to define and call rte_set_application_usage_hook() diff --git a/lib/librte_eal/rte_eal_version.map b/lib/librte_eal/rte_eal_version.map index 5dacde5..068ac0b 100644 --- a/lib/librte_eal/rte_eal_version.map +++ b/lib/librte_eal/rte_eal_version.map @@ -243,5 +243,7 @@ DPDK_18.02 { rte_eal_mp_action_register; rte_eal_mp_action_unregister; rte_eal_mp_sendmsg; + rte_eal_mp_request; + rte_eal_mp_reply; } DPDK_17.11; -- 2.7.4