From: Maxime Coquelin <maxime.coquelin@redhat.com>
To: dev@dpdk.org, david.marchand@redhat.com, chenbox@nvidia.com
Cc: Maxime Coquelin <maxime.coquelin@redhat.com>
Subject: [PATCH v2 4/5] vhost: improve fdset initialization
Date: Wed, 3 Apr 2024 11:24:47 +0200 [thread overview]
Message-ID: <20240403092448.1361820-5-maxime.coquelin@redhat.com> (raw)
In-Reply-To: <20240403092448.1361820-1-maxime.coquelin@redhat.com>
This patch heavily reworks fdset initialization:
- fdsets are now dynamically allocated by the FD manager
- the event dispatcher is now created by the FD manager
- struct fdset is now opaque to VDUSE and Vhost
Signed-off-by: Maxime Coquelin <maxime.coquelin@redhat.com>
---
lib/vhost/fd_man.c | 177 +++++++++++--
lib/vhost/fd_man.c.orig | 538 ++++++++++++++++++++++++++++++++++++++++
lib/vhost/fd_man.h | 39 +--
lib/vhost/socket.c | 24 +-
lib/vhost/vduse.c | 29 +--
5 files changed, 715 insertions(+), 92 deletions(-)
create mode 100644 lib/vhost/fd_man.c.orig
diff --git a/lib/vhost/fd_man.c b/lib/vhost/fd_man.c
index 0ae481b785..8b47c97d45 100644
--- a/lib/vhost/fd_man.c
+++ b/lib/vhost/fd_man.c
@@ -3,12 +3,16 @@
*/
#include <errno.h>
+#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <rte_common.h>
#include <rte_log.h>
+#include <rte_malloc.h>
+#include <rte_string_fns.h>
+#include <rte_thread.h>
#include "fd_man.h"
@@ -19,6 +23,79 @@ RTE_LOG_REGISTER_SUFFIX(vhost_fdset_logtype, fdset, INFO);
#define FDPOLLERR (POLLERR | POLLHUP | POLLNVAL)
+struct fdentry {
+ int fd; /* -1 indicates this entry is empty */
+ fd_cb rcb; /* callback when this fd is readable. */
+ fd_cb wcb; /* callback when this fd is writeable.*/
+ void *dat; /* fd context */
+ int busy; /* whether this entry is being used in cb. */
+};
+
+struct fdset {
+ char name[RTE_THREAD_NAME_SIZE];
+ struct pollfd rwfds[MAX_FDS];
+ struct fdentry fd[MAX_FDS];
+ rte_thread_t tid;
+ pthread_mutex_t fd_mutex;
+ pthread_mutex_t fd_polling_mutex;
+ int num; /* current fd number of this fdset */
+
+ union pipefds {
+ struct {
+ int pipefd[2];
+ };
+ struct {
+ int readfd;
+ int writefd;
+ };
+ } u;
+
+ pthread_mutex_t sync_mutex;
+ pthread_cond_t sync_cond;
+ bool sync;
+ bool destroy;
+};
+
+static int fdset_add_no_sync(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat);
+static uint32_t fdset_event_dispatch(void *arg);
+
+#define MAX_FDSETS 8
+
+static struct fdset *fdsets[MAX_FDSETS];
+pthread_mutex_t fdsets_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static struct fdset *
+fdset_lookup(const char *name)
+{
+ int i;
+
+ for (i = 0; i < MAX_FDSETS; i++) {
+ struct fdset *fdset = fdsets[i];
+ if (fdset == NULL)
+ continue;
+
+ if (!strncmp(fdset->name, name, RTE_THREAD_NAME_SIZE))
+ return fdset;
+ }
+
+ return NULL;
+}
+
+static int
+fdset_insert(struct fdset *fdset)
+{
+ int i;
+
+ for (i = 0; i < MAX_FDSETS; i++) {
+ if (fdsets[i] == NULL) {
+ fdsets[i] = fdset;
+ return 0;
+ }
+ }
+
+ return -1;
+}
+
static void
fdset_pipe_read_cb(int readfd, void *dat,
int *remove __rte_unused)
@@ -63,7 +140,7 @@ fdset_pipe_init(struct fdset *fdset)
return -1;
}
- ret = fdset_add(fdset, fdset->u.readfd,
+ ret = fdset_add_no_sync(fdset, fdset->u.readfd,
fdset_pipe_read_cb, NULL, fdset);
if (ret < 0) {
VHOST_FDMAN_LOG(ERR,
@@ -179,37 +256,82 @@ fdset_add_fd(struct fdset *pfdset, int idx, int fd,
pfd->revents = 0;
}
-void
-fdset_uninit(struct fdset *pfdset)
-{
- fdset_pipe_uninit(pfdset);
-}
-
-int
-fdset_init(struct fdset *pfdset)
+struct fdset *
+fdset_init(const char *name)
{
+ struct fdset *fdset;
+ uint32_t val;
int i;
- if (pfdset == NULL)
- return -1;
+ if (name == NULL) {
+ VHOST_FDMAN_LOG(ERR, "Invalid name");
+ goto err;
+ }
- pthread_mutex_init(&pfdset->fd_mutex, NULL);
- pthread_mutex_init(&pfdset->fd_polling_mutex, NULL);
+ pthread_mutex_lock(&fdsets_mutex);
+ fdset = fdset_lookup(name);
+ if (fdset) {
+ pthread_mutex_unlock(&fdsets_mutex);
+ return fdset;
+ }
+
+ fdset = rte_zmalloc(NULL, sizeof(*fdset), 0);
+ if (!fdset) {
+ VHOST_FDMAN_LOG(ERR, "Failed to alloc fdset %s", name);
+ goto err_unlock;
+ }
+
+ rte_strscpy(fdset->name, name, RTE_THREAD_NAME_SIZE);
+
+ pthread_mutex_init(&fdset->fd_mutex, NULL);
+ pthread_mutex_init(&fdset->fd_polling_mutex, NULL);
for (i = 0; i < MAX_FDS; i++) {
- pfdset->fd[i].fd = -1;
- pfdset->fd[i].dat = NULL;
+ fdset->fd[i].fd = -1;
+ fdset->fd[i].dat = NULL;
}
- pfdset->num = 0;
+ fdset->num = 0;
- return fdset_pipe_init(pfdset);
+ if (fdset_pipe_init(fdset)) {
+ VHOST_FDMAN_LOG(ERR, "Failed to init pipe for %s", name);
+ goto err_free;
+ }
+
+ if (rte_thread_create_internal_control(&fdset->tid, fdset->name,
+ fdset_event_dispatch, fdset)) {
+ VHOST_FDMAN_LOG(ERR, "Failed to create %s event dispatch thread",
+ fdset->name);
+ goto err_pipe;
+ }
+
+ if (fdset_insert(fdset)) {
+ VHOST_FDMAN_LOG(ERR, "Failed to insert fdset %s", name);
+ goto err_thread;
+ }
+
+ pthread_mutex_unlock(&fdsets_mutex);
+
+ return fdset;
+
+err_thread:
+ fdset->destroy = true;
+ fdset_sync(fdset);
+ rte_thread_join(fdset->tid, &val);
+err_pipe:
+ fdset_pipe_uninit(fdset);
+err_free:
+ rte_free(fdset);
+err_unlock:
+ pthread_mutex_unlock(&fdsets_mutex);
+err:
+ return NULL;
}
/**
* Register the fd in the fdset with read/write handler and context.
*/
-int
-fdset_add(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat)
+static int
+fdset_add_no_sync(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat)
{
int i;
@@ -232,6 +354,18 @@ fdset_add(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat)
fdset_add_fd(pfdset, i, fd, rcb, wcb, dat);
pthread_mutex_unlock(&pfdset->fd_mutex);
+ return 0;
+}
+
+int
+fdset_add(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat)
+{
+ int ret;
+
+ ret = fdset_add_no_sync(pfdset, fd, rcb, wcb, dat);
+ if (ret < 0)
+ return ret;
+
fdset_sync(pfdset);
return 0;
@@ -315,7 +449,7 @@ fdset_try_del(struct fdset *pfdset, int fd)
* will wait until the flag is reset to zero(which indicates the callback is
* finished), then it could free the context after fdset_del.
*/
-uint32_t
+static uint32_t
fdset_event_dispatch(void *arg)
{
int i;
@@ -404,6 +538,9 @@ fdset_event_dispatch(void *arg)
if (need_shrink)
fdset_shrink(pfdset);
+
+ if (pfdset->destroy)
+ break;
}
return 0;
diff --git a/lib/vhost/fd_man.c.orig b/lib/vhost/fd_man.c.orig
new file mode 100644
index 0000000000..c0149fbf4e
--- /dev/null
+++ b/lib/vhost/fd_man.c.orig
@@ -0,0 +1,538 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2010-2014 Intel Corporation
+ */
+
+#include <errno.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/eventfd.h>
+#include <unistd.h>
+
+#include <rte_common.h>
+#include <rte_log.h>
+#include <rte_malloc.h>
+#include <rte_thread.h>
+
+#include "fd_man.h"
+
+RTE_LOG_REGISTER_SUFFIX(vhost_fdset_logtype, fdset, INFO);
+#define RTE_LOGTYPE_VHOST_FDMAN vhost_fdset_logtype
+#define VHOST_FDMAN_LOG(level, ...) \
+ RTE_LOG_LINE(level, VHOST_FDMAN, "" __VA_ARGS__)
+
+#define FDPOLLERR (POLLERR | POLLHUP | POLLNVAL)
+
+struct fdentry {
+ int fd; /* -1 indicates this entry is empty */
+ fd_cb rcb; /* callback when this fd is readable. */
+ fd_cb wcb; /* callback when this fd is writeable.*/
+ void *dat; /* fd context */
+ int busy; /* whether this entry is being used in cb. */
+};
+
+struct fdset {
+ char name[RTE_THREAD_NAME_SIZE];
+ struct pollfd rwfds[MAX_FDS];
+ struct fdentry fd[MAX_FDS];
+ rte_thread_t tid;
+ pthread_mutex_t fd_mutex;
+ pthread_mutex_t fd_polling_mutex;
+ int num; /* current fd number of this fdset */
+
+ int sync_fd;
+ pthread_mutex_t sync_mutex;
+ pthread_cond_t sync_cond;
+ bool sync;
+
+ bool destroy;
+};
+
+static int fdset_add_no_sync(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat);
+static uint32_t fdset_event_dispatch(void *arg);
+
+#define MAX_FDSETS 8
+
+static struct fdset *fdsets[MAX_FDSETS];
+pthread_mutex_t fdsets_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static struct fdset *
+fdset_lookup(const char *name)
+{
+ int i;
+
+ for (i = 0; i < MAX_FDSETS; i++) {
+ struct fdset *fdset = fdsets[i];
+ if (fdset == NULL)
+ continue;
+
+ if (!strncmp(fdset->name, name, RTE_THREAD_NAME_SIZE))
+ return fdset;
+ }
+
+ return NULL;
+}
+
+static int
+fdset_insert(struct fdset *fdset)
+{
+ int i;
+
+ for (i = 0; i < MAX_FDSETS; i++) {
+ if (fdsets[i] == NULL) {
+ fdsets[i] = fdset;
+ return 0;
+ }
+ }
+
+ return -1;
+}
+
+static void
+fdset_sync_read_cb(int sync_fd, void *dat, int *remove __rte_unused)
+{
+ eventfd_t val;
+ struct fdset *fdset = dat;
+ int r = eventfd_read(sync_fd, &val);
+ /*
+ * Just an optimization, we don't care if read() failed
+ * so ignore explicitly its return value to make the
+ * compiler happy
+ */
+ RTE_SET_USED(r);
+
+ pthread_mutex_lock(&fdset->sync_mutex);
+ fdset->sync = true;
+ pthread_cond_broadcast(&fdset->sync_cond);
+ pthread_mutex_unlock(&fdset->sync_mutex);
+}
+
+static void
+fdset_sync_uninit(struct fdset *fdset)
+{
+ fdset_del(fdset, fdset->sync_fd);
+ close(fdset->sync_fd);
+ fdset->sync_fd = -1;
+}
+
+static int
+fdset_sync_init(struct fdset *fdset)
+{
+ int ret;
+
+ pthread_mutex_init(&fdset->sync_mutex, NULL);
+ pthread_cond_init(&fdset->sync_cond, NULL);
+
+ fdset->sync_fd = eventfd(0, 0);
+ if (fdset->sync_fd < 0) {
+ VHOST_FDMAN_LOG(ERR, "failed to create eventfd for %s fdset", fdset->name);
+ return -1;
+ }
+
+<<<<<<< HEAD
+ ret = fdset_add_no_sync(fdset, fdset->u.readfd,
+ fdset_pipe_read_cb, NULL, fdset);
+=======
+ ret = fdset_add(fdset, fdset->sync_fd, fdset_sync_read_cb, NULL, fdset);
+>>>>>>> 3474bf77e2 (vhost: convert fdset sync to eventfd)
+ if (ret < 0) {
+ VHOST_FDMAN_LOG(ERR, "failed to add eventfd %d to %s fdset",
+ fdset->sync_fd, fdset->name);
+
+ fdset_sync_uninit(fdset);
+ return -1;
+ }
+
+ return 0;
+}
+
+static void
+fdset_sync(struct fdset *fdset)
+{
+ int ret;
+
+ pthread_mutex_lock(&fdset->sync_mutex);
+
+ fdset->sync = false;
+ ret = eventfd_write(fdset->sync_fd, (eventfd_t)1);
+ if (ret < 0) {
+ VHOST_FDMAN_LOG(ERR, "Failed to write sync eventfd for %s fdset: %s",
+ fdset->name, strerror(errno));
+ goto out_unlock;
+ }
+
+ while (!fdset->sync)
+ pthread_cond_wait(&fdset->sync_cond, &fdset->sync_mutex);
+
+out_unlock:
+ pthread_mutex_unlock(&fdset->sync_mutex);
+}
+
+static int
+get_last_valid_idx(struct fdset *pfdset, int last_valid_idx)
+{
+ int i;
+
+ for (i = last_valid_idx; i >= 0 && pfdset->fd[i].fd == -1; i--)
+ ;
+
+ return i;
+}
+
+static void
+fdset_move(struct fdset *pfdset, int dst, int src)
+{
+ pfdset->fd[dst] = pfdset->fd[src];
+ pfdset->rwfds[dst] = pfdset->rwfds[src];
+}
+
+static void
+fdset_shrink_nolock(struct fdset *pfdset)
+{
+ int i;
+ int last_valid_idx = get_last_valid_idx(pfdset, pfdset->num - 1);
+
+ for (i = 0; i < last_valid_idx; i++) {
+ if (pfdset->fd[i].fd != -1)
+ continue;
+
+ fdset_move(pfdset, i, last_valid_idx);
+ last_valid_idx = get_last_valid_idx(pfdset, last_valid_idx - 1);
+ }
+ pfdset->num = last_valid_idx + 1;
+}
+
+/*
+ * Find deleted fd entries and remove them
+ */
+static void
+fdset_shrink(struct fdset *pfdset)
+{
+ pthread_mutex_lock(&pfdset->fd_mutex);
+ fdset_shrink_nolock(pfdset);
+ pthread_mutex_unlock(&pfdset->fd_mutex);
+}
+
+/**
+ * Returns the index in the fdset for a given fd.
+ * @return
+ * index for the fd, or -1 if fd isn't in the fdset.
+ */
+static int
+fdset_find_fd(struct fdset *pfdset, int fd)
+{
+ int i;
+
+ for (i = 0; i < pfdset->num && pfdset->fd[i].fd != fd; i++)
+ ;
+
+ return i == pfdset->num ? -1 : i;
+}
+
+static void
+fdset_add_fd(struct fdset *pfdset, int idx, int fd,
+ fd_cb rcb, fd_cb wcb, void *dat)
+{
+ struct fdentry *pfdentry = &pfdset->fd[idx];
+ struct pollfd *pfd = &pfdset->rwfds[idx];
+
+ pfdentry->fd = fd;
+ pfdentry->rcb = rcb;
+ pfdentry->wcb = wcb;
+ pfdentry->dat = dat;
+
+ pfd->fd = fd;
+ pfd->events = rcb ? POLLIN : 0;
+ pfd->events |= wcb ? POLLOUT : 0;
+ pfd->revents = 0;
+}
+
+struct fdset *
+fdset_init(const char *name)
+{
+ struct fdset *fdset;
+ uint32_t val;
+ int i;
+
+ if (name == NULL) {
+ VHOST_FDMAN_LOG(ERR, "Invalid name");
+ goto err;
+ }
+
+ pthread_mutex_lock(&fdsets_mutex);
+ fdset = fdset_lookup(name);
+ if (fdset) {
+ pthread_mutex_unlock(&fdsets_mutex);
+ return fdset;
+ }
+
+ fdset = rte_zmalloc(NULL, sizeof(*fdset), 0);
+ if (!fdset) {
+ VHOST_FDMAN_LOG(ERR, "Failed to alloc fdset %s", name);
+ goto err_unlock;
+ }
+
+ strncpy(fdset->name, name, RTE_THREAD_NAME_SIZE - 1);
+
+ pthread_mutex_init(&fdset->fd_mutex, NULL);
+ pthread_mutex_init(&fdset->fd_polling_mutex, NULL);
+
+ for (i = 0; i < MAX_FDS; i++) {
+ fdset->fd[i].fd = -1;
+ fdset->fd[i].dat = NULL;
+ }
+ fdset->num = 0;
+
+ if (fdset_sync_init(fdset)) {
+ VHOST_FDMAN_LOG(ERR, "Failed to init sync for %s", name);
+ goto err_free;
+ }
+
+ if (rte_thread_create_internal_control(&fdset->tid, fdset->name,
+ fdset_event_dispatch, fdset)) {
+ VHOST_FDMAN_LOG(ERR, "Failed to create %s event dispatch thread",
+ fdset->name);
+ goto err_sync;
+ }
+
+ if (fdset_insert(fdset)) {
+ VHOST_FDMAN_LOG(ERR, "Failed to insert fdset %s", name);
+ goto err_thread;
+ }
+
+ pthread_mutex_unlock(&fdsets_mutex);
+
+ return fdset;
+
+err_thread:
+ fdset->destroy = true;
+ fdset_sync(fdset);
+ rte_thread_join(fdset->tid, &val);
+err_sync:
+ fdset_sync_uninit(fdset);
+err_free:
+ rte_free(fdset);
+err_unlock:
+ pthread_mutex_unlock(&fdsets_mutex);
+err:
+ return NULL;
+}
+
+/**
+ * Register the fd in the fdset with read/write handler and context.
+ */
+static int
+fdset_add_no_sync(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat)
+{
+ int i;
+
+ if (pfdset == NULL || fd == -1)
+ return -1;
+
+ pthread_mutex_lock(&pfdset->fd_mutex);
+ i = pfdset->num < MAX_FDS ? pfdset->num++ : -1;
+ if (i == -1) {
+ pthread_mutex_lock(&pfdset->fd_polling_mutex);
+ fdset_shrink_nolock(pfdset);
+ pthread_mutex_unlock(&pfdset->fd_polling_mutex);
+ i = pfdset->num < MAX_FDS ? pfdset->num++ : -1;
+ if (i == -1) {
+ pthread_mutex_unlock(&pfdset->fd_mutex);
+ return -2;
+ }
+ }
+
+ fdset_add_fd(pfdset, i, fd, rcb, wcb, dat);
+ pthread_mutex_unlock(&pfdset->fd_mutex);
+
+ return 0;
+}
+
+int
+fdset_add(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat)
+{
+ int ret;
+
+ ret = fdset_add_no_sync(pfdset, fd, rcb, wcb, dat);
+ if (ret < 0)
+ return ret;
+
+ fdset_sync(pfdset);
+
+ return 0;
+}
+
+/**
+ * Unregister the fd from the fdset.
+ * Returns context of a given fd or NULL.
+ */
+void *
+fdset_del(struct fdset *pfdset, int fd)
+{
+ int i;
+ void *dat = NULL;
+
+ if (pfdset == NULL || fd == -1)
+ return NULL;
+
+ do {
+ pthread_mutex_lock(&pfdset->fd_mutex);
+
+ i = fdset_find_fd(pfdset, fd);
+ if (i != -1 && pfdset->fd[i].busy == 0) {
+ /* busy indicates r/wcb is executing! */
+ dat = pfdset->fd[i].dat;
+ pfdset->fd[i].fd = -1;
+ pfdset->fd[i].rcb = pfdset->fd[i].wcb = NULL;
+ pfdset->fd[i].dat = NULL;
+ i = -1;
+ }
+ pthread_mutex_unlock(&pfdset->fd_mutex);
+ } while (i != -1);
+
+ fdset_sync(pfdset);
+
+ return dat;
+}
+
+/**
+ * Unregister the fd from the fdset.
+ *
+ * If parameters are invalid, return directly -2.
+ * And check whether fd is busy, if yes, return -1.
+ * Otherwise, try to delete the fd from fdset and
+ * return true.
+ */
+int
+fdset_try_del(struct fdset *pfdset, int fd)
+{
+ int i;
+
+ if (pfdset == NULL || fd == -1)
+ return -2;
+
+ pthread_mutex_lock(&pfdset->fd_mutex);
+ i = fdset_find_fd(pfdset, fd);
+ if (i != -1 && pfdset->fd[i].busy) {
+ pthread_mutex_unlock(&pfdset->fd_mutex);
+ return -1;
+ }
+
+ if (i != -1) {
+ pfdset->fd[i].fd = -1;
+ pfdset->fd[i].rcb = pfdset->fd[i].wcb = NULL;
+ pfdset->fd[i].dat = NULL;
+ }
+
+ pthread_mutex_unlock(&pfdset->fd_mutex);
+
+ fdset_sync(pfdset);
+
+ return 0;
+}
+
+/**
+ * This functions runs in infinite blocking loop until there is no fd in
+ * pfdset. It calls corresponding r/w handler if there is event on the fd.
+ *
+ * Before the callback is called, we set the flag to busy status; If other
+ * thread(now rte_vhost_driver_unregister) calls fdset_del concurrently, it
+ * will wait until the flag is reset to zero(which indicates the callback is
+ * finished), then it could free the context after fdset_del.
+ */
+static uint32_t
+fdset_event_dispatch(void *arg)
+{
+ int i;
+ struct pollfd *pfd;
+ struct fdentry *pfdentry;
+ fd_cb rcb, wcb;
+ void *dat;
+ int fd, numfds;
+ int remove1, remove2;
+ int need_shrink;
+ struct fdset *pfdset = arg;
+ int val;
+
+ if (pfdset == NULL)
+ return 0;
+
+ while (1) {
+
+ /*
+ * When poll is blocked, other threads might unregister
+ * listenfds from and register new listenfds into fdset.
+ * When poll returns, the entries for listenfds in the fdset
+ * might have been updated. It is ok if there is unwanted call
+ * for new listenfds.
+ */
+ pthread_mutex_lock(&pfdset->fd_mutex);
+ numfds = pfdset->num;
+ pthread_mutex_unlock(&pfdset->fd_mutex);
+
+ pthread_mutex_lock(&pfdset->fd_polling_mutex);
+ val = poll(pfdset->rwfds, numfds, 1000 /* millisecs */);
+ pthread_mutex_unlock(&pfdset->fd_polling_mutex);
+ if (val < 0)
+ continue;
+
+ need_shrink = 0;
+ for (i = 0; i < numfds; i++) {
+ pthread_mutex_lock(&pfdset->fd_mutex);
+
+ pfdentry = &pfdset->fd[i];
+ fd = pfdentry->fd;
+ pfd = &pfdset->rwfds[i];
+
+ if (fd < 0) {
+ need_shrink = 1;
+ pthread_mutex_unlock(&pfdset->fd_mutex);
+ continue;
+ }
+
+ if (!pfd->revents) {
+ pthread_mutex_unlock(&pfdset->fd_mutex);
+ continue;
+ }
+
+ remove1 = remove2 = 0;
+
+ rcb = pfdentry->rcb;
+ wcb = pfdentry->wcb;
+ dat = pfdentry->dat;
+ pfdentry->busy = 1;
+
+ pthread_mutex_unlock(&pfdset->fd_mutex);
+
+ if (rcb && pfd->revents & (POLLIN | FDPOLLERR))
+ rcb(fd, dat, &remove1);
+ if (wcb && pfd->revents & (POLLOUT | FDPOLLERR))
+ wcb(fd, dat, &remove2);
+ pfdentry->busy = 0;
+ /*
+ * fdset_del needs to check busy flag.
+ * We don't allow fdset_del to be called in callback
+ * directly.
+ */
+ /*
+ * When we are to clean up the fd from fdset,
+ * because the fd is closed in the cb,
+ * the old fd val could be reused by when creates new
+ * listen fd in another thread, we couldn't call
+ * fdset_del.
+ */
+ if (remove1 || remove2) {
+ pfdentry->fd = -1;
+ need_shrink = 1;
+ }
+ }
+
+ if (need_shrink)
+ fdset_shrink(pfdset);
+
+ if (pfdset->destroy)
+ break;
+ }
+
+ return 0;
+}
diff --git a/lib/vhost/fd_man.h b/lib/vhost/fd_man.h
index c18e3a435c..079fa0155f 100644
--- a/lib/vhost/fd_man.h
+++ b/lib/vhost/fd_man.h
@@ -8,50 +8,19 @@
#include <poll.h>
#include <stdbool.h>
+struct fdset;
+
#define MAX_FDS 1024
typedef void (*fd_cb)(int fd, void *dat, int *remove);
-struct fdentry {
- int fd; /* -1 indicates this entry is empty */
- fd_cb rcb; /* callback when this fd is readable. */
- fd_cb wcb; /* callback when this fd is writeable.*/
- void *dat; /* fd context */
- int busy; /* whether this entry is being used in cb. */
-};
-
-struct fdset {
- struct pollfd rwfds[MAX_FDS];
- struct fdentry fd[MAX_FDS];
- pthread_mutex_t fd_mutex;
- pthread_mutex_t fd_polling_mutex;
- int num; /* current fd number of this fdset */
-
- union pipefds {
- struct {
- int pipefd[2];
- };
- struct {
- int readfd;
- int writefd;
- };
- } u;
-
- pthread_mutex_t sync_mutex;
- pthread_cond_t sync_cond;
- bool sync;
-};
-
-void fdset_uninit(struct fdset *pfdset);
-
-int fdset_init(struct fdset *pfdset);
+struct fdset *fdset_init(const char *name);
int fdset_add(struct fdset *pfdset, int fd,
fd_cb rcb, fd_cb wcb, void *dat);
void *fdset_del(struct fdset *pfdset, int fd);
-int fdset_try_del(struct fdset *pfdset, int fd);
-uint32_t fdset_event_dispatch(void *arg);
+int fdset_try_del(struct fdset *pfdset, int fd);
#endif
diff --git a/lib/vhost/socket.c b/lib/vhost/socket.c
index 5afb952a21..c68e9bd5a8 100644
--- a/lib/vhost/socket.c
+++ b/lib/vhost/socket.c
@@ -76,7 +76,7 @@ struct vhost_user_connection {
#define MAX_VHOST_SOCKET 1024
struct vhost_user {
struct vhost_user_socket *vsockets[MAX_VHOST_SOCKET];
- struct fdset fdset;
+ struct fdset *fdset;
int vsocket_cnt;
pthread_mutex_t mutex;
};
@@ -261,7 +261,7 @@ vhost_user_add_connection(int fd, struct vhost_user_socket *vsocket)
conn->connfd = fd;
conn->vsocket = vsocket;
conn->vid = vid;
- ret = fdset_add(&vhost_user.fdset, fd, vhost_user_read_cb,
+ ret = fdset_add(vhost_user.fdset, fd, vhost_user_read_cb,
NULL, conn);
if (ret < 0) {
VHOST_CONFIG_LOG(vsocket->path, ERR,
@@ -394,7 +394,7 @@ vhost_user_start_server(struct vhost_user_socket *vsocket)
if (ret < 0)
goto err;
- ret = fdset_add(&vhost_user.fdset, fd, vhost_user_server_new_connection,
+ ret = fdset_add(vhost_user.fdset, fd, vhost_user_server_new_connection,
NULL, vsocket);
if (ret < 0) {
VHOST_CONFIG_LOG(path, ERR, "failed to add listen fd %d to vhost server fdset",
@@ -1079,7 +1079,7 @@ rte_vhost_driver_unregister(const char *path)
* mutex lock, and try again since the r/wcb
* may use the mutex lock.
*/
- if (fdset_try_del(&vhost_user.fdset, vsocket->socket_fd) == -1) {
+ if (fdset_try_del(vhost_user.fdset, vsocket->socket_fd) == -1) {
pthread_mutex_unlock(&vhost_user.mutex);
goto again;
}
@@ -1099,7 +1099,7 @@ rte_vhost_driver_unregister(const char *path)
* try again since the r/wcb may use the
* conn_mutex and mutex locks.
*/
- if (fdset_try_del(&vhost_user.fdset,
+ if (fdset_try_del(vhost_user.fdset,
conn->connfd) == -1) {
pthread_mutex_unlock(&vsocket->conn_mutex);
pthread_mutex_unlock(&vhost_user.mutex);
@@ -1167,7 +1167,6 @@ int
rte_vhost_driver_start(const char *path)
{
struct vhost_user_socket *vsocket;
- static rte_thread_t fdset_tid;
pthread_mutex_lock(&vhost_user.mutex);
vsocket = find_vhost_user_socket(path);
@@ -1179,19 +1178,12 @@ rte_vhost_driver_start(const char *path)
if (vsocket->is_vduse)
return vduse_device_create(path, vsocket->net_compliant_ol_flags);
- if (fdset_tid.opaque_id == 0) {
- if (fdset_init(&vhost_user.fdset) < 0) {
+ if (vhost_user.fdset == NULL) {
+ vhost_user.fdset = fdset_init("vhost-evt");
+ if (vhost_user.fdset == NULL) {
VHOST_CONFIG_LOG(path, ERR, "failed to init Vhost-user fdset");
return -1;
}
-
- int ret = rte_thread_create_internal_control(&fdset_tid,
- "vhost-evt", fdset_event_dispatch, &vhost_user.fdset);
- if (ret != 0) {
- VHOST_CONFIG_LOG(path, ERR, "failed to create fdset handling thread");
- fdset_uninit(&vhost_user.fdset);
- return -1;
- }
}
if (vsocket->is_server)
diff --git a/lib/vhost/vduse.c b/lib/vhost/vduse.c
index d87fc500d4..c66602905c 100644
--- a/lib/vhost/vduse.c
+++ b/lib/vhost/vduse.c
@@ -28,13 +28,11 @@
#define VDUSE_CTRL_PATH "/dev/vduse/control"
struct vduse {
- struct fdset fdset;
+ struct fdset *fdset;
};
static struct vduse vduse;
-static bool vduse_events_thread;
-
static const char * const vduse_reqs_str[] = {
"VDUSE_GET_VQ_STATE",
"VDUSE_SET_STATUS",
@@ -215,7 +213,7 @@ vduse_vring_setup(struct virtio_net *dev, unsigned int index)
}
if (vq == dev->cvq) {
- ret = fdset_add(&vduse.fdset, vq->kickfd, vduse_control_queue_event, NULL, dev);
+ ret = fdset_add(vduse.fdset, vq->kickfd, vduse_control_queue_event, NULL, dev);
if (ret) {
VHOST_CONFIG_LOG(dev->ifname, ERR,
"Failed to setup kickfd handler for VQ %u: %s",
@@ -238,7 +236,7 @@ vduse_vring_cleanup(struct virtio_net *dev, unsigned int index)
int ret;
if (vq == dev->cvq && vq->kickfd >= 0)
- fdset_del(&vduse.fdset, vq->kickfd);
+ fdset_del(vduse.fdset, vq->kickfd);
vq_efd.index = index;
vq_efd.fd = VDUSE_EVENTFD_DEASSIGN;
@@ -413,7 +411,6 @@ int
vduse_device_create(const char *path, bool compliant_ol_flags)
{
int control_fd, dev_fd, vid, ret;
- rte_thread_t fdset_tid;
uint32_t i, max_queue_pairs, total_queues;
struct virtio_net *dev;
struct virtio_net_config vnet_config = {{ 0 }};
@@ -422,22 +419,12 @@ vduse_device_create(const char *path, bool compliant_ol_flags)
struct vduse_dev_config *dev_config = NULL;
const char *name = path + strlen("/dev/vduse/");
- /* If first device, create events dispatcher thread */
- if (vduse_events_thread == false) {
- if (fdset_init(&vduse.fdset) < 0) {
+ if (vduse.fdset == NULL) {
+ vduse.fdset = fdset_init("vduse-evt");
+ if (vduse.fdset == NULL) {
VHOST_CONFIG_LOG(path, ERR, "failed to init VDUSE fdset");
return -1;
}
-
- ret = rte_thread_create_internal_control(&fdset_tid, "vduse-evt",
- fdset_event_dispatch, &vduse.fdset);
- if (ret != 0) {
- VHOST_CONFIG_LOG(path, ERR, "failed to create vduse fdset handling thread");
- fdset_uninit(&vduse.fdset);
- return -1;
- }
-
- vduse_events_thread = true;
}
control_fd = open(VDUSE_CTRL_PATH, O_RDWR);
@@ -555,7 +542,7 @@ vduse_device_create(const char *path, bool compliant_ol_flags)
dev->cvq = dev->virtqueue[max_queue_pairs * 2];
- ret = fdset_add(&vduse.fdset, dev->vduse_dev_fd, vduse_events_handler, NULL, dev);
+ ret = fdset_add(vduse.fdset, dev->vduse_dev_fd, vduse_events_handler, NULL, dev);
if (ret) {
VHOST_CONFIG_LOG(name, ERR, "Failed to add fd %d to vduse fdset",
dev->vduse_dev_fd);
@@ -602,7 +589,7 @@ vduse_device_destroy(const char *path)
vduse_device_stop(dev);
- fdset_del(&vduse.fdset, dev->vduse_dev_fd);
+ fdset_del(vduse.fdset, dev->vduse_dev_fd);
if (dev->vduse_dev_fd >= 0) {
close(dev->vduse_dev_fd);
--
2.44.0
next prev parent reply other threads:[~2024-04-03 9:25 UTC|newest]
Thread overview: 6+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-04-03 9:24 [PATCH v2 0/5] vhost: FD manager improvements Maxime Coquelin
2024-04-03 9:24 ` [PATCH v2 1/5] vhost: rename polling mutex Maxime Coquelin
2024-04-03 9:24 ` [PATCH v2 2/5] vhost: make use of FD manager init function Maxime Coquelin
2024-04-03 9:24 ` [PATCH v2 3/5] vhost: hide synchronization within FD manager Maxime Coquelin
2024-04-03 9:24 ` Maxime Coquelin [this message]
2024-04-03 9:24 ` [PATCH v2 5/5] vhost: manage FD with epoll Maxime Coquelin
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20240403092448.1361820-5-maxime.coquelin@redhat.com \
--to=maxime.coquelin@redhat.com \
--cc=chenbox@nvidia.com \
--cc=david.marchand@redhat.com \
--cc=dev@dpdk.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).