1 /* 2 * Sharing QEMU devices via vhost-user protocol 3 * 4 * Copyright (c) Coiby Xu <coiby.xu@gmail.com>. 5 * Copyright (c) 2020 Red Hat, Inc. 6 * 7 * This work is licensed under the terms of the GNU GPL, version 2 or 8 * later. See the COPYING file in the top-level directory. 9 */ 10 #include "qemu/osdep.h" 11 #include "qemu/error-report.h" 12 #include "qemu/main-loop.h" 13 #include "qemu/vhost-user-server.h" 14 #include "block/aio-wait.h" 15 16 /* 17 * Theory of operation: 18 * 19 * VuServer is started and stopped by vhost_user_server_start() and 20 * vhost_user_server_stop() from the main loop thread. Starting the server 21 * opens a vhost-user UNIX domain socket and listens for incoming connections. 22 * Only one connection is allowed at a time. 23 * 24 * The connection is handled by the vu_client_trip() coroutine in the 25 * VuServer->ctx AioContext. The coroutine consists of a vu_dispatch() loop 26 * where libvhost-user calls vu_message_read() to receive the next vhost-user 27 * protocol messages over the UNIX domain socket. 28 * 29 * When virtqueues are set up libvhost-user calls set_watch() to monitor kick 30 * fds. These fds are also handled in the VuServer->ctx AioContext. 31 * 32 * Both vu_client_trip() and kick fd monitoring can be stopped by shutting down 33 * the socket connection. Shutting down the socket connection causes 34 * vu_message_read() to fail since no more data can be received from the socket. 35 * After vu_dispatch() fails, vu_client_trip() calls vu_deinit() to stop 36 * libvhost-user before terminating the coroutine. vu_deinit() calls 37 * remove_watch() to stop monitoring kick fds and this stops virtqueue 38 * processing. 39 * 40 * When vu_client_trip() has finished cleaning up it schedules a BH in the main 41 * loop thread to accept the next client connection. 42 * 43 * When libvhost-user detects an error it calls panic_cb() and sets the 44 * dev->broken flag. Both vu_client_trip() and kick fd processing stop when 45 * the dev->broken flag is set. 46 * 47 * It is possible to switch AioContexts using 48 * vhost_user_server_detach_aio_context() and 49 * vhost_user_server_attach_aio_context(). They stop monitoring fds in the old 50 * AioContext and resume monitoring in the new AioContext. The vu_client_trip() 51 * coroutine remains in a yielded state during the switch. This is made 52 * possible by QIOChannel's support for spurious coroutine re-entry in 53 * qio_channel_yield(). The coroutine will restart I/O when re-entered from the 54 * new AioContext. 55 */ 56 57 static void vmsg_close_fds(VhostUserMsg *vmsg) 58 { 59 int i; 60 for (i = 0; i < vmsg->fd_num; i++) { 61 close(vmsg->fds[i]); 62 } 63 } 64 65 static bool vmsg_unblock_fds(VhostUserMsg *vmsg, Error **errp) 66 { 67 int i; 68 69 /* 70 * These messages carry fd used to map memory, not to send/receive messages, 71 * so this operation is useless. In addition, in some systems this 72 * operation can fail (e.g. in macOS setting an fd returned by shm_open() 73 * non-blocking fails with errno = ENOTTY) 74 */ 75 if (vmsg->request == VHOST_USER_ADD_MEM_REG || 76 vmsg->request == VHOST_USER_SET_MEM_TABLE) { 77 return true; 78 } 79 80 for (i = 0; i < vmsg->fd_num; i++) { 81 if (!qemu_set_blocking(vmsg->fds[i], false, errp)) { 82 return false; 83 } 84 } 85 86 return true; 87 } 88 89 static void panic_cb(VuDev *vu_dev, const char *buf) 90 { 91 error_report("vu_panic: %s", buf); 92 } 93 94 void vhost_user_server_inc_in_flight(VuServer *server) 95 { 96 assert(!server->wait_idle); 97 qatomic_inc(&server->in_flight); 98 } 99 100 void vhost_user_server_dec_in_flight(VuServer *server) 101 { 102 if (qatomic_fetch_dec(&server->in_flight) == 1) { 103 if (server->wait_idle) { 104 aio_co_wake(server->co_trip); 105 } 106 } 107 } 108 109 bool vhost_user_server_has_in_flight(VuServer *server) 110 { 111 return qatomic_load_acquire(&server->in_flight) > 0; 112 } 113 114 static bool coroutine_fn 115 vu_message_read(VuDev *vu_dev, int conn_fd, VhostUserMsg *vmsg) 116 { 117 struct iovec iov = { 118 .iov_base = (char *)vmsg, 119 .iov_len = VHOST_USER_HDR_SIZE, 120 }; 121 int rc, read_bytes = 0; 122 Error *local_err = NULL; 123 const size_t max_fds = G_N_ELEMENTS(vmsg->fds); 124 VuServer *server = container_of(vu_dev, VuServer, vu_dev); 125 QIOChannel *ioc = server->ioc; 126 127 vmsg->fd_num = 0; 128 if (!ioc) { 129 goto fail; 130 } 131 132 assert(qemu_in_coroutine()); 133 do { 134 size_t nfds = 0; 135 int *fds = NULL; 136 137 /* 138 * qio_channel_readv_full may have short reads, keeping calling it 139 * until getting VHOST_USER_HDR_SIZE or 0 bytes in total 140 */ 141 rc = qio_channel_readv_full(ioc, &iov, 1, &fds, &nfds, 0, &local_err); 142 if (rc < 0) { 143 if (rc == QIO_CHANNEL_ERR_BLOCK) { 144 assert(local_err == NULL); 145 if (server->ctx) { 146 server->in_qio_channel_yield = true; 147 qio_channel_yield(ioc, G_IO_IN); 148 server->in_qio_channel_yield = false; 149 } else { 150 return false; 151 } 152 continue; 153 } else { 154 error_report_err(local_err); 155 goto fail; 156 } 157 } 158 159 if (nfds > 0) { 160 if (vmsg->fd_num + nfds > max_fds) { 161 error_report("A maximum of %zu fds are allowed, " 162 "however got %zu fds now", 163 max_fds, vmsg->fd_num + nfds); 164 g_free(fds); 165 goto fail; 166 } 167 memcpy(vmsg->fds + vmsg->fd_num, fds, nfds * sizeof(vmsg->fds[0])); 168 vmsg->fd_num += nfds; 169 g_free(fds); 170 } 171 172 if (rc == 0) { /* socket closed */ 173 goto fail; 174 } 175 176 iov.iov_base += rc; 177 iov.iov_len -= rc; 178 read_bytes += rc; 179 } while (read_bytes != VHOST_USER_HDR_SIZE); 180 181 /* qio_channel_readv_full will make socket fds blocking, unblock them */ 182 if (!vmsg_unblock_fds(vmsg, &local_err)) { 183 error_report_err(local_err); 184 goto fail; 185 } 186 if (vmsg->size > sizeof(vmsg->payload)) { 187 error_report("Error: too big message request: %d, " 188 "size: vmsg->size: %u, " 189 "while sizeof(vmsg->payload) = %zu", 190 vmsg->request, vmsg->size, sizeof(vmsg->payload)); 191 goto fail; 192 } 193 194 struct iovec iov_payload = { 195 .iov_base = (char *)&vmsg->payload, 196 .iov_len = vmsg->size, 197 }; 198 if (vmsg->size) { 199 rc = qio_channel_readv_all_eof(ioc, &iov_payload, 1, &local_err); 200 if (rc != 1) { 201 if (local_err) { 202 error_report_err(local_err); 203 } 204 goto fail; 205 } 206 } 207 208 return true; 209 210 fail: 211 vmsg_close_fds(vmsg); 212 213 return false; 214 } 215 216 static coroutine_fn void vu_client_trip(void *opaque) 217 { 218 VuServer *server = opaque; 219 VuDev *vu_dev = &server->vu_dev; 220 221 while (!vu_dev->broken) { 222 if (server->quiescing) { 223 server->co_trip = NULL; 224 aio_wait_kick(); 225 return; 226 } 227 /* vu_dispatch() returns false if server->ctx went away */ 228 if (!vu_dispatch(vu_dev) && server->ctx) { 229 break; 230 } 231 } 232 233 if (vhost_user_server_has_in_flight(server)) { 234 /* Wait for requests to complete before we can unmap the memory */ 235 server->wait_idle = true; 236 qemu_coroutine_yield(); 237 server->wait_idle = false; 238 } 239 assert(!vhost_user_server_has_in_flight(server)); 240 241 vu_deinit(vu_dev); 242 243 /* vu_deinit() should have called remove_watch() */ 244 assert(QTAILQ_EMPTY(&server->vu_fd_watches)); 245 246 object_unref(OBJECT(server->sioc)); 247 server->sioc = NULL; 248 249 object_unref(OBJECT(server->ioc)); 250 server->ioc = NULL; 251 252 server->co_trip = NULL; 253 if (server->restart_listener_bh) { 254 qemu_bh_schedule(server->restart_listener_bh); 255 } 256 aio_wait_kick(); 257 } 258 259 /* 260 * a wrapper for vu_kick_cb 261 * 262 * since aio_dispatch can only pass one user data pointer to the 263 * callback function, pack VuDev and pvt into a struct. Then unpack it 264 * and pass them to vu_kick_cb 265 */ 266 static void kick_handler(void *opaque) 267 { 268 VuFdWatch *vu_fd_watch = opaque; 269 VuDev *vu_dev = vu_fd_watch->vu_dev; 270 271 vu_fd_watch->cb(vu_dev, 0, vu_fd_watch->pvt); 272 273 /* Stop vu_client_trip() if an error occurred in vu_fd_watch->cb() */ 274 if (vu_dev->broken) { 275 VuServer *server = container_of(vu_dev, VuServer, vu_dev); 276 277 qio_channel_shutdown(server->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 278 } 279 } 280 281 static VuFdWatch *find_vu_fd_watch(VuServer *server, int fd) 282 { 283 284 VuFdWatch *vu_fd_watch, *next; 285 QTAILQ_FOREACH_SAFE(vu_fd_watch, &server->vu_fd_watches, next, next) { 286 if (vu_fd_watch->fd == fd) { 287 return vu_fd_watch; 288 } 289 } 290 return NULL; 291 } 292 293 static void 294 set_watch(VuDev *vu_dev, int fd, int vu_evt, 295 vu_watch_cb cb, void *pvt) 296 { 297 298 VuServer *server = container_of(vu_dev, VuServer, vu_dev); 299 g_assert(vu_dev); 300 g_assert(fd >= 0); 301 g_assert(cb); 302 303 VuFdWatch *vu_fd_watch = find_vu_fd_watch(server, fd); 304 305 if (!vu_fd_watch) { 306 vu_fd_watch = g_new0(VuFdWatch, 1); 307 308 QTAILQ_INSERT_TAIL(&server->vu_fd_watches, vu_fd_watch, next); 309 310 vu_fd_watch->fd = fd; 311 vu_fd_watch->cb = cb; 312 /* TODO: handle error more gracefully than aborting */ 313 qemu_set_blocking(fd, false, &error_abort); 314 aio_set_fd_handler(server->ctx, fd, kick_handler, 315 NULL, NULL, NULL, vu_fd_watch); 316 vu_fd_watch->vu_dev = vu_dev; 317 vu_fd_watch->pvt = pvt; 318 } 319 } 320 321 322 static void remove_watch(VuDev *vu_dev, int fd) 323 { 324 VuServer *server; 325 g_assert(vu_dev); 326 g_assert(fd >= 0); 327 328 server = container_of(vu_dev, VuServer, vu_dev); 329 330 VuFdWatch *vu_fd_watch = find_vu_fd_watch(server, fd); 331 332 if (!vu_fd_watch) { 333 return; 334 } 335 aio_set_fd_handler(server->ctx, fd, NULL, NULL, NULL, NULL, NULL); 336 337 QTAILQ_REMOVE(&server->vu_fd_watches, vu_fd_watch, next); 338 g_free(vu_fd_watch); 339 } 340 341 342 static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc, 343 gpointer opaque) 344 { 345 VuServer *server = opaque; 346 Error *local_err = NULL; 347 348 if (server->sioc) { 349 warn_report("Only one vhost-user client is allowed to " 350 "connect the server one time"); 351 return; 352 } 353 354 if (!vu_init(&server->vu_dev, server->max_queues, sioc->fd, panic_cb, 355 vu_message_read, set_watch, remove_watch, server->vu_iface)) { 356 error_report("Failed to initialize libvhost-user"); 357 return; 358 } 359 360 /* 361 * Unset the callback function for network listener to make another 362 * vhost-user client keeping waiting until this client disconnects 363 */ 364 qio_net_listener_set_client_func(server->listener, 365 NULL, 366 NULL, 367 NULL); 368 server->sioc = sioc; 369 /* 370 * Increase the object reference, so sioc will not freed by 371 * qio_net_listener_channel_func which will call object_unref(OBJECT(sioc)) 372 */ 373 object_ref(OBJECT(server->sioc)); 374 qio_channel_set_name(QIO_CHANNEL(sioc), "vhost-user client"); 375 server->ioc = QIO_CHANNEL(sioc); 376 object_ref(OBJECT(server->ioc)); 377 378 /* TODO vu_message_write() spins if non-blocking! */ 379 if (!qio_channel_set_blocking(server->ioc, false, &local_err)) { 380 error_report_err(local_err); 381 vu_deinit(&server->vu_dev); 382 return; 383 } 384 385 qio_channel_set_follow_coroutine_ctx(server->ioc, true); 386 387 vhost_user_server_attach_aio_context(server, server->ctx); 388 } 389 390 /* server->ctx acquired by caller */ 391 void vhost_user_server_stop(VuServer *server) 392 { 393 qemu_bh_delete(server->restart_listener_bh); 394 server->restart_listener_bh = NULL; 395 396 if (server->sioc) { 397 VuFdWatch *vu_fd_watch; 398 399 QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) { 400 aio_set_fd_handler(server->ctx, vu_fd_watch->fd, 401 NULL, NULL, NULL, NULL, vu_fd_watch); 402 } 403 404 qio_channel_shutdown(server->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 405 406 AIO_WAIT_WHILE(server->ctx, server->co_trip); 407 } 408 409 if (server->listener) { 410 qio_net_listener_disconnect(server->listener); 411 object_unref(OBJECT(server->listener)); 412 } 413 } 414 415 /* 416 * Allow the next client to connect to the server. Called from a BH in the main 417 * loop. 418 */ 419 static void restart_listener_bh(void *opaque) 420 { 421 VuServer *server = opaque; 422 423 qio_net_listener_set_client_func(server->listener, vu_accept, server, 424 NULL); 425 } 426 427 /* Called with ctx acquired */ 428 void vhost_user_server_attach_aio_context(VuServer *server, AioContext *ctx) 429 { 430 VuFdWatch *vu_fd_watch; 431 432 server->ctx = ctx; 433 434 if (!server->sioc) { 435 return; 436 } 437 438 QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) { 439 aio_set_fd_handler(ctx, vu_fd_watch->fd, kick_handler, NULL, 440 NULL, NULL, vu_fd_watch); 441 } 442 443 if (server->co_trip) { 444 /* 445 * The caller didn't fully shut down co_trip (this can happen on 446 * non-polling drains like in bdrv_graph_wrlock()). This is okay as long 447 * as it no longer tries to shut it down and we're guaranteed to still 448 * be in the same AioContext as before. 449 * 450 * co_ctx can still be NULL if we get multiple calls and only just 451 * scheduled a new coroutine in the else branch. 452 */ 453 AioContext *co_ctx = qemu_coroutine_get_aio_context(server->co_trip); 454 455 assert(!server->quiescing); 456 assert(!co_ctx || co_ctx == ctx); 457 } else { 458 server->co_trip = qemu_coroutine_create(vu_client_trip, server); 459 assert(!server->in_qio_channel_yield); 460 aio_co_schedule(ctx, server->co_trip); 461 } 462 } 463 464 /* Called with server->ctx acquired */ 465 void vhost_user_server_detach_aio_context(VuServer *server) 466 { 467 if (server->sioc) { 468 VuFdWatch *vu_fd_watch; 469 470 QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) { 471 aio_set_fd_handler(server->ctx, vu_fd_watch->fd, 472 NULL, NULL, NULL, NULL, vu_fd_watch); 473 } 474 } 475 476 server->ctx = NULL; 477 478 if (server->ioc) { 479 if (server->in_qio_channel_yield) { 480 /* Stop receiving the next vhost-user message */ 481 qio_channel_wake_read(server->ioc); 482 } 483 } 484 } 485 486 bool vhost_user_server_start(VuServer *server, 487 SocketAddress *socket_addr, 488 AioContext *ctx, 489 uint16_t max_queues, 490 const VuDevIface *vu_iface, 491 Error **errp) 492 { 493 QEMUBH *bh; 494 QIONetListener *listener; 495 496 if (socket_addr->type != SOCKET_ADDRESS_TYPE_UNIX && 497 socket_addr->type != SOCKET_ADDRESS_TYPE_FD) { 498 error_setg(errp, "Only socket address types 'unix' and 'fd' are supported"); 499 return false; 500 } 501 502 listener = qio_net_listener_new(); 503 if (qio_net_listener_open_sync(listener, socket_addr, 1, 504 errp) < 0) { 505 object_unref(OBJECT(listener)); 506 return false; 507 } 508 509 bh = qemu_bh_new(restart_listener_bh, server); 510 511 /* zero out unspecified fields */ 512 *server = (VuServer) { 513 .listener = listener, 514 .restart_listener_bh = bh, 515 .vu_iface = vu_iface, 516 .max_queues = max_queues, 517 .ctx = ctx, 518 }; 519 520 qio_net_listener_set_name(server->listener, "vhost-user-backend-listener"); 521 522 qio_net_listener_set_client_func(server->listener, 523 vu_accept, 524 server, 525 NULL); 526 527 QTAILQ_INIT(&server->vu_fd_watches); 528 return true; 529 } 530