DPDK patches and discussions
 help / color / mirror / Atom feed
From: Anatoly Burakov <anatoly.burakov@intel.com>
To: dev@dpdk.org
Cc: konstantin.ananyev@intel.com, thomas@monjalon.net,
	bruce.richardson@intel.com, qi.z.zhang@intel.com,
	Jianfeng Tan <jianfeng.tan@intel.com>
Subject: [dpdk-dev] [PATCH v2 6/7] ipc: remove IPC thread for async requests
Date: Tue, 26 Jun 2018 11:53:17 +0100	[thread overview]
Message-ID: <1f85bec5ac54c3880046e0937ed3ba49ce6ea5fe.1530009564.git.anatoly.burakov@intel.com> (raw)
In-Reply-To: <cover.1530009564.git.anatoly.burakov@intel.com>
In-Reply-To: <cover.1530009564.git.anatoly.burakov@intel.com>

Previously, we were using two IPC threads - one to handle messages
and synchronous requests, and another to handle asynchronous requests.
To handle replies for an async request, rte_mp_handle woke up the
rte_mp_handle_async thread to process through pthread_cond variable.

Change it to handle asynchronous messages within the main IPC thread.
To handle timeout events, for each async request which is sent,
we set an alarm for it. If its reply is received before timeout,
we will cancel the alarm when we handle the reply; otherwise,
alarm will invoke the async_reply_handle() as the alarm callback.

Signed-off-by: Jianfeng Tan <jianfeng.tan@intel.com>
Signed-off-by: Anatoly Burakov <anatoly.burakov@intel.com>
Suggested-by: Thomas Monjalon <thomas@monjalon.net>
---

Notes:
    RFC->RFCv2:
    - Rebased on latest code
    - Implemented comments to the original RFC

 lib/librte_eal/common/eal_common_proc.c | 201 +++++++++---------------
 1 file changed, 70 insertions(+), 131 deletions(-)

diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index 707d8ab30..6f3366403 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -20,6 +20,7 @@
 #include <sys/un.h>
 #include <unistd.h>
 
+#include <rte_alarm.h>
 #include <rte_common.h>
 #include <rte_cycles.h>
 #include <rte_eal.h>
@@ -94,11 +95,9 @@ TAILQ_HEAD(pending_request_list, pending_request);
 static struct {
 	struct pending_request_list requests;
 	pthread_mutex_t lock;
-	pthread_cond_t async_cond;
 } pending_requests = {
 	.requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests),
 	.lock = PTHREAD_MUTEX_INITIALIZER,
-	.async_cond = PTHREAD_COND_INITIALIZER
 	/**< used in async requests only */
 };
 
@@ -106,6 +105,16 @@ static struct {
 static int
 mp_send(struct rte_mp_msg *msg, const char *peer, int type);
 
+/* for use with alarm callback */
+static void
+async_reply_handle(void *arg);
+
+/* for use with process_msg */
+static struct pending_request *
+async_reply_handle_thread_unsafe(void *arg);
+
+static void
+trigger_async_action(struct pending_request *req);
 
 static struct pending_request *
 find_pending_request(const char *dst, const char *act_name)
@@ -290,6 +299,8 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
 	RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name);
 
 	if (m->type == MP_REP || m->type == MP_IGN) {
+		struct pending_request *req = NULL;
+
 		pthread_mutex_lock(&pending_requests.lock);
 		pending_req = find_pending_request(s->sun_path, msg->name);
 		if (pending_req) {
@@ -301,11 +312,14 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
 			if (pending_req->type == REQUEST_TYPE_SYNC)
 				pthread_cond_signal(&pending_req->sync.cond);
 			else if (pending_req->type == REQUEST_TYPE_ASYNC)
-				pthread_cond_signal(
-					&pending_requests.async_cond);
+				req = async_reply_handle_thread_unsafe(
+						pending_req);
 		} else
 			RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
 		pthread_mutex_unlock(&pending_requests.lock);
+
+		if (req != NULL)
+			trigger_async_action(req);
 		return;
 	}
 
