From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga06.intel.com (mga06.intel.com [134.134.136.31]) by dpdk.org (Postfix) with ESMTP id 59472354D for ; Tue, 27 Mar 2018 15:59:48 +0200 (CEST) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga005.fm.intel.com ([10.253.24.32]) by orsmga104.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 27 Mar 2018 06:59:47 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.48,367,1517904000"; d="scan'208";a="215284593" Received: from irvmail001.ir.intel.com ([163.33.26.43]) by fmsmga005.fm.intel.com with ESMTP; 27 Mar 2018 06:59:46 -0700 Received: from sivswdev01.ir.intel.com (sivswdev01.ir.intel.com [10.237.217.45]) by irvmail001.ir.intel.com (8.14.3/8.13.6/MailSET/Hub) with ESMTP id w2RDxjHW002121; Tue, 27 Mar 2018 14:59:45 +0100 Received: from sivswdev01.ir.intel.com (localhost [127.0.0.1]) by sivswdev01.ir.intel.com with ESMTP id w2RDxjZs022324; Tue, 27 Mar 2018 14:59:45 +0100 Received: (from aburakov@localhost) by sivswdev01.ir.intel.com with LOCAL id w2RDxjpt022320; Tue, 27 Mar 2018 14:59:45 +0100 From: Anatoly Burakov To: dev@dpdk.org Cc: rmdir:failed@ecsmtp.ir.intel.com, to@ecsmtp.ir.intel.com, remove@ecsmtp.ir.intel.com, "'kernel'":Directory@ecsmtp.ir.intel.com, not@ecsmtp.ir.intel.com, empty@ecsmtp.ir.intel.com, jianfeng.tan@intel.com, konstantin.ananyev@intel.com Date: Tue, 27 Mar 2018 14:59:44 +0100 Message-Id: <5d94df0912159b96aa71da5249598bd8c744a198.1522159146.git.anatoly.burakov@intel.com> X-Mailer: git-send-email 1.7.0.7 In-Reply-To: References: In-Reply-To: <338e432277f98a206df87e7808ec799de28535ce.1521895541.git.anatoly.burakov@intel.com> References: <338e432277f98a206df87e7808ec799de28535ce.1521895541.git.anatoly.burakov@intel.com> Subject: [dpdk-dev] [PATCH v6 2/2] eal: add asynchronous request API to DPDK IPC X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 27 Mar 2018 13:59:50 -0000 This API is similar to the blocking API that is already present, but reply will be received in a separate callback by the caller (callback specified at the time of request, rather than registering for it in advance). Under the hood, we create a separate thread to deal with replies to asynchronous requests, that will just wait to be notified by the main thread, or woken up on a timer. Signed-off-by: Anatoly Burakov Acked-by: Jianfeng Tan --- Notes: v6: - address review comments from Jianfeng - do not add dummy item to queue by default v5: - addressed review comments from Jianfeng - split into two patches to avoid rename noise - do not mark ignored message as processed v4: - rebase on top of latest IPC Improvements patchset [2] v3: - added support for MP_IGN messages introduced in IPC improvements v5 patchset v2: - fixed deadlocks and race conditions by not calling callbacks while iterating over sync request list - fixed use-after-free by making a copy of request - changed API to also give user a copy of original request, so that they know to which message the callback is a reply to - fixed missing .map file entries This patch is dependent upon previously published patchsets for IPC fixes [1] and improvements [2]. rte_mp_action_unregister and rte_mp_async_reply_unregister do the same thing - should we perhaps make it one function? [1] http://dpdk.org/dev/patchwork/bundle/aburakov/IPC_Fixes/ [2] http://dpdk.org/dev/patchwork/bundle/aburakov/IPC_Improvements/ lib/librte_eal/common/eal_common_proc.c | 458 +++++++++++++++++++++++++++++++- lib/librte_eal/common/include/rte_eal.h | 36 +++ lib/librte_eal/rte_eal_version.map | 1 + 3 files changed, 482 insertions(+), 13 deletions(-) diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c index 52b6ab2..139b653 100644 --- a/lib/librte_eal/common/eal_common_proc.c +++ b/lib/librte_eal/common/eal_common_proc.c @@ -26,6 +26,7 @@ #include #include #include +#include #include "eal_private.h" #include "eal_filesystem.h" @@ -60,13 +61,32 @@ struct mp_msg_internal { struct rte_mp_msg msg; }; +struct async_request_param { + rte_mp_async_reply_t clb; + struct rte_mp_reply user_reply; + struct timespec end; + int n_responses_processed; +}; + struct pending_request { TAILQ_ENTRY(pending_request) next; - int reply_received; + enum { + REQUEST_TYPE_SYNC, + REQUEST_TYPE_ASYNC + } type; char dst[PATH_MAX]; struct rte_mp_msg *request; struct rte_mp_msg *reply; - pthread_cond_t cond; + int reply_received; + RTE_STD_C11 + union { + struct { + struct async_request_param *param; + } async; + struct { + pthread_cond_t cond; + } sync; + }; }; TAILQ_HEAD(pending_request_list, pending_request); @@ -74,9 +94,12 @@ TAILQ_HEAD(pending_request_list, pending_request); static struct { struct pending_request_list requests; pthread_mutex_t lock; + pthread_cond_t async_cond; } pending_requests = { .requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests), - .lock = PTHREAD_MUTEX_INITIALIZER + .lock = PTHREAD_MUTEX_INITIALIZER, + .async_cond = PTHREAD_COND_INITIALIZER + /**< used in async requests only */ }; /* forward declarations */ @@ -273,7 +296,12 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s) memcpy(sync_req->reply, msg, sizeof(*msg)); /* -1 indicates that we've been asked to ignore */ sync_req->reply_received = m->type == MP_REP ? 1 : -1; - pthread_cond_signal(&sync_req->cond); + + if (sync_req->type == REQUEST_TYPE_SYNC) + pthread_cond_signal(&sync_req->sync.cond); + else if (sync_req->type == REQUEST_TYPE_ASYNC) + pthread_cond_signal( + &pending_requests.async_cond); } else RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name); pthread_mutex_unlock(&pending_requests.lock); @@ -320,6 +348,189 @@ mp_handle(void *arg __rte_unused) } static int +timespec_cmp(const struct timespec *a, const struct timespec *b) +{ + if (a->tv_sec < b->tv_sec) + return -1; + if (a->tv_sec > b->tv_sec) + return 1; + if (a->tv_nsec < b->tv_nsec) + return -1; + if (a->tv_nsec > b->tv_nsec) + return 1; + return 0; +} + +enum async_action { + ACTION_NONE, /**< don't do anything */ + ACTION_FREE, /**< free the action entry, but don't trigger callback */ + ACTION_TRIGGER /**< trigger callback, then free action entry */ +}; + +static enum async_action +process_async_request(struct pending_request *sr, const struct timespec *now) +{ + struct async_request_param *param; + struct rte_mp_reply *reply; + bool timeout, received, last_msg; + + param = sr->async.param; + reply = ¶m->user_reply; + + /* did we timeout? */ + timeout = timespec_cmp(¶m->end, now) <= 0; + + /* did we receive a response? */ + received = sr->reply_received != 0; + + /* if we didn't time out, and we didn't receive a response, ignore */ + if (!timeout && !received) + return ACTION_NONE; + + /* if we received a response, adjust relevant data and copy mesasge. */ + if (sr->reply_received == 1 && sr->reply) { + struct rte_mp_msg *msg, *user_msgs, *tmp; + + msg = sr->reply; + user_msgs = reply->msgs; + + tmp = realloc(user_msgs, sizeof(*msg) * + (reply->nb_received + 1)); + if (!tmp) { + RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n", + sr->dst, sr->request->name); + /* this entry is going to be removed and its message + * dropped, but we don't want to leak memory, so + * continue. + */ + } else { + user_msgs = tmp; + reply->msgs = user_msgs; + memcpy(&user_msgs[reply->nb_received], + msg, sizeof(*msg)); + reply->nb_received++; + } + + /* mark this request as processed */ + param->n_responses_processed++; + } else if (sr->reply_received == -1) { + /* we were asked to ignore this process */ + reply->nb_sent--; + } + free(sr->reply); + + last_msg = param->n_responses_processed == reply->nb_sent; + + return last_msg ? ACTION_TRIGGER : ACTION_FREE; +} + +static void +trigger_async_action(struct pending_request *sr) +{ + struct async_request_param *param; + struct rte_mp_reply *reply; + + param = sr->async.param; + reply = ¶m->user_reply; + + param->clb(sr->request, reply); + + /* clean up */ + free(sr->async.param->user_reply.msgs); + free(sr->async.param); + free(sr->request); +} + +static void * +async_reply_handle(void *arg __rte_unused) +{ + struct pending_request *sr; + struct timeval now; + struct timespec timeout, ts_now; + while (1) { + struct pending_request *trigger = NULL; + int ret; + bool nowait = false; + bool timedwait = false; + + pthread_mutex_lock(&pending_requests.lock); + + /* scan through the list and see if there are any timeouts that + * are earlier than our current timeout. + */ + TAILQ_FOREACH(sr, &pending_requests.requests, next) { + if (sr->type != REQUEST_TYPE_ASYNC) + continue; + if (!timedwait || timespec_cmp(&sr->async.param->end, + &timeout) < 0) { + memcpy(&timeout, &sr->async.param->end, + sizeof(timeout)); + timedwait = true; + } + + /* sometimes, we don't even wait */ + if (sr->reply_received) { + nowait = true; + break; + } + } + + if (nowait) + ret = 0; + else if (timedwait) + ret = pthread_cond_timedwait( + &pending_requests.async_cond, + &pending_requests.lock, &timeout); + else + ret = pthread_cond_wait(&pending_requests.async_cond, + &pending_requests.lock); + + if (gettimeofday(&now, NULL) < 0) { + RTE_LOG(ERR, EAL, "Cannot get current time\n"); + break; + } + ts_now.tv_nsec = now.tv_usec * 1000; + ts_now.tv_sec = now.tv_sec; + + if (ret == 0 || ret == ETIMEDOUT) { + struct pending_request *next; + /* we've either been woken up, or we timed out */ + + /* we have still the lock, check if anything needs + * processing. + */ + TAILQ_FOREACH_SAFE(sr, &pending_requests.requests, next, + next) { + enum async_action action; + if (sr->type != REQUEST_TYPE_ASYNC) + continue; + + action = process_async_request(sr, &ts_now); + if (action == ACTION_FREE) { + TAILQ_REMOVE(&pending_requests.requests, + sr, next); + free(sr); + } else if (action == ACTION_TRIGGER && + trigger == NULL) { + TAILQ_REMOVE(&pending_requests.requests, + sr, next); + trigger = sr; + } + } + } + pthread_mutex_unlock(&pending_requests.lock); + if (trigger) { + trigger_async_action(trigger); + free(trigger); + } + }; + + RTE_LOG(ERR, EAL, "ERROR: asynchronous requests disabled\n"); + + return NULL; +} + +static int open_socket_fd(void) { char peer_name[PATH_MAX] = {0}; @@ -382,7 +593,7 @@ rte_mp_channel_init(void) char thread_name[RTE_MAX_THREAD_NAME_LEN]; char path[PATH_MAX]; int dir_fd; - pthread_t tid; + pthread_t mp_handle_tid, async_reply_handle_tid; /* create filter path */ create_socket_path("*", path, sizeof(path)); @@ -419,7 +630,16 @@ rte_mp_channel_init(void) return -1; } - if (pthread_create(&tid, NULL, mp_handle, NULL) < 0) { + if (pthread_create(&mp_handle_tid, NULL, mp_handle, NULL) < 0) { + RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n", + strerror(errno)); + close(mp_fd); + mp_fd = -1; + return -1; + } + + if (pthread_create(&async_reply_handle_tid, NULL, + async_reply_handle, NULL) < 0) { RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n", strerror(errno)); close(mp_fd); @@ -430,7 +650,11 @@ rte_mp_channel_init(void) /* try best to set thread name */ snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_handle"); - rte_thread_setname(tid, thread_name); + rte_thread_setname(mp_handle_tid, thread_name); + + /* try best to set thread name */ + snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_async_handle"); + rte_thread_setname(async_reply_handle_tid, thread_name); /* unlock the directory */ flock(dir_fd, LOCK_UN); @@ -602,18 +826,77 @@ rte_mp_sendmsg(struct rte_mp_msg *msg) } static int -mp_request_one(const char *dst, struct rte_mp_msg *req, +mp_request_async(const char *dst, struct rte_mp_msg *req, + struct async_request_param *param) +{ + struct rte_mp_msg *reply_msg; + struct pending_request *sync_req, *exist; + int ret; + + sync_req = malloc(sizeof(*sync_req)); + reply_msg = malloc(sizeof(*reply_msg)); + if (sync_req == NULL || reply_msg == NULL) { + RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n"); + rte_errno = ENOMEM; + ret = -1; + goto fail; + } + + memset(sync_req, 0, sizeof(*sync_req)); + memset(reply_msg, 0, sizeof(*reply_msg)); + + sync_req->type = REQUEST_TYPE_ASYNC; + strcpy(sync_req->dst, dst); + sync_req->request = req; + sync_req->reply = reply_msg; + sync_req->async.param = param; + + /* queue already locked by caller */ + + exist = find_sync_request(dst, req->name); + if (!exist) { + TAILQ_INSERT_TAIL(&pending_requests.requests, sync_req, next); + } else { + RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name); + rte_errno = EEXIST; + ret = -1; + goto fail; + } + + ret = send_msg(dst, req, MP_REQ); + if (ret < 0) { + RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n", + dst, req->name); + ret = -1; + goto fail; + } else if (ret == 0) { + ret = 0; + goto fail; + } + + param->user_reply.nb_sent++; + + return 0; +fail: + free(sync_req); + free(reply_msg); + return ret; +} + +static int +mp_request_sync(const char *dst, struct rte_mp_msg *req, struct rte_mp_reply *reply, const struct timespec *ts) { int ret; struct rte_mp_msg msg, *tmp; struct pending_request sync_req, *exist; + sync_req.type = REQUEST_TYPE_SYNC; sync_req.reply_received = 0; strcpy(sync_req.dst, dst); sync_req.request = req; sync_req.reply = &msg; - pthread_cond_init(&sync_req.cond, NULL); + pthread_cond_init(&sync_req.sync.cond, NULL); pthread_mutex_lock(&pending_requests.lock); exist = find_sync_request(dst, req->name); @@ -637,7 +920,7 @@ mp_request_one(const char *dst, struct rte_mp_msg *req, reply->nb_sent++; do { - ret = pthread_cond_timedwait(&sync_req.cond, + ret = pthread_cond_timedwait(&sync_req.sync.cond, &pending_requests.lock, ts); } while (ret != 0 && ret != ETIMEDOUT); @@ -703,7 +986,7 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply, /* for secondary process, send request to the primary process only */ if (rte_eal_process_type() == RTE_PROC_SECONDARY) - return mp_request_one(eal_mp_socket_path(), req, reply, &end); + return mp_request_sync(eal_mp_socket_path(), req, reply, &end); /* for primary process, broadcast request, and collect reply 1 by 1 */ mp_dir = opendir(mp_dir_path); @@ -732,7 +1015,7 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply, snprintf(path, sizeof(path), "%s/%s", mp_dir_path, ent->d_name); - if (mp_request_one(path, req, reply, &end)) + if (mp_request_sync(path, req, reply, &end)) ret = -1; } /* unlock the directory */ @@ -744,9 +1027,158 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply, } int __rte_experimental -rte_mp_reply(struct rte_mp_msg *msg, const char *peer) +rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts, + rte_mp_async_reply_t clb) { + struct rte_mp_msg *copy; + struct pending_request *dummy; + struct async_request_param *param; + struct rte_mp_reply *reply; + int dir_fd, ret = 0; + DIR *mp_dir; + struct dirent *ent; + struct timeval now; + struct timespec *end; + bool dummy_used = false; + + RTE_LOG(DEBUG, EAL, "request: %s\n", req->name); + + if (check_input(req) == false) + return -1; + if (gettimeofday(&now, NULL) < 0) { + RTE_LOG(ERR, EAL, "Faile to get current time\n"); + rte_errno = errno; + return -1; + } + copy = malloc(sizeof(*copy)); + dummy = malloc(sizeof(*dummy)); + param = malloc(sizeof(*param)); + if (copy == NULL || dummy == NULL || param == NULL) { + RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n"); + rte_errno = ENOMEM; + goto fail; + } + + memset(copy, 0, sizeof(*copy)); + memset(dummy, 0, sizeof(*dummy)); + memset(param, 0, sizeof(*param)); + + /* copy message */ + memcpy(copy, req, sizeof(*copy)); + + param->n_responses_processed = 0; + param->clb = clb; + end = ¶m->end; + reply = ¶m->user_reply; + + end->tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000; + end->tv_sec = now.tv_sec + ts->tv_sec + + (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000; + reply->nb_sent = 0; + reply->nb_received = 0; + reply->msgs = NULL; + + /* we have to lock the request queue here, as we will be adding a bunch + * of requests to the queue at once, and some of the replies may arrive + * before we add all of the requests to the queue. + */ + pthread_mutex_lock(&pending_requests.lock); + /* we have to ensure that callback gets triggered even if we don't send + * anything, therefore earlier we have allocated a dummy request. fill + * it, and put it on the queue if we don't send any requests. + */ + dummy->type = REQUEST_TYPE_ASYNC; + dummy->request = copy; + dummy->reply = NULL; + dummy->async.param = param; + dummy->reply_received = 1; /* short-circuit the timeout */ + + /* for secondary process, send request to the primary process only */ + if (rte_eal_process_type() == RTE_PROC_SECONDARY) { + ret = mp_request_async(eal_mp_socket_path(), copy, param); + + /* if we didn't send anything, put dummy request on the queue */ + if (ret == 0 && reply->nb_sent == 0) { + TAILQ_INSERT_TAIL(&pending_requests.requests, dummy, + next); + dummy_used = true; + } + + pthread_mutex_unlock(&pending_requests.lock); + + /* if we couldn't send anything, clean up */ + if (ret != 0) + goto fail; + return 0; + } + + /* for primary process, broadcast request */ + mp_dir = opendir(mp_dir_path); + if (!mp_dir) { + RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path); + rte_errno = errno; + goto unlock_fail; + } + dir_fd = dirfd(mp_dir); + + /* lock the directory to prevent processes spinning up while we send */ + if (flock(dir_fd, LOCK_EX)) { + RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", + mp_dir_path); + rte_errno = errno; + goto closedir_fail; + } + + while ((ent = readdir(mp_dir))) { + char path[PATH_MAX]; + + if (fnmatch(mp_filter, ent->d_name, 0) != 0) + continue; + + snprintf(path, sizeof(path), "%s/%s", mp_dir_path, + ent->d_name); + + if (mp_request_async(path, copy, param)) + ret = -1; + } + /* if we didn't send anything, put dummy request on the queue */ + if (ret == 0 && reply->nb_sent == 0) { + TAILQ_INSERT_HEAD(&pending_requests.requests, dummy, next); + dummy_used = true; + } + + /* trigger async request thread wake up */ + pthread_cond_signal(&pending_requests.async_cond); + + /* finally, unlock the queue */ + pthread_mutex_unlock(&pending_requests.lock); + + /* unlock the directory */ + flock(dir_fd, LOCK_UN); + + /* dir_fd automatically closed on closedir */ + closedir(mp_dir); + + /* if dummy was unused, free it */ + if (!dummy_used) + free(dummy); + + return ret; +closedir_fail: + closedir(mp_dir); +unlock_fail: + pthread_mutex_unlock(&pending_requests.lock); +fail: + free(dummy); + free(param); + free(copy); + return -1; +} + +int __rte_experimental +rte_mp_reply(struct rte_mp_msg *msg, const char *peer) +{ RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name); if (check_input(msg) == false) diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h index 044474e..87ebfd0 100644 --- a/lib/librte_eal/common/include/rte_eal.h +++ b/lib/librte_eal/common/include/rte_eal.h @@ -230,6 +230,16 @@ struct rte_mp_reply { typedef int (*rte_mp_t)(const struct rte_mp_msg *msg, const void *peer); /** + * Asynchronous reply function typedef used by other components. + * + * As we create socket channel for primary/secondary communication, use + * this function typedef to register action for coming responses to asynchronous + * requests. + */ +typedef int (*rte_mp_async_reply_t)(const struct rte_mp_msg *request, + const struct rte_mp_reply *reply); + +/** * @warning * @b EXPERIMENTAL: this API may change without prior notice * @@ -321,6 +331,32 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply, * @warning * @b EXPERIMENTAL: this API may change without prior notice * + * Send a request to the peer process and expect a reply in a separate callback. + * + * This function sends a request message to the peer process, and will not + * block. Instead, reply will be received in a separate callback. + * + * @param req + * The req argument contains the customized request message. + * + * @param ts + * The ts argument specifies how long we can wait for the peer(s) to reply. + * + * @param clb + * The callback to trigger when all responses for this request have arrived. + * + * @return + * - On success, return 0. + * - On failure, return -1, and the reason will be stored in rte_errno. + */ +int __rte_experimental +rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts, + rte_mp_async_reply_t clb); + +/** + * @warning + * @b EXPERIMENTAL: this API may change without prior notice + * * Send a reply to the peer process. * * This function will send a reply message in response to a request message diff --git a/lib/librte_eal/rte_eal_version.map b/lib/librte_eal/rte_eal_version.map index d123602..328a0be 100644 --- a/lib/librte_eal/rte_eal_version.map +++ b/lib/librte_eal/rte_eal_version.map @@ -225,6 +225,7 @@ EXPERIMENTAL { rte_mp_action_unregister; rte_mp_sendmsg; rte_mp_request; + rte_mp_request_async; rte_mp_reply; rte_service_attr_get; rte_service_attr_reset_all; -- 2.7.4