1 /* 2 * Sharing QEMU devices via vhost-user protocol 3 * 4 * Copyright (c) Coiby Xu <coiby.xu@gmail.com>. 5 * Copyright (c) 2020 Red Hat, Inc. 6 * 7 * This work is licensed under the terms of the GNU GPL, version 2 or 8 * later. See the COPYING file in the top-level directory. 9 */ 10 #include "qemu/osdep.h" 11 #include "qemu/main-loop.h" 12 #include "qemu/vhost-user-server.h" 13 #include "block/aio-wait.h" 14 15 /* 16 * Theory of operation: 17 * 18 * VuServer is started and stopped by vhost_user_server_start() and 19 * vhost_user_server_stop() from the main loop thread. Starting the server 20 * opens a vhost-user UNIX domain socket and listens for incoming connections. 21 * Only one connection is allowed at a time. 22 * 23 * The connection is handled by the vu_client_trip() coroutine in the 24 * VuServer->ctx AioContext. The coroutine consists of a vu_dispatch() loop 25 * where libvhost-user calls vu_message_read() to receive the next vhost-user 26 * protocol messages over the UNIX domain socket. 27 * 28 * When virtqueues are set up libvhost-user calls set_watch() to monitor kick 29 * fds. These fds are also handled in the VuServer->ctx AioContext. 30 * 31 * Both vu_client_trip() and kick fd monitoring can be stopped by shutting down 32 * the socket connection. Shutting down the socket connection causes 33 * vu_message_read() to fail since no more data can be received from the socket. 34 * After vu_dispatch() fails, vu_client_trip() calls vu_deinit() to stop 35 * libvhost-user before terminating the coroutine. vu_deinit() calls 36 * remove_watch() to stop monitoring kick fds and this stops virtqueue 37 * processing. 38 * 39 * When vu_client_trip() has finished cleaning up it schedules a BH in the main 40 * loop thread to accept the next client connection. 41 * 42 * When libvhost-user detects an error it calls panic_cb() and sets the 43 * dev->broken flag. Both vu_client_trip() and kick fd processing stop when 44 * the dev->broken flag is set. 45 * 46 * It is possible to switch AioContexts using 47 * vhost_user_server_detach_aio_context() and 48 * vhost_user_server_attach_aio_context(). They stop monitoring fds in the old 49 * AioContext and resume monitoring in the new AioContext. The vu_client_trip() 50 * coroutine remains in a yielded state during the switch. This is made 51 * possible by QIOChannel's support for spurious coroutine re-entry in 52 * qio_channel_yield(). The coroutine will restart I/O when re-entered from the 53 * new AioContext. 54 */ 55 56 static void vmsg_close_fds(VhostUserMsg *vmsg) 57 { 58 int i; 59 for (i = 0; i < vmsg->fd_num; i++) { 60 close(vmsg->fds[i]); 61 } 62 } 63 64 static void vmsg_unblock_fds(VhostUserMsg *vmsg) 65 { 66 int i; 67 for (i = 0; i < vmsg->fd_num; i++) { 68 qemu_set_nonblock(vmsg->fds[i]); 69 } 70 } 71 72 static void panic_cb(VuDev *vu_dev, const char *buf) 73 { 74 error_report("vu_panic: %s", buf); 75 } 76 77 static bool coroutine_fn 78 vu_message_read(VuDev *vu_dev, int conn_fd, VhostUserMsg *vmsg) 79 { 80 struct iovec iov = { 81 .iov_base = (char *)vmsg, 82 .iov_len = VHOST_USER_HDR_SIZE, 83 }; 84 int rc, read_bytes = 0; 85 Error *local_err = NULL; 86 const size_t max_fds = G_N_ELEMENTS(vmsg->fds); 87 VuServer *server = container_of(vu_dev, VuServer, vu_dev); 88 QIOChannel *ioc = server->ioc; 89 90 vmsg->fd_num = 0; 91 if (!ioc) { 92 error_report_err(local_err); 93 goto fail; 94 } 95 96 assert(qemu_in_coroutine()); 97 do { 98 size_t nfds = 0; 99 int *fds = NULL; 100 101 /* 102 * qio_channel_readv_full may have short reads, keeping calling it 103 * until getting VHOST_USER_HDR_SIZE or 0 bytes in total 104 */ 105 rc = qio_channel_readv_full(ioc, &iov, 1, &fds, &nfds, &local_err); 106 if (rc < 0) { 107 if (rc == QIO_CHANNEL_ERR_BLOCK) { 108 assert(local_err == NULL); 109 qio_channel_yield(ioc, G_IO_IN); 110 continue; 111 } else { 112 error_report_err(local_err); 113 goto fail; 114 } 115 } 116 117 if (nfds > 0) { 118 if (vmsg->fd_num + nfds > max_fds) { 119 error_report("A maximum of %zu fds are allowed, " 120 "however got %zu fds now", 121 max_fds, vmsg->fd_num + nfds); 122 g_free(fds); 123 goto fail; 124 } 125 memcpy(vmsg->fds + vmsg->fd_num, fds, nfds * sizeof(vmsg->fds[0])); 126 vmsg->fd_num += nfds; 127 g_free(fds); 128 } 129 130 if (rc == 0) { /* socket closed */ 131 goto fail; 132 } 133 134 iov.iov_base += rc; 135 iov.iov_len -= rc; 136 read_bytes += rc; 137 } while (read_bytes != VHOST_USER_HDR_SIZE); 138 139 /* qio_channel_readv_full will make socket fds blocking, unblock them */ 140 vmsg_unblock_fds(vmsg); 141 if (vmsg->size > sizeof(vmsg->payload)) { 142 error_report("Error: too big message request: %d, " 143 "size: vmsg->size: %u, " 144 "while sizeof(vmsg->payload) = %zu", 145 vmsg->request, vmsg->size, sizeof(vmsg->payload)); 146 goto fail; 147 } 148 149 struct iovec iov_payload = { 150 .iov_base = (char *)&vmsg->payload, 151 .iov_len = vmsg->size, 152 }; 153 if (vmsg->size) { 154 rc = qio_channel_readv_all_eof(ioc, &iov_payload, 1, &local_err); 155 if (rc != 1) { 156 if (local_err) { 157 error_report_err(local_err); 158 } 159 goto fail; 160 } 161 } 162 163 return true; 164 165 fail: 166 vmsg_close_fds(vmsg); 167 168 return false; 169 } 170 171 static coroutine_fn void vu_client_trip(void *opaque) 172 { 173 VuServer *server = opaque; 174 VuDev *vu_dev = &server->vu_dev; 175 176 while (!vu_dev->broken && vu_dispatch(vu_dev)) { 177 /* Keep running */ 178 } 179 180 vu_deinit(vu_dev); 181 182 /* vu_deinit() should have called remove_watch() */ 183 assert(QTAILQ_EMPTY(&server->vu_fd_watches)); 184 185 object_unref(OBJECT(server->sioc)); 186 server->sioc = NULL; 187 188 object_unref(OBJECT(server->ioc)); 189 server->ioc = NULL; 190 191 server->co_trip = NULL; 192 if (server->restart_listener_bh) { 193 qemu_bh_schedule(server->restart_listener_bh); 194 } 195 aio_wait_kick(); 196 } 197 198 /* 199 * a wrapper for vu_kick_cb 200 * 201 * since aio_dispatch can only pass one user data pointer to the 202 * callback function, pack VuDev and pvt into a struct. Then unpack it 203 * and pass them to vu_kick_cb 204 */ 205 static void kick_handler(void *opaque) 206 { 207 VuFdWatch *vu_fd_watch = opaque; 208 VuDev *vu_dev = vu_fd_watch->vu_dev; 209 210 vu_fd_watch->cb(vu_dev, 0, vu_fd_watch->pvt); 211 212 /* Stop vu_client_trip() if an error occurred in vu_fd_watch->cb() */ 213 if (vu_dev->broken) { 214 VuServer *server = container_of(vu_dev, VuServer, vu_dev); 215 216 qio_channel_shutdown(server->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 217 } 218 } 219 220 static VuFdWatch *find_vu_fd_watch(VuServer *server, int fd) 221 { 222 223 VuFdWatch *vu_fd_watch, *next; 224 QTAILQ_FOREACH_SAFE(vu_fd_watch, &server->vu_fd_watches, next, next) { 225 if (vu_fd_watch->fd == fd) { 226 return vu_fd_watch; 227 } 228 } 229 return NULL; 230 } 231 232 static void 233 set_watch(VuDev *vu_dev, int fd, int vu_evt, 234 vu_watch_cb cb, void *pvt) 235 { 236 237 VuServer *server = container_of(vu_dev, VuServer, vu_dev); 238 g_assert(vu_dev); 239 g_assert(fd >= 0); 240 g_assert(cb); 241 242 VuFdWatch *vu_fd_watch = find_vu_fd_watch(server, fd); 243 244 if (!vu_fd_watch) { 245 VuFdWatch *vu_fd_watch = g_new0(VuFdWatch, 1); 246 247 QTAILQ_INSERT_TAIL(&server->vu_fd_watches, vu_fd_watch, next); 248 249 vu_fd_watch->fd = fd; 250 vu_fd_watch->cb = cb; 251 qemu_set_nonblock(fd); 252 aio_set_fd_handler(server->ioc->ctx, fd, true, kick_handler, 253 NULL, NULL, vu_fd_watch); 254 vu_fd_watch->vu_dev = vu_dev; 255 vu_fd_watch->pvt = pvt; 256 } 257 } 258 259 260 static void remove_watch(VuDev *vu_dev, int fd) 261 { 262 VuServer *server; 263 g_assert(vu_dev); 264 g_assert(fd >= 0); 265 266 server = container_of(vu_dev, VuServer, vu_dev); 267 268 VuFdWatch *vu_fd_watch = find_vu_fd_watch(server, fd); 269 270 if (!vu_fd_watch) { 271 return; 272 } 273 aio_set_fd_handler(server->ioc->ctx, fd, true, NULL, NULL, NULL, NULL); 274 275 QTAILQ_REMOVE(&server->vu_fd_watches, vu_fd_watch, next); 276 g_free(vu_fd_watch); 277 } 278 279 280 static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc, 281 gpointer opaque) 282 { 283 VuServer *server = opaque; 284 285 if (server->sioc) { 286 warn_report("Only one vhost-user client is allowed to " 287 "connect the server one time"); 288 return; 289 } 290 291 if (!vu_init(&server->vu_dev, server->max_queues, sioc->fd, panic_cb, 292 vu_message_read, set_watch, remove_watch, server->vu_iface)) { 293 error_report("Failed to initialize libvhost-user"); 294 return; 295 } 296 297 /* 298 * Unset the callback function for network listener to make another 299 * vhost-user client keeping waiting until this client disconnects 300 */ 301 qio_net_listener_set_client_func(server->listener, 302 NULL, 303 NULL, 304 NULL); 305 server->sioc = sioc; 306 /* 307 * Increase the object reference, so sioc will not freed by 308 * qio_net_listener_channel_func which will call object_unref(OBJECT(sioc)) 309 */ 310 object_ref(OBJECT(server->sioc)); 311 qio_channel_set_name(QIO_CHANNEL(sioc), "vhost-user client"); 312 server->ioc = QIO_CHANNEL(sioc); 313 object_ref(OBJECT(server->ioc)); 314 315 /* TODO vu_message_write() spins if non-blocking! */ 316 qio_channel_set_blocking(server->ioc, false, NULL); 317 318 server->co_trip = qemu_coroutine_create(vu_client_trip, server); 319 320 aio_context_acquire(server->ctx); 321 vhost_user_server_attach_aio_context(server, server->ctx); 322 aio_context_release(server->ctx); 323 } 324 325 void vhost_user_server_stop(VuServer *server) 326 { 327 aio_context_acquire(server->ctx); 328 329 qemu_bh_delete(server->restart_listener_bh); 330 server->restart_listener_bh = NULL; 331 332 if (server->sioc) { 333 VuFdWatch *vu_fd_watch; 334 335 QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) { 336 aio_set_fd_handler(server->ctx, vu_fd_watch->fd, true, 337 NULL, NULL, NULL, vu_fd_watch); 338 } 339 340 qio_channel_shutdown(server->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 341 342 AIO_WAIT_WHILE(server->ctx, server->co_trip); 343 } 344 345 aio_context_release(server->ctx); 346 347 if (server->listener) { 348 qio_net_listener_disconnect(server->listener); 349 object_unref(OBJECT(server->listener)); 350 } 351 } 352 353 /* 354 * Allow the next client to connect to the server. Called from a BH in the main 355 * loop. 356 */ 357 static void restart_listener_bh(void *opaque) 358 { 359 VuServer *server = opaque; 360 361 qio_net_listener_set_client_func(server->listener, vu_accept, server, 362 NULL); 363 } 364 365 /* Called with ctx acquired */ 366 void vhost_user_server_attach_aio_context(VuServer *server, AioContext *ctx) 367 { 368 VuFdWatch *vu_fd_watch; 369 370 server->ctx = ctx; 371 372 if (!server->sioc) { 373 return; 374 } 375 376 qio_channel_attach_aio_context(server->ioc, ctx); 377 378 QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) { 379 aio_set_fd_handler(ctx, vu_fd_watch->fd, true, kick_handler, NULL, 380 NULL, vu_fd_watch); 381 } 382 383 aio_co_schedule(ctx, server->co_trip); 384 } 385 386 /* Called with server->ctx acquired */ 387 void vhost_user_server_detach_aio_context(VuServer *server) 388 { 389 if (server->sioc) { 390 VuFdWatch *vu_fd_watch; 391 392 QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) { 393 aio_set_fd_handler(server->ctx, vu_fd_watch->fd, true, 394 NULL, NULL, NULL, vu_fd_watch); 395 } 396 397 qio_channel_detach_aio_context(server->ioc); 398 } 399 400 server->ctx = NULL; 401 } 402 403 bool vhost_user_server_start(VuServer *server, 404 SocketAddress *socket_addr, 405 AioContext *ctx, 406 uint16_t max_queues, 407 const VuDevIface *vu_iface, 408 Error **errp) 409 { 410 QEMUBH *bh; 411 QIONetListener *listener; 412 413 if (socket_addr->type != SOCKET_ADDRESS_TYPE_UNIX && 414 socket_addr->type != SOCKET_ADDRESS_TYPE_FD) { 415 error_setg(errp, "Only socket address types 'unix' and 'fd' are supported"); 416 return false; 417 } 418 419 listener = qio_net_listener_new(); 420 if (qio_net_listener_open_sync(listener, socket_addr, 1, 421 errp) < 0) { 422 object_unref(OBJECT(listener)); 423 return false; 424 } 425 426 bh = qemu_bh_new(restart_listener_bh, server); 427 428 /* zero out unspecified fields */ 429 *server = (VuServer) { 430 .listener = listener, 431 .restart_listener_bh = bh, 432 .vu_iface = vu_iface, 433 .max_queues = max_queues, 434 .ctx = ctx, 435 }; 436 437 qio_net_listener_set_name(server->listener, "vhost-user-backend-listener"); 438 439 qio_net_listener_set_client_func(server->listener, 440 vu_accept, 441 server, 442 NULL); 443 444 QTAILQ_INIT(&server->vu_fd_watches); 445 return true; 446 } 447