From: Jianfeng Tan <jianfeng.tan@intel.com>
To: dev@dpdk.org
Cc: anatoly.burakov@intel.com, bruce.richardson@intel.com,
konstantin.ananyev@intel.com, thomas@monjalon.net,
Jianfeng Tan <jianfeng.tan@intel.com>
Subject: [dpdk-dev] [PATCH v2 3/4] eal: add synchronous multi-process communication
Date: Thu, 11 Jan 2018 04:07:33 +0000 [thread overview]
Message-ID: <1515643654-129489-4-git-send-email-jianfeng.tan@intel.com> (raw)
In-Reply-To: <1515643654-129489-1-git-send-email-jianfeng.tan@intel.com>
We need the synchronous way for multi-process communication,
i.e., blockingly waiting for reply message when we send a request
to the peer process.
We add two APIs rte_eal_mp_request() and rte_eal_mp_reply() for
such use case. By invoking rte_eal_mp_request(), a request message
is sent out, and then it waits there for a reply message. The
timeout is hard-coded 5 Sec. And the replied message will be copied
in the parameters of this API so that the caller can decide how
to translate those information (including params and fds). Note
if a primary process owns multiple secondary processes, this API
will fail.
The API rte_eal_mp_reply() is always called by an mp action handler.
Here we add another parameter for rte_eal_mp_t so that the action
handler knows which peer address to reply.
We use mutex in rte_eal_mp_request() to guarantee that only one
request is on the fly for one pair of processes.
Suggested-by: Anatoly Burakov <anatoly.burakov@intel.com>
Signed-off-by: Jianfeng Tan <jianfeng.tan@intel.com>
---
lib/librte_eal/common/eal_common_proc.c | 144 +++++++++++++++++++++++++++++---
lib/librte_eal/common/include/rte_eal.h | 73 +++++++++++++++-
lib/librte_eal/rte_eal_version.map | 2 +
3 files changed, 206 insertions(+), 13 deletions(-)
diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index 70519cc..f194a52 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -32,6 +32,7 @@
static int mp_fd = -1;
static char *mp_sec_sockets[MAX_SECONDARY_PROCS];
static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t mp_mutex_request = PTHREAD_MUTEX_INITIALIZER;
struct action_entry {
TAILQ_ENTRY(action_entry) next; /**< Next attached action entry */
@@ -49,6 +50,10 @@ static struct action_entry_list action_entry_list =
struct mp_msghdr {
char action_name[MAX_ACTION_NAME_LEN];
+#define MP_MSG 0 /* Share message with peers, will not block */
+#define MP_REQ 1 /* Request for information, Will block for a reply */
+#define MP_REP 2 /* Reply to previously-received request */
+ int type;
int fds_num;
int len_params;
char params[0];
@@ -138,7 +143,8 @@ rte_eal_mp_action_unregister(const char *name)
}
static int
-read_msg(int fd, char *buf, int buflen, int *fds, int fds_num)
+read_msg(int fd, char *buf, int buflen,
+ int *fds, int fds_num, struct sockaddr_un *s)
{
int ret;
struct iovec iov;
@@ -151,6 +157,8 @@ read_msg(int fd, char *buf, int buflen, int *fds, int fds_num)
iov.iov_base = buf;
iov.iov_len = buflen;
+ msgh.msg_name = s;
+ msgh.msg_namelen = sizeof(*s);
msgh.msg_iov = &iov;
msgh.msg_iovlen = 1;
msgh.msg_control = control;
@@ -181,7 +189,7 @@ read_msg(int fd, char *buf, int buflen, int *fds, int fds_num)
}
static int
-process_msg(struct mp_msghdr *hdr, int len, int fds[])
+process_msg(struct mp_msghdr *hdr, int len, int fds[], struct sockaddr_un *s)
{
int ret;
int params_len;
@@ -199,10 +207,10 @@ process_msg(struct mp_msghdr *hdr, int len, int fds[])
}
params_len = len - sizeof(struct mp_msghdr);
- ret = entry->action(hdr->params, params_len, fds, hdr->fds_num);
+ ret = entry->action(hdr->params, params_len,
+ fds, hdr->fds_num, s->sun_path);
pthread_mutex_unlock(&mp_mutex_action);
return ret;
-
}
static void *
@@ -211,11 +219,12 @@ mp_handle(void *arg __rte_unused)
int len;
int fds[SCM_MAX_FD];
char buf[MAX_MSG_LENGTH];
+ struct sockaddr_un sa;
while (1) {
- len = read_msg(mp_fd, buf, MAX_MSG_LENGTH, fds, SCM_MAX_FD);
+ len = read_msg(mp_fd, buf, MAX_MSG_LENGTH, fds, SCM_MAX_FD, &sa);
if (len > 0)
- process_msg((struct mp_msghdr *)buf, len, fds);
+ process_msg((struct mp_msghdr *)buf, len, fds, &sa);
}
return NULL;
@@ -255,7 +264,8 @@ static int
mp_primary_proc(const void *params,
int len __rte_unused,
int fds[] __rte_unused,
- int fds_num __rte_unused)
+ int fds_num __rte_unused,
+ const void *peer __rte_unused)
{
const struct proc_request *r = (const struct proc_request *)params;
@@ -362,7 +372,8 @@ rte_eal_mp_channel_init(void)
}
static inline struct mp_msghdr *
-format_msg(const char *act_name, const void *p, int len_params, int fds_num)
+format_msg(const char *act_name, const void *p,
+ int len_params, int fds_num, int type)
{
int len_msg;
struct mp_msghdr *msg;
@@ -384,6 +395,7 @@ format_msg(const char *act_name, const void *p, int len_params, int fds_num)
strcpy(msg->action_name, act_name);
msg->fds_num = fds_num;
msg->len_params = len_params;
+ msg->type = type;
memcpy(msg->params, p, len_params);
return msg;
}
@@ -455,7 +467,9 @@ mp_send(const char *action_name,
const void *params,
int len_params,
int fds[],
- int fds_num)
+ int fds_num,
+ int type,
+ const void *peer)
{
int i;
int n = 0;
@@ -468,7 +482,7 @@ mp_send(const char *action_name,
return 0;
}
- msg = format_msg(action_name, params, len_params, fds_num);
+ msg = format_msg(action_name, params, len_params, fds_num, type);
if (msg == NULL)
return 0;
@@ -477,6 +491,11 @@ mp_send(const char *action_name,
return 0;
}
+ if (peer) {
+ n += send_msg(sockfd, peer, msg, fds);
+ goto ret;
+ }
+
if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
/* broadcast to all secondaries */
for (i = 0; i < MAX_SECONDARY_PROCS; ++i) {
@@ -488,6 +507,7 @@ mp_send(const char *action_name,
} else
n += send_msg(sockfd, eal_mp_unix_path(), msg, fds);
+ret:
free(msg);
close(sockfd);
return n;
@@ -501,5 +521,107 @@ rte_eal_mp_sendmsg(const char *action_name,
int fds_num)
{
RTE_LOG(DEBUG, EAL, "sendmsg: %s\n", action_name);
- return mp_send(action_name, params, len_params, fds, fds_num);
+ return mp_send(action_name, params, len_params,
+ fds, fds_num, MP_MSG, NULL);
+}
+
+int
+rte_eal_mp_request(const char *action_name,
+ void *params,
+ int len_p,
+ int fds[],
+ int fds_in,
+ int fds_out)
+{
+ int i, j;
+ int sockfd;
+ int nprocs;
+ int ret = 0;
+ struct mp_msghdr *req;
+ struct timeval tv;
+ char buf[MAX_MSG_LENGTH];
+ struct mp_msghdr *hdr;
+
+ RTE_LOG(DEBUG, EAL, "request: %s\n", action_name);
+
+ if (fds_in > SCM_MAX_FD || fds_out > SCM_MAX_FD) {
+ RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n", SCM_MAX_FD);
+ rte_errno = -E2BIG;
+ return 0;
+ }
+
+ req = format_msg(action_name, params, len_p, fds_in, MP_REQ);
+ if (req == NULL)
+ return 0;
+
+ if ((sockfd = open_unix_fd(0)) < 0) {
+ free(req);
+ return 0;
+ }
+
+ tv.tv_sec = 5; /* 5 Secs Timeout */
+ tv.tv_usec = 0;
+ if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO,
+ (const void *)&tv, sizeof(struct timeval)) < 0)
+ RTE_LOG(INFO, EAL, "Failed to set recv timeout\n");
+
+ /* Only allow one req at a time */
+ pthread_mutex_lock(&mp_mutex_request);
+
+ if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
+ nprocs = 0;
+ for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
+ if (!mp_sec_sockets[i]) {
+ j = i;
+ nprocs++;
+ }
+
+ if (nprocs > 1) {
+ RTE_LOG(ERR, EAL,
+ "multi secondary processes not supported\n");
+ goto free_and_ret;
+ }
+
+ ret = send_msg(sockfd, mp_sec_sockets[j], req, fds);
+ } else
+ ret = send_msg(sockfd, eal_mp_unix_path(), req, fds);
+
+ if (ret == 0) {
+ RTE_LOG(ERR, EAL, "failed to send request: %s\n", action_name);
+ ret = -1;
+ goto free_and_ret;
+ }
+
+ ret = read_msg(sockfd, buf, MAX_MSG_LENGTH, fds, fds_out, NULL);
+ if (ret > 0) {
+ hdr = (struct mp_msghdr *)buf;
+ if (hdr->len_params == len_p)
+ memcpy(params, hdr->params, len_p);
+ else {
+ RTE_LOG(ERR, EAL, "invalid reply\n");
+ ret = 0;
+ }
+ }
+
+free_and_ret:
+ free(req);
+ close(sockfd);
+ pthread_mutex_unlock(&mp_mutex_request);
+ return ret;
+}
+
+int
+rte_eal_mp_reply(const char *action_name,
+ const void *params,
+ int len_p,
+ int fds[],
+ int fds_in,
+ const void *peer)
+{
+ RTE_LOG(DEBUG, EAL, "reply: %s\n", action_name);
+ if (peer == NULL) {
+ RTE_LOG(ERR, EAL, "peer is not specified\n");
+ return 0;
+ }
+ return mp_send(action_name, params, len_p, fds, fds_in, MP_REP, peer);
}
diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
index 9884c0b..2690a77 100644
--- a/lib/librte_eal/common/include/rte_eal.h
+++ b/lib/librte_eal/common/include/rte_eal.h
@@ -192,7 +192,7 @@ int rte_eal_primary_proc_alive(const char *config_file_path);
* this function typedef to register action for coming messages.
*/
typedef int (*rte_eal_mp_t)(const void *params, int len,
- int fds[], int fds_num);
+ int fds[], int fds_num, const void *peer);
/**
* Register an action function for primary/secondary communication.
@@ -245,7 +245,7 @@ void rte_eal_mp_action_unregister(const char *name);
* The fds argument is an array of fds sent with sendmsg.
*
* @param fds_num
- * The fds_num argument is number of fds to be sent with sendmsg.
+ * The number of fds to be sent with sendmsg.
*
* @return
* - Returns the number of messages being sent successfully.
@@ -255,6 +255,75 @@ rte_eal_mp_sendmsg(const char *action_name, const void *params,
int len_params, int fds[], int fds_num);
/**
+ * Send a request to the peer process and expect a reply.
+ *
+ * This function sends a request message to the peer process, and will
+ * block until receiving reply message from the peer process. Note:
+ * this does not work for the primary process sending requests to its
+ * multiple (>1) secondary processes.
+ *
+ * @param action_name
+ * The action_name argument is used to identify which action will be used.
+ *
+ * @param params
+ * The params argument contains the customized message; as the reply is
+ * received, the replied params will be copied to this pointer.
+ *
+ * @param len_p
+ * The length of the customized message.
+ *
+ * @param fds
+ * The fds argument is an array of fds sent with sendmsg; as the reply
+ * is received, the replied fds will be copied into this array.
+ *
+ * @param fds_in
+ * The number of fds to be sent.
+ *
+ * @param fds_out
+ * The number of fds to be received.
+ *
+ * @return
+ * - (1) on success;
+ * - (0) on sending request successfully but no valid reply received.
+ * - (<0) on failing to sending request.
+ */
+int
+rte_eal_mp_request(const char *action_name, void *params,
+ int len_p, int fds[], int fds_in, int fds_out);
+
+/**
+ * Send a reply to the peer process.
+ *
+ * This function will send a reply message in response to a request message
+ * received previously.
+ *
+ * @param action_name
+ * The action_name argument is used to identify which action will be used.
+ *
+ * @param params
+ * The params argument contains the customized message.
+ *
+ * @param len_p
+ * The length of the customized message.
+ *
+ * @param fds
+ * The fds argument is an array of fds sent with sendmsg.
+ *
+ * @param fds_in
+ * The number of fds to be sent with sendmsg.
+ *
+ * @param peer
+ * The fds_num argument is number of fds to be sent with sendmsg.
+ *
+ * @return
+ * - (1) on success;
+ * - (0) on failure.
+ */
+int
+rte_eal_mp_reply(const char *action_name, const void *params,
+ int len_p, int fds[], int fds_in, const void *peer);
+
+/**
* Usage function typedef used by the application usage function.
*
* Use this function typedef to define and call rte_set_application_usage_hook()
diff --git a/lib/librte_eal/rte_eal_version.map b/lib/librte_eal/rte_eal_version.map
index 5dacde5..068ac0b 100644
--- a/lib/librte_eal/rte_eal_version.map
+++ b/lib/librte_eal/rte_eal_version.map
@@ -243,5 +243,7 @@ DPDK_18.02 {
rte_eal_mp_action_register;
rte_eal_mp_action_unregister;
rte_eal_mp_sendmsg;
+ rte_eal_mp_request;
+ rte_eal_mp_reply;
} DPDK_17.11;
--
2.7.4
next prev parent reply other threads:[~2018-01-11 4:05 UTC|newest]
Thread overview: 88+ messages / expand[flat|nested] mbox.gz Atom feed top
2017-11-30 18:44 [dpdk-dev] [PATCH 0/3] generic channel for " Jianfeng Tan
2017-11-30 18:44 ` [dpdk-dev] [PATCH 1/3] eal: add " Jianfeng Tan
2017-12-11 11:04 ` Burakov, Anatoly
2017-12-11 16:43 ` Ananyev, Konstantin
2017-11-30 18:44 ` [dpdk-dev] [PATCH 2/3] eal: add synchronous " Jianfeng Tan
2017-12-11 11:39 ` Burakov, Anatoly
2017-12-11 16:49 ` Ananyev, Konstantin
2017-11-30 18:44 ` [dpdk-dev] [PATCH 3/3] vfio: use the generic multi-process channel Jianfeng Tan
2017-12-11 12:01 ` Burakov, Anatoly
2017-12-11 9:59 ` [dpdk-dev] [PATCH 0/3] generic channel for multi-process communication Burakov, Anatoly
2017-12-12 7:34 ` Tan, Jianfeng
2017-12-12 16:18 ` Burakov, Anatoly
2018-01-11 4:07 ` [dpdk-dev] [PATCH v2 0/4] " Jianfeng Tan
2018-01-11 4:07 ` [dpdk-dev] [PATCH v2 1/4] eal: add " Jianfeng Tan
2018-01-13 12:57 ` Burakov, Anatoly
2018-01-15 19:52 ` Ananyev, Konstantin
2018-01-11 4:07 ` [dpdk-dev] [PATCH v2 2/4] eal: add and del secondary processes in the primary Jianfeng Tan
2018-01-13 13:11 ` Burakov, Anatoly
2018-01-15 21:45 ` Ananyev, Konstantin
2018-01-11 4:07 ` Jianfeng Tan [this message]
2018-01-13 13:41 ` [dpdk-dev] [PATCH v2 3/4] eal: add synchronous multi-process communication Burakov, Anatoly
2018-01-16 0:00 ` Ananyev, Konstantin
2018-01-16 8:10 ` Tan, Jianfeng
2018-01-16 11:12 ` Ananyev, Konstantin
2018-01-16 16:47 ` Tan, Jianfeng
2018-01-17 10:50 ` Ananyev, Konstantin
2018-01-17 13:09 ` Tan, Jianfeng
2018-01-17 13:15 ` Tan, Jianfeng
2018-01-17 17:20 ` Ananyev, Konstantin
2018-01-11 4:07 ` [dpdk-dev] [PATCH v2 4/4] vfio: use the generic multi-process channel Jianfeng Tan
2018-01-13 14:03 ` Burakov, Anatoly
2018-03-04 14:57 ` [dpdk-dev] [PATCH v5] vfio: change to use " Jianfeng Tan
2018-03-14 13:27 ` Burakov, Anatoly
2018-03-19 6:53 ` Tan, Jianfeng
2018-03-20 10:33 ` Burakov, Anatoly
2018-03-20 10:56 ` Burakov, Anatoly
2018-03-20 8:50 ` [dpdk-dev] [PATCH v6] " Jianfeng Tan
2018-04-05 14:26 ` Tan, Jianfeng
2018-04-05 14:39 ` Burakov, Anatoly
2018-04-12 23:27 ` Thomas Monjalon
2018-04-12 15:26 ` Burakov, Anatoly
2018-04-15 15:06 ` [dpdk-dev] [PATCH v7] " Jianfeng Tan
2018-04-15 15:10 ` Tan, Jianfeng
2018-04-17 23:04 ` Thomas Monjalon
2018-01-25 4:16 ` [dpdk-dev] [PATCH v3 0/3] generic channel for multi-process communication Jianfeng Tan
2018-01-25 4:16 ` [dpdk-dev] [PATCH v3 1/3] eal: add " Jianfeng Tan
2018-01-25 10:41 ` Thomas Monjalon
2018-01-25 11:27 ` Burakov, Anatoly
2018-01-25 11:34 ` Thomas Monjalon
2018-01-25 12:21 ` Ananyev, Konstantin
2018-01-25 4:16 ` [dpdk-dev] [PATCH v3 2/3] eal: add synchronous " Jianfeng Tan
2018-01-25 12:00 ` Burakov, Anatoly
2018-01-25 12:19 ` Ananyev, Konstantin
2018-01-25 12:25 ` Burakov, Anatoly
2018-01-25 13:00 ` Ananyev, Konstantin
2018-01-25 13:05 ` Burakov, Anatoly
2018-01-25 13:10 ` Burakov, Anatoly
2018-01-25 15:03 ` Ananyev, Konstantin
2018-01-25 16:22 ` Burakov, Anatoly
2018-01-25 17:10 ` Tan, Jianfeng
2018-01-25 18:02 ` Burakov, Anatoly
2018-01-25 12:19 ` Burakov, Anatoly
2018-01-25 12:22 ` Ananyev, Konstantin
2018-01-25 4:16 ` [dpdk-dev] [PATCH v3 3/3] vfio: use the generic multi-process channel Jianfeng Tan
2018-01-25 10:47 ` Thomas Monjalon
2018-01-25 10:52 ` Burakov, Anatoly
2018-01-25 10:57 ` Thomas Monjalon
2018-01-25 12:15 ` Burakov, Anatoly
2018-01-25 19:14 ` [dpdk-dev] [PATCH v4 0/2] generic channel for multi-process communication Jianfeng Tan
2018-01-25 19:14 ` [dpdk-dev] [PATCH v4 1/2] eal: add synchronous " Jianfeng Tan
2018-01-25 19:14 ` [dpdk-dev] [PATCH v4 2/2] vfio: use the generic multi-process channel Jianfeng Tan
2018-01-25 19:15 ` [dpdk-dev] [PATCH v4 0/2] generic channel for multi-process communication Tan, Jianfeng
2018-01-25 19:21 ` [dpdk-dev] [PATCH v5 " Jianfeng Tan
2018-01-25 19:21 ` [dpdk-dev] [PATCH v5 1/2] eal: add " Jianfeng Tan
2018-01-25 19:21 ` [dpdk-dev] [PATCH v5 2/2] eal: add synchronous " Jianfeng Tan
2018-01-25 21:23 ` [dpdk-dev] [PATCH v5 0/2] generic channel for " Thomas Monjalon
2018-01-26 3:41 ` [dpdk-dev] [PATCH v6 " Jianfeng Tan
2018-01-26 3:41 ` [dpdk-dev] [PATCH v6 1/2] eal: add " Jianfeng Tan
2018-01-26 10:25 ` Burakov, Anatoly
2018-01-29 6:37 ` Tan, Jianfeng
2018-01-29 9:37 ` Burakov, Anatoly
2018-01-26 3:41 ` [dpdk-dev] [PATCH v6 2/2] eal: add synchronous " Jianfeng Tan
2018-01-26 10:31 ` Burakov, Anatoly
2018-01-29 23:52 ` [dpdk-dev] [PATCH v6 0/2] generic channel for " Thomas Monjalon
2018-01-30 6:58 ` [dpdk-dev] [PATCH v7 " Jianfeng Tan
2018-01-30 6:58 ` [dpdk-dev] [PATCH v7 1/2] eal: add " Jianfeng Tan
2018-01-30 6:58 ` [dpdk-dev] [PATCH v7 2/2] eal: add synchronous " Jianfeng Tan
2018-01-30 14:46 ` [dpdk-dev] [PATCH v7 0/2] generic channel for " Thomas Monjalon
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=1515643654-129489-4-git-send-email-jianfeng.tan@intel.com \
--to=jianfeng.tan@intel.com \
--cc=anatoly.burakov@intel.com \
--cc=bruce.richardson@intel.com \
--cc=dev@dpdk.org \
--cc=konstantin.ananyev@intel.com \
--cc=thomas@monjalon.net \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).