@@ -365,7 +379,6 @@ timespec_cmp(const struct timespec *a, const struct timespec *b)
 }
 
 enum async_action {
-	ACTION_NONE, /**< don't do anything */
 	ACTION_FREE, /**< free the action entry, but don't trigger callback */
 	ACTION_TRIGGER /**< trigger callback, then free action entry */
 };
@@ -375,7 +388,7 @@ process_async_request(struct pending_request *sr, const struct timespec *now)
 {
 	struct async_request_param *param;
 	struct rte_mp_reply *reply;
-	bool timeout, received, last_msg;
+	bool timeout, last_msg;
 
 	param = sr->async.param;
 	reply = &param->user_reply;
@@ -383,13 +396,6 @@ process_async_request(struct pending_request *sr, const struct timespec *now)
 	/* did we timeout? */
 	timeout = timespec_cmp(&param->end, now) <= 0;
 
-	/* did we receive a response? */
-	received = sr->reply_received != 0;
-
-	/* if we didn't time out, and we didn't receive a response, ignore */
-	if (!timeout && !received)
-		return ACTION_NONE;
-
 	/* if we received a response, adjust relevant data and copy mesasge. */
 	if (sr->reply_received == 1 && sr->reply) {
 		struct rte_mp_msg *msg, *user_msgs, *tmp;
@@ -448,120 +454,60 @@ trigger_async_action(struct pending_request *sr)
 	free(sr->async.param->user_reply.msgs);
 	free(sr->async.param);
 	free(sr->request);
+	free(sr);
 }
 
 static struct pending_request *
-check_trigger(struct timespec *ts)
+async_reply_handle_thread_unsafe(void *arg)
 {
-	struct pending_request *next, *cur, *trigger = NULL;
-
-	TAILQ_FOREACH_SAFE(cur, &pending_requests.requests, next, next) {
-		enum async_action action;
-		if (cur->type != REQUEST_TYPE_ASYNC)
-			continue;
-
-		action = process_async_request(cur, ts);
-		if (action == ACTION_FREE) {
-			TAILQ_REMOVE(&pending_requests.requests, cur, next);
-			free(cur);
-		} else if (action == ACTION_TRIGGER) {
-			TAILQ_REMOVE(&pending_requests.requests, cur, next);
-			trigger = cur;
-			break;
-		}
-	}
-	return trigger;
-}
-
-static void
-wait_for_async_messages(void)
-{
-	struct pending_request *sr;
-	struct timespec timeout;
-	bool timedwait = false;
-	bool nowait = false;
-	int ret;
-
-	/* scan through the list and see if there are any timeouts that
-	 * are earlier than our current timeout.
-	 */
-	TAILQ_FOREACH(sr, &pending_requests.requests, next) {
-		if (sr->type != REQUEST_TYPE_ASYNC)
-			continue;
-		if (!timedwait || timespec_cmp(&sr->async.param->end,
-				&timeout) < 0) {
-			memcpy(&timeout, &sr->async.param->end,
-				sizeof(timeout));
-			timedwait = true;
-		}
-
-		/* sometimes, we don't even wait */
-		if (sr->reply_received) {
-			nowait = true;
-			break;
-		}
-	}
-
-	if (nowait)
-		return;
-
-	do {
-		ret = timedwait ?
-			pthread_cond_timedwait(
-				&pending_requests.async_cond,
-				&pending_requests.lock,
-				&timeout) :
-			pthread_cond_wait(
-				&pending_requests.async_cond,
-				&pending_requests.lock);
-	} while (ret != 0 && ret != ETIMEDOUT);
-
-	/* we've been woken up or timed out */
-}
-
-static void *
-async_reply_handle(void *arg __rte_unused)
-{
-	struct timeval now;
+	struct pending_request *req = (struct pending_request *)arg;
+	enum async_action action;
 	struct timespec ts_now;
-	while (1) {
-		struct pending_request *trigger = NULL;
+	struct timeval now;
 
-		pthread_mutex_lock(&pending_requests.lock);
+	if (gettimeofday(&now, NULL) < 0) {
+		RTE_LOG(ERR, EAL, "Cannot get current time\n");
+		goto no_trigger;
+	}
+	ts_now.tv_nsec = now.tv_usec * 1000;
+	ts_now.tv_sec = now.tv_sec;
 
-		/* we exit this function holding the lock */
-		wait_for_async_messages();
+	action = process_async_request(req, &ts_now);
 
-		if (gettimeofday(&now, NULL) < 0) {
-			pthread_mutex_unlock(&pending_requests.lock);
-			RTE_LOG(ERR, EAL, "Cannot get current time\n");
-			break;
+	TAILQ_REMOVE(&pending_requests.requests, req, next);
+
+	if (rte_eal_alarm_cancel(async_reply_handle, req) < 0) {
+		/* if we failed to cancel the alarm because it's already in
+		 * progress, don't proceed because otherwise we will end up
+		 * handling the same message twice.
+		 */
+		if (rte_errno == EINPROGRESS) {
+			RTE_LOG(DEBUG, EAL, "Request handling is already in progress\n");
+			goto no_trigger;
 		}
-		ts_now.tv_nsec = now.tv_usec * 1000;
-		ts_now.tv_sec = now.tv_sec;
-
-		do {
-			trigger = check_trigger(&ts_now);
-			/* unlock request list */
-			pthread_mutex_unlock(&pending_requests.lock);
-
-			if (trigger) {
-				trigger_async_action(trigger);
-				free(trigger);
-
-				/* we've triggered a callback, but there may be
-				 * more, so lock the list and check again.
-				 */
-				pthread_mutex_lock(&pending_requests.lock);
-			}
-		} while (trigger);
+		RTE_LOG(ERR, EAL, "Failed to cancel alarm\n");
 	}
 
-	RTE_LOG(ERR, EAL, "ERROR: asynchronous requests disabled\n");
-
+	if (action == ACTION_TRIGGER)
+		return req;
+no_trigger:
+	free(req);
 	return NULL;
 }
 
+static void
+async_reply_handle(void *arg)
+{
+	struct pending_request *req;
+
+	pthread_mutex_lock(&pending_requests.lock);
+	req = async_reply_handle_thread_unsafe(arg);
+	pthread_mutex_unlock(&pending_requests.lock);
+
+	if (req != NULL)
+		trigger_async_action(req);
+}
+
 static int
 open_socket_fd(void)
 {
@@ -624,7 +570,7 @@ rte_mp_channel_init(void)
 {
 	char path[PATH_MAX];
 	int dir_fd;
-	pthread_t mp_handle_tid, async_reply_handle_tid;
+	pthread_t mp_handle_tid;
 
 	/* create filter path */
 	create_socket_path("*", path, sizeof(path));
@@ -671,17 +617,6 @@ rte_mp_channel_init(void)
 		return -1;
 	}
 
-	if (rte_ctrl_thread_create(&async_reply_handle_tid,
-			"rte_mp_async", NULL,
-			async_reply_handle, NULL) < 0) {
-		RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
-			strerror(errno));
-		close(mp_fd);
-		close(dir_fd);
-		mp_fd = -1;
-		return -1;
-	}
-
 	/* unlock the directory */
 	flock(dir_fd, LOCK_UN);
 	close(dir_fd);
@@ -853,7 +788,7 @@ rte_mp_sendmsg(struct rte_mp_msg *msg)
 
 static int
 mp_request_async(const char *dst, struct rte_mp_msg *req,
-		struct async_request_param *param)
+		struct async_request_param *param, const struct timespec *ts)
 {
 	struct rte_mp_msg *reply_msg;
 	struct pending_request *pending_req, *exist;
@@ -898,6 +833,13 @@ mp_request_async(const char *dst, struct rte_mp_msg *req,
 
 	param->user_reply.nb_sent++;
 
+	if (rte_eal_alarm_set(ts->tv_sec * 1000000 + ts->tv_nsec / 1000,
+			      async_reply_handle, pending_req) < 0) {
+		RTE_LOG(ERR, EAL, "Fail to set alarm for request %s:%s\n",
+			dst, req->name);
+		rte_panic("Fix the above shit to properly free all memory\n");
+	}
+
 	return 0;
 fail:
 	free(pending_req);
@@ -1119,7 +1061,7 @@ rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
 
 	/* for secondary process, send request to the primary process only */
 	if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
-		ret = mp_request_async(eal_mp_socket_path(), copy, param);
+		ret = mp_request_async(eal_mp_socket_path(), copy, param, ts);
 
 		/* if we didn't send anything, put dummy request on the queue */
 		if (ret == 0 && reply->nb_sent == 0) {
@@ -1162,7 +1104,7 @@ rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
 		snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
 			 ent->d_name);
 
-		if (mp_request_async(path, copy, param))
+		if (mp_request_async(path, copy, param, ts))
 			ret = -1;
 	}
 	/* if we didn't send anything, put dummy request on the queue */
@@ -1171,9 +1113,6 @@ rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
 		dummy_used = true;
 	}
 
-	/* trigger async request thread wake up */
-	pthread_cond_signal(&pending_requests.async_cond);
-
 	/* finally, unlock the queue */
 	pthread_mutex_unlock(&pending_requests.lock);
 
-- 
2.17.1

  parent reply	other threads:[~2018-06-26 10:53 UTC|newest]

Thread overview: 25+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-06-15 14:25 [dpdk-dev] [PATCH 0/8] Remove IPC threads Anatoly Burakov
2018-06-15 14:25 ` [dpdk-dev] [PATCH 1/8] eal/linux: use glibc malloc in alarm Anatoly Burakov
2018-06-15 14:25 ` [dpdk-dev] [PATCH 2/8] eal/linux: use glibc malloc in interrupt handling Anatoly Burakov
2018-06-15 14:25 ` [dpdk-dev] [PATCH 3/8] ipc: remove IPC thread for async requests Anatoly Burakov
2018-06-15 14:25 ` [dpdk-dev] [PATCH 4/8] eal: bring forward init of interrupt handling Anatoly Burakov
2018-06-15 14:25 ` [dpdk-dev] [PATCH 5/8] eal: add IPC type for interrupt thread Anatoly Burakov
2018-06-15 14:25 ` [dpdk-dev] [PATCH 6/8] eal/bsdapp: add " Anatoly Burakov
2018-06-15 14:25 ` [dpdk-dev] [PATCH 7/8] eal/bsdapp: add alarm support Anatoly Burakov
2018-06-15 14:25 ` [dpdk-dev] [PATCH 8/8] ipc: remove main IPC thread Anatoly Burakov
2018-06-26  1:19 ` [dpdk-dev] [PATCH 0/8] Remove IPC threads Zhang, Qi Z
2018-06-26  7:03 ` Zhang, Qi Z
2018-06-26 10:53 ` [dpdk-dev] [PATCH v2 0/7] Remove asynchronous IPC thread Anatoly Burakov
2018-07-13 10:44   ` Thomas Monjalon
2018-06-26 10:53 ` [dpdk-dev] [PATCH v2 1/7] eal/linux: use glibc malloc in alarm Anatoly Burakov
2018-06-26 10:53 ` [dpdk-dev] [PATCH v2 2/7] eal/linux: use glibc malloc in interrupt handling Anatoly Burakov
2018-06-26 10:53 ` [dpdk-dev] [PATCH v2 3/7] eal/bsdapp: add interrupt thread Anatoly Burakov
2018-06-26 10:53 ` [dpdk-dev] [PATCH v2 4/7] eal/bsdapp: add alarm support Anatoly Burakov
2018-06-26 10:53 ` [dpdk-dev] [PATCH v2 5/7] eal: bring forward init of interrupt handling Anatoly Burakov
2018-07-12 22:36   ` Thomas Monjalon
2018-07-13  7:41     ` Burakov, Anatoly
2018-07-13  8:09     ` David Marchand
2018-07-13  9:10       ` Tiwei Bie
2018-07-13 11:28         ` David Marchand
2018-06-26 10:53 ` Anatoly Burakov [this message]
2018-06-26 10:53 ` [dpdk-dev] [PATCH v2 7/7] doc: document IPC callback limitations Anatoly Burakov

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1f85bec5ac54c3880046e0937ed3ba49ce6ea5fe.1530009564.git.anatoly.burakov@intel.com \
    --to=anatoly.burakov@intel.com \
    --cc=bruce.richardson@intel.com \
    --cc=dev@dpdk.org \
    --cc=jianfeng.tan@intel.com \
    --cc=konstantin.ananyev@intel.com \
    --cc=qi.z.zhang@intel.com \
    --cc=thomas@monjalon.net \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).