From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga11.intel.com (mga11.intel.com [192.55.52.93]) by dpdk.org (Postfix) with ESMTP id 7366A31FC for ; Tue, 16 Jan 2018 01:00:47 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga005.fm.intel.com ([10.253.24.32]) by fmsmga102.fm.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 15 Jan 2018 16:00:46 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.46,365,1511856000"; d="scan'208";a="195758564" Received: from irsmsx106.ger.corp.intel.com ([163.33.3.31]) by fmsmga005.fm.intel.com with ESMTP; 15 Jan 2018 16:00:45 -0800 Received: from irsmsx155.ger.corp.intel.com (163.33.192.3) by IRSMSX106.ger.corp.intel.com (163.33.3.31) with Microsoft SMTP Server (TLS) id 14.3.319.2; Tue, 16 Jan 2018 00:00:44 +0000 Received: from irsmsx105.ger.corp.intel.com ([169.254.7.236]) by irsmsx155.ger.corp.intel.com ([169.254.14.169]) with mapi id 14.03.0319.002; Tue, 16 Jan 2018 00:00:44 +0000 From: "Ananyev, Konstantin" To: "Tan, Jianfeng" , "dev@dpdk.org" CC: "Burakov, Anatoly" , "Richardson, Bruce" , "thomas@monjalon.net" Thread-Topic: [PATCH v2 3/4] eal: add synchronous multi-process communication Thread-Index: AQHTipGGLptO92489k+SILW6rHFPTqN1nPiQ Date: Tue, 16 Jan 2018 00:00:43 +0000 Message-ID: <2601191342CEEE43887BDE71AB9772588627E0E5@irsmsx105.ger.corp.intel.com> References: <1512067450-59203-1-git-send-email-jianfeng.tan@intel.com> <1515643654-129489-1-git-send-email-jianfeng.tan@intel.com> <1515643654-129489-4-git-send-email-jianfeng.tan@intel.com> In-Reply-To: <1515643654-129489-4-git-send-email-jianfeng.tan@intel.com> Accept-Language: en-IE, en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: x-titus-metadata-40: eyJDYXRlZ29yeUxhYmVscyI6IiIsIk1ldGFkYXRhIjp7Im5zIjoiaHR0cDpcL1wvd3d3LnRpdHVzLmNvbVwvbnNcL0ludGVsMyIsImlkIjoiNTQ3OGRiYTctNWMyMy00NjE1LTg5NjItOTBjYzgxYmUwODNkIiwicHJvcHMiOlt7Im4iOiJDVFBDbGFzc2lmaWNhdGlvbiIsInZhbHMiOlt7InZhbHVlIjoiQ1RQX05UIn1dfV19LCJTdWJqZWN0TGFiZWxzIjpbXSwiVE1DVmVyc2lvbiI6IjE2LjUuOS4zIiwiVHJ1c3RlZExhYmVsSGFzaCI6ImJTUThzQ2dSY1IwblRKQUpydUhRU1c2bE1QY0wrYXVyRWZwc2RFcnZnYWs9In0= x-ctpclassification: CTP_NT dlp-product: dlpe-windows dlp-version: 11.0.0.116 dlp-reaction: no-action x-originating-ip: [163.33.239.181] Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: quoted-printable MIME-Version: 1.0 Subject: Re: [dpdk-dev] [PATCH v2 3/4] eal: add synchronous multi-process communication X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 16 Jan 2018 00:00:48 -0000 >=20 > We need the synchronous way for multi-process communication, > i.e., blockingly waiting for reply message when we send a request > to the peer process. >=20 > We add two APIs rte_eal_mp_request() and rte_eal_mp_reply() for > such use case. By invoking rte_eal_mp_request(), a request message > is sent out, and then it waits there for a reply message. The > timeout is hard-coded 5 Sec. And the replied message will be copied > in the parameters of this API so that the caller can decide how > to translate those information (including params and fds). Note > if a primary process owns multiple secondary processes, this API > will fail. >=20 > The API rte_eal_mp_reply() is always called by an mp action handler. > Here we add another parameter for rte_eal_mp_t so that the action > handler knows which peer address to reply. >=20 > We use mutex in rte_eal_mp_request() to guarantee that only one > request is on the fly for one pair of processes. You don't need to do things in such strange and restrictive way. Instead you can do something like that: 1) Introduce new struct, list for it and mutex=20 struct sync_request { int reply_received; char dst[PATH_MAX]; char reply[...]; LIST_ENTRY(sync_request) next; }; static struct =20 LIST_HEAD(list, sync_request); pthread_mutex_t lock; pthead_cond_t cond; } sync_requests; 2) then at request() call: Grab sync_requests.lock Check do we already have a pending request for that destination, If yes - the release the lock and returns with error. - allocate and init new sync_request struct, set reply_received=3D0 - do send_msg() -then in a cycle: pthread_cond_timed_wait(&sync_requests.cond, &sync_request.lock, ×pe= c); - at return from it check if sync_request.reply_received =3D=3D 1, if not check if timeout expired and either return a failure or go to the start of = the cycle. 3) at mp_handler() if REPLY received - grab sync_request.lock,=20 search through sync_requests.list for dst[] , if found, then set it's reply_received=3D1, copy the received message in= to reply and call pthread_cond_braodcast((&sync_requests.cond); =20 >=20 > Suggested-by: Anatoly Burakov > Signed-off-by: Jianfeng Tan > --- > lib/librte_eal/common/eal_common_proc.c | 144 ++++++++++++++++++++++++++= +++--- > lib/librte_eal/common/include/rte_eal.h | 73 +++++++++++++++- > lib/librte_eal/rte_eal_version.map | 2 + > 3 files changed, 206 insertions(+), 13 deletions(-) >=20 > diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/com= mon/eal_common_proc.c > index 70519cc..f194a52 100644 > --- a/lib/librte_eal/common/eal_common_proc.c > +++ b/lib/librte_eal/common/eal_common_proc.c > @@ -32,6 +32,7 @@ > static int mp_fd =3D -1; > static char *mp_sec_sockets[MAX_SECONDARY_PROCS]; > static pthread_mutex_t mp_mutex_action =3D PTHREAD_MUTEX_INITIALIZER; > +static pthread_mutex_t mp_mutex_request =3D PTHREAD_MUTEX_INITIALIZER; >=20 > struct action_entry { > TAILQ_ENTRY(action_entry) next; /**< Next attached action entry */ > @@ -49,6 +50,10 @@ static struct action_entry_list action_entry_list =3D >=20 > struct mp_msghdr { > char action_name[MAX_ACTION_NAME_LEN]; > +#define MP_MSG 0 /* Share message with peers, will not block */ > +#define MP_REQ 1 /* Request for information, Will block for a reply */ > +#define MP_REP 2 /* Reply to previously-received request */ As a nit - please use enum {} instead for the above macros. > + int type; > int fds_num; > int len_params; > char params[0]; > @@ -138,7 +143,8 @@ rte_eal_mp_action_unregister(const char *name) > } >=20 > static int > -read_msg(int fd, char *buf, int buflen, int *fds, int fds_num) > +read_msg(int fd, char *buf, int buflen, > + int *fds, int fds_num, struct sockaddr_un *s) > { > int ret; > struct iovec iov; > @@ -151,6 +157,8 @@ read_msg(int fd, char *buf, int buflen, int *fds, int= fds_num) > iov.iov_base =3D buf; > iov.iov_len =3D buflen; >=20 > + msgh.msg_name =3D s; > + msgh.msg_namelen =3D sizeof(*s); > msgh.msg_iov =3D &iov; > msgh.msg_iovlen =3D 1; > msgh.msg_control =3D control; > @@ -181,7 +189,7 @@ read_msg(int fd, char *buf, int buflen, int *fds, int= fds_num) > } >=20 > static int > -process_msg(struct mp_msghdr *hdr, int len, int fds[]) > +process_msg(struct mp_msghdr *hdr, int len, int fds[], struct sockaddr_u= n *s) > { > int ret; > int params_len; > @@ -199,10 +207,10 @@ process_msg(struct mp_msghdr *hdr, int len, int fds= []) > } >=20 > params_len =3D len - sizeof(struct mp_msghdr); > - ret =3D entry->action(hdr->params, params_len, fds, hdr->fds_num); > + ret =3D entry->action(hdr->params, params_len, > + fds, hdr->fds_num, s->sun_path); > pthread_mutex_unlock(&mp_mutex_action); > return ret; > - > } >=20 > static void * > @@ -211,11 +219,12 @@ mp_handle(void *arg __rte_unused) > int len; > int fds[SCM_MAX_FD]; > char buf[MAX_MSG_LENGTH]; > + struct sockaddr_un sa; >=20 > while (1) { > - len =3D read_msg(mp_fd, buf, MAX_MSG_LENGTH, fds, SCM_MAX_FD); > + len =3D read_msg(mp_fd, buf, MAX_MSG_LENGTH, fds, SCM_MAX_FD, &sa); > if (len > 0) > - process_msg((struct mp_msghdr *)buf, len, fds); > + process_msg((struct mp_msghdr *)buf, len, fds, &sa); > } >=20 > return NULL; > @@ -255,7 +264,8 @@ static int > mp_primary_proc(const void *params, > int len __rte_unused, > int fds[] __rte_unused, > - int fds_num __rte_unused) > + int fds_num __rte_unused, > + const void *peer __rte_unused) > { > const struct proc_request *r =3D (const struct proc_request *)params; >=20 > @@ -362,7 +372,8 @@ rte_eal_mp_channel_init(void) > } >=20 > static inline struct mp_msghdr * > -format_msg(const char *act_name, const void *p, int len_params, int fds_= num) > +format_msg(const char *act_name, const void *p, > + int len_params, int fds_num, int type) > { > int len_msg; > struct mp_msghdr *msg; > @@ -384,6 +395,7 @@ format_msg(const char *act_name, const void *p, int l= en_params, int fds_num) > strcpy(msg->action_name, act_name); > msg->fds_num =3D fds_num; > msg->len_params =3D len_params; > + msg->type =3D type; > memcpy(msg->params, p, len_params); > return msg; > } > @@ -455,7 +467,9 @@ mp_send(const char *action_name, > const void *params, > int len_params, > int fds[], > - int fds_num) > + int fds_num, > + int type, > + const void *peer) > { > int i; > int n =3D 0; > @@ -468,7 +482,7 @@ mp_send(const char *action_name, > return 0; > } >=20 > - msg =3D format_msg(action_name, params, len_params, fds_num); > + msg =3D format_msg(action_name, params, len_params, fds_num, type); > if (msg =3D=3D NULL) > return 0; >=20 > @@ -477,6 +491,11 @@ mp_send(const char *action_name, > return 0; > } >=20 > + if (peer) { > + n +=3D send_msg(sockfd, peer, msg, fds); > + goto ret; > + } > + > if (rte_eal_process_type() =3D=3D RTE_PROC_PRIMARY) { > /* broadcast to all secondaries */ > for (i =3D 0; i < MAX_SECONDARY_PROCS; ++i) { > @@ -488,6 +507,7 @@ mp_send(const char *action_name, > } else > n +=3D send_msg(sockfd, eal_mp_unix_path(), msg, fds); >=20 > +ret: > free(msg); > close(sockfd); > return n; > @@ -501,5 +521,107 @@ rte_eal_mp_sendmsg(const char *action_name, > int fds_num) > { > RTE_LOG(DEBUG, EAL, "sendmsg: %s\n", action_name); > - return mp_send(action_name, params, len_params, fds, fds_num); > + return mp_send(action_name, params, len_params, > + fds, fds_num, MP_MSG, NULL); > +} > + > +int > +rte_eal_mp_request(const char *action_name, > + void *params, > + int len_p, > + int fds[], > + int fds_in, > + int fds_out) > +{ > + int i, j; > + int sockfd; > + int nprocs; > + int ret =3D 0; > + struct mp_msghdr *req; > + struct timeval tv; > + char buf[MAX_MSG_LENGTH]; > + struct mp_msghdr *hdr; > + > + RTE_LOG(DEBUG, EAL, "request: %s\n", action_name); > + > + if (fds_in > SCM_MAX_FD || fds_out > SCM_MAX_FD) { > + RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n", SCM_MAX_FD); > + rte_errno =3D -E2BIG; > + return 0; > + } > + > + req =3D format_msg(action_name, params, len_p, fds_in, MP_REQ); > + if (req =3D=3D NULL) > + return 0; > + > + if ((sockfd =3D open_unix_fd(0)) < 0) { > + free(req); > + return 0; > + } > + > + tv.tv_sec =3D 5; /* 5 Secs Timeout */ > + tv.tv_usec =3D 0; > + if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, > + (const void *)&tv, sizeof(struct timeval)) < 0) > + RTE_LOG(INFO, EAL, "Failed to set recv timeout\n"); > + > + /* Only allow one req at a time */ > + pthread_mutex_lock(&mp_mutex_request); > + > + if (rte_eal_process_type() =3D=3D RTE_PROC_PRIMARY) { > + nprocs =3D 0; > + for (i =3D 0; i < MAX_SECONDARY_PROCS; ++i) > + if (!mp_sec_sockets[i]) { > + j =3D i; > + nprocs++; > + } > + > + if (nprocs > 1) { > + RTE_LOG(ERR, EAL, > + "multi secondary processes not supported\n"); > + goto free_and_ret; > + } > + > + ret =3D send_msg(sockfd, mp_sec_sockets[j], req, fds); > + } else > + ret =3D send_msg(sockfd, eal_mp_unix_path(), req, fds); > + > + if (ret =3D=3D 0) { > + RTE_LOG(ERR, EAL, "failed to send request: %s\n", action_name); > + ret =3D -1; > + goto free_and_ret; > + } > + > + ret =3D read_msg(sockfd, buf, MAX_MSG_LENGTH, fds, fds_out, NULL); > + if (ret > 0) { > + hdr =3D (struct mp_msghdr *)buf; > + if (hdr->len_params =3D=3D len_p) > + memcpy(params, hdr->params, len_p); > + else { > + RTE_LOG(ERR, EAL, "invalid reply\n"); > + ret =3D 0; > + } > + } > + > +free_and_ret: > + free(req); > + close(sockfd); > + pthread_mutex_unlock(&mp_mutex_request); > + return ret; > +} > + > +int > +rte_eal_mp_reply(const char *action_name, > + const void *params, > + int len_p, > + int fds[], > + int fds_in, > + const void *peer) > +{ > + RTE_LOG(DEBUG, EAL, "reply: %s\n", action_name); > + if (peer =3D=3D NULL) { > + RTE_LOG(ERR, EAL, "peer is not specified\n"); > + return 0; > + } > + return mp_send(action_name, params, len_p, fds, fds_in, MP_REP, peer); > } > diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/com= mon/include/rte_eal.h > index 9884c0b..2690a77 100644 > --- a/lib/librte_eal/common/include/rte_eal.h > +++ b/lib/librte_eal/common/include/rte_eal.h > @@ -192,7 +192,7 @@ int rte_eal_primary_proc_alive(const char *config_fil= e_path); > * this function typedef to register action for coming messages. > */ > typedef int (*rte_eal_mp_t)(const void *params, int len, > - int fds[], int fds_num); > + int fds[], int fds_num, const void *peer); >=20 > /** > * Register an action function for primary/secondary communication. > @@ -245,7 +245,7 @@ void rte_eal_mp_action_unregister(const char *name); > * The fds argument is an array of fds sent with sendmsg. > * > * @param fds_num > - * The fds_num argument is number of fds to be sent with sendmsg. > + * The number of fds to be sent with sendmsg. > * > * @return > * - Returns the number of messages being sent successfully. > @@ -255,6 +255,75 @@ rte_eal_mp_sendmsg(const char *action_name, const vo= id *params, > int len_params, int fds[], int fds_num); >=20 > /** > + * Send a request to the peer process and expect a reply. > + * > + * This function sends a request message to the peer process, and will > + * block until receiving reply message from the peer process. Note: > + * this does not work for the primary process sending requests to its > + * multiple (>1) secondary processes. > + * > + * @param action_name > + * The action_name argument is used to identify which action will be u= sed. > + * > + * @param params > + * The params argument contains the customized message; as the reply i= s > + * received, the replied params will be copied to this pointer. > + * > + * @param len_p > + * The length of the customized message. > + * > + * @param fds > + * The fds argument is an array of fds sent with sendmsg; as the reply > + * is received, the replied fds will be copied into this array. > + * > + * @param fds_in > + * The number of fds to be sent. > + * > + * @param fds_out > + * The number of fds to be received. > + * > + * @return > + * - (1) on success; > + * - (0) on sending request successfully but no valid reply received. > + * - (<0) on failing to sending request. > + */ > +int > +rte_eal_mp_request(const char *action_name, void *params, > + int len_p, int fds[], int fds_in, int fds_out); > + > +/** > + * Send a reply to the peer process. > + * > + * This function will send a reply message in response to a request mess= age > + * received previously. > + * > + * @param action_name > + * The action_name argument is used to identify which action will be u= sed. > + * > + * @param params > + * The params argument contains the customized message. > + * > + * @param len_p > + * The length of the customized message. > + * > + * @param fds > + * The fds argument is an array of fds sent with sendmsg. > + * > + * @param fds_in > + * The number of fds to be sent with sendmsg. > + * > + * @param peer > + * The fds_num argument is number of fds to be sent with sendmsg. > + * > + * @return > + * - (1) on success; > + * - (0) on failure. > + */ > +int > +rte_eal_mp_reply(const char *action_name, const void *params, > + int len_p, int fds[], int fds_in, const void *peer); > + > +/** > * Usage function typedef used by the application usage function. > * > * Use this function typedef to define and call rte_set_application_usag= e_hook() > diff --git a/lib/librte_eal/rte_eal_version.map b/lib/librte_eal/rte_eal_= version.map > index 5dacde5..068ac0b 100644 > --- a/lib/librte_eal/rte_eal_version.map > +++ b/lib/librte_eal/rte_eal_version.map > @@ -243,5 +243,7 @@ DPDK_18.02 { > rte_eal_mp_action_register; > rte_eal_mp_action_unregister; > rte_eal_mp_sendmsg; > + rte_eal_mp_request; > + rte_eal_mp_reply; >=20 > } DPDK_17.11; > -- > 2.7.4