From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mx1.redhat.com (mx1.redhat.com [209.132.183.28]) by dpdk.org (Postfix) with ESMTP id E38A71B2F4 for ; Fri, 19 Jan 2018 14:44:59 +0100 (CET) Received: from smtp.corp.redhat.com (int-mx03.intmail.prod.int.phx2.redhat.com [10.5.11.13]) (using TLSv1.2 with cipher AECDH-AES256-SHA (256/256 bits)) (No client certificate requested) by mx1.redhat.com (Postfix) with ESMTPS id 517707855C; Fri, 19 Jan 2018 13:44:59 +0000 (UTC) Received: from localhost (ovpn-116-254.ams2.redhat.com [10.36.116.254]) by smtp.corp.redhat.com (Postfix) with ESMTP id C968860C7F; Fri, 19 Jan 2018 13:44:57 +0000 (UTC) From: Stefan Hajnoczi To: dev@dpdk.org Cc: maxime.coquelin@redhat.com, Yuanhan Liu , wei.w.wang@intel.com, mst@redhat.com, zhiyong.yang@intel.com, jasowang@redhat.com, Stefan Hajnoczi Date: Fri, 19 Jan 2018 13:44:22 +0000 Message-Id: <20180119134444.24927-3-stefanha@redhat.com> In-Reply-To: <20180119134444.24927-1-stefanha@redhat.com> References: <20180119134444.24927-1-stefanha@redhat.com> X-Scanned-By: MIMEDefang 2.79 on 10.5.11.13 X-Greylist: Sender IP whitelisted, not delayed by milter-greylist-4.5.16 (mx1.redhat.com [10.5.110.28]); Fri, 19 Jan 2018 13:44:59 +0000 (UTC) Subject: [dpdk-dev] [RFC 02/24] vhost: move AF_UNIX code from socket.c to trans_af_unix.c X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Fri, 19 Jan 2018 13:45:00 -0000 The socket.c file serves two purposes: 1. librte_vhost public API entry points, e.g. rte_vhost_driver_register(). 2. AF_UNIX socket management. Move AF_UNIX socket code into trans_af_unix.c so that socket.c only handles the librte_vhost public API entry points. This will make it possible to support other transports besides AF_UNIX. This patch is a preparatory step that simply moves code from socket.c to trans_af_unix.c unmodified, besides dropping 'static' qualifiers where necessary because socket.c now calls into trans_af_unix.c. A lot of socket.c state is exposed in vhost.h but this is a temporary measure and will be cleaned up in later patches. By simply moving code unmodified in this patch it will be easier to review the actual refactoring that follows. Signed-off-by: Stefan Hajnoczi --- lib/librte_vhost/vhost.h | 65 +++++ lib/librte_vhost/socket.c | 501 +-------------------------------------- lib/librte_vhost/trans_af_unix.c | 451 +++++++++++++++++++++++++++++++++++ 3 files changed, 517 insertions(+), 500 deletions(-) diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h index 53811a8b1..8c6d6e524 100644 --- a/lib/librte_vhost/vhost.h +++ b/lib/librte_vhost/vhost.h @@ -5,6 +5,7 @@ #ifndef _VHOST_NET_CDEV_H_ #define _VHOST_NET_CDEV_H_ #include +#include #include #include #include @@ -12,12 +13,15 @@ #include #include #include +#include /* TODO remove when trans_af_unix.c refactoring is done */ #include +#include #include #include #include +#include "fd_man.h" #include "rte_vhost.h" /* Used to indicate that the device is running on a data core */ @@ -259,6 +263,67 @@ struct virtio_net { int slave_req_fd; } __rte_cache_aligned; +/* The vhost_user, vhost_user_socket, vhost_user_connection, and reconnect + * declarations are temporary measures for moving AF_UNIX code into + * trans_af_unix.c. They will be cleaned up as socket.c is untangled from + * trans_af_unix.c. + */ +TAILQ_HEAD(vhost_user_connection_list, vhost_user_connection); + +/* + * Every time rte_vhost_driver_register() is invoked, an associated + * vhost_user_socket struct will be created. + */ +struct vhost_user_socket { + struct vhost_user_connection_list conn_list; + pthread_mutex_t conn_mutex; + char *path; + int socket_fd; + struct sockaddr_un un; + bool is_server; + bool reconnect; + bool dequeue_zero_copy; + bool iommu_support; + + /* + * The "supported_features" indicates the feature bits the + * vhost driver supports. The "features" indicates the feature + * bits after the rte_vhost_driver_features_disable/enable(). + * It is also the final feature bits used for vhost-user + * features negotiation. + */ + uint64_t supported_features; + uint64_t features; + + struct vhost_device_ops const *notify_ops; +}; + +struct vhost_user_connection { + struct vhost_user_socket *vsocket; + int connfd; + int vid; + + TAILQ_ENTRY(vhost_user_connection) next; +}; + +#define MAX_VHOST_SOCKET 1024 +struct vhost_user { + struct vhost_user_socket *vsockets[MAX_VHOST_SOCKET]; + struct fdset fdset; + int vsocket_cnt; + pthread_mutex_t mutex; +}; + +extern struct vhost_user vhost_user; + +int create_unix_socket(struct vhost_user_socket *vsocket); +int vhost_user_start_server(struct vhost_user_socket *vsocket); +int vhost_user_start_client(struct vhost_user_socket *vsocket); + +extern pthread_t reconn_tid; + +int vhost_user_reconnect_init(void); +bool vhost_user_remove_reconnect(struct vhost_user_socket *vsocket); #define VHOST_LOG_PAGE 4096 diff --git a/lib/librte_vhost/socket.c b/lib/librte_vhost/socket.c index 6e3857e7a..d681f9cae 100644 --- a/lib/librte_vhost/socket.c +++ b/lib/librte_vhost/socket.c @@ -4,17 +4,14 @@ #include #include -#include #include #include #include #include #include #include -#include #include #include -#include #include #include @@ -23,61 +20,7 @@ #include "vhost.h" #include "vhost_user.h" - -TAILQ_HEAD(vhost_user_connection_list, vhost_user_connection); - -/* - * Every time rte_vhost_driver_register() is invoked, an associated - * vhost_user_socket struct will be created. - */ -struct vhost_user_socket { - struct vhost_user_connection_list conn_list; - pthread_mutex_t conn_mutex; - char *path; - int socket_fd; - struct sockaddr_un un; - bool is_server; - bool reconnect; - bool dequeue_zero_copy; - bool iommu_support; - - /* - * The "supported_features" indicates the feature bits the - * vhost driver supports. The "features" indicates the feature - * bits after the rte_vhost_driver_features_disable/enable(). - * It is also the final feature bits used for vhost-user - * features negotiation. - */ - uint64_t supported_features; - uint64_t features; - - struct vhost_device_ops const *notify_ops; -}; - -struct vhost_user_connection { - struct vhost_user_socket *vsocket; - int connfd; - int vid; - - TAILQ_ENTRY(vhost_user_connection) next; -}; - -#define MAX_VHOST_SOCKET 1024 -struct vhost_user { - struct vhost_user_socket *vsockets[MAX_VHOST_SOCKET]; - struct fdset fdset; - int vsocket_cnt; - pthread_mutex_t mutex; -}; - -#define MAX_VIRTIO_BACKLOG 128 - -static void vhost_user_server_new_connection(int fd, void *data, int *remove); -static void vhost_user_read_cb(int fd, void *dat, int *remove); -static int create_unix_socket(struct vhost_user_socket *vsocket); -static int vhost_user_start_client(struct vhost_user_socket *vsocket); - -static struct vhost_user vhost_user = { +struct vhost_user vhost_user = { .fdset = { .fd = { [0 ... MAX_FDS - 1] = {-1, NULL, NULL, NULL, 0} }, .fd_mutex = PTHREAD_MUTEX_INITIALIZER, @@ -87,424 +30,6 @@ static struct vhost_user vhost_user = { .mutex = PTHREAD_MUTEX_INITIALIZER, }; -/* return bytes# of read on success or negative val on failure. */ -int -read_fd_message(int sockfd, char *buf, int buflen, int *fds, int fd_num) -{ - struct iovec iov; - struct msghdr msgh; - size_t fdsize = fd_num * sizeof(int); - char control[CMSG_SPACE(fdsize)]; - struct cmsghdr *cmsg; - int ret; - - memset(&msgh, 0, sizeof(msgh)); - iov.iov_base = buf; - iov.iov_len = buflen; - - msgh.msg_iov = &iov; - msgh.msg_iovlen = 1; - msgh.msg_control = control; - msgh.msg_controllen = sizeof(control); - - ret = recvmsg(sockfd, &msgh, 0); - if (ret <= 0) { - RTE_LOG(ERR, VHOST_CONFIG, "recvmsg failed\n"); - return ret; - } - - if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) { - RTE_LOG(ERR, VHOST_CONFIG, "truncted msg\n"); - return -1; - } - - for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL; - cmsg = CMSG_NXTHDR(&msgh, cmsg)) { - if ((cmsg->cmsg_level == SOL_SOCKET) && - (cmsg->cmsg_type == SCM_RIGHTS)) { - memcpy(fds, CMSG_DATA(cmsg), fdsize); - break; - } - } - - return ret; -} - -int -send_fd_message(int sockfd, char *buf, int buflen, int *fds, int fd_num) -{ - - struct iovec iov; - struct msghdr msgh; - size_t fdsize = fd_num * sizeof(int); - char control[CMSG_SPACE(fdsize)]; - struct cmsghdr *cmsg; - int ret; - - memset(&msgh, 0, sizeof(msgh)); - iov.iov_base = buf; - iov.iov_len = buflen; - - msgh.msg_iov = &iov; - msgh.msg_iovlen = 1; - - if (fds && fd_num > 0) { - msgh.msg_control = control; - msgh.msg_controllen = sizeof(control); - cmsg = CMSG_FIRSTHDR(&msgh); - cmsg->cmsg_len = CMSG_LEN(fdsize); - cmsg->cmsg_level = SOL_SOCKET; - cmsg->cmsg_type = SCM_RIGHTS; - memcpy(CMSG_DATA(cmsg), fds, fdsize); - } else { - msgh.msg_control = NULL; - msgh.msg_controllen = 0; - } - - do { - ret = sendmsg(sockfd, &msgh, 0); - } while (ret < 0 && errno == EINTR); - - if (ret < 0) { - RTE_LOG(ERR, VHOST_CONFIG, "sendmsg error\n"); - return ret; - } - - return ret; -} - -static void -vhost_user_add_connection(int fd, struct vhost_user_socket *vsocket) -{ - int vid; - size_t size; - struct vhost_user_connection *conn; - int ret; - - conn = malloc(sizeof(*conn)); - if (conn == NULL) { - close(fd); - return; - } - - vid = vhost_new_device(); - if (vid == -1) { - goto err; - } - - size = strnlen(vsocket->path, PATH_MAX); - vhost_set_ifname(vid, vsocket->path, size); - - if (vsocket->dequeue_zero_copy) - vhost_enable_dequeue_zero_copy(vid); - - RTE_LOG(INFO, VHOST_CONFIG, "new device, handle is %d\n", vid); - - if (vsocket->notify_ops->new_connection) { - ret = vsocket->notify_ops->new_connection(vid); - if (ret < 0) { - RTE_LOG(ERR, VHOST_CONFIG, - "failed to add vhost user connection with fd %d\n", - fd); - goto err; - } - } - - conn->connfd = fd; - conn->vsocket = vsocket; - conn->vid = vid; - ret = fdset_add(&vhost_user.fdset, fd, vhost_user_read_cb, - NULL, conn); - if (ret < 0) { - RTE_LOG(ERR, VHOST_CONFIG, - "failed to add fd %d into vhost server fdset\n", - fd); - - if (vsocket->notify_ops->destroy_connection) - vsocket->notify_ops->destroy_connection(conn->vid); - - goto err; - } - - pthread_mutex_lock(&vsocket->conn_mutex); - TAILQ_INSERT_TAIL(&vsocket->conn_list, conn, next); - pthread_mutex_unlock(&vsocket->conn_mutex); - return; - -err: - free(conn); - close(fd); -} - -/* call back when there is new vhost-user connection from client */ -static void -vhost_user_server_new_connection(int fd, void *dat, int *remove __rte_unused) -{ - struct vhost_user_socket *vsocket = dat; - - fd = accept(fd, NULL, NULL); - if (fd < 0) - return; - - RTE_LOG(INFO, VHOST_CONFIG, "new vhost user connection is %d\n", fd); - vhost_user_add_connection(fd, vsocket); -} - -static void -vhost_user_read_cb(int connfd, void *dat, int *remove) -{ - struct vhost_user_connection *conn = dat; - struct vhost_user_socket *vsocket = conn->vsocket; - int ret; - - ret = vhost_user_msg_handler(conn->vid, connfd); - if (ret < 0) { - close(connfd); - *remove = 1; - vhost_destroy_device(conn->vid); - - if (vsocket->notify_ops->destroy_connection) - vsocket->notify_ops->destroy_connection(conn->vid); - - pthread_mutex_lock(&vsocket->conn_mutex); - TAILQ_REMOVE(&vsocket->conn_list, conn, next); - pthread_mutex_unlock(&vsocket->conn_mutex); - - free(conn); - - if (vsocket->reconnect) { - create_unix_socket(vsocket); - vhost_user_start_client(vsocket); - } - } -} - -static int -create_unix_socket(struct vhost_user_socket *vsocket) -{ - int fd; - struct sockaddr_un *un = &vsocket->un; - - fd = socket(AF_UNIX, SOCK_STREAM, 0); - if (fd < 0) - return -1; - RTE_LOG(INFO, VHOST_CONFIG, "vhost-user %s: socket created, fd: %d\n", - vsocket->is_server ? "server" : "client", fd); - - if (!vsocket->is_server && fcntl(fd, F_SETFL, O_NONBLOCK)) { - RTE_LOG(ERR, VHOST_CONFIG, - "vhost-user: can't set nonblocking mode for socket, fd: " - "%d (%s)\n", fd, strerror(errno)); - close(fd); - return -1; - } - - memset(un, 0, sizeof(*un)); - un->sun_family = AF_UNIX; - strncpy(un->sun_path, vsocket->path, sizeof(un->sun_path)); - un->sun_path[sizeof(un->sun_path) - 1] = '\0'; - - vsocket->socket_fd = fd; - return 0; -} - -static int -vhost_user_start_server(struct vhost_user_socket *vsocket) -{ - int ret; - int fd = vsocket->socket_fd; - const char *path = vsocket->path; - - ret = bind(fd, (struct sockaddr *)&vsocket->un, sizeof(vsocket->un)); - if (ret < 0) { - RTE_LOG(ERR, VHOST_CONFIG, - "failed to bind to %s: %s; remove it and try again\n", - path, strerror(errno)); - goto err; - } - RTE_LOG(INFO, VHOST_CONFIG, "bind to %s\n", path); - - ret = listen(fd, MAX_VIRTIO_BACKLOG); - if (ret < 0) - goto err; - - ret = fdset_add(&vhost_user.fdset, fd, vhost_user_server_new_connection, - NULL, vsocket); - if (ret < 0) { - RTE_LOG(ERR, VHOST_CONFIG, - "failed to add listen fd %d to vhost server fdset\n", - fd); - goto err; - } - - return 0; - -err: - close(fd); - return -1; -} - -struct vhost_user_reconnect { - struct sockaddr_un un; - int fd; - struct vhost_user_socket *vsocket; - - TAILQ_ENTRY(vhost_user_reconnect) next; -}; - -TAILQ_HEAD(vhost_user_reconnect_tailq_list, vhost_user_reconnect); -struct vhost_user_reconnect_list { - struct vhost_user_reconnect_tailq_list head; - pthread_mutex_t mutex; -}; - -static struct vhost_user_reconnect_list reconn_list; -static pthread_t reconn_tid; - -static int -vhost_user_connect_nonblock(int fd, struct sockaddr *un, size_t sz) -{ - int ret, flags; - - ret = connect(fd, un, sz); - if (ret < 0 && errno != EISCONN) - return -1; - - flags = fcntl(fd, F_GETFL, 0); - if (flags < 0) { - RTE_LOG(ERR, VHOST_CONFIG, - "can't get flags for connfd %d\n", fd); - return -2; - } - if ((flags & O_NONBLOCK) && fcntl(fd, F_SETFL, flags & ~O_NONBLOCK)) { - RTE_LOG(ERR, VHOST_CONFIG, - "can't disable nonblocking on fd %d\n", fd); - return -2; - } - return 0; -} - -static void * -vhost_user_client_reconnect(void *arg __rte_unused) -{ - int ret; - struct vhost_user_reconnect *reconn, *next; - - while (1) { - pthread_mutex_lock(&reconn_list.mutex); - - /* - * An equal implementation of TAILQ_FOREACH_SAFE, - * which does not exist on all platforms. - */ - for (reconn = TAILQ_FIRST(&reconn_list.head); - reconn != NULL; reconn = next) { - next = TAILQ_NEXT(reconn, next); - - ret = vhost_user_connect_nonblock(reconn->fd, - (struct sockaddr *)&reconn->un, - sizeof(reconn->un)); - if (ret == -2) { - close(reconn->fd); - RTE_LOG(ERR, VHOST_CONFIG, - "reconnection for fd %d failed\n", - reconn->fd); - goto remove_fd; - } - if (ret == -1) - continue; - - RTE_LOG(INFO, VHOST_CONFIG, - "%s: connected\n", reconn->vsocket->path); - vhost_user_add_connection(reconn->fd, reconn->vsocket); -remove_fd: - TAILQ_REMOVE(&reconn_list.head, reconn, next); - free(reconn); - } - - pthread_mutex_unlock(&reconn_list.mutex); - sleep(1); - } - - return NULL; -} - -static int -vhost_user_reconnect_init(void) -{ - int ret; - char thread_name[RTE_MAX_THREAD_NAME_LEN]; - - ret = pthread_mutex_init(&reconn_list.mutex, NULL); - if (ret < 0) { - RTE_LOG(ERR, VHOST_CONFIG, "failed to initialize mutex"); - return ret; - } - TAILQ_INIT(&reconn_list.head); - - ret = pthread_create(&reconn_tid, NULL, - vhost_user_client_reconnect, NULL); - if (ret != 0) { - RTE_LOG(ERR, VHOST_CONFIG, "failed to create reconnect thread"); - if (pthread_mutex_destroy(&reconn_list.mutex)) { - RTE_LOG(ERR, VHOST_CONFIG, - "failed to destroy reconnect mutex"); - } - } else { - snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, - "vhost-reconn"); - - if (rte_thread_setname(reconn_tid, thread_name)) { - RTE_LOG(DEBUG, VHOST_CONFIG, - "failed to set reconnect thread name"); - } - } - - return ret; -} - -static int -vhost_user_start_client(struct vhost_user_socket *vsocket) -{ - int ret; - int fd = vsocket->socket_fd; - const char *path = vsocket->path; - struct vhost_user_reconnect *reconn; - - ret = vhost_user_connect_nonblock(fd, (struct sockaddr *)&vsocket->un, - sizeof(vsocket->un)); - if (ret == 0) { - vhost_user_add_connection(fd, vsocket); - return 0; - } - - RTE_LOG(WARNING, VHOST_CONFIG, - "failed to connect to %s: %s\n", - path, strerror(errno)); - - if (ret == -2 || !vsocket->reconnect) { - close(fd); - return -1; - } - - RTE_LOG(INFO, VHOST_CONFIG, "%s: reconnecting...\n", path); - reconn = malloc(sizeof(*reconn)); - if (reconn == NULL) { - RTE_LOG(ERR, VHOST_CONFIG, - "failed to allocate memory for reconnect\n"); - close(fd); - return -1; - } - reconn->un = vsocket->un; - reconn->fd = fd; - reconn->vsocket = vsocket; - pthread_mutex_lock(&reconn_list.mutex); - TAILQ_INSERT_TAIL(&reconn_list.head, reconn, next); - pthread_mutex_unlock(&reconn_list.mutex); - - return 0; -} - static struct vhost_user_socket * find_vhost_user_socket(const char *path) { @@ -688,30 +213,6 @@ rte_vhost_driver_register(const char *path, uint64_t flags) return ret; } -static bool -vhost_user_remove_reconnect(struct vhost_user_socket *vsocket) -{ - int found = false; - struct vhost_user_reconnect *reconn, *next; - - pthread_mutex_lock(&reconn_list.mutex); - - for (reconn = TAILQ_FIRST(&reconn_list.head); - reconn != NULL; reconn = next) { - next = TAILQ_NEXT(reconn, next); - - if (reconn->vsocket == vsocket) { - TAILQ_REMOVE(&reconn_list.head, reconn, next); - close(reconn->fd); - free(reconn); - found = true; - break; - } - } - pthread_mutex_unlock(&reconn_list.mutex); - return found; -} - /** * Unregister the specified vhost socket */ diff --git a/lib/librte_vhost/trans_af_unix.c b/lib/librte_vhost/trans_af_unix.c index 9ed04b7eb..636f69916 100644 --- a/lib/librte_vhost/trans_af_unix.c +++ b/lib/librte_vhost/trans_af_unix.c @@ -33,7 +33,458 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +#include + +#include + #include "vhost.h" +#include "vhost_user.h" + +#define MAX_VIRTIO_BACKLOG 128 + +static void vhost_user_read_cb(int connfd, void *dat, int *remove); + +/* return bytes# of read on success or negative val on failure. */ +int +read_fd_message(int sockfd, char *buf, int buflen, int *fds, int fd_num) +{ + struct iovec iov; + struct msghdr msgh; + size_t fdsize = fd_num * sizeof(int); + char control[CMSG_SPACE(fdsize)]; + struct cmsghdr *cmsg; + int ret; + + memset(&msgh, 0, sizeof(msgh)); + iov.iov_base = buf; + iov.iov_len = buflen; + + msgh.msg_iov = &iov; + msgh.msg_iovlen = 1; + msgh.msg_control = control; + msgh.msg_controllen = sizeof(control); + + ret = recvmsg(sockfd, &msgh, 0); + if (ret <= 0) { + RTE_LOG(ERR, VHOST_CONFIG, "recvmsg failed\n"); + return ret; + } + + if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) { + RTE_LOG(ERR, VHOST_CONFIG, "truncted msg\n"); + return -1; + } + + for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL; + cmsg = CMSG_NXTHDR(&msgh, cmsg)) { + if ((cmsg->cmsg_level == SOL_SOCKET) && + (cmsg->cmsg_type == SCM_RIGHTS)) { + memcpy(fds, CMSG_DATA(cmsg), fdsize); + break; + } + } + + return ret; +} + +int +send_fd_message(int sockfd, char *buf, int buflen, int *fds, int fd_num) +{ + + struct iovec iov; + struct msghdr msgh; + size_t fdsize = fd_num * sizeof(int); + char control[CMSG_SPACE(fdsize)]; + struct cmsghdr *cmsg; + int ret; + + memset(&msgh, 0, sizeof(msgh)); + iov.iov_base = buf; + iov.iov_len = buflen; + + msgh.msg_iov = &iov; + msgh.msg_iovlen = 1; + + if (fds && fd_num > 0) { + msgh.msg_control = control; + msgh.msg_controllen = sizeof(control); + cmsg = CMSG_FIRSTHDR(&msgh); + cmsg->cmsg_len = CMSG_LEN(fdsize); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + memcpy(CMSG_DATA(cmsg), fds, fdsize); + } else { + msgh.msg_control = NULL; + msgh.msg_controllen = 0; + } + + do { + ret = sendmsg(sockfd, &msgh, 0); + } while (ret < 0 && errno == EINTR); + + if (ret < 0) { + RTE_LOG(ERR, VHOST_CONFIG, "sendmsg error\n"); + return ret; + } + + return ret; +} + +static void +vhost_user_add_connection(int fd, struct vhost_user_socket *vsocket) +{ + int vid; + size_t size; + struct vhost_user_connection *conn; + int ret; + + conn = malloc(sizeof(*conn)); + if (conn == NULL) { + close(fd); + return; + } + + vid = vhost_new_device(); + if (vid == -1) { + goto err; + } + + size = strnlen(vsocket->path, PATH_MAX); + vhost_set_ifname(vid, vsocket->path, size); + + if (vsocket->dequeue_zero_copy) + vhost_enable_dequeue_zero_copy(vid); + + RTE_LOG(INFO, VHOST_CONFIG, "new device, handle is %d\n", vid); + + if (vsocket->notify_ops->new_connection) { + ret = vsocket->notify_ops->new_connection(vid); + if (ret < 0) { + RTE_LOG(ERR, VHOST_CONFIG, + "failed to add vhost user connection with fd %d\n", + fd); + goto err; + } + } + + conn->connfd = fd; + conn->vsocket = vsocket; + conn->vid = vid; + ret = fdset_add(&vhost_user.fdset, fd, vhost_user_read_cb, + NULL, conn); + if (ret < 0) { + RTE_LOG(ERR, VHOST_CONFIG, + "failed to add fd %d into vhost server fdset\n", + fd); + + if (vsocket->notify_ops->destroy_connection) + vsocket->notify_ops->destroy_connection(conn->vid); + + goto err; + } + + pthread_mutex_lock(&vsocket->conn_mutex); + TAILQ_INSERT_TAIL(&vsocket->conn_list, conn, next); + pthread_mutex_unlock(&vsocket->conn_mutex); + return; + +err: + free(conn); + close(fd); +} + +/* call back when there is new vhost-user connection from client */ +static void +vhost_user_server_new_connection(int fd, void *dat, int *remove __rte_unused) +{ + struct vhost_user_socket *vsocket = dat; + + fd = accept(fd, NULL, NULL); + if (fd < 0) + return; + + RTE_LOG(INFO, VHOST_CONFIG, "new vhost user connection is %d\n", fd); + vhost_user_add_connection(fd, vsocket); +} + +static void +vhost_user_read_cb(int connfd, void *dat, int *remove) +{ + struct vhost_user_connection *conn = dat; + struct vhost_user_socket *vsocket = conn->vsocket; + int ret; + + ret = vhost_user_msg_handler(conn->vid, connfd); + if (ret < 0) { + close(connfd); + *remove = 1; + vhost_destroy_device(conn->vid); + + if (vsocket->notify_ops->destroy_connection) + vsocket->notify_ops->destroy_connection(conn->vid); + + pthread_mutex_lock(&vsocket->conn_mutex); + TAILQ_REMOVE(&vsocket->conn_list, conn, next); + pthread_mutex_unlock(&vsocket->conn_mutex); + + free(conn); + + if (vsocket->reconnect) { + create_unix_socket(vsocket); + vhost_user_start_client(vsocket); + } + } +} + +int +create_unix_socket(struct vhost_user_socket *vsocket) +{ + int fd; + struct sockaddr_un *un = &vsocket->un; + + fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (fd < 0) + return -1; + RTE_LOG(INFO, VHOST_CONFIG, "vhost-user %s: socket created, fd: %d\n", + vsocket->is_server ? "server" : "client", fd); + + if (!vsocket->is_server && fcntl(fd, F_SETFL, O_NONBLOCK)) { + RTE_LOG(ERR, VHOST_CONFIG, + "vhost-user: can't set nonblocking mode for socket, fd: " + "%d (%s)\n", fd, strerror(errno)); + close(fd); + return -1; + } + + memset(un, 0, sizeof(*un)); + un->sun_family = AF_UNIX; + strncpy(un->sun_path, vsocket->path, sizeof(un->sun_path)); + un->sun_path[sizeof(un->sun_path) - 1] = '\0'; + + vsocket->socket_fd = fd; + return 0; +} + +int +vhost_user_start_server(struct vhost_user_socket *vsocket) +{ + int ret; + int fd = vsocket->socket_fd; + const char *path = vsocket->path; + + ret = bind(fd, (struct sockaddr *)&vsocket->un, sizeof(vsocket->un)); + if (ret < 0) { + RTE_LOG(ERR, VHOST_CONFIG, + "failed to bind to %s: %s; remove it and try again\n", + path, strerror(errno)); + goto err; + } + RTE_LOG(INFO, VHOST_CONFIG, "bind to %s\n", path); + + ret = listen(fd, MAX_VIRTIO_BACKLOG); + if (ret < 0) + goto err; + + ret = fdset_add(&vhost_user.fdset, fd, vhost_user_server_new_connection, + NULL, vsocket); + if (ret < 0) { + RTE_LOG(ERR, VHOST_CONFIG, + "failed to add listen fd %d to vhost server fdset\n", + fd); + goto err; + } + + return 0; + +err: + close(fd); + return -1; +} + +struct vhost_user_reconnect { + struct sockaddr_un un; + int fd; + struct vhost_user_socket *vsocket; + + TAILQ_ENTRY(vhost_user_reconnect) next; +}; + +TAILQ_HEAD(vhost_user_reconnect_tailq_list, vhost_user_reconnect); +struct vhost_user_reconnect_list { + struct vhost_user_reconnect_tailq_list head; + pthread_mutex_t mutex; +}; + +static struct vhost_user_reconnect_list reconn_list; +pthread_t reconn_tid; + +static int +vhost_user_connect_nonblock(int fd, struct sockaddr *un, size_t sz) +{ + int ret, flags; + + ret = connect(fd, un, sz); + if (ret < 0 && errno != EISCONN) + return -1; + + flags = fcntl(fd, F_GETFL, 0); + if (flags < 0) { + RTE_LOG(ERR, VHOST_CONFIG, + "can't get flags for connfd %d\n", fd); + return -2; + } + if ((flags & O_NONBLOCK) && fcntl(fd, F_SETFL, flags & ~O_NONBLOCK)) { + RTE_LOG(ERR, VHOST_CONFIG, + "can't disable nonblocking on fd %d\n", fd); + return -2; + } + return 0; +} + +static void * +vhost_user_client_reconnect(void *arg __rte_unused) +{ + int ret; + struct vhost_user_reconnect *reconn, *next; + + while (1) { + pthread_mutex_lock(&reconn_list.mutex); + + /* + * An equal implementation of TAILQ_FOREACH_SAFE, + * which does not exist on all platforms. + */ + for (reconn = TAILQ_FIRST(&reconn_list.head); + reconn != NULL; reconn = next) { + next = TAILQ_NEXT(reconn, next); + + ret = vhost_user_connect_nonblock(reconn->fd, + (struct sockaddr *)&reconn->un, + sizeof(reconn->un)); + if (ret == -2) { + close(reconn->fd); + RTE_LOG(ERR, VHOST_CONFIG, + "reconnection for fd %d failed\n", + reconn->fd); + goto remove_fd; + } + if (ret == -1) + continue; + + RTE_LOG(INFO, VHOST_CONFIG, + "%s: connected\n", reconn->vsocket->path); + vhost_user_add_connection(reconn->fd, reconn->vsocket); +remove_fd: + TAILQ_REMOVE(&reconn_list.head, reconn, next); + free(reconn); + } + + pthread_mutex_unlock(&reconn_list.mutex); + sleep(1); + } + + return NULL; +} + +int +vhost_user_reconnect_init(void) +{ + int ret; + char thread_name[RTE_MAX_THREAD_NAME_LEN]; + + ret = pthread_mutex_init(&reconn_list.mutex, NULL); + if (ret < 0) { + RTE_LOG(ERR, VHOST_CONFIG, "failed to initialize mutex"); + return ret; + } + TAILQ_INIT(&reconn_list.head); + + ret = pthread_create(&reconn_tid, NULL, + vhost_user_client_reconnect, NULL); + if (ret != 0) { + RTE_LOG(ERR, VHOST_CONFIG, "failed to create reconnect thread"); + if (pthread_mutex_destroy(&reconn_list.mutex)) { + RTE_LOG(ERR, VHOST_CONFIG, + "failed to destroy reconnect mutex"); + } + } else { + snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, + "vhost-reconn"); + + if (rte_thread_setname(reconn_tid, thread_name)) { + RTE_LOG(DEBUG, VHOST_CONFIG, + "failed to set reconnect thread name"); + } + } + + return ret; +} + +int +vhost_user_start_client(struct vhost_user_socket *vsocket) +{ + int ret; + int fd = vsocket->socket_fd; + const char *path = vsocket->path; + struct vhost_user_reconnect *reconn; + + ret = vhost_user_connect_nonblock(fd, (struct sockaddr *)&vsocket->un, + sizeof(vsocket->un)); + if (ret == 0) { + vhost_user_add_connection(fd, vsocket); + return 0; + } + + RTE_LOG(WARNING, VHOST_CONFIG, + "failed to connect to %s: %s\n", + path, strerror(errno)); + + if (ret == -2 || !vsocket->reconnect) { + close(fd); + return -1; + } + + RTE_LOG(INFO, VHOST_CONFIG, "%s: reconnecting...\n", path); + reconn = malloc(sizeof(*reconn)); + if (reconn == NULL) { + RTE_LOG(ERR, VHOST_CONFIG, + "failed to allocate memory for reconnect\n"); + close(fd); + return -1; + } + reconn->un = vsocket->un; + reconn->fd = fd; + reconn->vsocket = vsocket; + pthread_mutex_lock(&reconn_list.mutex); + TAILQ_INSERT_TAIL(&reconn_list.head, reconn, next); + pthread_mutex_unlock(&reconn_list.mutex); + + return 0; +} + +bool +vhost_user_remove_reconnect(struct vhost_user_socket *vsocket) +{ + int found = false; + struct vhost_user_reconnect *reconn, *next; + + pthread_mutex_lock(&reconn_list.mutex); + + for (reconn = TAILQ_FIRST(&reconn_list.head); + reconn != NULL; reconn = next) { + next = TAILQ_NEXT(reconn, next); + + if (reconn->vsocket == vsocket) { + TAILQ_REMOVE(&reconn_list.head, reconn, next); + close(reconn->fd); + free(reconn); + found = true; + break; + } + } + pthread_mutex_unlock(&reconn_list.mutex); + return found; +} static int af_unix_vring_call(struct virtio_net *dev __rte_unused, -- 2.14.3